From e65cf00d34cc721347e271f9d85bc1675c8a63e9 Mon Sep 17 00:00:00 2001 From: Akash Sivakumar Date: Wed, 20 Nov 2024 20:10:15 -0700 Subject: [PATCH 01/15] branch test commit --- preprocessing/pcap_processor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/preprocessing/pcap_processor.py b/preprocessing/pcap_processor.py index 09c556a..99aaa09 100644 --- a/preprocessing/pcap_processor.py +++ b/preprocessing/pcap_processor.py @@ -11,6 +11,8 @@ import json dbg_print = lambda *x: DEBUG and print(f"[DEBUG] {x}") + + class KafkaClient: def __init__(self, topic_name=None, mode='producer'): self.mode = mode From 4d9526ae60ce276841533ba796136ada178e8620 Mon Sep 17 00:00:00 2001 From: Lalit Arvind Date: Fri, 22 Nov 2024 17:32:57 -0700 Subject: [PATCH 02/15] added weight tag to remote servers config files --- .../config_update_scripts/update_compose.py | 1 + .../jinja-templates/remote-servers.xml.jinja | 18 ------------------ clickhouse/node1-config/remote-servers.xml | 1 + clickhouse/node2-config/remote-servers.xml | 1 + 4 files changed, 3 insertions(+), 18 deletions(-) delete mode 100644 clickhouse/jinja-templates/remote-servers.xml.jinja diff --git a/clickhouse/config_update_scripts/update_compose.py b/clickhouse/config_update_scripts/update_compose.py index 9f29a20..74d85d1 100644 --- a/clickhouse/config_update_scripts/update_compose.py +++ b/clickhouse/config_update_scripts/update_compose.py @@ -31,6 +31,7 @@ if __name__ == "__main__": # new shard template that is gonna be added to remote servers file of each node new_shard_str = f''' + {curr_num_shards+1} true clickhouse-server{curr_num_servers+1} diff --git a/clickhouse/jinja-templates/remote-servers.xml.jinja b/clickhouse/jinja-templates/remote-servers.xml.jinja deleted file mode 100644 index a6a9edd..0000000 --- a/clickhouse/jinja-templates/remote-servers.xml.jinja +++ /dev/null @@ -1,18 +0,0 @@ - - - - mysecretphrase - - true - - clickhouse-server1 - 9000 - - - clickhouse-server2 - 9000 - - - - - \ No newline at end of file diff --git a/clickhouse/node1-config/remote-servers.xml b/clickhouse/node1-config/remote-servers.xml index a6a9edd..e708fb7 100644 --- a/clickhouse/node1-config/remote-servers.xml +++ b/clickhouse/node1-config/remote-servers.xml @@ -3,6 +3,7 @@ mysecretphrase + 1 true clickhouse-server1 diff --git a/clickhouse/node2-config/remote-servers.xml b/clickhouse/node2-config/remote-servers.xml index a6a9edd..e708fb7 100644 --- a/clickhouse/node2-config/remote-servers.xml +++ b/clickhouse/node2-config/remote-servers.xml @@ -3,6 +3,7 @@ mysecretphrase + 1 true clickhouse-server1 From 5b1add9241f2873afd74927d86294cd23411d033 Mon Sep 17 00:00:00 2001 From: Akash Sivakumar Date: Fri, 22 Nov 2024 20:26:57 -0700 Subject: [PATCH 03/15] integration working in akash's system --- .../data/preprocessed_configs/config.xml | 61 +++++++ ...r-compose.yaml => docker-compose-old.yaml} | 0 clickhouse/docker-compose.yml | 156 ++++++++++++++++++ preprocessing/pcap_consumer_test.py | 25 +++ preprocessing/pcap_processor.py | 4 +- 5 files changed, 244 insertions(+), 2 deletions(-) create mode 100644 clickhouse/clickhouse_data/data/preprocessed_configs/config.xml rename clickhouse/{docker-compose.yaml => docker-compose-old.yaml} (100%) create mode 100644 clickhouse/docker-compose.yml create mode 100644 preprocessing/pcap_consumer_test.py diff --git a/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml b/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml new file mode 100644 index 0000000..d34f75c --- /dev/null +++ b/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml @@ -0,0 +1,61 @@ + + + + + trace + /var/log/clickhouse-keeper/clickhouse-keeper.log + /var/log/clickhouse-keeper/clickhouse-keeper.err.log + 1000M + 3 + + :: + + /var/lib/clickhouse/data/ + /var/lib/clickhouse/tmp/ + /var/lib/clickhouse/user_files/ + /var/lib/clickhouse/format_schemas/ + + + 9181 + 3 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + 10000 + 30000 + trace + + + + 1 + clickhouse-keeper1 + 9234 + + + 2 + clickhouse-keeper2 + 9234 + + + 3 + clickhouse-keeper3 + 9234 + + + + + + + 0.0.0.0 + 1 + + + diff --git a/clickhouse/docker-compose.yaml b/clickhouse/docker-compose-old.yaml similarity index 100% rename from clickhouse/docker-compose.yaml rename to clickhouse/docker-compose-old.yaml diff --git a/clickhouse/docker-compose.yml b/clickhouse/docker-compose.yml new file mode 100644 index 0000000..1463068 --- /dev/null +++ b/clickhouse/docker-compose.yml @@ -0,0 +1,156 @@ +services: + clickhouse-keeper1: + image: clickhouse/clickhouse-server:latest + container_name: clickhouse-keeper1 + command: > + /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml + volumes: + - ./clickhouse_keeper/keeper1-config.xml:/etc/clickhouse-server/config.xml + - ./clickhouse_data/data:/var/lib/clickhouse/data + - ./clickhouse_data/tmp:/var/lib/clickhouse/tmp + - ./clickhouse_data/user_files:/var/lib/clickhouse/user_files + - ./clickhouse_data/format_schemas:/var/lib/clickhouse/format_schemas + networks: + common-network: + aliases: + - clickhouse-keeper1 + + clickhouse-keeper2: + image: clickhouse/clickhouse-server:latest + container_name: clickhouse-keeper2 + command: > + /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml + volumes: + - ./clickhouse_keeper/keeper2-config.xml:/etc/clickhouse-server/config.xml + - ./clickhouse_data/data:/var/lib/clickhouse/data + - ./clickhouse_data/tmp:/var/lib/clickhouse/tmp + - ./clickhouse_data/user_files:/var/lib/clickhouse/user_files + - ./clickhouse_data/format_schemas:/var/lib/clickhouse/format_schemas + networks: + common-network: + aliases: + - clickhouse-keeper2 + + clickhouse-keeper3: + image: clickhouse/clickhouse-server:latest + container_name: clickhouse-keeper3 + command: > + /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml + volumes: + - ./clickhouse_keeper/keeper3-config.xml:/etc/clickhouse-server/config.xml + - ./clickhouse_data/data:/var/lib/clickhouse/data + - ./clickhouse_data/tmp:/var/lib/clickhouse/tmp + - ./clickhouse_data/user_files:/var/lib/clickhouse/user_files + - ./clickhouse_data/format_schemas:/var/lib/clickhouse/format_schemas + networks: + common-network: + aliases: + - clickhouse-keeper3 + + clickhouse-server1: + image: clickhouse/clickhouse-server:latest + container_name: clickhouse-server1 + volumes: + - ./node1-config/:/etc/clickhouse-server/config.d/ + - clickhouse_data1:/var/lib/clickhouse + networks: + common-network: + aliases: + - clickhouse-server1 + depends_on: + - clickhouse-keeper1 + - clickhouse-keeper2 + - clickhouse-keeper3 + ports: + - "9001:9000" # Native client port + - "8123:8123" # HTTP interface + + clickhouse-server2: + image: clickhouse/clickhouse-server:latest + container_name: clickhouse-server2 + volumes: + - ./node2-config/:/etc/clickhouse-server/config.d/ + - clickhouse_data2:/var/lib/clickhouse + networks: + common-network: + aliases: + - clickhouse-server2 + depends_on: + - clickhouse-keeper1 + - clickhouse-keeper2 + - clickhouse-keeper3 + ports: + - "9002:9000" # Native client port + - "8125:8123" # HTTP interface + + zookeeper: + image: confluentinc/cp-zookeeper:latest + networks: + common-network: + aliases: + - zookeeper + deploy: + replicas: 1 + restart_policy: + condition: on-failure + environment: + ZOOKEEPER_CLIENT_PORT: 2182 + ports: + - "2182:2181" + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2182 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT + KAFKA_BROKER_ID: 1 + KAFKA_MESSAGE_MAX_BYTES: 200000000 + KAFKA_REPLICA_FETCH_MAX_BYTES: 200000000 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + networks: + common-network: + aliases: + - kafka + ports: + - "9092:9092" + volumes: + - kafka_data_new:/var/lib/kafka/data + deploy: + replicas: 1 + restart_policy: + condition: on-failure + + pcap_streamer: + image: levenshtein/streamer_test4:latest + depends_on: + - kafka + networks: + common-network: + aliases: + - pcap_streamer + volumes: + - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap" + environment: + PCAP_FILE: /data/pcap/202310081400.pcap + command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"] + deploy: + replicas: 1 + restart_policy: + condition: on-failure + +networks: + common-network: + driver: overlay + attachable: true + +volumes: + clickhouse_data1: + driver: local + clickhouse_data2: + driver: local + kafka_data_new: + driver: local diff --git a/preprocessing/pcap_consumer_test.py b/preprocessing/pcap_consumer_test.py new file mode 100644 index 0000000..d2312df --- /dev/null +++ b/preprocessing/pcap_consumer_test.py @@ -0,0 +1,25 @@ +from kafka import KafkaConsumer +import json + +KAFKA_TOPIC = 'pcap_stream_new' # Use the topic name from your producer +KAFKA_SERVER = 'localhost:9092' # Ensure this matches your Kafka server configuration + +# Create a Kafka consumer +consumer = KafkaConsumer( + KAFKA_TOPIC, + bootstrap_servers=[KAFKA_SERVER], + value_deserializer=lambda x: x.decode('utf-8'),#json.loads(x.decode('utf-8')), + auto_offset_reset='earliest', # Ensures it starts reading from the beginning + enable_auto_commit=True +) + + + +print("Consuming messages from topic:", KAFKA_TOPIC) + +# Consume and print messages + +for message in consumer: + print(type(message)) + + diff --git a/preprocessing/pcap_processor.py b/preprocessing/pcap_processor.py index 99aaa09..b963875 100644 --- a/preprocessing/pcap_processor.py +++ b/preprocessing/pcap_processor.py @@ -21,7 +21,7 @@ class KafkaClient: self.client = KafkaProducer( bootstrap_servers=['kafka:9092'], max_request_size = 200000000, - api_version=(0,11,5), + #api_version=(0,11,5), value_serializer=lambda x: json.dumps(x).encode('utf-8')) elif mode == 'consumer' and topic_name is not None: self.client = KafkaConsumer( @@ -33,7 +33,7 @@ class KafkaClient: raise ValueError("Consumer mode requires a topic_name") # Kafka Configuration -KAFKA_TOPIC = 'pcap_stream' +KAFKA_TOPIC = 'pcap_stream_new' KAFKA_SERVER = 'kafka:9092' # Adjust to your Kafka server #KAFKA_SERVER = 'kafka_service:9092' From 98601a1078d597d3f0484b5d52a787af7942088d Mon Sep 17 00:00:00 2001 From: Akash Sivakumar <73591598+Akash-0818@users.noreply.github.com> Date: Fri, 22 Nov 2024 20:48:29 -0700 Subject: [PATCH 04/15] Update README --- clickhouse/README | 152 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 140 insertions(+), 12 deletions(-) diff --git a/clickhouse/README b/clickhouse/README index af20090..16831ea 100644 --- a/clickhouse/README +++ b/clickhouse/README @@ -1,13 +1,141 @@ -Execution steps: -ensure to pull clickhouse docker image -> docker-compose up -d -identify the custom networks created. -> docker network ls -inspect the keeper network and verify whether all the containers are connected to it -> docker network inspect {{dds_proj_clickhouse-keeper-network}} -if all the containers are not connected -> docker-compose restart -To execute queries -> docker exec -it clickhouse-server1 clickhouse-client -> docker exec -it clickhouse-server2 clickhouse-client \ No newline at end of file +# Kafka - Clickhouse Integration Testing + +## Execution Steps + +Ensure to pull the ClickHouse Docker image: +```bash +docker-compose up -d +``` +Identify the custom networks created: +```bash +docker network ls +``` +Inspect the keeper network and verify whether all the containers are connected to it: +```bash +docker network inspect dds_proj_clickhouse-keeper-network +``` +If all the containers are not connected: +```bash +docker-compose restart +``` +To execute queries: +```bash +docker exec -it clickhouse-server1 clickhouse-client +``` +```bash +docker exec -it clickhouse-server2 clickhouse-client +``` + +## Building Kafka Image + +Navigate to `/preprocessing` from the repo main directory: +```bash +docker build -t :latest -f Dockerfile.python . +``` +Tag the image: +```bash +docker tag :latest /:latest +``` +Push the image to Docker Hub: +```bash +docker push /:latest +``` + +## Testing the Integration + +Changes to make in `docker-compose.yml`: +- `pcap_streamer => volumes`: `` +- `pcap_streamer => image`: `/:latest` + +Navigate to `/clickhouse` from the repo main directory: +```bash +docker stack deploy -c docker-compose.yml --detach=false +``` +Check running services: +```bash +docker service ls +``` +Check logs of the service: +```bash +docker service logs +``` + +Get the container ID for the ClickHouse client: +```bash +docker ps +``` +Check if the topic has all streamed data: +```bash +docker exec -it clickhouse-kafka-1 kafka-console-consumer --bootstrap-server kafka:9092 --topic pcap_stream_new --from-beginning +``` +- Change the topic name if applicable. +- The output should display all JSON packets. + +Get into the ClickHouse client: +```bash +docker exec -it clickhouse-client clickhouse-client +``` +Check if tables are available: +```bash +SHOW TABLES; +``` + +If tables are not available, create the following: + +Create a table for `packets_data`: +```sql +CREATE TABLE packets_data +( + time Float64, + l4_proto String, + src_addr String, + dst_addr String, + src_port UInt16, + dst_port UInt16, + pkt_len UInt32 +) ENGINE = MergeTree +ORDER BY time; +``` + +Create a table for `kafka_stream`: +```sql +CREATE TABLE kafka_stream +( + time Float64, + l4_proto String, + src_addr String, + dst_addr String, + src_port UInt16, + dst_port UInt16, + pkt_len UInt32 +) ENGINE = Kafka +SETTINGS kafka_broker_list = 'kafka:9092', + kafka_topic_list = 'pcap_stream_new', + kafka_group_name = 'clickhouse_consumer', + kafka_format = 'JSONEachRow', + kafka_num_consumers = 1; +``` + +Create a materialized view `kafka_to_packets`: +```sql +CREATE MATERIALIZED VIEW kafka_to_packets +TO packets_data AS +SELECT + time, + l4_proto, + src_addr, + dst_addr, + src_port, + dst_port, + pkt_len +FROM kafka_stream; +``` + +Check table contents to verify data availability: +```bash +SELECT * FROM packets_data LIMIT 10; +``` +```bash +SELECT COUNT(*) AS total_count_of_packets_streamed FROM packets_data; +``` From 30e90013d7db6f1cf68f02c05cf0d04592b7d406 Mon Sep 17 00:00:00 2001 From: Akash Sivakumar <73591598+Akash-0818@users.noreply.github.com> Date: Fri, 22 Nov 2024 20:49:07 -0700 Subject: [PATCH 05/15] Rename README to README.md --- clickhouse/{README => README.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename clickhouse/{README => README.md} (100%) diff --git a/clickhouse/README b/clickhouse/README.md similarity index 100% rename from clickhouse/README rename to clickhouse/README.md From 52fc788eeeb4a7d65cca99b264f6b9d09aa67e3b Mon Sep 17 00:00:00 2001 From: Lalit Arvind Date: Sat, 23 Nov 2024 15:50:38 -0700 Subject: [PATCH 06/15] Added resources, created a prototype trigger and its corresponding dockerfile for update configurations, Testing required --- clickhouse/config_update_scripts/Dockerfile | 9 +++++ .../config_update_scripts/requirements.txt | 8 +++++ .../config_update_scripts/update_compose.py | 9 ++--- .../config_update_scripts/update_trigger.py | 34 +++++++++++++++++++ clickhouse/docker-compose.yaml | 23 ++++++++----- 5 files changed, 69 insertions(+), 14 deletions(-) create mode 100644 clickhouse/config_update_scripts/Dockerfile create mode 100644 clickhouse/config_update_scripts/requirements.txt create mode 100644 clickhouse/config_update_scripts/update_trigger.py diff --git a/clickhouse/config_update_scripts/Dockerfile b/clickhouse/config_update_scripts/Dockerfile new file mode 100644 index 0000000..79e3995 --- /dev/null +++ b/clickhouse/config_update_scripts/Dockerfile @@ -0,0 +1,9 @@ +FROM python + +WORKDIR /update_scripts + +COPY . /update_scripts/ + +RUN pip install --no-cache-dir -r requirements.txt + +CMD ["python3","update_trigger.py"] \ No newline at end of file diff --git a/clickhouse/config_update_scripts/requirements.txt b/clickhouse/config_update_scripts/requirements.txt new file mode 100644 index 0000000..711648d --- /dev/null +++ b/clickhouse/config_update_scripts/requirements.txt @@ -0,0 +1,8 @@ +subprocess +json +jinja2 +yaml +re +xml +schedule +time \ No newline at end of file diff --git a/clickhouse/config_update_scripts/update_compose.py b/clickhouse/config_update_scripts/update_compose.py index 74d85d1..4786cbb 100644 --- a/clickhouse/config_update_scripts/update_compose.py +++ b/clickhouse/config_update_scripts/update_compose.py @@ -4,6 +4,7 @@ import subprocess import json import xml.etree.ElementTree as ET import os +import re if __name__ == "__main__": @@ -15,14 +16,10 @@ if __name__ == "__main__": all_services = [json.loads(s) for s in all_services] # extracting the name, removing the custom id from it and storing it in a list - all_service_names = [service['Names'].split('.')[0] for service in all_services] - # extracting only 'keeper1', 'server1'... + all_service_names = [service['Names'].split('.')[0] for service in all_services if re.findall(r'clickhouse-server',service['Names'])] + # extracting only 'server1','server2'... all_service_names = [ name.split('-')[-1] for name in all_service_names] - # removing all keeepers - all_service_names.remove('keeper1') - all_service_names.remove('keeper2') - all_service_names.remove('keeper3') curr_num_servers = sorted(all_service_names)[-1][-1] replication_factor = 2 diff --git a/clickhouse/config_update_scripts/update_trigger.py b/clickhouse/config_update_scripts/update_trigger.py new file mode 100644 index 0000000..fe154f5 --- /dev/null +++ b/clickhouse/config_update_scripts/update_trigger.py @@ -0,0 +1,34 @@ +import subprocess +import json +import re +import schedule +import time + +def check_util_exec(): + # extracting details of each running container in json format + try: + all_services = subprocess.check_output(["docker","stats","--no-stream","--format","json"],text=True).split('\n')[:-1] + except subprocess.CalledProcessError as e: + print(f"Command failed with return code {e.returncode}") + + all_services = [json.loads(s) for s in all_services] + + resource_util_exceed_flag = True # Flag to check if all of the containers have exceeded 80% memory utilization + for service in all_services: + if re.findall(r'clickhouse-server',service['Name']): + if float(service['MemPerc'][:-1]) < 80: + resource_util_exceed_flag = False + + if resource_util_exceed_flag: + process = subprocess.Popen(['python3','update_compose.py'],text=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) + stdout, stderr = process.communicate() # Wait for the process to finish and capture output + print("Standard Output:", stdout) + print("Standard Error:", stderr) + +if __name__ == "__main__": + schedule.every(30).seconds.do(check_util_exec) + while True: + schedule.run_pending() + time.sleep(1) + + \ No newline at end of file diff --git a/clickhouse/docker-compose.yaml b/clickhouse/docker-compose.yaml index 7ee9aea..ad37e56 100644 --- a/clickhouse/docker-compose.yaml +++ b/clickhouse/docker-compose.yaml @@ -1,4 +1,9 @@ services: + registry: + image: registry:2 + ports: + - "5000:5000" + clickhouse-keeper1: image: clickhouse/clickhouse-server:latest container_name: clickhouse-keeper1 @@ -69,10 +74,10 @@ services: # constraints: [node.labels.role == server] update_config: delay: 10s - # resources: - # limits: - # cpus: "0.50" - # memory: 100M + resources: + limits: + cpus: "0.50" + memory: 1200M depends_on: - clickhouse-keeper1 - clickhouse-keeper2 @@ -100,10 +105,10 @@ services: # constraints: [node.labels.role == server] update_config: delay: 10s - # resources: - # limits: - # cpus: "0.50" - # memory: 100M + resources: + limits: + cpus: "0.50" + memory: 1200M depends_on: - clickhouse-keeper1 - clickhouse-keeper2 @@ -115,8 +120,10 @@ services: networks: clickhouse-server-network: driver: overlay + attachable: true clickhouse-keeper-network: driver: overlay + attachable: true volumes: clickhouse_data1: From 745870bb9c1361d2a845eb1950d9b0dc3134290e Mon Sep 17 00:00:00 2001 From: Lalit Arvind Date: Sun, 24 Nov 2024 15:21:35 -0700 Subject: [PATCH 07/15] Added storage policy for TTL in each server's config files and jinja templates. Testing needed --- .../config_update_scripts/update_compose.py | 5 ++++ clickhouse/jinja-templates/config.xml.jinja | 1 + .../jinja-templates/storage-policy.xml.jinja | 25 +++++++++++++++++++ clickhouse/node1-config/config.xml | 1 + clickhouse/node1-config/storage-policy.xml | 25 +++++++++++++++++++ clickhouse/node2-config/config.xml | 1 + clickhouse/node2-config/storage-policy.xml | 25 +++++++++++++++++++ 7 files changed, 83 insertions(+) create mode 100644 clickhouse/jinja-templates/storage-policy.xml.jinja create mode 100644 clickhouse/node1-config/storage-policy.xml create mode 100644 clickhouse/node2-config/storage-policy.xml diff --git a/clickhouse/config_update_scripts/update_compose.py b/clickhouse/config_update_scripts/update_compose.py index 4786cbb..6ab8ac2 100644 --- a/clickhouse/config_update_scripts/update_compose.py +++ b/clickhouse/config_update_scripts/update_compose.py @@ -79,6 +79,7 @@ if __name__ == "__main__": config_template = env.get_template('config.xml.jinja') macros_template = env.get_template('macros.xml.jinja') use_keeper_template = env.get_template('use-keeper.xml.jinja') + storage_policy_template = env.get_template('storage-policy.xml.jinja') for i in range(1,3): config_content = config_template.render(node_num=curr_num_servers+i) @@ -93,3 +94,7 @@ if __name__ == "__main__": with open(f'../node{curr_num_servers + i}-config/use-keeper.xml','w') as f3: f3.write(use_keeper_content) + storage_policy_content = storage_policy_template.render(server_num=curr_num_servers+i) + with open(f'../node{curr_num_servers + i}-config/storage-policy.xml','w') as f4: + f4.write(storage_policy_content) + diff --git a/clickhouse/jinja-templates/config.xml.jinja b/clickhouse/jinja-templates/config.xml.jinja index cd07efb..4a0f2ba 100644 --- a/clickhouse/jinja-templates/config.xml.jinja +++ b/clickhouse/jinja-templates/config.xml.jinja @@ -20,5 +20,6 @@ /etc/clickhouse-server/config.d/macros.xml /etc/clickhouse-server/config.d/remote-servers.xml /etc/clickhouse-server/config.d/use-keeper.xml + /etc/clickhouse-server/config.d/storage-policy.xml \ No newline at end of file diff --git a/clickhouse/jinja-templates/storage-policy.xml.jinja b/clickhouse/jinja-templates/storage-policy.xml.jinja new file mode 100644 index 0000000..8f8653c --- /dev/null +++ b/clickhouse/jinja-templates/storage-policy.xml.jinja @@ -0,0 +1,25 @@ + + + + ../clickhouse_data{{server_num}}/hot + 300000000 + + + ../clickhouse_data{{server_num}}/cold + 500000000 + + + + + + + hot_disk + + + cold_disk + + + 0.2 + + + \ No newline at end of file diff --git a/clickhouse/node1-config/config.xml b/clickhouse/node1-config/config.xml index 4ced53d..261fe8d 100644 --- a/clickhouse/node1-config/config.xml +++ b/clickhouse/node1-config/config.xml @@ -20,5 +20,6 @@ /etc/clickhouse-server/config.d/macros.xml /etc/clickhouse-server/config.d/remote-servers.xml /etc/clickhouse-server/config.d/use-keeper.xml + /etc/clickhouse-server/config.d/storage-policy.xml \ No newline at end of file diff --git a/clickhouse/node1-config/storage-policy.xml b/clickhouse/node1-config/storage-policy.xml new file mode 100644 index 0000000..067390b --- /dev/null +++ b/clickhouse/node1-config/storage-policy.xml @@ -0,0 +1,25 @@ + + + + ../clickhouse_data1/hot + 300000000 + + + ../clickhouse_data1/cold + 500000000 + + + + + + + hot_disk + + + cold_disk + + + 0.2 + + + \ No newline at end of file diff --git a/clickhouse/node2-config/config.xml b/clickhouse/node2-config/config.xml index 68d5b06..f2928c0 100644 --- a/clickhouse/node2-config/config.xml +++ b/clickhouse/node2-config/config.xml @@ -20,5 +20,6 @@ /etc/clickhouse-server/config.d/macros.xml /etc/clickhouse-server/config.d/remote-servers.xml /etc/clickhouse-server/config.d/use-keeper.xml + /etc/clickhouse-server/config.d/storage-policy.xml \ No newline at end of file diff --git a/clickhouse/node2-config/storage-policy.xml b/clickhouse/node2-config/storage-policy.xml new file mode 100644 index 0000000..ceda009 --- /dev/null +++ b/clickhouse/node2-config/storage-policy.xml @@ -0,0 +1,25 @@ + + + + ../clickhouse_data2/hot + 300000000 + + + ../clickhouse_data2/cold + 500000000 + + + + + + + hot_disk + + + cold_disk + + + 0.2 + + + \ No newline at end of file From 34b9ae609e160231ccbbabf6cffbae3b3aa9ac0e Mon Sep 17 00:00:00 2001 From: Lalit Arvind Date: Sun, 24 Nov 2024 17:14:56 -0700 Subject: [PATCH 08/15] table creation queries --- clickhouse/Queries.sql | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 clickhouse/Queries.sql diff --git a/clickhouse/Queries.sql b/clickhouse/Queries.sql new file mode 100644 index 0000000..111c82e --- /dev/null +++ b/clickhouse/Queries.sql @@ -0,0 +1,22 @@ +-- TABLE CREATION +CREATE TABLE traffic_records ON cluster_1S_2R +( + time_stamp DateTime64(3, 'Japan') CODEC(Delta, ZSTD), + protocol Enum('TCP' = 1, 'UDP' = 2), + from_IP IPv4, + to_IP IPv4, + port UInt16 CODEC(ZSTD), + INDEX port_idx port TYPE bloom_filter GRANULARITY 10 +) ENGINE = ReplicatedMergeTree( '/clickhouse/tables/{shard}/traffic_records', '{replica}') +ORDER BY time_stamp +SETTINGS storage_policy = 'hot_cold' +TTL time_stamp + INTERVAL 5 DAY TO VOLUME 'cold_disk'; + +CREATE TABLE ip_region_map ON cluster_1S_2R +( + ip_range_start IPv4, + ip_range_end IPv4, + region String, + INDEX region_ind region TYPE bloom_filter +) ENGINE = ReplicatedMergeTree( '/clickhouse/tables/{shard}/ip_region_map', '{replica}') +ORDER BY ip_range_start; \ No newline at end of file From cf50af1a9e6e8d1c280e4a153672c4fc9913114c Mon Sep 17 00:00:00 2001 From: Lalit Arvind Date: Mon, 25 Nov 2024 00:30:05 -0700 Subject: [PATCH 09/15] Corrected storage policy errors --- clickhouse/docker-compose.yaml | 9 ++-- clickhouse/node1-config/storage-policy.xml | 52 +++++++++++----------- clickhouse/node2-config/storage-policy.xml | 52 +++++++++++----------- 3 files changed, 58 insertions(+), 55 deletions(-) diff --git a/clickhouse/docker-compose.yaml b/clickhouse/docker-compose.yaml index ad37e56..25acf42 100644 --- a/clickhouse/docker-compose.yaml +++ b/clickhouse/docker-compose.yaml @@ -1,9 +1,4 @@ services: - registry: - image: registry:2 - ports: - - "5000:5000" - clickhouse-keeper1: image: clickhouse/clickhouse-server:latest container_name: clickhouse-keeper1 @@ -61,6 +56,8 @@ services: volumes: - ./node1-config/:/etc/clickhouse-server/config.d/ - clickhouse_data1:/var/lib/clickhouse + - ./server_TTL/server1/hot/:/clickhouse_data/server1/hot/ + - ./server_TTL/server1/cold/:/clickhouse_data/server1/cold/ networks: clickhouse-server-network: aliases: @@ -92,6 +89,8 @@ services: volumes: - ./node2-config/:/etc/clickhouse-server/config.d/ - clickhouse_data2:/var/lib/clickhouse + - ./server_TTL/server2/hot/:/clickhouse_data/server2/hot/ + - ./server_TTL/server2/cold/:/clickhouse_data/server2/cold/ networks: clickhouse-server-network: aliases: diff --git a/clickhouse/node1-config/storage-policy.xml b/clickhouse/node1-config/storage-policy.xml index 067390b..41054e4 100644 --- a/clickhouse/node1-config/storage-policy.xml +++ b/clickhouse/node1-config/storage-policy.xml @@ -1,25 +1,27 @@ - - - - ../clickhouse_data1/hot - 300000000 - - - ../clickhouse_data1/cold - 500000000 - - - - - - - hot_disk - - - cold_disk - - - 0.2 - - - \ No newline at end of file + + + + + /clickhouse_data/server1/hot/ + + + /clickhouse_data/server1/cold/ + + + + + + + hot_disk + 1073741824 + + + cold_disk + 1073741824 + + + 0.2 + + + + \ No newline at end of file diff --git a/clickhouse/node2-config/storage-policy.xml b/clickhouse/node2-config/storage-policy.xml index ceda009..0691d22 100644 --- a/clickhouse/node2-config/storage-policy.xml +++ b/clickhouse/node2-config/storage-policy.xml @@ -1,25 +1,27 @@ - - - - ../clickhouse_data2/hot - 300000000 - - - ../clickhouse_data2/cold - 500000000 - - - - - - - hot_disk - - - cold_disk - - - 0.2 - - - \ No newline at end of file + + + + + /clickhouse_data/server2/hot/ + + + /clickhouse_data/server2/cold/ + + + + + + + hot_disk + 1073741824 + + + cold_disk + 1073741824 + + + 0.2 + + + + \ No newline at end of file From ee03026d471f22170ec7cfbaecf15f72e25f7d0d Mon Sep 17 00:00:00 2001 From: Lalit Arvind Date: Mon, 25 Nov 2024 00:44:27 -0700 Subject: [PATCH 10/15] Changed some bind volumes to named volumes --- clickhouse/docker-compose.yaml | 41 ++++++++++++++++------------------ 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/clickhouse/docker-compose.yaml b/clickhouse/docker-compose.yaml index 25acf42..79848ce 100644 --- a/clickhouse/docker-compose.yaml +++ b/clickhouse/docker-compose.yaml @@ -6,10 +6,7 @@ services: /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml volumes: - ./clickhouse_keeper/keeper1-config.xml:/etc/clickhouse-server/config.xml - - ./clickhouse_data/data:/var/lib/clickhouse/data - - ./clickhouse_data/tmp:/var/lib/clickhouse/tmp - - ./clickhouse_data/user_files:/var/lib/clickhouse/user_files - - ./clickhouse_data/format_schemas:/var/lib/clickhouse/format_schemas + - clickhouse_keeper1_data:/var/lib/clickhouse networks: clickhouse-keeper-network: @@ -23,11 +20,7 @@ services: /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml volumes: - ./clickhouse_keeper/keeper2-config.xml:/etc/clickhouse-server/config.xml - - ./clickhouse_data/data:/var/lib/clickhouse/data - - ./clickhouse_data/tmp:/var/lib/clickhouse/tmp - - ./clickhouse_data/user_files:/var/lib/clickhouse/user_files - - ./clickhouse_data/format_schemas:/var/lib/clickhouse/format_schemas - + - clickhouse_keeper2_data:/var/lib/clickhouse networks: clickhouse-keeper-network: aliases: @@ -40,11 +33,7 @@ services: /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml volumes: - ./clickhouse_keeper/keeper3-config.xml:/etc/clickhouse-server/config.xml - - ./clickhouse_data/data:/var/lib/clickhouse/data - - ./clickhouse_data/tmp:/var/lib/clickhouse/tmp - - ./clickhouse_data/user_files:/var/lib/clickhouse/user_files - - ./clickhouse_data/format_schemas:/var/lib/clickhouse/format_schemas - + - clickhouse_keeper3_data:/var/lib/clickhouse networks: clickhouse-keeper-network: aliases: @@ -55,9 +44,8 @@ services: container_name: clickhouse-server1 volumes: - ./node1-config/:/etc/clickhouse-server/config.d/ - - clickhouse_data1:/var/lib/clickhouse - - ./server_TTL/server1/hot/:/clickhouse_data/server1/hot/ - - ./server_TTL/server1/cold/:/clickhouse_data/server1/cold/ + - clickhouse_server1_data:/var/lib/clickhouse + - clickhouse_server1_TTL:/clickhouse_data/server1 networks: clickhouse-server-network: aliases: @@ -88,9 +76,8 @@ services: container_name: clickhouse-server2 volumes: - ./node2-config/:/etc/clickhouse-server/config.d/ - - clickhouse_data2:/var/lib/clickhouse - - ./server_TTL/server2/hot/:/clickhouse_data/server2/hot/ - - ./server_TTL/server2/cold/:/clickhouse_data/server2/cold/ + - clickhouse_server2_data:/var/lib/clickhouse + - clickhouse_server2_TTL:/clickhouse_data/server2 networks: clickhouse-server-network: aliases: @@ -125,7 +112,17 @@ networks: attachable: true volumes: - clickhouse_data1: + clickhouse_server1_data: driver: local - clickhouse_data2: + clickhouse_server2_data: + driver: local + clickhouse_keeper1_data: + driver: local + clickhouse_keeper2_data: + driver: local + clickhouse_keeper3_data: + driver: local + clickhouse_server1_TTL: + driver: local + clickhouse_server2_TTL: driver: local \ No newline at end of file From 351a6473a40bb01d74de929714867f68f526d7a1 Mon Sep 17 00:00:00 2001 From: Lalit Arvind Date: Mon, 25 Nov 2024 01:11:09 -0700 Subject: [PATCH 11/15] added volume template and modified update_compose accordingly. updated service template --- .../config_update_scripts/update_compose.py | 16 +++++++++++----- clickhouse/jinja-templates/service.yml.jinja | 10 +++++----- clickhouse/jinja-templates/volume.yml.jinja | 4 ++++ 3 files changed, 20 insertions(+), 10 deletions(-) create mode 100644 clickhouse/jinja-templates/volume.yml.jinja diff --git a/clickhouse/config_update_scripts/update_compose.py b/clickhouse/config_update_scripts/update_compose.py index 6ab8ac2..5e677e2 100644 --- a/clickhouse/config_update_scripts/update_compose.py +++ b/clickhouse/config_update_scripts/update_compose.py @@ -20,7 +20,7 @@ if __name__ == "__main__": # extracting only 'server1','server2'... all_service_names = [ name.split('-')[-1] for name in all_service_names] - curr_num_servers = sorted(all_service_names)[-1][-1] + curr_num_servers = int(sorted(all_service_names)[-1][-1]) replication_factor = 2 curr_num_shards = curr_num_servers/replication_factor @@ -57,21 +57,27 @@ if __name__ == "__main__": output_path = f'../node{i}-config/remote-servers.xml' curr_remote_servers_xml.write(output_path, encoding='utf-8', xml_declaration=False) - env = Environment(loader=FileSystemLoader('.')) + env = Environment(loader=FileSystemLoader('../jinja-templates')) service_template = env.get_template('service.yml.jinja') + volume_template = env.get_template('volume.yml.jinja') # loading existing docker-compose file with open('../docker-compose.yaml','r') as f: compose_f = yaml.safe_load(f) # rendering the new service - new_service1 = service_template.render(server_num=curr_num_servers+1) - new_service2 = service_template.render(server_num=curr_num_servers+2) + new_service1 = yaml.safe_load(service_template.render(server_num=curr_num_servers+1)) + new_service2 = yaml.safe_load(service_template.render(server_num=curr_num_servers+2)) + + new_volume1 = yaml.safe_load(volume_template.render(server_num=curr_num_servers+1)) + new_volume2 = yaml.safe_load(volume_template.render(server_num=curr_num_servers+2)) # adding the new service to docker-compose compose_f['services'].update(new_service1) compose_f['services'].update(new_service2) - + compose_f['volumes'].update(new_volume1) + compose_f['volumes'].update(new_volume2) + if compose_f: with open('../docker-compose.yaml','w') as yamlfile: yaml.safe_dump(compose_f, yamlfile) diff --git a/clickhouse/jinja-templates/service.yml.jinja b/clickhouse/jinja-templates/service.yml.jinja index 1ae0414..1ee1631 100644 --- a/clickhouse/jinja-templates/service.yml.jinja +++ b/clickhouse/jinja-templates/service.yml.jinja @@ -3,7 +3,7 @@ clickhouse-server{{server_num}}: container_name: clickhouse-server{{server_num}} volumes: - ./node{{server_num}}-config/:/etc/clickhouse-server/config.d/ - - clickhouse_data{{server_num}}:/var/lib/clickhouse + - clickhouse_server{{server_num}}_data:/var/lib/clickhouse networks: clickhouse-server-network: aliases: @@ -17,10 +17,10 @@ clickhouse-server{{server_num}}: # constraints: [node.labels.role == server] update_config: delay: 10s - # resources: - # limits: - # cpus: "0.50" - # memory: 100M + resources: + limits: + cpus: "0.50" + memory: 100M depends_on: - clickhouse-keeper1 - clickhouse-keeper2 diff --git a/clickhouse/jinja-templates/volume.yml.jinja b/clickhouse/jinja-templates/volume.yml.jinja new file mode 100644 index 0000000..e9324d0 --- /dev/null +++ b/clickhouse/jinja-templates/volume.yml.jinja @@ -0,0 +1,4 @@ +clickhouse_server{{server_num}}_data: + driver: local +clickhouse_server{{server_num}}_TTL: + driver: local \ No newline at end of file From 13dc2b4089705cf972e5a86178171ce5a1777491 Mon Sep 17 00:00:00 2001 From: Lalit Arvind Date: Mon, 25 Nov 2024 10:59:05 -0700 Subject: [PATCH 12/15] updated some errors while testing --- clickhouse/config_update_scripts/update_compose.py | 2 +- clickhouse/docker-compose.yaml | 9 +++------ clickhouse/jinja-templates/storage-policy.xml.jinja | 4 ++-- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/clickhouse/config_update_scripts/update_compose.py b/clickhouse/config_update_scripts/update_compose.py index 5e677e2..66440e2 100644 --- a/clickhouse/config_update_scripts/update_compose.py +++ b/clickhouse/config_update_scripts/update_compose.py @@ -92,7 +92,7 @@ if __name__ == "__main__": with open(f'../node{curr_num_servers + i}-config/config.xml','w') as f1: f1.write(config_content) - macros_content = macros_template.render(shard_num="0{curr_num_shards}",replica_num=i) + macros_content = macros_template.render(shard_num="0"+str(int(curr_num_shards+1)),replica_num=i) with open(f'../node{curr_num_servers + i}-config/macros.xml','w') as f2: f2.write(macros_content) diff --git a/clickhouse/docker-compose.yaml b/clickhouse/docker-compose.yaml index 79848ce..f7c95bc 100644 --- a/clickhouse/docker-compose.yaml +++ b/clickhouse/docker-compose.yaml @@ -2,8 +2,7 @@ services: clickhouse-keeper1: image: clickhouse/clickhouse-server:latest container_name: clickhouse-keeper1 - command: > - /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml + command: /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml volumes: - ./clickhouse_keeper/keeper1-config.xml:/etc/clickhouse-server/config.xml - clickhouse_keeper1_data:/var/lib/clickhouse @@ -16,8 +15,7 @@ services: clickhouse-keeper2: image: clickhouse/clickhouse-server:latest container_name: clickhouse-keeper2 - command: > - /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml + command: /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml volumes: - ./clickhouse_keeper/keeper2-config.xml:/etc/clickhouse-server/config.xml - clickhouse_keeper2_data:/var/lib/clickhouse @@ -29,8 +27,7 @@ services: clickhouse-keeper3: image: clickhouse/clickhouse-server:latest container_name: clickhouse-keeper3 - command: > - /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml + command: /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml volumes: - ./clickhouse_keeper/keeper3-config.xml:/etc/clickhouse-server/config.xml - clickhouse_keeper3_data:/var/lib/clickhouse diff --git a/clickhouse/jinja-templates/storage-policy.xml.jinja b/clickhouse/jinja-templates/storage-policy.xml.jinja index 8f8653c..440d7e7 100644 --- a/clickhouse/jinja-templates/storage-policy.xml.jinja +++ b/clickhouse/jinja-templates/storage-policy.xml.jinja @@ -1,11 +1,11 @@ - ../clickhouse_data{{server_num}}/hot + /clickhouse_data{{server_num}}/hot 300000000 - ../clickhouse_data{{server_num}}/cold + /clickhouse_data{{server_num}}/cold 500000000 From 35d48a2cf4b11f2bdaf5e5275b8eee12ed9a87e2 Mon Sep 17 00:00:00 2001 From: Akash Sivakumar Date: Mon, 25 Nov 2024 16:53:13 -0700 Subject: [PATCH 13/15] pcap check --- preprocessing/pcap_processor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/preprocessing/pcap_processor.py b/preprocessing/pcap_processor.py index b963875..9c9edc8 100644 --- a/preprocessing/pcap_processor.py +++ b/preprocessing/pcap_processor.py @@ -19,7 +19,7 @@ class KafkaClient: self.topic_name = topic_name if mode == 'producer': self.client = KafkaProducer( - bootstrap_servers=['kafka:9092'], + bootstrap_servers=['localhost:9092'], max_request_size = 200000000, #api_version=(0,11,5), value_serializer=lambda x: json.dumps(x).encode('utf-8')) @@ -186,7 +186,9 @@ if __name__ == "__main__": pkts = [] cnt = 0 + seen_count = 0 for idx, pkt in enumerate(pcap_rdr): + seen_count += 1 # filter packets if not pkt_filter(pkt): continue @@ -209,6 +211,7 @@ if __name__ == "__main__": #print(f"streamed packet at index {idx} ") if idx > sample_size: break + print(f"total seen: {seen_count-1}") print(f"total streamed: {cnt}") # flush remaining if not streaming and len(pkts) > 0: From 5d20e14dbf4717f324491802fb774950f29d3ec8 Mon Sep 17 00:00:00 2001 From: Kaushik Narayan R Date: Mon, 25 Nov 2024 22:33:32 -0700 Subject: [PATCH 14/15] added option for streaming from csv --- preprocessing/docker-compose.yml | 7 ++- preprocessing/pcap_processor.py | 101 +++++++++++++++++++++---------- 2 files changed, 73 insertions(+), 35 deletions(-) diff --git a/preprocessing/docker-compose.yml b/preprocessing/docker-compose.yml index 89b39e5..d42305b 100644 --- a/preprocessing/docker-compose.yml +++ b/preprocessing/docker-compose.yml @@ -50,10 +50,13 @@ services: aliases: - pcap_streamer volumes: - - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap" + # - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap" + - "./:/data/pcap" + - "./:/data/csv" environment: PCAP_FILE: /data/pcap/202310081400.pcap - command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"] + # command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"] + command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -c /data/csv/sample_output.csv -s --stream_size 1000"] deploy: replicas: 1 restart_policy: diff --git a/preprocessing/pcap_processor.py b/preprocessing/pcap_processor.py index 9c9edc8..f5f7552 100644 --- a/preprocessing/pcap_processor.py +++ b/preprocessing/pcap_processor.py @@ -113,6 +113,21 @@ def create_pkt_object(pkt: Packet) -> dict: return res_json + +def row_to_dict(pkt_row: list) -> dict: + """make dict from CSV row""" + + return { + "time": float(pkt_row[0]), + "l4_proto": pkt_row[1], + "src_addr": pkt_row[2], + "dst_addr": pkt_row[3], + "src_port": int(pkt_row[4]), + "dst_port": int(pkt_row[5]), + "pkt_len": int(pkt_row[6]), + } + + def prep_csv(out_file: str): with open(out_file, "w", newline="") as csvfile: writer = csv.writer(csvfile) @@ -139,7 +154,8 @@ def pkts_write_csv(pkts: list, out_file: str): if __name__ == "__main__": argp = ArgumentParser() - argp.add_argument("-f", "--pcap_file", required=True, dest="_pcap") + argp.add_argument("-f", "--pcap_file", required=False, dest="_pcap") + argp.add_argument("-c", "--csv_file", required=False, dest="_csv") argp.add_argument("-o", "--out_file", required=False, dest="_out") argp.add_argument("--stream_size", required=False, default=10000, dest="_streamsize") argp.add_argument( @@ -169,6 +185,7 @@ if __name__ == "__main__": args = argp.parse_args() pcap_file = args._pcap + csv_file = args._csv out_file = args._out streaming = args._stream sample = args._sample @@ -179,41 +196,59 @@ if __name__ == "__main__": sample_size = samplesize #1000000 batch_size = 100 #100000 - pcap_rdr = PcapReader(pcap_file) - if not streaming: - assert args._out and args._out != "" - prep_csv(out_file) + # if preprocessed data ready for streaming + if csv_file: + with open(csv_file, newline="") as f: + csv_rdr = csv.reader(f) + next(csv_rdr) # skip headers + pkts = [] - pkts = [] - cnt = 0 - seen_count = 0 - for idx, pkt in enumerate(pcap_rdr): - seen_count += 1 - # filter packets - if not pkt_filter(pkt): - continue + for idx, row in enumerate(csv_rdr): + # direct streaming to kafka goes here + producer.client.send(KAFKA_TOPIC, row_to_dict(row)) + dbg_print(row_to_dict(row)) + dbg_print("streamed packet", idx) + if idx > sample_size: + break + dbg_print(f"total streamed: {idx}") + # otherwise, process packets + else: + pcap_rdr = PcapReader(pcap_file) if not streaming: - # write to file - pkts.append(pkt_extract(pkt)) + assert args._out and args._out != "" + prep_csv(out_file) - if sample and idx > sample_size: - break + pkts = [] + cnt = 0 + seen_count = 0 + for idx, pkt in enumerate(pcap_rdr): + seen_count += 1 + # filter packets + if not pkt_filter(pkt): + continue - if idx > 0 and idx % batch_size == 0: - pkts_write_csv(pkts, out_file) - pkts = [] - else: - # direct streaming to kafka goes here - packet_data = create_pkt_object(pkt) - producer.client.send(KAFKA_TOPIC, packet_data) - cnt += 1 - #print(f"streamed packet at index {idx} ") - if idx > sample_size: break - - print(f"total seen: {seen_count-1}") - print(f"total streamed: {cnt}") - # flush remaining - if not streaming and len(pkts) > 0: - pkts_write_csv(pkts, out_file) + if not streaming: + # write to file + pkts.append(pkt_extract(pkt)) + if sample and idx > sample_size: + break + + if idx > 0 and idx % batch_size == 0: + pkts_write_csv(pkts, out_file) + pkts = [] + else: + # direct streaming to kafka goes here + packet_data = create_pkt_object(pkt) + producer.client.send(KAFKA_TOPIC, packet_data) + cnt += 1 + # print(f"streamed packet at index {idx} ") + if idx > sample_size: + break + + print(f"total seen: {seen_count-1}") + print(f"total streamed: {cnt}") + # flush remaining + if not streaming and len(pkts) > 0: + pkts_write_csv(pkts, out_file) From b1fc1dbc49599536b71395fc9117ae6a3773643f Mon Sep 17 00:00:00 2001 From: Akash Sivakumar Date: Tue, 26 Nov 2024 21:41:17 -0700 Subject: [PATCH 15/15] Fixed integration issue with csv streaming --- clickhouse/README.md | 2 +- .../clickhouse_data/data/preprocessed_configs/config.xml | 2 +- clickhouse/docker-compose.yml | 9 ++++----- preprocessing/README.md | 1 + preprocessing/pcap_processor.py | 7 ++++--- 5 files changed, 11 insertions(+), 10 deletions(-) diff --git a/clickhouse/README.md b/clickhouse/README.md index 16831ea..679c830 100644 --- a/clickhouse/README.md +++ b/clickhouse/README.md @@ -74,7 +74,7 @@ docker exec -it clickhouse-kafka-1 kafka-console-consumer --bootstrap-server kaf Get into the ClickHouse client: ```bash -docker exec -it clickhouse-client clickhouse-client +docker exec -it clickhouse-client ``` Check if tables are available: ```bash diff --git a/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml b/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml index d34f75c..08de94b 100644 --- a/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml +++ b/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml @@ -21,7 +21,7 @@ 9181 - 3 + 2 /var/lib/clickhouse/coordination/log /var/lib/clickhouse/coordination/snapshots diff --git a/clickhouse/docker-compose.yml b/clickhouse/docker-compose.yml index 1463068..d38b576 100644 --- a/clickhouse/docker-compose.yml +++ b/clickhouse/docker-compose.yml @@ -125,7 +125,7 @@ services: condition: on-failure pcap_streamer: - image: levenshtein/streamer_test4:latest + image: levenshtein/streamer_test7:latest depends_on: - kafka networks: @@ -133,10 +133,9 @@ services: aliases: - pcap_streamer volumes: - - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap" - environment: - PCAP_FILE: /data/pcap/202310081400.pcap - command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"] + #- "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap" + - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project/project_github/real-time-traffic-analysis-clickhouse/preprocessing:/data/pcap" + command: ["sh", "-c", "sleep 60 && python /app/pcap_processor.py -c /data/pcap/sample_output.csv -s --stream_size 1000"] deploy: replicas: 1 restart_policy: diff --git a/preprocessing/README.md b/preprocessing/README.md index 069efee..8afb3aa 100644 --- a/preprocessing/README.md +++ b/preprocessing/README.md @@ -49,3 +49,4 @@ python pcap_processor.py -f C:/Users/akash/storage/Asu/sem3/dds/project/202310081400.pcap -s --stream_size 1000 +python pcap_procesor.py -c sample_output.csv -s --stream_size 1000 diff --git a/preprocessing/pcap_processor.py b/preprocessing/pcap_processor.py index f5f7552..35826e3 100644 --- a/preprocessing/pcap_processor.py +++ b/preprocessing/pcap_processor.py @@ -19,7 +19,7 @@ class KafkaClient: self.topic_name = topic_name if mode == 'producer': self.client = KafkaProducer( - bootstrap_servers=['localhost:9092'], + bootstrap_servers=['kafka:9092'], max_request_size = 200000000, #api_version=(0,11,5), value_serializer=lambda x: json.dumps(x).encode('utf-8')) @@ -198,6 +198,7 @@ if __name__ == "__main__": # if preprocessed data ready for streaming if csv_file: + #print("true") with open(csv_file, newline="") as f: csv_rdr = csv.reader(f) next(csv_rdr) # skip headers @@ -207,10 +208,10 @@ if __name__ == "__main__": # direct streaming to kafka goes here producer.client.send(KAFKA_TOPIC, row_to_dict(row)) dbg_print(row_to_dict(row)) - dbg_print("streamed packet", idx) + print("streamed packet", idx) if idx > sample_size: break - dbg_print(f"total streamed: {idx}") + print(f"total streamed: {idx}") # otherwise, process packets else: