From 5d20e14dbf4717f324491802fb774950f29d3ec8 Mon Sep 17 00:00:00 2001 From: Kaushik Narayan R Date: Mon, 25 Nov 2024 22:33:32 -0700 Subject: [PATCH] added option for streaming from csv --- preprocessing/docker-compose.yml | 7 ++- preprocessing/pcap_processor.py | 101 +++++++++++++++++++++---------- 2 files changed, 73 insertions(+), 35 deletions(-) diff --git a/preprocessing/docker-compose.yml b/preprocessing/docker-compose.yml index 89b39e5..d42305b 100644 --- a/preprocessing/docker-compose.yml +++ b/preprocessing/docker-compose.yml @@ -50,10 +50,13 @@ services: aliases: - pcap_streamer volumes: - - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap" + # - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap" + - "./:/data/pcap" + - "./:/data/csv" environment: PCAP_FILE: /data/pcap/202310081400.pcap - command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"] + # command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"] + command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -c /data/csv/sample_output.csv -s --stream_size 1000"] deploy: replicas: 1 restart_policy: diff --git a/preprocessing/pcap_processor.py b/preprocessing/pcap_processor.py index 9c9edc8..f5f7552 100644 --- a/preprocessing/pcap_processor.py +++ b/preprocessing/pcap_processor.py @@ -113,6 +113,21 @@ def create_pkt_object(pkt: Packet) -> dict: return res_json + +def row_to_dict(pkt_row: list) -> dict: + """make dict from CSV row""" + + return { + "time": float(pkt_row[0]), + "l4_proto": pkt_row[1], + "src_addr": pkt_row[2], + "dst_addr": pkt_row[3], + "src_port": int(pkt_row[4]), + "dst_port": int(pkt_row[5]), + "pkt_len": int(pkt_row[6]), + } + + def prep_csv(out_file: str): with open(out_file, "w", newline="") as csvfile: writer = csv.writer(csvfile) @@ -139,7 +154,8 @@ def pkts_write_csv(pkts: list, out_file: str): if __name__ == "__main__": argp = ArgumentParser() - argp.add_argument("-f", "--pcap_file", required=True, dest="_pcap") + argp.add_argument("-f", "--pcap_file", required=False, dest="_pcap") + argp.add_argument("-c", "--csv_file", required=False, dest="_csv") argp.add_argument("-o", "--out_file", required=False, dest="_out") argp.add_argument("--stream_size", required=False, default=10000, dest="_streamsize") argp.add_argument( @@ -169,6 +185,7 @@ if __name__ == "__main__": args = argp.parse_args() pcap_file = args._pcap + csv_file = args._csv out_file = args._out streaming = args._stream sample = args._sample @@ -179,41 +196,59 @@ if __name__ == "__main__": sample_size = samplesize #1000000 batch_size = 100 #100000 - pcap_rdr = PcapReader(pcap_file) - if not streaming: - assert args._out and args._out != "" - prep_csv(out_file) + # if preprocessed data ready for streaming + if csv_file: + with open(csv_file, newline="") as f: + csv_rdr = csv.reader(f) + next(csv_rdr) # skip headers + pkts = [] - pkts = [] - cnt = 0 - seen_count = 0 - for idx, pkt in enumerate(pcap_rdr): - seen_count += 1 - # filter packets - if not pkt_filter(pkt): - continue + for idx, row in enumerate(csv_rdr): + # direct streaming to kafka goes here + producer.client.send(KAFKA_TOPIC, row_to_dict(row)) + dbg_print(row_to_dict(row)) + dbg_print("streamed packet", idx) + if idx > sample_size: + break + dbg_print(f"total streamed: {idx}") + # otherwise, process packets + else: + pcap_rdr = PcapReader(pcap_file) if not streaming: - # write to file - pkts.append(pkt_extract(pkt)) + assert args._out and args._out != "" + prep_csv(out_file) - if sample and idx > sample_size: - break + pkts = [] + cnt = 0 + seen_count = 0 + for idx, pkt in enumerate(pcap_rdr): + seen_count += 1 + # filter packets + if not pkt_filter(pkt): + continue - if idx > 0 and idx % batch_size == 0: - pkts_write_csv(pkts, out_file) - pkts = [] - else: - # direct streaming to kafka goes here - packet_data = create_pkt_object(pkt) - producer.client.send(KAFKA_TOPIC, packet_data) - cnt += 1 - #print(f"streamed packet at index {idx} ") - if idx > sample_size: break - - print(f"total seen: {seen_count-1}") - print(f"total streamed: {cnt}") - # flush remaining - if not streaming and len(pkts) > 0: - pkts_write_csv(pkts, out_file) + if not streaming: + # write to file + pkts.append(pkt_extract(pkt)) + if sample and idx > sample_size: + break + + if idx > 0 and idx % batch_size == 0: + pkts_write_csv(pkts, out_file) + pkts = [] + else: + # direct streaming to kafka goes here + packet_data = create_pkt_object(pkt) + producer.client.send(KAFKA_TOPIC, packet_data) + cnt += 1 + # print(f"streamed packet at index {idx} ") + if idx > sample_size: + break + + print(f"total seen: {seen_count-1}") + print(f"total streamed: {cnt}") + # flush remaining + if not streaming and len(pkts) > 0: + pkts_write_csv(pkts, out_file)