mirror of
https://github.com/20kaushik02/real-time-traffic-analysis-clickhouse.git
synced 2025-12-06 08:04:06 +00:00
Fixed integration issue with csv streaming
This commit is contained in:
parent
5d20e14dbf
commit
b1fc1dbc49
@ -74,7 +74,7 @@ docker exec -it clickhouse-kafka-1 kafka-console-consumer --bootstrap-server kaf
|
||||
|
||||
Get into the ClickHouse client:
|
||||
```bash
|
||||
docker exec -it clickhouse-client clickhouse-client
|
||||
docker exec -it <server1's container ID from docker ps> clickhouse-client
|
||||
```
|
||||
Check if tables are available:
|
||||
```bash
|
||||
|
||||
@ -21,7 +21,7 @@
|
||||
|
||||
<keeper_server>
|
||||
<tcp_port>9181</tcp_port>
|
||||
<server_id>3</server_id>
|
||||
<server_id>2</server_id>
|
||||
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
|
||||
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
|
||||
<coordination_settings>
|
||||
|
||||
@ -125,7 +125,7 @@ services:
|
||||
condition: on-failure
|
||||
|
||||
pcap_streamer:
|
||||
image: levenshtein/streamer_test4:latest
|
||||
image: levenshtein/streamer_test7:latest
|
||||
depends_on:
|
||||
- kafka
|
||||
networks:
|
||||
@ -133,10 +133,9 @@ services:
|
||||
aliases:
|
||||
- pcap_streamer
|
||||
volumes:
|
||||
- "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap"
|
||||
environment:
|
||||
PCAP_FILE: /data/pcap/202310081400.pcap
|
||||
command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"]
|
||||
#- "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap"
|
||||
- "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project/project_github/real-time-traffic-analysis-clickhouse/preprocessing:/data/pcap"
|
||||
command: ["sh", "-c", "sleep 60 && python /app/pcap_processor.py -c /data/pcap/sample_output.csv -s --stream_size 1000"]
|
||||
deploy:
|
||||
replicas: 1
|
||||
restart_policy:
|
||||
|
||||
@ -49,3 +49,4 @@
|
||||
|
||||
|
||||
python pcap_processor.py -f C:/Users/akash/storage/Asu/sem3/dds/project/202310081400.pcap -s --stream_size 1000
|
||||
python pcap_procesor.py -c sample_output.csv -s --stream_size 1000
|
||||
|
||||
@ -19,7 +19,7 @@ class KafkaClient:
|
||||
self.topic_name = topic_name
|
||||
if mode == 'producer':
|
||||
self.client = KafkaProducer(
|
||||
bootstrap_servers=['localhost:9092'],
|
||||
bootstrap_servers=['kafka:9092'],
|
||||
max_request_size = 200000000,
|
||||
#api_version=(0,11,5),
|
||||
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
|
||||
@ -198,6 +198,7 @@ if __name__ == "__main__":
|
||||
|
||||
# if preprocessed data ready for streaming
|
||||
if csv_file:
|
||||
#print("true")
|
||||
with open(csv_file, newline="") as f:
|
||||
csv_rdr = csv.reader(f)
|
||||
next(csv_rdr) # skip headers
|
||||
@ -207,10 +208,10 @@ if __name__ == "__main__":
|
||||
# direct streaming to kafka goes here
|
||||
producer.client.send(KAFKA_TOPIC, row_to_dict(row))
|
||||
dbg_print(row_to_dict(row))
|
||||
dbg_print("streamed packet", idx)
|
||||
print("streamed packet", idx)
|
||||
if idx > sample_size:
|
||||
break
|
||||
dbg_print(f"total streamed: {idx}")
|
||||
print(f"total streamed: {idx}")
|
||||
|
||||
# otherwise, process packets
|
||||
else:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user