Kafka - Clickhouse Integration Testing
Execution Steps
Ensure to pull the ClickHouse Docker image:
docker-compose up -d
Identify the custom networks created:
docker network ls
Inspect the keeper network and verify whether all the containers are connected to it:
docker network inspect dds_proj_clickhouse-keeper-network
If all the containers are not connected:
docker-compose restart
To execute queries:
docker exec -it clickhouse-server1 clickhouse-client
docker exec -it clickhouse-server2 clickhouse-client
Building Kafka Image
Navigate to /preprocessing from the repo main directory:
docker build -t <imagename>:latest -f Dockerfile.python .
Tag the image:
docker tag <imagename>:latest <dockerhub_id>/<imagename>:latest
Push the image to Docker Hub:
docker push <dockerhub_id>/<imagename>:latest
Testing the Integration
Changes to make in docker-compose.yml:
pcap_streamer => volumes:<local path where .pcap file is stored; don't change /data/pcap after : symbol>pcap_streamer => image:<dockerhub_id>/<imagename>:latest
Navigate to /clickhouse from the repo main directory:
docker stack deploy -c docker-compose.yml <stackname> --detach=false
Check running services:
docker service ls
Check logs of the service:
docker service logs <servicename>
Get the container ID for the ClickHouse client:
docker ps
Check if the topic has all streamed data:
docker exec -it clickhouse-kafka-1 kafka-console-consumer --bootstrap-server kafka:9092 --topic pcap_stream_new --from-beginning
- Change the topic name if applicable.
- The output should display all JSON packets.
Get into the ClickHouse client:
docker exec -it clickhouse-client clickhouse-client
Check if tables are available:
SHOW TABLES;
If tables are not available, create the following:
Create a table for packets_data:
CREATE TABLE packets_data
(
time Float64,
l4_proto String,
src_addr String,
dst_addr String,
src_port UInt16,
dst_port UInt16,
pkt_len UInt32
) ENGINE = MergeTree
ORDER BY time;
Create a table for kafka_stream:
CREATE TABLE kafka_stream
(
time Float64,
l4_proto String,
src_addr String,
dst_addr String,
src_port UInt16,
dst_port UInt16,
pkt_len UInt32
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'pcap_stream_new',
kafka_group_name = 'clickhouse_consumer',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 1;
Create a materialized view kafka_to_packets:
CREATE MATERIALIZED VIEW kafka_to_packets
TO packets_data AS
SELECT
time,
l4_proto,
src_addr,
dst_addr,
src_port,
dst_port,
pkt_len
FROM kafka_stream;
Check table contents to verify data availability:
SELECT * FROM packets_data LIMIT 10;
SELECT COUNT(*) AS total_count_of_packets_streamed FROM packets_data;