From d8631a5cfc3bb542354a1d1f56249d44d9ff7ac2 Mon Sep 17 00:00:00 2001 From: Akash Sivakumar Date: Sun, 3 Nov 2024 18:33:11 -0700 Subject: [PATCH] added kafka streaming part --- preprocessing/README.md | 13 +++++++++ preprocessing/pcap_processor.py | 48 ++++++++++++++++++++++++++++++--- 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/preprocessing/README.md b/preprocessing/README.md index c0eabd7..ab835fb 100644 --- a/preprocessing/README.md +++ b/preprocessing/README.md @@ -14,3 +14,16 @@ - TCP/UDP - ports - sport, dport - Packet size - in bytes - `sample_output.csv` contains a partial subset of `202310081400.pcap`, ~600K packets + +# Streaming from pcap file using Kafka +- Run pcap_processor.py file +- Arguments + - -f or --pcap_file: pcap file path, mandatory argument + - -o or --out_file: output csv file path + - -x or --sample: boolean value indicating if data has to be sampled + - -s or --stream: boolean value indicating if kafka streaming should happen + - --stream_size: integer indicating number of sampled packets + - -d or --debug: boolean value indicating if program is run in debug mode + + +python pcap_processor.py -f C:/Users/akash/storage/Asu/sem3/dds/project/202310081400.pcap -s --sample-size 1000 \ No newline at end of file diff --git a/preprocessing/pcap_processor.py b/preprocessing/pcap_processor.py index c4a14f0..8905352 100644 --- a/preprocessing/pcap_processor.py +++ b/preprocessing/pcap_processor.py @@ -6,8 +6,21 @@ from scapy.packet import Packet from scapy.utils import PcapReader from scapy.layers.inet import IP, TCP, UDP +from kafka import KafkaProducer +import json + dbg_print = lambda *x: DEBUG and print(f"[DEBUG] {x}") +# Kafka Configuration +KAFKA_TOPIC = 'pcap_stream' +KAFKA_SERVER = 'localhost:9092' # Adjust to your Kafka server + +# Initialize Kafka Producer +producer = KafkaProducer( + bootstrap_servers=KAFKA_SERVER, + value_serializer=lambda v: json.dumps(v).encode('utf-8') # Encode data as JSON +) + def pkt_filter(pkt: Packet) -> bool: """filter to include/exclude a packet""" @@ -53,6 +66,30 @@ def pkt_extract(pkt: Packet) -> list: return pkt_attrs +def create_pkt_object(pkt: Packet) -> dict: + """create a dictionary of packet attributes""" + """key: attribute name, value: attribute value""" + + l4_proto = None + if TCP in pkt: + l4_proto = TCP + elif UDP in pkt: + l4_proto = UDP + + pkt_time_str = str(datetime.fromtimestamp(float(pkt.time), timezone.utc)) + + res_json = { + "time": float(pkt.time), + "l4_proto": pkt.getlayer(l4_proto).name, + "src_addr": pkt[IP].src, + "dst_addr": pkt[IP].dst, + "src_port": pkt[l4_proto].sport, + "dst_port": pkt[l4_proto].dport, + "pkt_len": len(pkt) + } + + return res_json + def prep_csv(out_file: str): with open(out_file, "w", newline="") as csvfile: writer = csv.writer(csvfile) @@ -81,6 +118,7 @@ if __name__ == "__main__": argp = ArgumentParser() argp.add_argument("-f", "--pcap_file", required=True, dest="_pcap") argp.add_argument("-o", "--out_file", required=False, dest="_out") + argp.add_argument("--stream_size", required=False, default=10000, dest="_streamsize") argp.add_argument( "-x", "--sample", @@ -111,11 +149,12 @@ if __name__ == "__main__": out_file = args._out streaming = args._stream sample = args._sample + samplesize = int(args._streamsize) DEBUG = args._debug - sample_size = 1000000 - batch_size = 100000 + sample_size = samplesize #1000000 + batch_size = 100 #100000 pcap_rdr = PcapReader(pcap_file) if not streaming: @@ -140,7 +179,10 @@ if __name__ == "__main__": pkts = [] else: # direct streaming to kafka goes here - pass + packet_data = create_pkt_object(pkt) + producer.send(KAFKA_TOPIC, packet_data) + print(f"streamed packet at index {idx} ") + if idx > sample_size: break # flush remaining if not streaming and len(pkts) > 0: