diff --git a/preprocessing/docker-compose.yml b/preprocessing/docker-compose.yml index 0973767..98bf52c 100644 --- a/preprocessing/docker-compose.yml +++ b/preprocessing/docker-compose.yml @@ -57,7 +57,7 @@ services: - data-streamer volumes: - "../preprocessing/10k_sample_2023_10_01-2023_10_31.csv:/data/csv/main.csv:ro" - command: "sh -c 'sleep 30 && python /app/pcap_processor.py -c /data/csv/main.csv -x --stream_size 100000'" + command: "sh -c 'sleep 30 && python /app/pcap_processor.py -c /data/csv/main.csv -x --stream_size 100000 -l 0.1'" deploy: replicas: 1 # placement: diff --git a/preprocessing/pcap_processor.py b/preprocessing/pcap_processor.py index 11a15fd..fc31e9f 100644 --- a/preprocessing/pcap_processor.py +++ b/preprocessing/pcap_processor.py @@ -1,6 +1,7 @@ from argparse import ArgumentParser from datetime import datetime, timezone import csv +import time from scapy.packet import Packet from scapy.utils import PcapReader @@ -173,6 +174,13 @@ if __name__ == "__main__": dest="_stream", action="store_true", ) + argp.add_argument( + "-l", + "--delay", + required=False, + default=0, + dest="_delay" + ) argp.add_argument( "-d", "--debug", @@ -187,6 +195,7 @@ if __name__ == "__main__": csv_file = args._csv out_file = args._out streaming = args._stream + batch_delay = float(args._delay) sample = args._sample DEBUG = args._debug @@ -207,6 +216,8 @@ if __name__ == "__main__": producer.client.send(KAFKA_TOPIC, row_to_dict(row)) dbg_print(row_to_dict(row)) dbg_print("streamed packet", idx) + if idx > 0 and idx % batch_size == 0: + time.sleep(batch_delay) if sample and idx > sample_size: break print(f"total streamed: {idx}")