Merge branch 'main' into preprocessing

This commit is contained in:
Kaushik Narayan R 2024-11-25 13:21:52 -07:00
commit 6884fc681d
12 changed files with 334 additions and 71 deletions

View File

@ -1,61 +0,0 @@
<!-- This file was generated automatically.
Do not edit it: it is likely to be discarded and generated again before it's read next time.
Files used to generate this file:
/etc/clickhouse-server/config.xml
/etc/clickhouse-server/config.d/docker_related_config.xml -->
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-keeper/clickhouse-keeper.log</log>
<errorlog>/var/log/clickhouse-keeper/clickhouse-keeper.err.log</errorlog>
<size>1000M</size>
<count>3</count>
</logger>
<listen_host>::</listen_host>
<path>/var/lib/clickhouse/data/</path>
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
<user_files_path>/var/lib/clickhouse/user_files/</user_files_path>
<format_schema_path>/var/lib/clickhouse/format_schemas/</format_schema_path>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>clickhouse-keeper1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>clickhouse-keeper2</hostname>
<port>9234</port>
</server>
<server>
<id>3</id>
<hostname>clickhouse-keeper3</hostname>
<port>9234</port>
</server>
</raft_configuration>
</keeper_server>
<!-- Listen wildcard address to allow accepting connections from other containers and host network. -->
<listen_host>0.0.0.0</listen_host>
<listen_try>1</listen_try>
<!--
<logger>
<console>1</console>
</logger>
-->
</yandex>

View File

@ -0,0 +1,97 @@
import yaml
from jinja2 import Environment, FileSystemLoader
import subprocess
import json
import xml.etree.ElementTree as ET
import os
if __name__ == "__main__":
# extracting details of each running container in json format
try:
all_services = subprocess.check_output(["docker","ps","--format","json"],text=True).split('\n')[:-1]
except subprocess.CalledProcessError as e:
print(f"Command failed with return code {e.returncode}")
all_services = [json.loads(s) for s in all_services]
# extracting the name, removing the custom id from it and storing it in a list
all_service_names = [service['Names'].split('.')[0] for service in all_services]
# extracting only 'keeper1', 'server1'...
all_service_names = [ name.split('-')[-1] for name in all_service_names]
# removing all keeepers
all_service_names.remove('keeper1')
all_service_names.remove('keeper2')
all_service_names.remove('keeper3')
curr_num_servers = sorted(all_service_names)[-1][-1]
replication_factor = 2
curr_num_shards = curr_num_servers/replication_factor
# new shard template that is gonna be added to remote servers file of each node
new_shard_str = f'''
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>clickhouse-server{curr_num_servers+1}</host>
<port>9000</port>
</replica>
<replica>
<host>clickhouse-server{curr_num_servers+2}</host>
<port>9000</port>
</replica>
</shard>
'''
# extracting existing remote-servers file
with open('../node1-config/remote-servers.xml','r') as f:
curr_remote_servers_xml = ET.parse(f)
cluster_root = curr_remote_servers_xml.find('.//cluster_1S_2R')
new_shard_xml = ET.fromstring(new_shard_str)
cluster_root.append(new_shard_xml)
# creating folders for new servers that contain the configuration files
os.makedirs(f'../node{curr_num_servers+1}-config',exist_ok=True)
os.makedirs(f'../node{curr_num_servers+2}-config',exist_ok=True)
# adding the new shard to each remote-servers file
for i in range(1,curr_num_servers+3):
output_path = f'../node{i}-config/remote-servers.xml'
curr_remote_servers_xml.write(output_path, encoding='utf-8', xml_declaration=False)
env = Environment(loader=FileSystemLoader('.'))
service_template = env.get_template('service.yml.jinja')
# loading existing docker-compose file
with open('../docker-compose.yaml','r') as f:
compose_f = yaml.safe_load(f)
# rendering the new service
new_service1 = service_template.render(server_num=curr_num_servers+1)
new_service2 = service_template.render(server_num=curr_num_servers+2)
# adding the new service to docker-compose
compose_f['services'].update(new_service1)
compose_f['services'].update(new_service2)
if compose_f:
with open('../docker-compose.yaml','w') as yamlfile:
yaml.safe_dump(compose_f, yamlfile)
config_template = env.get_template('config.xml.jinja')
macros_template = env.get_template('macros.xml.jinja')
use_keeper_template = env.get_template('use-keeper.xml.jinja')
for i in range(1,3):
config_content = config_template.render(node_num=curr_num_servers+i)
with open(f'../node{curr_num_servers + i}-config/config.xml','w') as f1:
f1.write(config_content)
macros_content = macros_template.render(shard_num="0{curr_num_shards}",replica_num=i)
with open(f'../node{curr_num_servers + i}-config/macros.xml','w') as f2:
f2.write(macros_content)
use_keeper_content = use_keeper_template.render()
with open(f'../node{curr_num_servers + i}-config/use-keeper.xml','w') as f3:
f3.write(use_keeper_content)

