diff --git a/clickhouse/ddl/common/table_create.sql b/clickhouse/ddl/common/01_table_create.sql similarity index 93% rename from clickhouse/ddl/common/table_create.sql rename to clickhouse/ddl/common/01_table_create.sql index 06fc528..0f90004 100644 --- a/clickhouse/ddl/common/table_create.sql +++ b/clickhouse/ddl/common/01_table_create.sql @@ -18,8 +18,8 @@ SETTINGS storage_policy = 'hot_cold'; CREATE TABLE ip_region_map ( ip_range_start IPv4, - ip_range_end IPv4, - region String, + ip_range_end IPv4, + region LowCardinality(String), INDEX region_idx region TYPE bloom_filter ) ENGINE = ReplicatedMergeTree( '/clickhouse/tables/{shard}/ip_region_map', diff --git a/clickhouse/ddl/main/01_table_create.sql b/clickhouse/ddl/main/01_table_create.sql new file mode 100644 index 0000000..0f90004 --- /dev/null +++ b/clickhouse/ddl/main/01_table_create.sql @@ -0,0 +1,28 @@ +-- local table creation +CREATE TABLE traffic_records ( + time_stamp DateTime64 (6, 'Japan') CODEC (Delta, ZSTD), + l4_protocol Enum8 ('TCP' = 1, 'UDP' = 2), + src_ip IPv4, + dst_ip IPv4, + src_port UInt16 CODEC (ZSTD), + dst_port UInt16 CODEC (ZSTD), + pkt_len UInt16 CODEC (ZSTD), + INDEX port_idx src_port TYPE bloom_filter GRANULARITY 10 +) ENGINE = ReplicatedMergeTree( + '/clickhouse/tables/{shard}/traffic_records', + '{replica}' +) +ORDER BY time_stamp +TTL toDateTime(time_stamp) + INTERVAL 15 DAY TO VOLUME 'cold_vol' +SETTINGS storage_policy = 'hot_cold'; + +CREATE TABLE ip_region_map ( + ip_range_start IPv4, + ip_range_end IPv4, + region LowCardinality(String), + INDEX region_idx region TYPE bloom_filter +) ENGINE = ReplicatedMergeTree( + '/clickhouse/tables/{shard}/ip_region_map', + '{replica}' +) +ORDER BY ip_range_start; \ No newline at end of file diff --git a/clickhouse/ddl/distr/table_create.sql b/clickhouse/ddl/main/02_dist_table_create.sql similarity index 100% rename from clickhouse/ddl/distr/table_create.sql rename to clickhouse/ddl/main/02_dist_table_create.sql diff --git a/clickhouse/ddl/main/03_create_kafka_table.sql b/clickhouse/ddl/main/03_create_kafka_table.sql new file mode 100644 index 0000000..b320fe0 --- /dev/null +++ b/clickhouse/ddl/main/03_create_kafka_table.sql @@ -0,0 +1,22 @@ +CREATE TABLE traffic_records_kafka_queue ( + time Float64, + l4_proto String, + src_addr String, + dst_addr String, + src_port UInt16, + dst_port UInt16, + pkt_len UInt32 +) ENGINE = Kafka() SETTINGS kafka_broker_list = 'kafka:9092', +kafka_topic_list = 'traffic_records_stream', +kafka_group_name = 'clickhouse_consumer', +kafka_format = 'JSONEachRow', +kafka_num_consumers = 1; +CREATE MATERIALIZED VIEW traffic_records_kafka_view TO traffic_records_all AS +SELECT time AS time_stamp, + l4_proto AS l4_protocol, + src_addr AS src_ip, + dst_addr AS dst_ip, + src_port, + dst_port, + pkt_len +FROM traffic_records_kafka_queue; \ No newline at end of file diff --git a/clickhouse/docker-compose.yaml b/clickhouse/docker-compose.yaml index 5eee4a4..4ba1a62 100644 --- a/clickhouse/docker-compose.yaml +++ b/clickhouse/docker-compose.yaml @@ -41,8 +41,7 @@ services: container_name: clickhouse-server1 volumes: - ../clickhouse/node1-config/:/etc/clickhouse-server/config.d/ - - ../clickhouse/ddl/common/table_create.sql:/docker-entrypoint-initdb.d/common_table_create.sql - - ../clickhouse/ddl/distr/table_create.sql:/docker-entrypoint-initdb.d/distr_table_create.sql + - ../clickhouse/ddl/main:/docker-entrypoint-initdb.d - clickhouse_server1_data:/var/lib/clickhouse - clickhouse_server1_TTL:/clickhouse_data/server1 networks: @@ -79,7 +78,7 @@ services: container_name: clickhouse-server2 volumes: - ../clickhouse/node2-config/:/etc/clickhouse-server/config.d/ - - ../clickhouse/ddl/common/table_create.sql:/docker-entrypoint-initdb.d/common_table_create.sql + - ../clickhouse/ddl/common:/docker-entrypoint-initdb.d - clickhouse_server2_data:/var/lib/clickhouse - clickhouse_server2_TTL:/clickhouse_data/server2 networks: diff --git a/preprocessing/docker-compose.yml b/preprocessing/docker-compose.yml index 72515ea..a587882 100644 --- a/preprocessing/docker-compose.yml +++ b/preprocessing/docker-compose.yml @@ -28,6 +28,8 @@ services: KAFKA_BROKER_ID: 1 KAFKA_MESSAGE_MAX_BYTES: 200000000 KAFKA_REPLICA_FETCH_MAX_BYTES: 200000000 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + command: sh -c "/etc/confluent/docker/run && kafka-topics --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic traffic_records_stream" networks: data-network: aliases: diff --git a/preprocessing/pcap_processor.py b/preprocessing/pcap_processor.py index 0fd40af..11a15fd 100644 --- a/preprocessing/pcap_processor.py +++ b/preprocessing/pcap_processor.py @@ -12,36 +12,33 @@ import json dbg_print = lambda *x: DEBUG and print(f"[DEBUG] {x}") +# Kafka Configuration +KAFKA_TOPIC = "traffic_records_stream" +KAFKA_SERVER = "kafka:9092" # Adjust to your Kafka server + class KafkaClient: - def __init__(self, topic_name=None, mode='producer'): + def __init__(self, topic_name=None, mode="producer"): self.mode = mode self.topic_name = topic_name - if mode == 'producer': + if mode == "producer": self.client = KafkaProducer( - bootstrap_servers=['kafka:9092'], - max_request_size = 200000000, - #api_version=(0,11,5), - value_serializer=lambda x: json.dumps(x).encode('utf-8')) - elif mode == 'consumer' and topic_name is not None: + bootstrap_servers=[KAFKA_SERVER], + max_request_size=200000000, + # api_version=(0,11,5), + value_serializer=lambda x: json.dumps(x).encode("utf-8"), + ) + elif mode == "consumer" and topic_name is not None: self.client = KafkaConsumer( - topic_name, - bootstrap_servers=['localhost:9092'], - api_version=(0,11,5), - value_deserializer=lambda x: json.loads(x.decode('utf-8'))) + topic_name, + bootstrap_servers=["localhost:9092"], + api_version=(0, 11, 5), + value_deserializer=lambda x: json.loads(x.decode("utf-8")), + ) else: raise ValueError("Consumer mode requires a topic_name") -# Kafka Configuration -KAFKA_TOPIC = 'pcap_stream_new' -KAFKA_SERVER = 'kafka:9092' # Adjust to your Kafka server -#KAFKA_SERVER = 'kafka_service:9092' -# Initialize Kafka Producer -# producer = KafkaProducer( -# bootstrap_servers=KAFKA_SERVER, -# value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else str(v).encode('utf-8') #remove intermediate JSON encoding -# ) producer = KafkaClient(topic_name=KAFKA_TOPIC) @@ -108,7 +105,7 @@ def create_pkt_object(pkt: Packet) -> dict: "dst_addr": pkt[IP].dst, "src_port": pkt[l4_proto].sport, "dst_port": pkt[l4_proto].dport, - "pkt_len": len(pkt) + "pkt_len": len(pkt), } return res_json @@ -157,7 +154,9 @@ if __name__ == "__main__": argp.add_argument("-f", "--pcap_file", required=False, dest="_pcap") argp.add_argument("-c", "--csv_file", required=False, dest="_csv") argp.add_argument("-o", "--out_file", required=False, dest="_out") - argp.add_argument("--stream_size", required=False, default=10000, dest="_streamsize") + argp.add_argument( + "--stream_size", required=False, default=10000, dest="_streamsize" + ) argp.add_argument( "-x", "--sample", @@ -193,21 +192,21 @@ if __name__ == "__main__": DEBUG = args._debug sample_size = int(args._streamsize) # 100000 - batch_size = 100 #100000 + batch_size = 100 # 100000 # if preprocessed data ready for streaming if csv_file: - #print("true") with open(csv_file, newline="") as f: csv_rdr = csv.reader(f) next(csv_rdr) # skip headers pkts = [] + print("started stream from csv") for idx, row in enumerate(csv_rdr): # direct streaming to kafka goes here producer.client.send(KAFKA_TOPIC, row_to_dict(row)) dbg_print(row_to_dict(row)) - print("streamed packet", idx) + dbg_print("streamed packet", idx) if sample and idx > sample_size: break print(f"total streamed: {idx}") @@ -222,6 +221,8 @@ if __name__ == "__main__": pkts = [] cnt = 0 seen_count = 0 + + print("started stream from pcap") for idx, pkt in enumerate(pcap_rdr): seen_count += 1 # filter packets @@ -243,8 +244,9 @@ if __name__ == "__main__": packet_data = create_pkt_object(pkt) producer.client.send(KAFKA_TOPIC, packet_data) cnt += 1 - #print(f"streamed packet at index {idx} ") - if idx > sample_size: break + # print(f"streamed packet at index {idx} ") + if idx > sample_size: + break print(f"total seen: {seen_count-1}") print(f"total streamed: {cnt}") diff --git a/scripts/deploy.ps1 b/scripts/deploy.ps1 index cea301b..c5fe9cd 100644 --- a/scripts/deploy.ps1 +++ b/scripts/deploy.ps1 @@ -18,14 +18,11 @@ if ($downStack) { elseif ($MasterNode) { Write-Output "[+] swarm master" - # cleanup - docker stack rm $stackName - docker service rm registry - # data streaming Set-Location $scriptDir/../preprocessing docker service create --name registry -p 5000:5000 registry:2 - docker build -t 127.0.0.1:5000/data-streamer:latest --no-cache --push -f Dockerfile.python . + # docker build -t 127.0.0.1:5000/data-streamer:latest --no-cache --push -f Dockerfile.python . + docker build -t 127.0.0.1:5000/data-streamer:latest --push -f Dockerfile.python . # execute Set-Location $scriptDir