kafka ch integration done

This commit is contained in:
Kaushik Narayan R 2024-11-27 16:24:14 -07:00
parent 6fb0ff4b4c
commit 75bca2d704
8 changed files with 87 additions and 37 deletions

View File

@ -18,8 +18,8 @@ SETTINGS storage_policy = 'hot_cold';
CREATE TABLE ip_region_map ( CREATE TABLE ip_region_map (
ip_range_start IPv4, ip_range_start IPv4,
ip_range_end IPv4, ip_range_end IPv4,
region String, region LowCardinality(String),
INDEX region_idx region TYPE bloom_filter INDEX region_idx region TYPE bloom_filter
) ENGINE = ReplicatedMergeTree( ) ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/{shard}/ip_region_map', '/clickhouse/tables/{shard}/ip_region_map',

View File

@ -0,0 +1,28 @@
-- local table creation
CREATE TABLE traffic_records (
time_stamp DateTime64 (6, 'Japan') CODEC (Delta, ZSTD),
l4_protocol Enum8 ('TCP' = 1, 'UDP' = 2),
src_ip IPv4,
dst_ip IPv4,
src_port UInt16 CODEC (ZSTD),
dst_port UInt16 CODEC (ZSTD),
pkt_len UInt16 CODEC (ZSTD),
INDEX port_idx src_port TYPE bloom_filter GRANULARITY 10
) ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/{shard}/traffic_records',
'{replica}'
)
ORDER BY time_stamp
TTL toDateTime(time_stamp) + INTERVAL 15 DAY TO VOLUME 'cold_vol'
SETTINGS storage_policy = 'hot_cold';
CREATE TABLE ip_region_map (
ip_range_start IPv4,
ip_range_end IPv4,
region LowCardinality(String),
INDEX region_idx region TYPE bloom_filter
) ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/{shard}/ip_region_map',
'{replica}'
)
ORDER BY ip_range_start;

View File

@ -0,0 +1,22 @@
CREATE TABLE traffic_records_kafka_queue (
time Float64,
l4_proto String,
src_addr String,
dst_addr String,
src_port UInt16,
dst_port UInt16,
pkt_len UInt32
) ENGINE = Kafka() SETTINGS kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'traffic_records_stream',
kafka_group_name = 'clickhouse_consumer',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 1;
CREATE MATERIALIZED VIEW traffic_records_kafka_view TO traffic_records_all AS
SELECT time AS time_stamp,
l4_proto AS l4_protocol,
src_addr AS src_ip,
dst_addr AS dst_ip,
src_port,
dst_port,
pkt_len
FROM traffic_records_kafka_queue;

View File

@ -41,8 +41,7 @@ services:
container_name: clickhouse-server1 container_name: clickhouse-server1
volumes: volumes:
- ../clickhouse/node1-config/:/etc/clickhouse-server/config.d/ - ../clickhouse/node1-config/:/etc/clickhouse-server/config.d/
- ../clickhouse/ddl/common/table_create.sql:/docker-entrypoint-initdb.d/common_table_create.sql - ../clickhouse/ddl/main:/docker-entrypoint-initdb.d
- ../clickhouse/ddl/distr/table_create.sql:/docker-entrypoint-initdb.d/distr_table_create.sql
- clickhouse_server1_data:/var/lib/clickhouse - clickhouse_server1_data:/var/lib/clickhouse
- clickhouse_server1_TTL:/clickhouse_data/server1 - clickhouse_server1_TTL:/clickhouse_data/server1
networks: networks:
@ -79,7 +78,7 @@ services:
container_name: clickhouse-server2 container_name: clickhouse-server2
volumes: volumes:
- ../clickhouse/node2-config/:/etc/clickhouse-server/config.d/ - ../clickhouse/node2-config/:/etc/clickhouse-server/config.d/
- ../clickhouse/ddl/common/table_create.sql:/docker-entrypoint-initdb.d/common_table_create.sql - ../clickhouse/ddl/common:/docker-entrypoint-initdb.d
- clickhouse_server2_data:/var/lib/clickhouse - clickhouse_server2_data:/var/lib/clickhouse
- clickhouse_server2_TTL:/clickhouse_data/server2 - clickhouse_server2_TTL:/clickhouse_data/server2
networks: networks:

View File

@ -28,6 +28,8 @@ services:
KAFKA_BROKER_ID: 1 KAFKA_BROKER_ID: 1
KAFKA_MESSAGE_MAX_BYTES: 200000000 KAFKA_MESSAGE_MAX_BYTES: 200000000
KAFKA_REPLICA_FETCH_MAX_BYTES: 200000000 KAFKA_REPLICA_FETCH_MAX_BYTES: 200000000
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
command: sh -c "/etc/confluent/docker/run && kafka-topics --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic traffic_records_stream"
networks: networks:
data-network: data-network:
aliases: aliases:

View File