View File

@ -1,5 +1,3 @@
version: '3.8'
services:
clickhouse-keeper1:
image: clickhouse/clickhouse-server:latest
@ -57,6 +55,7 @@ services:
container_name: clickhouse-server1
volumes:
- ./node1-config/:/etc/clickhouse-server/config.d/
- clickhouse_data1:/var/lib/clickhouse
networks:
clickhouse-server-network:
aliases:
@ -64,6 +63,16 @@ services:
clickhouse-keeper-network:
aliases:
- clickhouse-server1
deploy:
replicas: 1
# placement:
# constraints: [node.labels.role == server]
update_config:
delay: 10s
# resources:
# limits:
# cpus: "0.50"
# memory: 100M
depends_on:
- clickhouse-keeper1
- clickhouse-keeper2
@ -77,6 +86,7 @@ services:
container_name: clickhouse-server2
volumes:
- ./node2-config/:/etc/clickhouse-server/config.d/
- clickhouse_data2:/var/lib/clickhouse
networks:
clickhouse-server-network:
aliases:
@ -84,6 +94,16 @@ services:
clickhouse-keeper-network:
aliases:
- clickhouse-server2
deploy:
replicas: 1
# placement:
# constraints: [node.labels.role == server]
update_config:
delay: 10s
# resources:
# limits:
# cpus: "0.50"
# memory: 100M
depends_on:
- clickhouse-keeper1
- clickhouse-keeper2
@ -94,4 +114,12 @@ services:
networks:
clickhouse-server-network:
driver: overlay
clickhouse-keeper-network:
driver: overlay
volumes:
clickhouse_data1:
driver: local
clickhouse_data2:
driver: local

View File

@ -0,0 +1,24 @@
<clickhouse>
<logger>
<level>debug</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>3</count>
</logger>
<display_name>cluster_1S_2R node {{node_num}}</display_name>
<listen_host>0.0.0.0</listen_host>
<http_port>8123</http_port>
<tcp_port>9000</tcp_port>
<!-- Maximum connections and settings -->
<max_connections>4096</max_connections>
<keep_alive_timeout>3</keep_alive_timeout>
<max_concurrent_queries>100</max_concurrent_queries>
<!-- Additional configuration files can be included -->
<include_from>/etc/clickhouse-server/config.d/macros.xml</include_from>
<include_from>/etc/clickhouse-server/config.d/remote-servers.xml</include_from>
<include_from>/etc/clickhouse-server/config.d/use-keeper.xml</include_from>
<!-- <include_from>/etc/clickhouse-server/config.d/keeper-config.xml</include_from> -->
</clickhouse>

View File

@ -0,0 +1,7 @@
<clickhouse>
<macros>
<shard>{{shard_num}}</shard>
<replica>{{replica_num}}</replica>
<cluster>cluster_1S_2R</cluster>
</macros>
</clickhouse>

View File

@ -0,0 +1,18 @@
<clickhouse>
<remote_servers replace="true">
<cluster_1S_2R>
<secret>mysecretphrase</secret>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>clickhouse-server1</host>
<port>9000</port>
</replica>
<replica>
<host>clickhouse-server2</host>
<port>9000</port>
</replica>
</shard>
</cluster_1S_2R>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,30 @@
clickhouse-server{{server_num}}:
image: clickhouse/clickhouse-server:latest
container_name: clickhouse-server{{server_num}}
volumes:
- ./node{{server_num}}-config/:/etc/clickhouse-server/config.d/
- clickhouse_data{{server_num}}:/var/lib/clickhouse
networks:
clickhouse-server-network:
aliases:
- clickhouse-server{{server_num}}
clickhouse-keeper-network:
aliases:
- clickhouse-server{{server_num}}
deploy:
replicas: 1
# placement:
# constraints: [node.labels.role == server]
update_config:
delay: 10s
# resources:
# limits:
# cpus: "0.50"
# memory: 100M
depends_on:
- clickhouse-keeper1
- clickhouse-keeper2
- clickhouse-keeper3
ports:
- "900{{server_num}}:9000" # Native client port
- "8123:8123" # HTTP interface

