mirror of
https://github.com/20kaushik02/real-time-traffic-analysis-clickhouse.git
synced 2025-12-06 08:04:06 +00:00
stream slow option
This commit is contained in:
parent
030659c3e1
commit
fe7f2e570d
@ -57,7 +57,7 @@ services:
|
|||||||
- data-streamer
|
- data-streamer
|
||||||
volumes:
|
volumes:
|
||||||
- "../preprocessing/10k_sample_2023_10_01-2023_10_31.csv:/data/csv/main.csv:ro"
|
- "../preprocessing/10k_sample_2023_10_01-2023_10_31.csv:/data/csv/main.csv:ro"
|
||||||
command: "sh -c 'sleep 30 && python /app/pcap_processor.py -c /data/csv/main.csv -x --stream_size 100000'"
|
command: "sh -c 'sleep 30 && python /app/pcap_processor.py -c /data/csv/main.csv -x --stream_size 100000 -l 0.1'"
|
||||||
deploy:
|
deploy:
|
||||||
replicas: 1
|
replicas: 1
|
||||||
# placement:
|
# placement:
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
from argparse import ArgumentParser
|
from argparse import ArgumentParser
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
import csv
|
import csv
|
||||||
|
import time
|
||||||
|
|
||||||
from scapy.packet import Packet
|
from scapy.packet import Packet
|
||||||
from scapy.utils import PcapReader
|
from scapy.utils import PcapReader
|
||||||
@ -173,6 +174,13 @@ if __name__ == "__main__":
|
|||||||
dest="_stream",
|
dest="_stream",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
)
|
)
|
||||||
|
argp.add_argument(
|
||||||
|
"-l",
|
||||||
|
"--delay",
|
||||||
|
required=False,
|
||||||
|
default=0,
|
||||||
|
dest="_delay"
|
||||||
|
)
|
||||||
argp.add_argument(
|
argp.add_argument(
|
||||||
"-d",
|
"-d",
|
||||||
"--debug",
|
"--debug",
|
||||||
@ -187,6 +195,7 @@ if __name__ == "__main__":
|
|||||||
csv_file = args._csv
|
csv_file = args._csv
|
||||||
out_file = args._out
|
out_file = args._out
|
||||||
streaming = args._stream
|
streaming = args._stream
|
||||||
|
batch_delay = float(args._delay)
|
||||||
sample = args._sample
|
sample = args._sample
|
||||||
|
|
||||||
DEBUG = args._debug
|
DEBUG = args._debug
|
||||||
@ -207,6 +216,8 @@ if __name__ == "__main__":
|
|||||||
producer.client.send(KAFKA_TOPIC, row_to_dict(row))
|
producer.client.send(KAFKA_TOPIC, row_to_dict(row))
|
||||||
dbg_print(row_to_dict(row))
|
dbg_print(row_to_dict(row))
|
||||||
dbg_print("streamed packet", idx)
|
dbg_print("streamed packet", idx)
|
||||||
|
if idx > 0 and idx % batch_size == 0:
|
||||||
|
time.sleep(batch_delay)
|
||||||
if sample and idx > sample_size:
|
if sample and idx > sample_size:
|
||||||
break
|
break
|
||||||
print(f"total streamed: {idx}")
|
print(f"total streamed: {idx}")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user