@ -12,36 +12,33 @@ import json
dbg_print = lambda *x: DEBUG and print(f"[DEBUG] {x}") dbg_print = lambda *x: DEBUG and print(f"[DEBUG] {x}")
# Kafka Configuration
KAFKA_TOPIC = "traffic_records_stream"
KAFKA_SERVER = "kafka:9092" # Adjust to your Kafka server
class KafkaClient: class KafkaClient:
def __init__(self, topic_name=None, mode='producer'): def __init__(self, topic_name=None, mode="producer"):
self.mode = mode self.mode = mode
self.topic_name = topic_name self.topic_name = topic_name
if mode == 'producer': if mode == "producer":
self.client = KafkaProducer( self.client = KafkaProducer(
bootstrap_servers=['kafka:9092'], bootstrap_servers=[KAFKA_SERVER],
max_request_size = 200000000, max_request_size=200000000,
#api_version=(0,11,5), # api_version=(0,11,5),
value_serializer=lambda x: json.dumps(x).encode('utf-8')) value_serializer=lambda x: json.dumps(x).encode("utf-8"),
elif mode == 'consumer' and topic_name is not None: )
elif mode == "consumer" and topic_name is not None:
self.client = KafkaConsumer( self.client = KafkaConsumer(
topic_name, topic_name,
bootstrap_servers=['localhost:9092'], bootstrap_servers=["localhost:9092"],
api_version=(0,11,5), api_version=(0, 11, 5),
value_deserializer=lambda x: json.loads(x.decode('utf-8'))) value_deserializer=lambda x: json.loads(x.decode("utf-8")),
)
else: else:
raise ValueError("Consumer mode requires a topic_name") raise ValueError("Consumer mode requires a topic_name")
# Kafka Configuration
KAFKA_TOPIC = 'pcap_stream_new'
KAFKA_SERVER = 'kafka:9092' # Adjust to your Kafka server
#KAFKA_SERVER = 'kafka_service:9092'
# Initialize Kafka Producer
# producer = KafkaProducer(
# bootstrap_servers=KAFKA_SERVER,
# value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else str(v).encode('utf-8') #remove intermediate JSON encoding
# )
producer = KafkaClient(topic_name=KAFKA_TOPIC) producer = KafkaClient(topic_name=KAFKA_TOPIC)
@ -108,7 +105,7 @@ def create_pkt_object(pkt: Packet) -> dict:
"dst_addr": pkt[IP].dst, "dst_addr": pkt[IP].dst,
"src_port": pkt[l4_proto].sport, "src_port": pkt[l4_proto].sport,
"dst_port": pkt[l4_proto].dport, "dst_port": pkt[l4_proto].dport,
"pkt_len": len(pkt) "pkt_len": len(pkt),
} }
return res_json return res_json
@ -157,7 +154,9 @@ if __name__ == "__main__":
argp.add_argument("-f", "--pcap_file", required=False, dest="_pcap") argp.add_argument("-f", "--pcap_file", required=False, dest="_pcap")
argp.add_argument("-c", "--csv_file", required=False, dest="_csv") argp.add_argument("-c", "--csv_file", required=False, dest="_csv")
argp.add_argument("-o", "--out_file", required=False, dest="_out") argp.add_argument("-o", "--out_file", required=False, dest="_out")
argp.add_argument("--stream_size", required=False, default=10000, dest="_streamsize") argp.add_argument(
"--stream_size", required=False, default=10000, dest="_streamsize"
)
argp.add_argument( argp.add_argument(
"-x", "-x",
"--sample", "--sample",
@ -193,21 +192,21 @@ if __name__ == "__main__":
DEBUG = args._debug DEBUG = args._debug
sample_size = int(args._streamsize) # 100000 sample_size = int(args._streamsize) # 100000
batch_size = 100 #100000 batch_size = 100 # 100000
# if preprocessed data ready for streaming # if preprocessed data ready for streaming
if csv_file: if csv_file:
#print("true")
with open(csv_file, newline="") as f: with open(csv_file, newline="") as f:
csv_rdr = csv.reader(f) csv_rdr = csv.reader(f)
next(csv_rdr) # skip headers next(csv_rdr) # skip headers
pkts = [] pkts = []
print("started stream from csv")
for idx, row in enumerate(csv_rdr): for idx, row in enumerate(csv_rdr):
# direct streaming to kafka goes here # direct streaming to kafka goes here
producer.client.send(KAFKA_TOPIC, row_to_dict(row)) producer.client.send(KAFKA_TOPIC, row_to_dict(row))
dbg_print(row_to_dict(row)) dbg_print(row_to_dict(row))
print("streamed packet", idx) dbg_print("streamed packet", idx)
if sample and idx > sample_size: if sample and idx > sample_size:
break break
print(f"total streamed: {idx}") print(f"total streamed: {idx}")
@ -222,6 +221,8 @@ if __name__ == "__main__":
pkts = [] pkts = []
cnt = 0 cnt = 0
seen_count = 0 seen_count = 0
print("started stream from pcap")
for idx, pkt in enumerate(pcap_rdr): for idx, pkt in enumerate(pcap_rdr):
seen_count += 1 seen_count += 1
# filter packets # filter packets
@ -243,8 +244,9 @@ if __name__ == "__main__":
packet_data = create_pkt_object(pkt) packet_data = create_pkt_object(pkt)
producer.client.send(KAFKA_TOPIC, packet_data) producer.client.send(KAFKA_TOPIC, packet_data)
cnt += 1 cnt += 1
#print(f"streamed packet at index {idx} ") # print(f"streamed packet at index {idx} ")
if idx > sample_size: break if idx > sample_size:
break
print(f"total seen: {seen_count-1}") print(f"total seen: {seen_count-1}")
print(f"total streamed: {cnt}") print(f"total streamed: {cnt}")

View File

@ -18,14 +18,11 @@ if ($downStack) {
elseif ($MasterNode) { elseif ($MasterNode) {
Write-Output "[+] swarm master" Write-Output "[+] swarm master"
# cleanup
docker stack rm $stackName
docker service rm registry
# data streaming # data streaming
Set-Location $scriptDir/../preprocessing Set-Location $scriptDir/../preprocessing
docker service create --name registry -p 5000:5000 registry:2 docker service create --name registry -p 5000:5000 registry:2
docker build -t 127.0.0.1:5000/data-streamer:latest --no-cache --push -f Dockerfile.python . # docker build -t 127.0.0.1:5000/data-streamer:latest --no-cache --push -f Dockerfile.python .
docker build -t 127.0.0.1:5000/data-streamer:latest --push -f Dockerfile.python .
# execute # execute
Set-Location $scriptDir Set-Location $scriptDir