View File

@ -0,0 +1,17 @@
<clickhouse>
<zookeeper>
<!-- where are the ZK nodes -->
<node>
<host>clickhouse-keeper1</host>
<port>9181</port>
</node>
<node>
<host>clickhouse-keeper2</host>
<port>9181</port>
</node>
<node>
<host>clickhouse-keeper3</host>
<port>9181</port>
</node>
</zookeeper>
</clickhouse>

View File

@ -16,5 +16,5 @@ COPY pcap_processor.py /app
# Expose the port Kafka uses (optional, for communication with other services)
EXPOSE 9092
# Command to run your Python application
# Command to allow custom runtime arguments
CMD ["python", "pcap_processor.py"]

View File

@ -48,4 +48,4 @@
- -d or --debug: boolean value indicating if program is run in debug mode
python pcap_processor.py -f C:/Users/akash/storage/Asu/sem3/dds/project/202310081400.pcap -s --sample-size 1000
python pcap_processor.py -f C:/Users/akash/storage/Asu/sem3/dds/project/202310081400.pcap -s --stream_size 1000

View File

@ -0,0 +1,69 @@
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
networks:
kafka_network:
aliases:
- zookeeper
deploy:
replicas: 1
restart_policy:
condition: on-failure
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_MESSAGE_MAX_BYTES: 200000000
KAFKA_REPLICA_FETCH_MAX_BYTES: 200000000
networks:
kafka_network:
aliases:
- kafka
ports:
- "9092:9092"
volumes:
- kafka_data:/var/lib/kafka/data
deploy:
replicas: 1
restart_policy:
condition: on-failure
pcap_streamer:
image: levenshtein/streamer_test3:latest
depends_on:
- kafka
networks:
kafka_network:
aliases:
- pcap_streamer
volumes:
- "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap"
environment:
PCAP_FILE: /data/pcap/202310081400.pcap
command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"]
deploy:
replicas: 1
restart_policy:
condition: on-failure
networks:
kafka_network:
driver: overlay
attachable: true
volumes:
kafka_data:
driver: local

View File

@ -6,10 +6,42 @@ from scapy.packet import Packet
from scapy.utils import PcapReader
from scapy.layers.inet import IP, TCP, UDP
from kafka import KafkaProducer
from kafka import KafkaProducer, KafkaConsumer
import json
dbg_print = lambda *x: DEBUG and print(f"[DEBUG] {x}")
class KafkaClient:
def __init__(self, topic_name=None, mode='producer'):
self.mode = mode
self.topic_name = topic_name
if mode == 'producer':
self.client = KafkaProducer(
bootstrap_servers=['kafka:9092'],
max_request_size = 200000000,
api_version=(0,11,5),
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
elif mode == 'consumer' and topic_name is not None:
self.client = KafkaConsumer(
topic_name,
bootstrap_servers=['localhost:9092'],
api_version=(0,11,5),
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
else:
raise ValueError("Consumer mode requires a topic_name")
# Kafka Configuration
KAFKA_TOPIC = 'pcap_stream'
KAFKA_SERVER = 'kafka:9092' # Adjust to your Kafka server
#KAFKA_SERVER = 'kafka_service:9092'
# Initialize Kafka Producer
# producer = KafkaProducer(
# bootstrap_servers=KAFKA_SERVER,
# value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else str(v).encode('utf-8') #remove intermediate JSON encoding
# )
producer = KafkaClient(topic_name=KAFKA_TOPIC)
def pkt_filter(pkt: Packet) -> bool:
"""filter to include/exclude a packet"""
@ -153,6 +185,7 @@ if __name__ == "__main__":
prep_csv(out_file)
pkts = []
cnt = 0
for idx, pkt in enumerate(pcap_rdr):
# filter packets
if not pkt_filter(pkt):
@ -184,11 +217,12 @@ if __name__ == "__main__":
)
packet_data = create_pkt_object(pkt)
producer.send(KAFKA_TOPIC, packet_data)
print(f"streamed packet at index {idx} ")
if idx > sample_size:
break
producer.client.send(KAFKA_TOPIC, packet_data)
cnt += 1
#print(f"streamed packet at index {idx} ")
if idx > sample_size: break
print(f"total streamed: {cnt}")
# flush remaining
if not streaming and len(pkts) > 0:
pkts_write_csv(pkts, out_file)