diff --git a/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml b/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml
deleted file mode 100644
index 08de94b..0000000
--- a/clickhouse/clickhouse_data/data/preprocessed_configs/config.xml
+++ /dev/null
@@ -1,61 +0,0 @@
-
-
-
-
- trace
- /var/log/clickhouse-keeper/clickhouse-keeper.log
- /var/log/clickhouse-keeper/clickhouse-keeper.err.log
- 1000M
- 3
-
- ::
-
- /var/lib/clickhouse/data/
- /var/lib/clickhouse/tmp/
- /var/lib/clickhouse/user_files/
- /var/lib/clickhouse/format_schemas/
-
-
- 9181
- 2
- /var/lib/clickhouse/coordination/log
- /var/lib/clickhouse/coordination/snapshots
-
- 10000
- 30000
- trace
-
-
-
- 1
- clickhouse-keeper1
- 9234
-
-
- 2
- clickhouse-keeper2
- 9234
-
-
- 3
- clickhouse-keeper3
- 9234
-
-
-
-
-
-
- 0.0.0.0
- 1
-
-
-
diff --git a/clickhouse/config_update_scripts/update_compose.py b/clickhouse/config_update_scripts/update_compose.py
new file mode 100644
index 0000000..9f29a20
--- /dev/null
+++ b/clickhouse/config_update_scripts/update_compose.py
@@ -0,0 +1,97 @@
+import yaml
+from jinja2 import Environment, FileSystemLoader
+import subprocess
+import json
+import xml.etree.ElementTree as ET
+import os
+
+if __name__ == "__main__":
+
+ # extracting details of each running container in json format
+ try:
+ all_services = subprocess.check_output(["docker","ps","--format","json"],text=True).split('\n')[:-1]
+ except subprocess.CalledProcessError as e:
+ print(f"Command failed with return code {e.returncode}")
+
+ all_services = [json.loads(s) for s in all_services]
+ # extracting the name, removing the custom id from it and storing it in a list
+ all_service_names = [service['Names'].split('.')[0] for service in all_services]
+ # extracting only 'keeper1', 'server1'...
+ all_service_names = [ name.split('-')[-1] for name in all_service_names]
+
+ # removing all keeepers
+ all_service_names.remove('keeper1')
+ all_service_names.remove('keeper2')
+ all_service_names.remove('keeper3')
+ curr_num_servers = sorted(all_service_names)[-1][-1]
+
+ replication_factor = 2
+ curr_num_shards = curr_num_servers/replication_factor
+
+ # new shard template that is gonna be added to remote servers file of each node
+ new_shard_str = f'''
+
+ true
+
+ clickhouse-server{curr_num_servers+1}
+ 9000
+
+
+ clickhouse-server{curr_num_servers+2}
+ 9000
+
+
+ '''
+ # extracting existing remote-servers file
+ with open('../node1-config/remote-servers.xml','r') as f:
+ curr_remote_servers_xml = ET.parse(f)
+
+ cluster_root = curr_remote_servers_xml.find('.//cluster_1S_2R')
+ new_shard_xml = ET.fromstring(new_shard_str)
+ cluster_root.append(new_shard_xml)
+
+ # creating folders for new servers that contain the configuration files
+ os.makedirs(f'../node{curr_num_servers+1}-config',exist_ok=True)
+ os.makedirs(f'../node{curr_num_servers+2}-config',exist_ok=True)
+
+ # adding the new shard to each remote-servers file
+ for i in range(1,curr_num_servers+3):
+ output_path = f'../node{i}-config/remote-servers.xml'
+ curr_remote_servers_xml.write(output_path, encoding='utf-8', xml_declaration=False)
+
+ env = Environment(loader=FileSystemLoader('.'))
+ service_template = env.get_template('service.yml.jinja')
+
+ # loading existing docker-compose file
+ with open('../docker-compose.yaml','r') as f:
+ compose_f = yaml.safe_load(f)
+
+ # rendering the new service
+ new_service1 = service_template.render(server_num=curr_num_servers+1)
+ new_service2 = service_template.render(server_num=curr_num_servers+2)
+
+ # adding the new service to docker-compose
+ compose_f['services'].update(new_service1)
+ compose_f['services'].update(new_service2)
+
+ if compose_f:
+ with open('../docker-compose.yaml','w') as yamlfile:
+ yaml.safe_dump(compose_f, yamlfile)
+
+ config_template = env.get_template('config.xml.jinja')
+ macros_template = env.get_template('macros.xml.jinja')
+ use_keeper_template = env.get_template('use-keeper.xml.jinja')
+
+ for i in range(1,3):
+ config_content = config_template.render(node_num=curr_num_servers+i)
+ with open(f'../node{curr_num_servers + i}-config/config.xml','w') as f1:
+ f1.write(config_content)
+
+ macros_content = macros_template.render(shard_num="0{curr_num_shards}",replica_num=i)
+ with open(f'../node{curr_num_servers + i}-config/macros.xml','w') as f2:
+ f2.write(macros_content)
+
+ use_keeper_content = use_keeper_template.render()
+ with open(f'../node{curr_num_servers + i}-config/use-keeper.xml','w') as f3:
+ f3.write(use_keeper_content)
+
diff --git a/clickhouse/docker-compose.yaml b/clickhouse/docker-compose.yaml
index 7841015..7ee9aea 100644
--- a/clickhouse/docker-compose.yaml
+++ b/clickhouse/docker-compose.yaml
@@ -1,5 +1,3 @@
-version: '3.8'
-
services:
clickhouse-keeper1:
image: clickhouse/clickhouse-server:latest
@@ -57,6 +55,7 @@ services:
container_name: clickhouse-server1
volumes:
- ./node1-config/:/etc/clickhouse-server/config.d/
+ - clickhouse_data1:/var/lib/clickhouse
networks:
clickhouse-server-network:
aliases:
@@ -64,6 +63,16 @@ services:
clickhouse-keeper-network:
aliases:
- clickhouse-server1
+ deploy:
+ replicas: 1
+ # placement:
+ # constraints: [node.labels.role == server]
+ update_config:
+ delay: 10s
+ # resources:
+ # limits:
+ # cpus: "0.50"
+ # memory: 100M
depends_on:
- clickhouse-keeper1
- clickhouse-keeper2
@@ -77,6 +86,7 @@ services:
container_name: clickhouse-server2
volumes:
- ./node2-config/:/etc/clickhouse-server/config.d/
+ - clickhouse_data2:/var/lib/clickhouse
networks:
clickhouse-server-network:
aliases:
@@ -84,6 +94,16 @@ services:
clickhouse-keeper-network:
aliases:
- clickhouse-server2
+ deploy:
+ replicas: 1
+ # placement:
+ # constraints: [node.labels.role == server]
+ update_config:
+ delay: 10s
+ # resources:
+ # limits:
+ # cpus: "0.50"
+ # memory: 100M
depends_on:
- clickhouse-keeper1
- clickhouse-keeper2
@@ -94,4 +114,12 @@ services:
networks:
clickhouse-server-network:
+ driver: overlay
clickhouse-keeper-network:
+ driver: overlay
+
+volumes:
+ clickhouse_data1:
+ driver: local
+ clickhouse_data2:
+ driver: local
\ No newline at end of file
diff --git a/clickhouse/jinja-templates/config.xml.jinja b/clickhouse/jinja-templates/config.xml.jinja
new file mode 100644
index 0000000..cd07efb
--- /dev/null
+++ b/clickhouse/jinja-templates/config.xml.jinja
@@ -0,0 +1,24 @@
+
+
+ debug
+ /var/log/clickhouse-server/clickhouse-server.log
+ /var/log/clickhouse-server/clickhouse-server.err.log
+ 1000M
+ 3
+
+ cluster_1S_2R node {{node_num}}
+ 0.0.0.0
+ 8123
+ 9000
+
+ 4096
+ 3
+ 100
+
+
+
+ /etc/clickhouse-server/config.d/macros.xml
+ /etc/clickhouse-server/config.d/remote-servers.xml
+ /etc/clickhouse-server/config.d/use-keeper.xml
+
+
\ No newline at end of file
diff --git a/clickhouse/jinja-templates/macros.xml.jinja b/clickhouse/jinja-templates/macros.xml.jinja
new file mode 100644
index 0000000..f7ade4c
--- /dev/null
+++ b/clickhouse/jinja-templates/macros.xml.jinja
@@ -0,0 +1,7 @@
+
+
+ {{shard_num}}
+ {{replica_num}}
+ cluster_1S_2R
+
+
\ No newline at end of file
diff --git a/clickhouse/jinja-templates/remote-servers.xml.jinja b/clickhouse/jinja-templates/remote-servers.xml.jinja
new file mode 100644
index 0000000..a6a9edd
--- /dev/null
+++ b/clickhouse/jinja-templates/remote-servers.xml.jinja
@@ -0,0 +1,18 @@
+
+
+
+ mysecretphrase
+
+ true
+
+ clickhouse-server1
+ 9000
+
+
+ clickhouse-server2
+ 9000
+
+
+
+
+
\ No newline at end of file
diff --git a/clickhouse/jinja-templates/service.yml.jinja b/clickhouse/jinja-templates/service.yml.jinja
new file mode 100644
index 0000000..1ae0414
--- /dev/null
+++ b/clickhouse/jinja-templates/service.yml.jinja
@@ -0,0 +1,30 @@
+clickhouse-server{{server_num}}:
+ image: clickhouse/clickhouse-server:latest
+ container_name: clickhouse-server{{server_num}}
+ volumes:
+ - ./node{{server_num}}-config/:/etc/clickhouse-server/config.d/
+ - clickhouse_data{{server_num}}:/var/lib/clickhouse
+ networks:
+ clickhouse-server-network:
+ aliases:
+ - clickhouse-server{{server_num}}
+ clickhouse-keeper-network:
+ aliases:
+ - clickhouse-server{{server_num}}
+ deploy:
+ replicas: 1
+ # placement:
+ # constraints: [node.labels.role == server]
+ update_config:
+ delay: 10s
+ # resources:
+ # limits:
+ # cpus: "0.50"
+ # memory: 100M
+ depends_on:
+ - clickhouse-keeper1
+ - clickhouse-keeper2
+ - clickhouse-keeper3
+ ports:
+ - "900{{server_num}}:9000" # Native client port
+ - "8123:8123" # HTTP interface
diff --git a/clickhouse/jinja-templates/use-keeper.xml.jinja b/clickhouse/jinja-templates/use-keeper.xml.jinja
new file mode 100644
index 0000000..2b384dc
--- /dev/null
+++ b/clickhouse/jinja-templates/use-keeper.xml.jinja
@@ -0,0 +1,17 @@
+
+
+
+
+ clickhouse-keeper1
+ 9181
+
+
+ clickhouse-keeper2
+ 9181
+
+
+ clickhouse-keeper3
+ 9181
+
+
+
\ No newline at end of file
diff --git a/preprocessing/Dockerfile.python b/preprocessing/Dockerfile.python
index bba0ca6..b4b0742 100644
--- a/preprocessing/Dockerfile.python
+++ b/preprocessing/Dockerfile.python
@@ -16,5 +16,5 @@ COPY pcap_processor.py /app
# Expose the port Kafka uses (optional, for communication with other services)
EXPOSE 9092
-# Command to run your Python application
+# Command to allow custom runtime arguments
CMD ["python", "pcap_processor.py"]
diff --git a/preprocessing/README.md b/preprocessing/README.md
index bfd5208..069efee 100644
--- a/preprocessing/README.md
+++ b/preprocessing/README.md
@@ -48,4 +48,4 @@
- -d or --debug: boolean value indicating if program is run in debug mode
-python pcap_processor.py -f C:/Users/akash/storage/Asu/sem3/dds/project/202310081400.pcap -s --sample-size 1000
+python pcap_processor.py -f C:/Users/akash/storage/Asu/sem3/dds/project/202310081400.pcap -s --stream_size 1000
diff --git a/preprocessing/docker-compose.yml b/preprocessing/docker-compose.yml
new file mode 100644
index 0000000..89b39e5
--- /dev/null
+++ b/preprocessing/docker-compose.yml
@@ -0,0 +1,69 @@
+version: '3.8'
+
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:latest
+ networks:
+ kafka_network:
+ aliases:
+ - zookeeper
+ deploy:
+ replicas: 1
+ restart_policy:
+ condition: on-failure
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ports:
+ - "2181:2181"
+
+ kafka:
+ image: confluentinc/cp-kafka:latest
+ depends_on:
+ - zookeeper
+ environment:
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
+ KAFKA_BROKER_ID: 1
+ KAFKA_MESSAGE_MAX_BYTES: 200000000
+ KAFKA_REPLICA_FETCH_MAX_BYTES: 200000000
+ networks:
+ kafka_network:
+ aliases:
+ - kafka
+ ports:
+ - "9092:9092"
+ volumes:
+ - kafka_data:/var/lib/kafka/data
+ deploy:
+ replicas: 1
+ restart_policy:
+ condition: on-failure
+
+ pcap_streamer:
+ image: levenshtein/streamer_test3:latest
+ depends_on:
+ - kafka
+ networks:
+ kafka_network:
+ aliases:
+ - pcap_streamer
+ volumes:
+ - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap"
+ environment:
+ PCAP_FILE: /data/pcap/202310081400.pcap
+ command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"]
+ deploy:
+ replicas: 1
+ restart_policy:
+ condition: on-failure
+
+networks:
+ kafka_network:
+ driver: overlay
+ attachable: true
+
+volumes:
+ kafka_data:
+ driver: local
\ No newline at end of file
diff --git a/preprocessing/pcap_processor.py b/preprocessing/pcap_processor.py
index 8662f2d..0d70629 100644
--- a/preprocessing/pcap_processor.py
+++ b/preprocessing/pcap_processor.py
@@ -6,10 +6,42 @@ from scapy.packet import Packet
from scapy.utils import PcapReader
from scapy.layers.inet import IP, TCP, UDP
-from kafka import KafkaProducer
+from kafka import KafkaProducer, KafkaConsumer
+import json
dbg_print = lambda *x: DEBUG and print(f"[DEBUG] {x}")
+class KafkaClient:
+ def __init__(self, topic_name=None, mode='producer'):
+ self.mode = mode
+ self.topic_name = topic_name
+ if mode == 'producer':
+ self.client = KafkaProducer(
+ bootstrap_servers=['kafka:9092'],
+ max_request_size = 200000000,
+ api_version=(0,11,5),
+ value_serializer=lambda x: json.dumps(x).encode('utf-8'))
+ elif mode == 'consumer' and topic_name is not None:
+ self.client = KafkaConsumer(
+ topic_name,
+ bootstrap_servers=['localhost:9092'],
+ api_version=(0,11,5),
+ value_deserializer=lambda x: json.loads(x.decode('utf-8')))
+ else:
+ raise ValueError("Consumer mode requires a topic_name")
+
+# Kafka Configuration
+KAFKA_TOPIC = 'pcap_stream'
+KAFKA_SERVER = 'kafka:9092' # Adjust to your Kafka server
+#KAFKA_SERVER = 'kafka_service:9092'
+
+# Initialize Kafka Producer
+# producer = KafkaProducer(
+# bootstrap_servers=KAFKA_SERVER,
+# value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else str(v).encode('utf-8') #remove intermediate JSON encoding
+# )
+producer = KafkaClient(topic_name=KAFKA_TOPIC)
+
def pkt_filter(pkt: Packet) -> bool:
"""filter to include/exclude a packet"""
@@ -153,6 +185,7 @@ if __name__ == "__main__":
prep_csv(out_file)
pkts = []
+ cnt = 0
for idx, pkt in enumerate(pcap_rdr):
# filter packets
if not pkt_filter(pkt):
@@ -184,11 +217,12 @@ if __name__ == "__main__":
)
packet_data = create_pkt_object(pkt)
- producer.send(KAFKA_TOPIC, packet_data)
- print(f"streamed packet at index {idx} ")
- if idx > sample_size:
- break
-
+ producer.client.send(KAFKA_TOPIC, packet_data)
+ cnt += 1
+ #print(f"streamed packet at index {idx} ")
+ if idx > sample_size: break
+
+ print(f"total streamed: {cnt}")
# flush remaining
if not streaming and len(pkts) > 0:
pkts_write_csv(pkts, out_file)