added option for streaming from csv

This commit is contained in:
Kaushik Narayan R 2024-11-25 22:33:32 -07:00
parent 3710362d2a
commit 5d20e14dbf
2 changed files with 73 additions and 35 deletions

View File

@ -50,10 +50,13 @@ services:
aliases: aliases:
- pcap_streamer - pcap_streamer
volumes: volumes:
- "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap" # - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap"
- "./:/data/pcap"
- "./:/data/csv"
environment: environment:
PCAP_FILE: /data/pcap/202310081400.pcap PCAP_FILE: /data/pcap/202310081400.pcap
command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"] # command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"]
command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -c /data/csv/sample_output.csv -s --stream_size 1000"]
deploy: deploy:
replicas: 1 replicas: 1
restart_policy: restart_policy:

View File

@ -113,6 +113,21 @@ def create_pkt_object(pkt: Packet) -> dict:
return res_json return res_json
def row_to_dict(pkt_row: list) -> dict:
"""make dict from CSV row"""
return {
"time": float(pkt_row[0]),
"l4_proto": pkt_row[1],
"src_addr": pkt_row[2],
"dst_addr": pkt_row[3],
"src_port": int(pkt_row[4]),
"dst_port": int(pkt_row[5]),
"pkt_len": int(pkt_row[6]),
}
def prep_csv(out_file: str): def prep_csv(out_file: str):
with open(out_file, "w", newline="") as csvfile: with open(out_file, "w", newline="") as csvfile:
writer = csv.writer(csvfile) writer = csv.writer(csvfile)
@ -139,7 +154,8 @@ def pkts_write_csv(pkts: list, out_file: str):
if __name__ == "__main__": if __name__ == "__main__":
argp = ArgumentParser() argp = ArgumentParser()
argp.add_argument("-f", "--pcap_file", required=True, dest="_pcap") argp.add_argument("-f", "--pcap_file", required=False, dest="_pcap")
argp.add_argument("-c", "--csv_file", required=False, dest="_csv")
argp.add_argument("-o", "--out_file", required=False, dest="_out") argp.add_argument("-o", "--out_file", required=False, dest="_out")
argp.add_argument("--stream_size", required=False, default=10000, dest="_streamsize") argp.add_argument("--stream_size", required=False, default=10000, dest="_streamsize")
argp.add_argument( argp.add_argument(
@ -169,6 +185,7 @@ if __name__ == "__main__":
args = argp.parse_args() args = argp.parse_args()
pcap_file = args._pcap pcap_file = args._pcap
csv_file = args._csv
out_file = args._out out_file = args._out
streaming = args._stream streaming = args._stream
sample = args._sample sample = args._sample
@ -179,41 +196,59 @@ if __name__ == "__main__":
sample_size = samplesize #1000000 sample_size = samplesize #1000000
batch_size = 100 #100000 batch_size = 100 #100000
pcap_rdr = PcapReader(pcap_file) # if preprocessed data ready for streaming
if not streaming: if csv_file:
assert args._out and args._out != "" with open(csv_file, newline="") as f:
prep_csv(out_file) csv_rdr = csv.reader(f)
next(csv_rdr) # skip headers
pkts = []
pkts = [] for idx, row in enumerate(csv_rdr):
cnt = 0 # direct streaming to kafka goes here
seen_count = 0 producer.client.send(KAFKA_TOPIC, row_to_dict(row))
for idx, pkt in enumerate(pcap_rdr): dbg_print(row_to_dict(row))
seen_count += 1 dbg_print("streamed packet", idx)
# filter packets if idx > sample_size:
if not pkt_filter(pkt): break
continue dbg_print(f"total streamed: {idx}")
# otherwise, process packets
else:
pcap_rdr = PcapReader(pcap_file)
if not streaming: if not streaming:
# write to file assert args._out and args._out != ""
pkts.append(pkt_extract(pkt)) prep_csv(out_file)
if sample and idx > sample_size: pkts = []
break cnt = 0
seen_count = 0
for idx, pkt in enumerate(pcap_rdr):
seen_count += 1
# filter packets
if not pkt_filter(pkt):
continue
if idx > 0 and idx % batch_size == 0: if not streaming:
pkts_write_csv(pkts, out_file) # write to file
pkts = [] pkts.append(pkt_extract(pkt))
else:
# direct streaming to kafka goes here
packet_data = create_pkt_object(pkt)
producer.client.send(KAFKA_TOPIC, packet_data)
cnt += 1
#print(f"streamed packet at index {idx} ")
if idx > sample_size: break
print(f"total seen: {seen_count-1}") if sample and idx > sample_size:
print(f"total streamed: {cnt}") break
# flush remaining
if not streaming and len(pkts) > 0:
pkts_write_csv(pkts, out_file)
if idx > 0 and idx % batch_size == 0:
pkts_write_csv(pkts, out_file)
pkts = []
else:
# direct streaming to kafka goes here
packet_data = create_pkt_object(pkt)
producer.client.send(KAFKA_TOPIC, packet_data)
cnt += 1
# print(f"streamed packet at index {idx} ")
if idx > sample_size:
break
print(f"total seen: {seen_count-1}")
print(f"total streamed: {cnt}")
# flush remaining
if not streaming and len(pkts) > 0:
pkts_write_csv(pkts, out_file)