mirror of
https://github.com/20kaushik02/real-time-traffic-analysis-clickhouse.git
synced 2025-12-06 06:44:07 +00:00
Merge pull request #4 from 20kaushik02/integration_temp
Start integration
This commit is contained in:
commit
887b04a04e
@ -1,13 +0,0 @@
|
||||
Execution steps:
|
||||
|
||||
ensure to pull clickhouse docker image
|
||||
> docker-compose up -d
|
||||
identify the custom networks created.
|
||||
> docker network ls
|
||||
inspect the keeper network and verify whether all the containers are connected to it
|
||||
> docker network inspect {{dds_proj_clickhouse-keeper-network}}
|
||||
if all the containers are not connected
|
||||
> docker-compose restart
|
||||
To execute queries
|
||||
> docker exec -it clickhouse-server1 clickhouse-client
|
||||
> docker exec -it clickhouse-server2 clickhouse-client
|
||||
141
clickhouse/README.md
Normal file
141
clickhouse/README.md
Normal file
@ -0,0 +1,141 @@
|
||||
|
||||
# Kafka - Clickhouse Integration Testing
|
||||
|
||||
## Execution Steps
|
||||
|
||||
Ensure to pull the ClickHouse Docker image:
|
||||
```bash
|
||||
docker-compose up -d
|
||||
```
|
||||
Identify the custom networks created:
|
||||
```bash
|
||||
docker network ls
|
||||
```
|
||||
Inspect the keeper network and verify whether all the containers are connected to it:
|
||||
```bash
|
||||
docker network inspect dds_proj_clickhouse-keeper-network
|
||||
```
|
||||
If all the containers are not connected:
|
||||
```bash
|
||||
docker-compose restart
|
||||
```
|
||||
To execute queries:
|
||||
```bash
|
||||
docker exec -it clickhouse-server1 clickhouse-client
|
||||
```
|
||||
```bash
|
||||
docker exec -it clickhouse-server2 clickhouse-client
|
||||
```
|
||||
|
||||
## Building Kafka Image
|
||||
|
||||
Navigate to `/preprocessing` from the repo main directory:
|
||||
```bash
|
||||
docker build -t <imagename>:latest -f Dockerfile.python .
|
||||
```
|
||||
Tag the image:
|
||||
```bash
|
||||
docker tag <imagename>:latest <dockerhub_id>/<imagename>:latest
|
||||
```
|
||||
Push the image to Docker Hub:
|
||||
```bash
|
||||
docker push <dockerhub_id>/<imagename>:latest
|
||||
```
|
||||
|
||||
## Testing the Integration
|
||||
|
||||
Changes to make in `docker-compose.yml`:
|
||||
- `pcap_streamer => volumes`: `<local path where .pcap file is stored; don't change /data/pcap after : symbol>`
|
||||
- `pcap_streamer => image`: `<dockerhub_id>/<imagename>:latest`
|
||||
|
||||
Navigate to `/clickhouse` from the repo main directory:
|
||||
```bash
|
||||
docker stack deploy -c docker-compose.yml <stackname> --detach=false
|
||||
```
|
||||
Check running services:
|
||||
```bash
|
||||
docker service ls
|
||||
```
|
||||
Check logs of the service:
|
||||
```bash
|
||||
docker service logs <servicename>
|
||||
```
|
||||
|
||||
Get the container ID for the ClickHouse client:
|
||||
```bash
|
||||
docker ps
|
||||
```
|
||||
Check if the topic has all streamed data:
|
||||
```bash
|
||||
docker exec -it clickhouse-kafka-1 kafka-console-consumer --bootstrap-server kafka:9092 --topic pcap_stream_new --from-beginning
|
||||
```
|
||||
- Change the topic name if applicable.
|
||||
- The output should display all JSON packets.
|
||||
|
||||
Get into the ClickHouse client:
|
||||
```bash
|
||||
docker exec -it clickhouse-client clickhouse-client
|
||||
```
|
||||
Check if tables are available:
|
||||
```bash
|
||||
SHOW TABLES;
|
||||
```
|
||||
|
||||
If tables are not available, create the following:
|
||||
|
||||
Create a table for `packets_data`:
|
||||
```sql
|
||||
CREATE TABLE packets_data
|
||||
(
|
||||
time Float64,
|
||||
l4_proto String,
|
||||
src_addr String,
|
||||
dst_addr String,
|
||||
src_port UInt16,
|
||||
dst_port UInt16,
|
||||
pkt_len UInt32
|
||||
) ENGINE = MergeTree
|
||||
ORDER BY time;
|
||||
```
|
||||
|
||||
Create a table for `kafka_stream`:
|
||||
```sql
|
||||
CREATE TABLE kafka_stream
|
||||
(
|
||||
time Float64,
|
||||
l4_proto String,
|
||||
src_addr String,
|
||||
dst_addr String,
|
||||
src_port UInt16,
|
||||
dst_port UInt16,
|
||||
pkt_len UInt32
|
||||
) ENGINE = Kafka
|
||||
SETTINGS kafka_broker_list = 'kafka:9092',
|
||||
kafka_topic_list = 'pcap_stream_new',
|
||||
kafka_group_name = 'clickhouse_consumer',
|
||||
kafka_format = 'JSONEachRow',
|
||||
kafka_num_consumers = 1;
|
||||
```
|
||||
|
||||
Create a materialized view `kafka_to_packets`:
|
||||
```sql
|
||||
CREATE MATERIALIZED VIEW kafka_to_packets
|
||||
TO packets_data AS
|
||||
SELECT
|
||||
time,
|
||||
l4_proto,
|
||||
src_addr,
|
||||
dst_addr,
|
||||
src_port,
|
||||
dst_port,
|
||||
pkt_len
|
||||
FROM kafka_stream;
|
||||
```
|
||||
|
||||
Check table contents to verify data availability:
|
||||
```bash
|
||||
SELECT * FROM packets_data LIMIT 10;
|
||||
```
|
||||
```bash
|
||||
SELECT COUNT(*) AS total_count_of_packets_streamed FROM packets_data;
|
||||
```
|
||||
@ -0,0 +1,61 @@
|
||||
<!-- This file was generated automatically.
|
||||
Do not edit it: it is likely to be discarded and generated again before it's read next time.
|
||||
Files used to generate this file:
|
||||
/etc/clickhouse-server/config.xml
|
||||
/etc/clickhouse-server/config.d/docker_related_config.xml -->
|
||||
|
||||
<yandex>
|
||||
<logger>
|
||||
<level>trace</level>
|
||||
<log>/var/log/clickhouse-keeper/clickhouse-keeper.log</log>
|
||||
<errorlog>/var/log/clickhouse-keeper/clickhouse-keeper.err.log</errorlog>
|
||||
<size>1000M</size>
|
||||
<count>3</count>
|
||||
</logger>
|
||||
<listen_host>::</listen_host>
|
||||
|
||||
<path>/var/lib/clickhouse/data/</path>
|
||||
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
|
||||
<user_files_path>/var/lib/clickhouse/user_files/</user_files_path>
|
||||
<format_schema_path>/var/lib/clickhouse/format_schemas/</format_schema_path>
|
||||
|
||||
<keeper_server>
|
||||
<tcp_port>9181</tcp_port>
|
||||
<server_id>3</server_id>
|
||||
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
|
||||
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
|
||||
<coordination_settings>
|
||||
<operation_timeout_ms>10000</operation_timeout_ms>
|
||||
<session_timeout_ms>30000</session_timeout_ms>
|
||||
<raft_logs_level>trace</raft_logs_level>
|
||||
</coordination_settings>
|
||||
<raft_configuration>
|
||||
<server>
|
||||
<id>1</id>
|
||||
<hostname>clickhouse-keeper1</hostname>
|
||||
<port>9234</port>
|
||||
</server>
|
||||
<server>
|
||||
<id>2</id>
|
||||
<hostname>clickhouse-keeper2</hostname>
|
||||
<port>9234</port>
|
||||
</server>
|
||||
<server>
|
||||
<id>3</id>
|
||||
<hostname>clickhouse-keeper3</hostname>
|
||||
<port>9234</port>
|
||||
</server>
|
||||
</raft_configuration>
|
||||
</keeper_server>
|
||||
|
||||
<!-- Listen wildcard address to allow accepting connections from other containers and host network. -->
|
||||
|
||||
<listen_host>0.0.0.0</listen_host>
|
||||
<listen_try>1</listen_try>
|
||||
|
||||
<!--
|
||||
<logger>
|
||||
<console>1</console>
|
||||
</logger>
|
||||
-->
|
||||
</yandex>
|
||||
156
clickhouse/docker-compose.yml
Normal file
156
clickhouse/docker-compose.yml
Normal file
@ -0,0 +1,156 @@
|
||||
services:
|
||||
clickhouse-keeper1:
|
||||
image: clickhouse/clickhouse-server:latest
|
||||
container_name: clickhouse-keeper1
|
||||
command: >
|
||||
/usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml
|
||||
volumes:
|
||||
- ./clickhouse_keeper/keeper1-config.xml:/etc/clickhouse-server/config.xml
|
||||
- ./clickhouse_data/data:/var/lib/clickhouse/data
|
||||
- ./clickhouse_data/tmp:/var/lib/clickhouse/tmp
|
||||
- ./clickhouse_data/user_files:/var/lib/clickhouse/user_files
|
||||
- ./clickhouse_data/format_schemas:/var/lib/clickhouse/format_schemas
|
||||
networks:
|
||||
common-network:
|
||||
aliases:
|
||||
- clickhouse-keeper1
|
||||
|
||||
clickhouse-keeper2:
|
||||
image: clickhouse/clickhouse-server:latest
|
||||
container_name: clickhouse-keeper2
|
||||
command: >
|
||||
/usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml
|
||||
volumes:
|
||||
- ./clickhouse_keeper/keeper2-config.xml:/etc/clickhouse-server/config.xml
|
||||
- ./clickhouse_data/data:/var/lib/clickhouse/data
|
||||
- ./clickhouse_data/tmp:/var/lib/clickhouse/tmp
|
||||
- ./clickhouse_data/user_files:/var/lib/clickhouse/user_files
|
||||
- ./clickhouse_data/format_schemas:/var/lib/clickhouse/format_schemas
|
||||
networks:
|
||||
common-network:
|
||||
aliases:
|
||||
- clickhouse-keeper2
|
||||
|
||||
clickhouse-keeper3:
|
||||
image: clickhouse/clickhouse-server:latest
|
||||
container_name: clickhouse-keeper3
|
||||
command: >
|
||||
/usr/bin/clickhouse-keeper --config-file=/etc/clickhouse-server/config.xml
|
||||
volumes:
|
||||
- ./clickhouse_keeper/keeper3-config.xml:/etc/clickhouse-server/config.xml
|
||||
- ./clickhouse_data/data:/var/lib/clickhouse/data
|
||||
- ./clickhouse_data/tmp:/var/lib/clickhouse/tmp
|
||||
- ./clickhouse_data/user_files:/var/lib/clickhouse/user_files
|
||||
- ./clickhouse_data/format_schemas:/var/lib/clickhouse/format_schemas
|
||||
networks:
|
||||
common-network:
|
||||
aliases:
|
||||
- clickhouse-keeper3
|
||||
|
||||
clickhouse-server1:
|
||||
image: clickhouse/clickhouse-server:latest
|
||||
container_name: clickhouse-server1
|
||||
volumes:
|
||||
- ./node1-config/:/etc/clickhouse-server/config.d/
|
||||
- clickhouse_data1:/var/lib/clickhouse
|
||||
networks:
|
||||
common-network:
|
||||
aliases:
|
||||
- clickhouse-server1
|
||||
depends_on:
|
||||
- clickhouse-keeper1
|
||||
- clickhouse-keeper2
|
||||
- clickhouse-keeper3
|
||||
ports:
|
||||
- "9001:9000" # Native client port
|
||||
- "8123:8123" # HTTP interface
|
||||
|
||||
clickhouse-server2:
|
||||
image: clickhouse/clickhouse-server:latest
|
||||
container_name: clickhouse-server2
|
||||
volumes:
|
||||
- ./node2-config/:/etc/clickhouse-server/config.d/
|
||||
- clickhouse_data2:/var/lib/clickhouse
|
||||
networks:
|
||||
common-network:
|
||||
aliases:
|
||||
- clickhouse-server2
|
||||
depends_on:
|
||||
- clickhouse-keeper1
|
||||
- clickhouse-keeper2
|
||||
- clickhouse-keeper3
|
||||
ports:
|
||||
- "9002:9000" # Native client port
|
||||
- "8125:8123" # HTTP interface
|
||||
|
||||
zookeeper:
|
||||
image: confluentinc/cp-zookeeper:latest
|
||||
networks:
|
||||
common-network:
|
||||
aliases:
|
||||
- zookeeper
|
||||
deploy:
|
||||
replicas: 1
|
||||
restart_policy:
|
||||
condition: on-failure
|
||||
environment:
|
||||
ZOOKEEPER_CLIENT_PORT: 2182
|
||||
ports:
|
||||
- "2182:2181"
|
||||
|
||||
kafka:
|
||||
image: confluentinc/cp-kafka:latest
|
||||
depends_on:
|
||||
- zookeeper
|
||||
environment:
|
||||
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2182
|
||||
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_MESSAGE_MAX_BYTES: 200000000
|
||||
KAFKA_REPLICA_FETCH_MAX_BYTES: 200000000
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
networks:
|
||||
common-network:
|
||||
aliases:
|
||||
- kafka
|
||||
ports:
|
||||
- "9092:9092"
|
||||
volumes:
|
||||
- kafka_data_new:/var/lib/kafka/data
|
||||
deploy:
|
||||
replicas: 1
|
||||
restart_policy:
|
||||
condition: on-failure
|
||||
|
||||
pcap_streamer:
|
||||
image: levenshtein/streamer_test4:latest
|
||||
depends_on:
|
||||
- kafka
|
||||
networks:
|
||||
common-network:
|
||||
aliases:
|
||||
- pcap_streamer
|
||||
volumes:
|
||||
- "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap"
|
||||
environment:
|
||||
PCAP_FILE: /data/pcap/202310081400.pcap
|
||||
command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"]
|
||||
deploy:
|
||||
replicas: 1
|
||||
restart_policy:
|
||||
condition: on-failure
|
||||
|
||||
networks:
|
||||
common-network:
|
||||
driver: overlay
|
||||
attachable: true
|
||||
|
||||
volumes:
|
||||
clickhouse_data1:
|
||||
driver: local
|
||||
clickhouse_data2:
|
||||
driver: local
|
||||
kafka_data_new:
|
||||
driver: local
|
||||
@ -50,10 +50,13 @@ services:
|
||||
aliases:
|
||||
- pcap_streamer
|
||||
volumes:
|
||||
- "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap"
|
||||
# - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap"
|
||||
- "./:/data/pcap"
|
||||
- "./:/data/csv"
|
||||
environment:
|
||||
PCAP_FILE: /data/pcap/202310081400.pcap
|
||||
command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"]
|
||||
# command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"]
|
||||
command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -c /data/csv/sample_output.csv -s --stream_size 1000"]
|
||||
deploy:
|
||||
replicas: 1
|
||||
restart_policy:
|
||||
|
||||
25
preprocessing/pcap_consumer_test.py
Normal file
25
preprocessing/pcap_consumer_test.py
Normal file
@ -0,0 +1,25 @@
|
||||
from kafka import KafkaConsumer
|
||||
import json
|
||||
|
||||
KAFKA_TOPIC = 'pcap_stream_new' # Use the topic name from your producer
|
||||
KAFKA_SERVER = 'localhost:9092' # Ensure this matches your Kafka server configuration
|
||||
|
||||
# Create a Kafka consumer
|
||||
consumer = KafkaConsumer(
|
||||
KAFKA_TOPIC,
|
||||
bootstrap_servers=[KAFKA_SERVER],
|
||||
value_deserializer=lambda x: x.decode('utf-8'),#json.loads(x.decode('utf-8')),
|
||||
auto_offset_reset='earliest', # Ensures it starts reading from the beginning
|
||||
enable_auto_commit=True
|
||||
)
|
||||
|
||||
|
||||
|
||||
print("Consuming messages from topic:", KAFKA_TOPIC)
|
||||
|
||||
# Consume and print messages
|
||||
|
||||
for message in consumer:
|
||||
print(type(message))
|
||||
|
||||
|
||||
@ -11,15 +11,17 @@ import json
|
||||
|
||||
dbg_print = lambda *x: DEBUG and print(f"[DEBUG] {x}")
|
||||
|
||||
|
||||
|
||||
class KafkaClient:
|
||||
def __init__(self, topic_name=None, mode='producer'):
|
||||
self.mode = mode
|
||||
self.topic_name = topic_name
|
||||
if mode == 'producer':
|
||||
self.client = KafkaProducer(
|
||||
bootstrap_servers=['kafka:9092'],
|
||||
bootstrap_servers=['localhost:9092'],
|
||||
max_request_size = 200000000,
|
||||
api_version=(0,11,5),
|
||||
#api_version=(0,11,5),
|
||||
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
|
||||
elif mode == 'consumer' and topic_name is not None:
|
||||
self.client = KafkaConsumer(
|
||||
@ -31,7 +33,7 @@ class KafkaClient:
|
||||
raise ValueError("Consumer mode requires a topic_name")
|
||||
|
||||
# Kafka Configuration
|
||||
KAFKA_TOPIC = 'pcap_stream'
|
||||
KAFKA_TOPIC = 'pcap_stream_new'
|
||||
KAFKA_SERVER = 'kafka:9092' # Adjust to your Kafka server
|
||||
#KAFKA_SERVER = 'kafka_service:9092'
|
||||
|
||||
@ -111,6 +113,21 @@ def create_pkt_object(pkt: Packet) -> dict:
|
||||
|
||||
return res_json
|
||||
|
||||
|
||||
def row_to_dict(pkt_row: list) -> dict:
|
||||
"""make dict from CSV row"""
|
||||
|
||||
return {
|
||||
"time": float(pkt_row[0]),
|
||||
"l4_proto": pkt_row[1],
|
||||
"src_addr": pkt_row[2],
|
||||
"dst_addr": pkt_row[3],
|
||||
"src_port": int(pkt_row[4]),
|
||||
"dst_port": int(pkt_row[5]),
|
||||
"pkt_len": int(pkt_row[6]),
|
||||
}
|
||||
|
||||
|
||||
def prep_csv(out_file: str):
|
||||
with open(out_file, "w", newline="") as csvfile:
|
||||
writer = csv.writer(csvfile)
|
||||
@ -137,7 +154,8 @@ def pkts_write_csv(pkts: list, out_file: str):
|
||||
|
||||
if __name__ == "__main__":
|
||||
argp = ArgumentParser()
|
||||
argp.add_argument("-f", "--pcap_file", required=True, dest="_pcap")
|
||||
argp.add_argument("-f", "--pcap_file", required=False, dest="_pcap")
|
||||
argp.add_argument("-c", "--csv_file", required=False, dest="_csv")
|
||||
argp.add_argument("-o", "--out_file", required=False, dest="_out")
|
||||
argp.add_argument("--stream_size", required=False, default=10000, dest="_streamsize")
|
||||
argp.add_argument(
|
||||
@ -167,6 +185,7 @@ if __name__ == "__main__":
|
||||
args = argp.parse_args()
|
||||
|
||||
pcap_file = args._pcap
|
||||
csv_file = args._csv
|
||||
out_file = args._out
|
||||
streaming = args._stream
|
||||
sample = args._sample
|
||||
@ -177,38 +196,59 @@ if __name__ == "__main__":
|
||||
sample_size = samplesize #1000000
|
||||
batch_size = 100 #100000
|
||||
|
||||
pcap_rdr = PcapReader(pcap_file)
|
||||
if not streaming:
|
||||
assert args._out and args._out != ""
|
||||
prep_csv(out_file)
|
||||
# if preprocessed data ready for streaming
|
||||
if csv_file:
|
||||
with open(csv_file, newline="") as f:
|
||||
csv_rdr = csv.reader(f)
|
||||
next(csv_rdr) # skip headers
|
||||
pkts = []
|
||||
|
||||
pkts = []
|
||||
cnt = 0
|
||||
for idx, pkt in enumerate(pcap_rdr):
|
||||
# filter packets
|
||||
if not pkt_filter(pkt):
|
||||
continue
|
||||
for idx, row in enumerate(csv_rdr):
|
||||
# direct streaming to kafka goes here
|
||||
producer.client.send(KAFKA_TOPIC, row_to_dict(row))
|
||||
dbg_print(row_to_dict(row))
|
||||
dbg_print("streamed packet", idx)
|
||||
if idx > sample_size:
|
||||
break
|
||||
dbg_print(f"total streamed: {idx}")
|
||||
|
||||
# otherwise, process packets
|
||||
else:
|
||||
pcap_rdr = PcapReader(pcap_file)
|
||||
if not streaming:
|
||||
# write to file
|
||||
pkts.append(pkt_extract(pkt))
|
||||
assert args._out and args._out != ""
|
||||
prep_csv(out_file)
|
||||
|
||||
if sample and idx > sample_size:
|
||||
break
|
||||
pkts = []
|
||||
cnt = 0
|
||||
seen_count = 0
|
||||
for idx, pkt in enumerate(pcap_rdr):
|
||||
seen_count += 1
|
||||
# filter packets
|
||||
if not pkt_filter(pkt):
|
||||
continue
|
||||
|
||||
if idx > 0 and idx % batch_size == 0:
|
||||
pkts_write_csv(pkts, out_file)
|
||||
pkts = []
|
||||
else:
|
||||
# direct streaming to kafka goes here
|
||||
packet_data = create_pkt_object(pkt)
|
||||
producer.client.send(KAFKA_TOPIC, packet_data)
|
||||
cnt += 1
|
||||
#print(f"streamed packet at index {idx} ")
|
||||
if idx > sample_size: break
|
||||
|
||||
print(f"total streamed: {cnt}")
|
||||
# flush remaining
|
||||
if not streaming and len(pkts) > 0:
|
||||
pkts_write_csv(pkts, out_file)
|
||||
if not streaming:
|
||||
# write to file
|
||||
pkts.append(pkt_extract(pkt))
|
||||
|
||||
if sample and idx > sample_size:
|
||||
break
|
||||
|
||||
if idx > 0 and idx % batch_size == 0:
|
||||
pkts_write_csv(pkts, out_file)
|
||||
pkts = []
|
||||
else:
|
||||
# direct streaming to kafka goes here
|
||||
packet_data = create_pkt_object(pkt)
|
||||
producer.client.send(KAFKA_TOPIC, packet_data)
|
||||
cnt += 1
|
||||
# print(f"streamed packet at index {idx} ")
|
||||
if idx > sample_size:
|
||||
break
|
||||
|
||||
print(f"total seen: {seen_count-1}")
|
||||
print(f"total streamed: {cnt}")
|
||||
# flush remaining
|
||||
if not streaming and len(pkts) > 0:
|
||||
pkts_write_csv(pkts, out_file)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user