From b1fc1dbc49599536b71395fc9117ae6a3773643f Mon Sep 17 00:00:00 2001 From: Akash Sivakumar Date: Tue, 26 Nov 2024 21:41:17 -0700 Subject: [PATCH] Fixed integration issue with csv streaming --- clickhouse/README.md | 2 +- .../clickhouse_data/data/preprocessed_configs/config.xml | 2 +- clickhouse/docker-compose.yml | 9 ++++----- preprocessing/README.md | 1 + preprocessing/pcap_processor.py | 7 ++++--- 5 files changed, 11 insertions(+), 10 deletions(-) diff --git a/clickhouse/README.md b/clickhouse/README.md index 16831ea..679c830 100644 --- a/clickhouse/README.md +++ b/clickhouse/README.md @@ -74,7 +74,7 @@ docker exec -it clickhouse-kafka-1 kafka-console-consumer --bootstrap-server kaf Get into the ClickHouse client: ```bash -docker exec -it clickhouse-client clickhouse-client +docker exec -it clickhouse-client ``` Check if tables are available: ```bash diff --git a/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml b/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml index d34f75c..08de94b 100644 --- a/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml +++ b/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml @@ -21,7 +21,7 @@ 9181 - 3 + 2 /var/lib/clickhouse/coordination/log /var/lib/clickhouse/coordination/snapshots diff --git a/clickhouse/docker-compose.yml b/clickhouse/docker-compose.yml index 1463068..d38b576 100644 --- a/clickhouse/docker-compose.yml +++ b/clickhouse/docker-compose.yml @@ -125,7 +125,7 @@ services: condition: on-failure pcap_streamer: - image: levenshtein/streamer_test4:latest + image: levenshtein/streamer_test7:latest depends_on: - kafka networks: @@ -133,10 +133,9 @@ services: aliases: - pcap_streamer volumes: - - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap" - environment: - PCAP_FILE: /data/pcap/202310081400.pcap - command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"] + #- "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap" + - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project/project_github/real-time-traffic-analysis-clickhouse/preprocessing:/data/pcap" + command: ["sh", "-c", "sleep 60 && python /app/pcap_processor.py -c /data/pcap/sample_output.csv -s --stream_size 1000"] deploy: replicas: 1 restart_policy: diff --git a/preprocessing/README.md b/preprocessing/README.md index 069efee..8afb3aa 100644 --- a/preprocessing/README.md +++ b/preprocessing/README.md @@ -49,3 +49,4 @@ python pcap_processor.py -f C:/Users/akash/storage/Asu/sem3/dds/project/202310081400.pcap -s --stream_size 1000 +python pcap_procesor.py -c sample_output.csv -s --stream_size 1000 diff --git a/preprocessing/pcap_processor.py b/preprocessing/pcap_processor.py index f5f7552..35826e3 100644 --- a/preprocessing/pcap_processor.py +++ b/preprocessing/pcap_processor.py @@ -19,7 +19,7 @@ class KafkaClient: self.topic_name = topic_name if mode == 'producer': self.client = KafkaProducer( - bootstrap_servers=['localhost:9092'], + bootstrap_servers=['kafka:9092'], max_request_size = 200000000, #api_version=(0,11,5), value_serializer=lambda x: json.dumps(x).encode('utf-8')) @@ -198,6 +198,7 @@ if __name__ == "__main__": # if preprocessed data ready for streaming if csv_file: + #print("true") with open(csv_file, newline="") as f: csv_rdr = csv.reader(f) next(csv_rdr) # skip headers @@ -207,10 +208,10 @@ if __name__ == "__main__": # direct streaming to kafka goes here producer.client.send(KAFKA_TOPIC, row_to_dict(row)) dbg_print(row_to_dict(row)) - dbg_print("streamed packet", idx) + print("streamed packet", idx) if idx > sample_size: break - dbg_print(f"total streamed: {idx}") + print(f"total streamed: {idx}") # otherwise, process packets else: