diff --git a/clickhouse/Queries.sql b/clickhouse/Queries.sql new file mode 100644 index 0000000..111c82e --- /dev/null +++ b/clickhouse/Queries.sql @@ -0,0 +1,22 @@ +-- TABLE CREATION +CREATE TABLE traffic_records ON cluster_1S_2R +( + time_stamp DateTime64(3, 'Japan') CODEC(Delta, ZSTD), + protocol Enum('TCP' = 1, 'UDP' = 2), + from_IP IPv4, + to_IP IPv4, + port UInt16 CODEC(ZSTD), + INDEX port_idx port TYPE bloom_filter GRANULARITY 10 +) ENGINE = ReplicatedMergeTree( '/clickhouse/tables/{shard}/traffic_records', '{replica}') +ORDER BY time_stamp +SETTINGS storage_policy = 'hot_cold' +TTL time_stamp + INTERVAL 5 DAY TO VOLUME 'cold_disk'; + +CREATE TABLE ip_region_map ON cluster_1S_2R +( + ip_range_start IPv4, + ip_range_end IPv4, + region String, + INDEX region_ind region TYPE bloom_filter +) ENGINE = ReplicatedMergeTree( '/clickhouse/tables/{shard}/ip_region_map', '{replica}') +ORDER BY ip_range_start; \ No newline at end of file diff --git a/clickhouse/README b/clickhouse/README deleted file mode 100644 index af20090..0000000 --- a/clickhouse/README +++ /dev/null @@ -1,13 +0,0 @@ -Execution steps: - -ensure to pull clickhouse docker image -> docker-compose up -d -identify the custom networks created. -> docker network ls -inspect the keeper network and verify whether all the containers are connected to it -> docker network inspect {{dds_proj_clickhouse-keeper-network}} -if all the containers are not connected -> docker-compose restart -To execute queries -> docker exec -it clickhouse-server1 clickhouse-client -> docker exec -it clickhouse-server2 clickhouse-client \ No newline at end of file diff --git a/clickhouse/README.md b/clickhouse/README.md new file mode 100644 index 0000000..679c830 --- /dev/null +++ b/clickhouse/README.md @@ -0,0 +1,141 @@ + +# Kafka - Clickhouse Integration Testing + +## Execution Steps + +Ensure to pull the ClickHouse Docker image: +```bash +docker-compose up -d +``` +Identify the custom networks created: +```bash +docker network ls +``` +Inspect the keeper network and verify whether all the containers are connected to it: +```bash +docker network inspect dds_proj_clickhouse-keeper-network +``` +If all the containers are not connected: +```bash +docker-compose restart +``` +To execute queries: +```bash +docker exec -it clickhouse-server1 clickhouse-client +``` +```bash +docker exec -it clickhouse-server2 clickhouse-client +``` + +## Building Kafka Image + +Navigate to `/preprocessing` from the repo main directory: +```bash +docker build -t :latest -f Dockerfile.python . +``` +Tag the image: +```bash +docker tag :latest /:latest +``` +Push the image to Docker Hub: +```bash +docker push /:latest +``` + +## Testing the Integration + +Changes to make in `docker-compose.yml`: +- `pcap_streamer => volumes`: `` +- `pcap_streamer => image`: `/:latest` + +Navigate to `/clickhouse` from the repo main directory: +```bash +docker stack deploy -c docker-compose.yml --detach=false +``` +Check running services: +```bash +docker service ls +``` +Check logs of the service: +```bash +docker service logs +``` + +Get the container ID for the ClickHouse client: +```bash +docker ps +``` +Check if the topic has all streamed data: +```bash +docker exec -it clickhouse-kafka-1 kafka-console-consumer --bootstrap-server kafka:9092 --topic pcap_stream_new --from-beginning +``` +- Change the topic name if applicable. +- The output should display all JSON packets. + +Get into the ClickHouse client: +```bash +docker exec -it clickhouse-client +``` +Check if tables are available: +```bash +SHOW TABLES; +``` + +If tables are not available, create the following: + +Create a table for `packets_data`: +```sql +CREATE TABLE packets_data +( + time Float64, + l4_proto String, + src_addr String, + dst_addr String, + src_port UInt16, + dst_port UInt16, + pkt_len UInt32 +) ENGINE = MergeTree +ORDER BY time; +``` + +Create a table for `kafka_stream`: +```sql +CREATE TABLE kafka_stream +( + time Float64, + l4_proto String, + src_addr String, + dst_addr String, + src_port UInt16, + dst_port UInt16, + pkt_len UInt32 +) ENGINE = Kafka +SETTINGS kafka_broker_list = 'kafka:9092', + kafka_topic_list = 'pcap_stream_new', + kafka_group_name = 'clickhouse_consumer', + kafka_format = 'JSONEachRow', + kafka_num_consumers = 1; +``` + +Create a materialized view `kafka_to_packets`: +```sql +CREATE MATERIALIZED VIEW kafka_to_packets +TO packets_data AS +SELECT + time, + l4_proto, + src_addr, + dst_addr, + src_port, + dst_port, + pkt_len +FROM kafka_stream; +``` + +Check table contents to verify data availability: +```bash +SELECT * FROM packets_data LIMIT 10; +``` +```bash +SELECT COUNT(*) AS total_count_of_packets_streamed FROM packets_data; +``` diff --git a/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml b/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml new file mode 100644 index 0000000..08de94b --- /dev/null +++ b/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml @@ -0,0 +1,61 @@ + + + + + trace + /var/log/clickhouse-keeper/clickhouse-keeper.log + /var/log/clickhouse-keeper/clickhouse-keeper.err.log + 1000M + 3 + + :: + + /var/lib/clickhouse/data/ + /var/lib/clickhouse/tmp/ + /var/lib/clickhouse/user_files/ + /var/lib/clickhouse/format_schemas/ + + + 9181 + 2 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + 10000 + 30000 + trace + + + + 1 + clickhouse-keeper1 + 9234 + + + 2 + clickhouse-keeper2 + 9234 + + + 3 + clickhouse-keeper3 + 9234 + + + + + + + 0.0.0.0 + 1 + + + diff --git a/clickhouse/config_update_scripts/Dockerfile b/clickhouse/config_update_scripts/Dockerfile new file mode 100644 index 0000000..79e3995 --- /dev/null +++ b/clickhouse/config_update_scripts/Dockerfile @@ -0,0 +1,9 @@ +FROM python + +WORKDIR /update_scripts + +COPY . /update_scripts/ + +RUN pip install --no-cache-dir -r requirements.txt + +CMD ["python3","update_trigger.py"] \ No newline at end of file diff --git a/clickhouse/config_update_scripts/requirements.txt b/clickhouse/config_update_scripts/requirements.txt new file mode 100644 index 0000000..711648d --- /dev/null +++ b/clickhouse/config_update_scripts/requirements.txt @@ -0,0 +1,8 @@ +subprocess +json +jinja2 +yaml +re +xml +schedule +time \ No newline at end of file diff --git a/clickhouse/config_update_scripts/update_compose.py b/clickhouse/config_update_scripts/update_compose.py index 9f29a20..66440e2 100644 --- a/clickhouse/config_update_scripts/update_compose.py +++ b/clickhouse/config_update_scripts/update_compose.py @@ -4,6 +4,7 @@ import subprocess import json import xml.etree.ElementTree as ET import os +import re if __name__ == "__main__": @@ -15,15 +16,11 @@ if __name__ == "__main__": all_services = [json.loads(s) for s in all_services] # extracting the name, removing the custom id from it and storing it in a list - all_service_names = [service['Names'].split('.')[0] for service in all_services] - # extracting only 'keeper1', 'server1'... + all_service_names = [service['Names'].split('.')[0] for service in all_services if re.findall(r'clickhouse-server',service['Names'])] + # extracting only 'server1','server2'... all_service_names = [ name.split('-')[-1] for name in all_service_names] - # removing all keeepers - all_service_names.remove('keeper1') - all_service_names.remove('keeper2') - all_service_names.remove('keeper3') - curr_num_servers = sorted(all_service_names)[-1][-1] + curr_num_servers = int(sorted(all_service_names)[-1][-1]) replication_factor = 2 curr_num_shards = curr_num_servers/replication_factor @@ -31,6 +28,7 @@ if __name__ == "__main__": # new shard template that is gonna be added to remote servers file of each node new_shard_str = f''' + {curr_num_shards+1} true clickhouse-server{curr_num_servers+1} @@ -59,21 +57,27 @@ if __name__ == "__main__": output_path = f'../node{i}-config/remote-servers.xml' curr_remote_servers_xml.write(output_path, encoding='utf-8', xml_declaration=False) - env = Environment(loader=FileSystemLoader('.')) + env = Environment(loader=FileSystemLoader('../jinja-templates')) service_template = env.get_template('service.yml.jinja') + volume_template = env.get_template('volume.yml.jinja') # loading existing docker-compose file with open('../docker-compose.yaml','r') as f: compose_f = yaml.safe_load(f) # rendering the new service - new_service1 = service_template.render(server_num=curr_num_servers+1) - new_service2 = service_template.render(server_num=curr_num_servers+2) + new_service1 = yaml.safe_load(service_template.render(server_num=curr_num_servers+1)) + new_service2 = yaml.safe_load(service_template.render(server_num=curr_num_servers+2)) + + new_volume1 = yaml.safe_load(volume_template.render(server_num=curr_num_servers+1)) + new_volume2 = yaml.safe_load(volume_template.render(server_num=curr_num_servers+2)) # adding the new service to docker-compose compose_f['services'].update(new_service1) compose_f['services'].update(new_service2) - + compose_f['volumes'].update(new_volume1) + compose_f['volumes'].update(new_volume2) + if compose_f: with open('../docker-compose.yaml','w') as yamlfile: yaml.safe_dump(compose_f, yamlfile) @@ -81,13 +85,14 @@ if __name__ == "__main__": config_template = env.get_template('config.xml.jinja') macros_template = env.get_template('macros.xml.jinja') use_keeper_template = env.get_template('use-keeper.xml.jinja') + storage_policy_template = env.get_template('storage-policy.xml.jinja') for i in range(1,3): config_content = config_template.render(node_num=curr_num_servers+i) with open(f'../node{curr_num_servers + i}-config/config.xml','w') as f1: f1.write(config_content) - macros_content = macros_template.render(shard_num="0{curr_num_shards}",replica_num=i) + macros_content = macros_template.render(shard_num="0"+str(int(curr_num_shards+1)),replica_num=i) with open(f'../node{curr_num_servers + i}-config/macros.xml','w') as f2: f2.write(macros_content) @@ -95,3 +100,7 @@ if __name__ == "__main__": with open(f'../node{curr_num_servers + i}-config/use-keeper.xml','w') as f3: f3.write(use_keeper_content) + storage_policy_content = storage_policy_template.render(server_num=curr_num_servers+i) + with open(f'../node{curr_num_servers + i}-config/storage-policy.xml','w') as f4: + f4.write(storage_policy_content) + diff --git a/clickhouse/config_update_scripts/update_trigger.py b/clickhouse/config_update_scripts/update_trigger.py new file mode 100644 index 0000000..fe154f5 --- /dev/null +++ b/clickhouse/config_update_scripts/update_trigger.py @@ -0,0 +1,34 @@ +import subprocess +import json +import re +import schedule +import time + +def check_util_exec(): + # extracting details of each running container in json format + try: + all_services = subprocess.check_output(["docker","stats","--no-stream","--format","json"],text=True).split('\n')[:-1] + except subprocess.CalledProcessError as e: + print(f"Command failed with return code {e.returncode}") + + all_services = [json.loads(s) for s in all_services] + + resource_util_exceed_flag = True # Flag to check if all of the containers have exceeded 80% memory utilization + for service in all_services: + if re.findall(r'clickhouse-server',service['Name']): + if float(service['MemPerc'][:-1]) < 80: + resource_util_exceed_flag = False + + if resource_util_exceed_flag: + process = subprocess.Popen(['python3','update_compose.py'],text=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) + stdout, stderr = process.communicate() # Wait for the process to finish and capture output + print("Standard Output:", stdout) + print("Standard Error:", stderr) + +if __name__ == "__main__": + schedule.every(30).seconds.do(check_util_exec) + while True: + schedule.run_pending() + time.sleep(1) + + \ No newline at end of file diff --git a/clickhouse/docker-compose.yaml b/clickhouse/docker-compose-old.yaml similarity index 64% rename from clickhouse/docker-compose.yaml rename to clickhouse/docker-compose-old.yaml index 7ee9aea..f7c95bc 100644 --- a/clickhouse/docker-compose.yaml +++ b/clickhouse/docker-compose-old.yaml @@ -2,14 +2,10 @@ services: clickhouse-keeper1: image: clickhouse/clickhouse-server:latest container_name: clickhouse-keeper1 - command: > - /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml + command: /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml volumes: - ./clickhouse_keeper/keeper1-config.xml:/etc/clickhouse-server/config.xml - - ./clickhouse_data/data:/var/lib/clickhouse/data - - ./clickhouse_data/tmp:/var/lib/clickhouse/tmp - - ./clickhouse_data/user_files:/var/lib/clickhouse/user_files - - ./clickhouse_data/format_schemas:/var/lib/clickhouse/format_schemas + - clickhouse_keeper1_data:/var/lib/clickhouse networks: clickhouse-keeper-network: @@ -19,15 +15,10 @@ services: clickhouse-keeper2: image: clickhouse/clickhouse-server:latest container_name: clickhouse-keeper2 - command: > - /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml + command: /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml volumes: - ./clickhouse_keeper/keeper2-config.xml:/etc/clickhouse-server/config.xml - - ./clickhouse_data/data:/var/lib/clickhouse/data - - ./clickhouse_data/tmp:/var/lib/clickhouse/tmp - - ./clickhouse_data/user_files:/var/lib/clickhouse/user_files - - ./clickhouse_data/format_schemas:/var/lib/clickhouse/format_schemas - + - clickhouse_keeper2_data:/var/lib/clickhouse networks: clickhouse-keeper-network: aliases: @@ -36,15 +27,10 @@ services: clickhouse-keeper3: image: clickhouse/clickhouse-server:latest container_name: clickhouse-keeper3 - command: > - /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml + command: /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml volumes: - ./clickhouse_keeper/keeper3-config.xml:/etc/clickhouse-server/config.xml - - ./clickhouse_data/data:/var/lib/clickhouse/data - - ./clickhouse_data/tmp:/var/lib/clickhouse/tmp - - ./clickhouse_data/user_files:/var/lib/clickhouse/user_files - - ./clickhouse_data/format_schemas:/var/lib/clickhouse/format_schemas - + - clickhouse_keeper3_data:/var/lib/clickhouse networks: clickhouse-keeper-network: aliases: @@ -55,7 +41,8 @@ services: container_name: clickhouse-server1 volumes: - ./node1-config/:/etc/clickhouse-server/config.d/ - - clickhouse_data1:/var/lib/clickhouse + - clickhouse_server1_data:/var/lib/clickhouse + - clickhouse_server1_TTL:/clickhouse_data/server1 networks: clickhouse-server-network: aliases: @@ -69,10 +56,10 @@ services: # constraints: [node.labels.role == server] update_config: delay: 10s - # resources: - # limits: - # cpus: "0.50" - # memory: 100M + resources: + limits: + cpus: "0.50" + memory: 1200M depends_on: - clickhouse-keeper1 - clickhouse-keeper2 @@ -86,7 +73,8 @@ services: container_name: clickhouse-server2 volumes: - ./node2-config/:/etc/clickhouse-server/config.d/ - - clickhouse_data2:/var/lib/clickhouse + - clickhouse_server2_data:/var/lib/clickhouse + - clickhouse_server2_TTL:/clickhouse_data/server2 networks: clickhouse-server-network: aliases: @@ -100,10 +88,10 @@ services: # constraints: [node.labels.role == server] update_config: delay: 10s - # resources: - # limits: - # cpus: "0.50" - # memory: 100M + resources: + limits: + cpus: "0.50" + memory: 1200M depends_on: - clickhouse-keeper1 - clickhouse-keeper2 @@ -115,11 +103,23 @@ services: networks: clickhouse-server-network: driver: overlay + attachable: true clickhouse-keeper-network: driver: overlay + attachable: true volumes: - clickhouse_data1: + clickhouse_server1_data: driver: local - clickhouse_data2: + clickhouse_server2_data: + driver: local + clickhouse_keeper1_data: + driver: local + clickhouse_keeper2_data: + driver: local + clickhouse_keeper3_data: + driver: local + clickhouse_server1_TTL: + driver: local + clickhouse_server2_TTL: driver: local \ No newline at end of file diff --git a/clickhouse/docker-compose.yml b/clickhouse/docker-compose.yml new file mode 100644 index 0000000..d38b576 --- /dev/null +++ b/clickhouse/docker-compose.yml @@ -0,0 +1,155 @@ +services: + clickhouse-keeper1: + image: clickhouse/clickhouse-server:latest + container_name: clickhouse-keeper1 + command: > + /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml + volumes: + - ./clickhouse_keeper/keeper1-config.xml:/etc/clickhouse-server/config.xml + - ./clickhouse_data/data:/var/lib/clickhouse/data + - ./clickhouse_data/tmp:/var/lib/clickhouse/tmp + - ./clickhouse_data/user_files:/var/lib/clickhouse/user_files + - ./clickhouse_data/format_schemas:/var/lib/clickhouse/format_schemas + networks: + common-network: + aliases: + - clickhouse-keeper1 + + clickhouse-keeper2: + image: clickhouse/clickhouse-server:latest + container_name: clickhouse-keeper2 + command: > + /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml + volumes: + - ./clickhouse_keeper/keeper2-config.xml:/etc/clickhouse-server/config.xml + - ./clickhouse_data/data:/var/lib/clickhouse/data + - ./clickhouse_data/tmp:/var/lib/clickhouse/tmp + - ./clickhouse_data/user_files:/var/lib/clickhouse/user_files + - ./clickhouse_data/format_schemas:/var/lib/clickhouse/format_schemas + networks: + common-network: + aliases: + - clickhouse-keeper2 + + clickhouse-keeper3: + image: clickhouse/clickhouse-server:latest + container_name: clickhouse-keeper3 + command: > + /usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml + volumes: + - ./clickhouse_keeper/keeper3-config.xml:/etc/clickhouse-server/config.xml + - ./clickhouse_data/data:/var/lib/clickhouse/data + - ./clickhouse_data/tmp:/var/lib/clickhouse/tmp + - ./clickhouse_data/user_files:/var/lib/clickhouse/user_files + - ./clickhouse_data/format_schemas:/var/lib/clickhouse/format_schemas + networks: + common-network: + aliases: + - clickhouse-keeper3 + + clickhouse-server1: + image: clickhouse/clickhouse-server:latest + container_name: clickhouse-server1 + volumes: + - ./node1-config/:/etc/clickhouse-server/config.d/ + - clickhouse_data1:/var/lib/clickhouse + networks: + common-network: + aliases: + - clickhouse-server1 + depends_on: + - clickhouse-keeper1 + - clickhouse-keeper2 + - clickhouse-keeper3 + ports: + - "9001:9000" # Native client port + - "8123:8123" # HTTP interface + + clickhouse-server2: + image: clickhouse/clickhouse-server:latest + container_name: clickhouse-server2 + volumes: + - ./node2-config/:/etc/clickhouse-server/config.d/ + - clickhouse_data2:/var/lib/clickhouse + networks: + common-network: + aliases: + - clickhouse-server2 + depends_on: + - clickhouse-keeper1 + - clickhouse-keeper2 + - clickhouse-keeper3 + ports: + - "9002:9000" # Native client port + - "8125:8123" # HTTP interface + + zookeeper: + image: confluentinc/cp-zookeeper:latest + networks: + common-network: + aliases: + - zookeeper + deploy: + replicas: 1 + restart_policy: + condition: on-failure + environment: + ZOOKEEPER_CLIENT_PORT: 2182 + ports: + - "2182:2181" + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2182 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT + KAFKA_BROKER_ID: 1 + KAFKA_MESSAGE_MAX_BYTES: 200000000 + KAFKA_REPLICA_FETCH_MAX_BYTES: 200000000 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + networks: + common-network: + aliases: + - kafka + ports: + - "9092:9092" + volumes: + - kafka_data_new:/var/lib/kafka/data + deploy: + replicas: 1 + restart_policy: + condition: on-failure + + pcap_streamer: + image: levenshtein/streamer_test7:latest + depends_on: + - kafka + networks: + common-network: + aliases: + - pcap_streamer + volumes: + #- "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap" + - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project/project_github/real-time-traffic-analysis-clickhouse/preprocessing:/data/pcap" + command: ["sh", "-c", "sleep 60 && python /app/pcap_processor.py -c /data/pcap/sample_output.csv -s --stream_size 1000"] + deploy: + replicas: 1 + restart_policy: + condition: on-failure + +networks: + common-network: + driver: overlay + attachable: true + +volumes: + clickhouse_data1: + driver: local + clickhouse_data2: + driver: local + kafka_data_new: + driver: local diff --git a/clickhouse/jinja-templates/config.xml.jinja b/clickhouse/jinja-templates/config.xml.jinja index cd07efb..4a0f2ba 100644 --- a/clickhouse/jinja-templates/config.xml.jinja +++ b/clickhouse/jinja-templates/config.xml.jinja @@ -20,5 +20,6 @@ /etc/clickhouse-server/config.d/macros.xml /etc/clickhouse-server/config.d/remote-servers.xml /etc/clickhouse-server/config.d/use-keeper.xml + /etc/clickhouse-server/config.d/storage-policy.xml \ No newline at end of file diff --git a/clickhouse/jinja-templates/remote-servers.xml.jinja b/clickhouse/jinja-templates/remote-servers.xml.jinja deleted file mode 100644 index a6a9edd..0000000 --- a/clickhouse/jinja-templates/remote-servers.xml.jinja +++ /dev/null @@ -1,18 +0,0 @@ - - - - mysecretphrase - - true - - clickhouse-server1 - 9000 - - - clickhouse-server2 - 9000 - - - - - \ No newline at end of file diff --git a/clickhouse/jinja-templates/service.yml.jinja b/clickhouse/jinja-templates/service.yml.jinja index 1ae0414..1ee1631 100644 --- a/clickhouse/jinja-templates/service.yml.jinja +++ b/clickhouse/jinja-templates/service.yml.jinja @@ -3,7 +3,7 @@ clickhouse-server{{server_num}}: container_name: clickhouse-server{{server_num}} volumes: - ./node{{server_num}}-config/:/etc/clickhouse-server/config.d/ - - clickhouse_data{{server_num}}:/var/lib/clickhouse + - clickhouse_server{{server_num}}_data:/var/lib/clickhouse networks: clickhouse-server-network: aliases: @@ -17,10 +17,10 @@ clickhouse-server{{server_num}}: # constraints: [node.labels.role == server] update_config: delay: 10s - # resources: - # limits: - # cpus: "0.50" - # memory: 100M + resources: + limits: + cpus: "0.50" + memory: 100M depends_on: - clickhouse-keeper1 - clickhouse-keeper2 diff --git a/clickhouse/jinja-templates/storage-policy.xml.jinja b/clickhouse/jinja-templates/storage-policy.xml.jinja new file mode 100644 index 0000000..440d7e7 --- /dev/null +++ b/clickhouse/jinja-templates/storage-policy.xml.jinja @@ -0,0 +1,25 @@ + + + + /clickhouse_data{{server_num}}/hot + 300000000 + + + /clickhouse_data{{server_num}}/cold + 500000000 + + + + + + + hot_disk + + + cold_disk + + + 0.2 + + + \ No newline at end of file diff --git a/clickhouse/jinja-templates/volume.yml.jinja b/clickhouse/jinja-templates/volume.yml.jinja new file mode 100644 index 0000000..e9324d0 --- /dev/null +++ b/clickhouse/jinja-templates/volume.yml.jinja @@ -0,0 +1,4 @@ +clickhouse_server{{server_num}}_data: + driver: local +clickhouse_server{{server_num}}_TTL: + driver: local \ No newline at end of file diff --git a/clickhouse/node1-config/config.xml b/clickhouse/node1-config/config.xml index 4ced53d..261fe8d 100644 --- a/clickhouse/node1-config/config.xml +++ b/clickhouse/node1-config/config.xml @@ -20,5 +20,6 @@ /etc/clickhouse-server/config.d/macros.xml /etc/clickhouse-server/config.d/remote-servers.xml /etc/clickhouse-server/config.d/use-keeper.xml + /etc/clickhouse-server/config.d/storage-policy.xml \ No newline at end of file diff --git a/clickhouse/node1-config/remote-servers.xml b/clickhouse/node1-config/remote-servers.xml index a6a9edd..e708fb7 100644 --- a/clickhouse/node1-config/remote-servers.xml +++ b/clickhouse/node1-config/remote-servers.xml @@ -3,6 +3,7 @@ mysecretphrase + 1 true clickhouse-server1 diff --git a/clickhouse/node1-config/storage-policy.xml b/clickhouse/node1-config/storage-policy.xml new file mode 100644 index 0000000..41054e4 --- /dev/null +++ b/clickhouse/node1-config/storage-policy.xml @@ -0,0 +1,27 @@ + + + + + /clickhouse_data/server1/hot/ + + + /clickhouse_data/server1/cold/ + + + + + + + hot_disk + 1073741824 + + + cold_disk + 1073741824 + + + 0.2 + + + + \ No newline at end of file diff --git a/clickhouse/node2-config/config.xml b/clickhouse/node2-config/config.xml index 68d5b06..f2928c0 100644 --- a/clickhouse/node2-config/config.xml +++ b/clickhouse/node2-config/config.xml @@ -20,5 +20,6 @@ /etc/clickhouse-server/config.d/macros.xml /etc/clickhouse-server/config.d/remote-servers.xml /etc/clickhouse-server/config.d/use-keeper.xml + /etc/clickhouse-server/config.d/storage-policy.xml \ No newline at end of file diff --git a/clickhouse/node2-config/remote-servers.xml b/clickhouse/node2-config/remote-servers.xml index a6a9edd..e708fb7 100644 --- a/clickhouse/node2-config/remote-servers.xml +++ b/clickhouse/node2-config/remote-servers.xml @@ -3,6 +3,7 @@ mysecretphrase + 1 true clickhouse-server1 diff --git a/clickhouse/node2-config/storage-policy.xml b/clickhouse/node2-config/storage-policy.xml new file mode 100644 index 0000000..0691d22 --- /dev/null +++ b/clickhouse/node2-config/storage-policy.xml @@ -0,0 +1,27 @@ + + + + + /clickhouse_data/server2/hot/ + + + /clickhouse_data/server2/cold/ + + + + + + + hot_disk + 1073741824 + + + cold_disk + 1073741824 + + + 0.2 + + + + \ No newline at end of file diff --git a/preprocessing/README.md b/preprocessing/README.md index 069efee..8afb3aa 100644 --- a/preprocessing/README.md +++ b/preprocessing/README.md @@ -49,3 +49,4 @@ python pcap_processor.py -f C:/Users/akash/storage/Asu/sem3/dds/project/202310081400.pcap -s --stream_size 1000 +python pcap_procesor.py -c sample_output.csv -s --stream_size 1000 diff --git a/preprocessing/docker-compose.yml b/preprocessing/docker-compose.yml index 89b39e5..d42305b 100644 --- a/preprocessing/docker-compose.yml +++ b/preprocessing/docker-compose.yml @@ -50,10 +50,13 @@ services: aliases: - pcap_streamer volumes: - - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap" + # - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap" + - "./:/data/pcap" + - "./:/data/csv" environment: PCAP_FILE: /data/pcap/202310081400.pcap - command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"] + # command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"] + command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -c /data/csv/sample_output.csv -s --stream_size 1000"] deploy: replicas: 1 restart_policy: diff --git a/preprocessing/pcap_consumer_test.py b/preprocessing/pcap_consumer_test.py new file mode 100644 index 0000000..d2312df --- /dev/null +++ b/preprocessing/pcap_consumer_test.py @@ -0,0 +1,25 @@ +from kafka import KafkaConsumer +import json + +KAFKA_TOPIC = 'pcap_stream_new' # Use the topic name from your producer +KAFKA_SERVER = 'localhost:9092' # Ensure this matches your Kafka server configuration + +# Create a Kafka consumer +consumer = KafkaConsumer( + KAFKA_TOPIC, + bootstrap_servers=[KAFKA_SERVER], + value_deserializer=lambda x: x.decode('utf-8'),#json.loads(x.decode('utf-8')), + auto_offset_reset='earliest', # Ensures it starts reading from the beginning + enable_auto_commit=True +) + + + +print("Consuming messages from topic:", KAFKA_TOPIC) + +# Consume and print messages + +for message in consumer: + print(type(message)) + + diff --git a/preprocessing/pcap_processor.py b/preprocessing/pcap_processor.py index 0d70629..142b649 100644 --- a/preprocessing/pcap_processor.py +++ b/preprocessing/pcap_processor.py @@ -11,6 +11,8 @@ import json dbg_print = lambda *x: DEBUG and print(f"[DEBUG] {x}") + + class KafkaClient: def __init__(self, topic_name=None, mode='producer'): self.mode = mode @@ -19,7 +21,7 @@ class KafkaClient: self.client = KafkaProducer( bootstrap_servers=['kafka:9092'], max_request_size = 200000000, - api_version=(0,11,5), + #api_version=(0,11,5), value_serializer=lambda x: json.dumps(x).encode('utf-8')) elif mode == 'consumer' and topic_name is not None: self.client = KafkaConsumer( @@ -31,7 +33,7 @@ class KafkaClient: raise ValueError("Consumer mode requires a topic_name") # Kafka Configuration -KAFKA_TOPIC = 'pcap_stream' +KAFKA_TOPIC = 'pcap_stream_new' KAFKA_SERVER = 'kafka:9092' # Adjust to your Kafka server #KAFKA_SERVER = 'kafka_service:9092' @@ -112,6 +114,20 @@ def create_pkt_object(pkt: Packet) -> dict: return res_json +def row_to_dict(pkt_row: list) -> dict: + """make dict from CSV row""" + + return { + "time": float(pkt_row[0]), + "l4_proto": pkt_row[1], + "src_addr": pkt_row[2], + "dst_addr": pkt_row[3], + "src_port": int(pkt_row[4]), + "dst_port": int(pkt_row[5]), + "pkt_len": int(pkt_row[6]), + } + + def prep_csv(out_file: str): with open(out_file, "w", newline="") as csvfile: writer = csv.writer(csvfile) @@ -138,7 +154,8 @@ def pkts_write_csv(pkts: list, out_file: str): if __name__ == "__main__": argp = ArgumentParser() - argp.add_argument("-f", "--pcap_file", required=True, dest="_pcap") + argp.add_argument("-f", "--pcap_file", required=False, dest="_pcap") + argp.add_argument("-c", "--csv_file", required=False, dest="_csv") argp.add_argument("-o", "--out_file", required=False, dest="_out") argp.add_argument( "--stream_size", required=False, default=100000, dest="_streamsize" @@ -170,6 +187,7 @@ if __name__ == "__main__": args = argp.parse_args() pcap_file = args._pcap + csv_file = args._csv out_file = args._out streaming = args._stream sample = args._sample @@ -177,52 +195,62 @@ if __name__ == "__main__": DEBUG = args._debug sample_size = int(args._streamsize) # 100000 - batch_size = 10000 # 10000 + batch_size = 100 #100000 - pcap_rdr = PcapReader(pcap_file) - if not streaming: - assert args._out and args._out != "" - prep_csv(out_file) + # if preprocessed data ready for streaming + if csv_file: + #print("true") + with open(csv_file, newline="") as f: + csv_rdr = csv.reader(f) + next(csv_rdr) # skip headers + pkts = [] - pkts = [] - cnt = 0 - for idx, pkt in enumerate(pcap_rdr): - # filter packets - if not pkt_filter(pkt): - continue + for idx, row in enumerate(csv_rdr): + # direct streaming to kafka goes here + producer.client.send(KAFKA_TOPIC, row_to_dict(row)) + dbg_print(row_to_dict(row)) + print("streamed packet", idx) + if idx > sample_size: + break + print(f"total streamed: {idx}") + # otherwise, process packets + else: + pcap_rdr = PcapReader(pcap_file) if not streaming: - # write to file - pkts.append(pkt_extract(pkt)) + assert args._out and args._out != "" + prep_csv(out_file) - if sample and idx > sample_size: - break + pkts = [] + cnt = 0 + seen_count = 0 + for idx, pkt in enumerate(pcap_rdr): + seen_count += 1 + # filter packets + if not pkt_filter(pkt): + continue - if idx > 0 and idx % batch_size == 0: - pkts_write_csv(pkts, out_file) - pkts = [] - else: - # Kafka Configuration - KAFKA_TOPIC = "pcap_stream" - KAFKA_SERVER = "localhost:9092" # Adjust to your Kafka server - # KAFKA_SERVER = 'kafka_service:9092' + if not streaming: + # write to file + pkts.append(pkt_extract(pkt)) - # Initialize Kafka Producer - producer = KafkaProducer( - bootstrap_servers=KAFKA_SERVER, - # value_serializer=lambda v: json.dumps(v).encode('utf-8') # Encode data as JSON - value_serializer=lambda v: ( - v.encode("utf-8") if isinstance(v, str) else str(v).encode("utf-8") - ), # remove intermediate JSON encoding - ) + if sample and idx > sample_size: + break - packet_data = create_pkt_object(pkt) - producer.client.send(KAFKA_TOPIC, packet_data) - cnt += 1 - #print(f"streamed packet at index {idx} ") - if idx > sample_size: break - - print(f"total streamed: {cnt}") - # flush remaining - if not streaming and len(pkts) > 0: - pkts_write_csv(pkts, out_file) + if idx > 0 and idx % batch_size == 0: + pkts_write_csv(pkts, out_file) + pkts = [] + else: + # direct streaming to kafka goes here + packet_data = create_pkt_object(pkt) + producer.client.send(KAFKA_TOPIC, packet_data) + cnt += 1 + # print(f"streamed packet at index {idx} ") + if idx > sample_size: + break + + print(f"total seen: {seen_count-1}") + print(f"total streamed: {cnt}") + # flush remaining + if not streaming and len(pkts) > 0: + pkts_write_csv(pkts, out_file)