From e6c0182724dbb0feddd553e05c8602af50a6a118 Mon Sep 17 00:00:00 2001 From: Kaushik Narayan R Date: Fri, 15 Nov 2024 13:12:04 -0700 Subject: [PATCH] aggregating data --- preprocessing/pcap_aggregation.sh | 41 +++++++++++++++++++++++++++ preprocessing/pcap_processor.py | 46 ++++++++++++++++--------------- 2 files changed, 65 insertions(+), 22 deletions(-) create mode 100644 preprocessing/pcap_aggregation.sh diff --git a/preprocessing/pcap_aggregation.sh b/preprocessing/pcap_aggregation.sh new file mode 100644 index 0000000..2b8d494 --- /dev/null +++ b/preprocessing/pcap_aggregation.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +data_year=2023 +data_month=10 + +# some info + +total_size=0 +for data_day in {01..31}; do + pcap_size=$(curl -sI "http://mawi.nezu.wide.ad.jp/mawi/samplepoint-F/${data_year}/${data_year}${data_month}${i}1400.pcap.gz" | + grep Content-Length | + awk '{printf "%.3f", $2/1024/1024/1024}') + echo "${data_year}-${data_month}-${data_day} - ${pcap_size} GB" + total_size=$(echo $total_size + $pcap_size | bc -l) +done + +echo "Total size (compressed) of ${data_year}-${data_month} - ${total_size} GB" +# Total size (compressed) of 2023-10 - 193.292 GB + +# extracting data + +mkdir -p csv_files + +for data_day in {01..31}; do + if [[ ! -f "${data_year}${data_month}${data_day}1400.pcap.gz" ]]; then + wget "http://mawi.nezu.wide.ad.jp/mawi/samplepoint-F/${data_year}/${data_year}${data_month}${data_day}1400.pcap.gz" + fi + gzip -d "${data_year}${data_month}${data_day}1400.pcap.gz" + + # 10000 packets from each day + python3 pcap_processor.py \ + --pcap_file "${data_year}${data_month}${data_day}1400.pcap" \ + --out_file csv_files/${data_day}.csv \ + --sample \ + --stream_size 10000 + + rm "${data_year}${data_month}${data_day}1400.pcap" +done + +# merge all CSV together +awk '(NR == 1) || (FNR > 1)' csv_files/*.csv > csv_files/merged.csv diff --git a/preprocessing/pcap_processor.py b/preprocessing/pcap_processor.py index bcda443..8662f2d 100644 --- a/preprocessing/pcap_processor.py +++ b/preprocessing/pcap_processor.py @@ -7,22 +7,9 @@ from scapy.utils import PcapReader from scapy.layers.inet import IP, TCP, UDP from kafka import KafkaProducer -import json dbg_print = lambda *x: DEBUG and print(f"[DEBUG] {x}") -# Kafka Configuration -KAFKA_TOPIC = 'pcap_stream' -KAFKA_SERVER = 'localhost:9092' # Adjust to your Kafka server -#KAFKA_SERVER = 'kafka_service:9092' - -# Initialize Kafka Producer -producer = KafkaProducer( - bootstrap_servers=KAFKA_SERVER, - #value_serializer=lambda v: json.dumps(v).encode('utf-8') # Encode data as JSON - value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else str(v).encode('utf-8') #remove intermediate JSON encoding -) - def pkt_filter(pkt: Packet) -> bool: """filter to include/exclude a packet""" @@ -87,11 +74,12 @@ def create_pkt_object(pkt: Packet) -> dict: "dst_addr": pkt[IP].dst, "src_port": pkt[l4_proto].sport, "dst_port": pkt[l4_proto].dport, - "pkt_len": len(pkt) + "pkt_len": len(pkt), } return res_json + def prep_csv(out_file: str): with open(out_file, "w", newline="") as csvfile: writer = csv.writer(csvfile) @@ -120,7 +108,9 @@ if __name__ == "__main__": argp = ArgumentParser() argp.add_argument("-f", "--pcap_file", required=True, dest="_pcap") argp.add_argument("-o", "--out_file", required=False, dest="_out") - argp.add_argument("--stream_size", required=False, default=10000, dest="_streamsize") + argp.add_argument( + "--stream_size", required=False, default=100000, dest="_streamsize" + ) argp.add_argument( "-x", "--sample", @@ -151,12 +141,11 @@ if __name__ == "__main__": out_file = args._out streaming = args._stream sample = args._sample - samplesize = int(args._streamsize) DEBUG = args._debug - sample_size = samplesize #1000000 - batch_size = 100 #100000 + sample_size = int(args._streamsize) # 100000 + batch_size = 10000 # 10000 pcap_rdr = PcapReader(pcap_file) if not streaming: @@ -180,13 +169,26 @@ if __name__ == "__main__": pkts_write_csv(pkts, out_file) pkts = [] else: - # direct streaming to kafka goes here + # Kafka Configuration + KAFKA_TOPIC = "pcap_stream" + KAFKA_SERVER = "localhost:9092" # Adjust to your Kafka server + # KAFKA_SERVER = 'kafka_service:9092' + + # Initialize Kafka Producer + producer = KafkaProducer( + bootstrap_servers=KAFKA_SERVER, + # value_serializer=lambda v: json.dumps(v).encode('utf-8') # Encode data as JSON + value_serializer=lambda v: ( + v.encode("utf-8") if isinstance(v, str) else str(v).encode("utf-8") + ), # remove intermediate JSON encoding + ) + packet_data = create_pkt_object(pkt) producer.send(KAFKA_TOPIC, packet_data) print(f"streamed packet at index {idx} ") - if idx > sample_size: break - + if idx > sample_size: + break + # flush remaining if not streaming and len(pkts) > 0: pkts_write_csv(pkts, out_file) -