Merge pull request #6 from 20kaushik02/integration_2

ocean of little drops
This commit is contained in:
Kaushik Narayan Ravishankar 2024-11-28 22:23:07 -07:00 committed by GitHub
commit 5de749c09a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 199 additions and 271 deletions

View File

@ -1,61 +0,0 @@
<!-- This file was generated automatically.
Do not edit it: it is likely to be discarded and generated again before it's read next time.
Files used to generate this file:
/etc/clickhouse-server/config.xml
/etc/clickhouse-server/config.d/docker_related_config.xml -->
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-keeper/clickhouse-keeper.log</log>
<errorlog>/var/log/clickhouse-keeper/clickhouse-keeper.err.log</errorlog>
<size>1000M</size>
<count>3</count>
</logger>
<listen_host>::</listen_host>
<path>/var/lib/clickhouse/data/</path>
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
<user_files_path>/var/lib/clickhouse/user_files/</user_files_path>
<format_schema_path>/var/lib/clickhouse/format_schemas/</format_schema_path>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>clickhouse-keeper1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>clickhouse-keeper2</hostname>
<port>9234</port>
</server>
<server>
<id>3</id>
<hostname>clickhouse-keeper3</hostname>
<port>9234</port>
</server>
</raft_configuration>
</keeper_server>
<!-- Listen wildcard address to allow accepting connections from other containers and host network. -->
<listen_host>0.0.0.0</listen_host>
<listen_try>1</listen_try>
<!--
<logger>
<console>1</console>
</logger>
-->
</yandex>

View File

@ -18,6 +18,12 @@
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<http_control>
<port>9182</port>
<readiness>
<endpoint>/ready</endpoint>
</readiness>
</http_control>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>

View File

@ -18,6 +18,12 @@
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<http_control>
<port>9182</port>
<readiness>
<endpoint>/ready</endpoint>
</readiness>
</http_control>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>

View File

@ -18,6 +18,12 @@
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<http_control>
<port>9182</port>
<readiness>
<endpoint>/ready</endpoint>
</readiness>
</http_control>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>

View File

@ -1,36 +0,0 @@
INSERT INTO
traffic_records_all
VALUES
(
'1698728400.40122',
'UDP',
'142.12.217.111',
'163.213.146.100',
443,
47104,
74
) (
'1698728400.401217',
'UDP',
'45.144.255.42',
'131.174.60.217',
51820,
63998,
42
) (
'1698728400.401218',
'TCP',
'152.199.153.111',
'202.215.192.69',
80,
65305,
66
) (
'1698728400.401219',
'UDP',
'45.144.255.42',
'131.174.60.217',
51820,
63998,
42
)

View File

@ -11,6 +11,12 @@ services:
clickhouse-keeper-network:
aliases:
- clickhouse-keeper1
deploy:
replicas: 1
# placement:
# constraints: [node.labels.main == true]
restart_policy:
condition: on-failure
clickhouse-keeper2:
image: clickhouse/clickhouse-server:latest
@ -23,6 +29,12 @@ services:
clickhouse-keeper-network:
aliases:
- clickhouse-keeper2
deploy:
replicas: 1
# placement:
# constraints: [node.labels.main == true]
restart_policy:
condition: on-failure
clickhouse-keeper3:
image: clickhouse/clickhouse-server:latest
@ -35,13 +47,20 @@ services:
clickhouse-keeper-network:
aliases:
- clickhouse-keeper3
deploy:
replicas: 1
# placement:
# constraints: [node.labels.main == true]
restart_policy:
condition: on-failure
clickhouse-server1:
image: clickhouse/clickhouse-server:latest
container_name: clickhouse-server1
volumes:
- ../clickhouse/node1-config/:/etc/clickhouse-server/config.d/
- ../clickhouse/ddl/main:/docker-entrypoint-initdb.d
- ../clickhouse/node-entrypoints/main:/docker-entrypoint-initdb.d
- ../preprocessing/geoip.csv:/tmp/seedData/csv/ip_region_map.csv
- clickhouse_server1_data:/var/lib/clickhouse
- clickhouse_server1_TTL:/clickhouse_data/server1
networks:
@ -57,7 +76,9 @@ services:
deploy:
replicas: 1
# placement:
# constraints: [node.labels.role == server]
# constraints: [node.labels.main == true]
restart_policy:
condition: on-failure
update_config:
delay: 10s
resources:
@ -78,7 +99,7 @@ services:
container_name: clickhouse-server2
volumes:
- ../clickhouse/node2-config/:/etc/clickhouse-server/config.d/
- ../clickhouse/ddl/common:/docker-entrypoint-initdb.d
- ../clickhouse/node-entrypoints/common:/docker-entrypoint-initdb.d
- clickhouse_server2_data:/var/lib/clickhouse
- clickhouse_server2_TTL:/clickhouse_data/server2
networks:
@ -91,7 +112,9 @@ services:
deploy:
replicas: 1
# placement:
# constraints: [node.labels.role == server]
# constraints: [node.labels.main == true]
restart_policy:
condition: on-failure
update_config:
delay: 10s
resources:

View File

@ -0,0 +1,29 @@
#!/bin/bash
set -e
keeper_hostnames=(
"clickhouse-keeper1"
"clickhouse-keeper2"
"clickhouse-keeper3"
)
keeper_healthy=(false false false)
can_proceed=false
while ! $can_proceed ; do
for keeper_idx in "${!keeper_hostnames[@]}"; do
if wget -q --tries=1 --spider "http://${keeper_hostnames[$keeper_idx]}:9182/ready" ; then
echo "keeper healthy"
keeper_healthy[$keeper_idx]=true
fi
done
can_proceed=true
for keeper_idx in "${!keeper_hostnames[@]}"; do
if ! ${keeper_healthy[$keeper_idx]} ; then
can_proceed=false
sleep 5
break
fi
done
done

View File

@ -0,0 +1,29 @@
#!/bin/bash
set -e
keeper_hostnames=(
"clickhouse-keeper1"
"clickhouse-keeper2"
"clickhouse-keeper3"
)
keeper_healthy=(false false false)
can_proceed=false
while ! $can_proceed ; do
for keeper_idx in "${!keeper_hostnames[@]}"; do
if wget -q --tries=1 --spider "http://${keeper_hostnames[$keeper_idx]}:9182/ready" ; then
echo "keeper healthy"
keeper_healthy[$keeper_idx]=true
fi
done
can_proceed=true
for keeper_idx in "${!keeper_hostnames[@]}"; do
if ! ${keeper_healthy[$keeper_idx]} ; then
can_proceed=false
sleep 5
break
fi
done
done

View File

@ -0,0 +1,3 @@
INSERT INTO ip_region_map
FROM INFILE '/tmp/seedData/csv/ip_region_map.csv'
FORMAT CSVWithNames;

View File

@ -11,6 +11,7 @@ kafka_topic_list = 'traffic_records_stream',
kafka_group_name = 'clickhouse_consumer',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 1;
CREATE MATERIALIZED VIEW traffic_records_kafka_view TO traffic_records_all AS
SELECT time AS time_stamp,
l4_proto AS l4_protocol,
@ -19,4 +20,4 @@ SELECT time AS time_stamp,
src_port,
dst_port,
pkt_len
FROM traffic_records_kafka_queue;
FROM traffic_records_kafka_queue;

View File

@ -7,6 +7,8 @@ services:
- zookeeper
deploy:
replicas: 1
# placement:
# constraints: [node.labels.worker == true]
restart_policy:
condition: on-failure
environment:
@ -40,6 +42,8 @@ services:
- kafka_data:/var/lib/kafka/data
deploy:
replicas: 1
# placement:
# constraints: [node.labels.worker == true]
restart_policy:
condition: on-failure
@ -53,9 +57,11 @@ services:
- data-streamer
volumes:
- "../preprocessing/10k_sample_2023_10_01-2023_10_31.csv:/data/csv/main.csv:ro"
command: "sh -c 'sleep 30 && python /app/pcap_processor.py -c /data/csv/main.csv -x --stream_size 100000'"
command: "sh -c 'sleep 30 && python /app/pcap_processor.py -c /data/csv/main.csv -x --stream_size 100000 -l 0.1'"
deploy:
replicas: 1
# placement:
# constraints: [node.labels.worker == true]
restart_policy:
condition: on-failure

View File

@ -1,4 +1,4 @@
ip_range_start,ip_range_end,country
ip_range_start,ip_range_end,region
0.0.0.0,0.255.255.255,-
1.0.0.0,1.0.0.255,Australia
1.0.1.0,1.0.3.255,China

Can't render this file because it is too large.

View File

@ -1,6 +1,7 @@
from argparse import ArgumentParser
from datetime import datetime, timezone
import csv
import time
from scapy.packet import Packet
from scapy.utils import PcapReader
@ -173,6 +174,13 @@ if __name__ == "__main__":
dest="_stream",
action="store_true",
)
argp.add_argument(
"-l",
"--delay",
required=False,
default=0,
dest="_delay"
)
argp.add_argument(
"-d",
"--debug",
@ -187,6 +195,7 @@ if __name__ == "__main__":
csv_file = args._csv
out_file = args._out
streaming = args._stream
batch_delay = float(args._delay)
sample = args._sample
DEBUG = args._debug
@ -207,6 +216,8 @@ if __name__ == "__main__":
producer.client.send(KAFKA_TOPIC, row_to_dict(row))
dbg_print(row_to_dict(row))
dbg_print("streamed packet", idx)
if idx > 0 and idx % batch_size == 0:
time.sleep(batch_delay)
if sample and idx > sample_size:
break
print(f"total streamed: {idx}")

View File

@ -1,4 +1,13 @@
# Full setup
# End-to-end stack management
## Windows (Powershell)
`deploy.ps1 -MasterNode` to deploy stack with current node as manager
`deploy.ps1 -downStack` to bring down stack (run from manager node)
## Linux/macOS (Bash)
`deploy.sh -M` and `deploy.sh -D` for the same
Add `-S` if Docker requires `sudo` privileges

39
scripts/deploy.sh Normal file → Executable file
View File

@ -1,7 +1,8 @@
#!/bin/bash
while getopts "M:D:T:A" flag; do
while getopts "SMDT:A" flag; do
case "${flag}" in
S) sudoRequired=true ;;
M) masterNode=true ;;
D) downStack=true ;;
T) swarmToken=$OPTARG ;;
@ -9,34 +10,36 @@ while getopts "M:D:T:A" flag; do
esac
done
echo "masterNode: $masterNode"
echo "downStack: $downStack"
echo "swarmToken: $swarmToken"
echo "managerAddr: $managerAddr"
$scriptDir = $(readlink -f "$0")
scriptDir=$(dirname $(readlink -f "$0"))
# echo $scriptDir # ===> /Project/scripts
$stackName="TheWebFarm"
stackName="TheWebFarm"
dockerCmd="docker"
if [[ $sudoRequired ]]; then
dockerCmd="sudo docker"
fi
if [[ $downStack ]]; then
echo "[+] Removing stack..."
docker stack rm $stackName
docker service rm registry
echo "$dockerCmd stack rm $stackName"
$dockerCmd stack rm $stackName
$dockerCmd service rm registry
sleep 20
docker volume rm $(docker volume ls --filter name=$stackName -q)
elif ($MasterNode); then
$dockerCmd volume rm $($dockerCmd volume ls --filter name=$stackName -q)
elif ($masterNode); then
echo "[+] swarm master"
$dockerCmd swarm init
# data streaming
cd $scriptDir/../preprocessing
docker service create --name registry -p 5000:5000 registry:2
# docker build -t 127.0.0.1:5000/data-streamer:latest --no-cache --push -f Dockerfile.python .
docker build -t 127.0.0.1:5000/data-streamer:latest --push -f Dockerfile.python .
$dockerCmd service create --name registry -p 5000:5000 registry:2
# $dockerCmd build -t 127.0.0.1:5000/data-streamer:latest --no-cache --push -f Dockerfile.python .
$dockerCmd build -t 127.0.0.1:5000/data-streamer:latest --push -f Dockerfile.python .
# execute
cd $scriptDir
docker stack deploy -d \
$dockerCmd stack deploy -d \
-c ../preprocessing/docker-compose.yml \
-c ../clickhouse/docker-compose.yaml \
-c ../ui/docker-compose.yaml \
@ -49,5 +52,5 @@ elif ($MasterNode); then
else
echo "[+] swarm follower"
echo "[+] joining swarm with token $swarmToken"
docker swarm join --token $swarmToken $managerAddr
$dockerCmd swarm join --token $swarmToken $managerAddr
fi

View File

@ -154,14 +154,17 @@
"legend": {
"displayMode": "list",
"placement": "bottom",
"showLegend": true
"showLegend": true,
"values": [
"percent"
]
},
"pieType": "pie",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "/^ProtocolCount$/",
"fields": "/^Protocol frequency$/",
"values": true
},
"tooltip": {
@ -190,11 +193,11 @@
},
"pluginVersion": "4.5.1",
"queryType": "table",
"rawSql": "SELECT\r\n l4_protocol as Protocol,\r\n COUNT(Protocol) as ProtocolCount\r\n FROM traffic_records_all\r\n GROUP BY Protocol",
"rawSql": "SELECT\r\n l4_protocol as Protocol,\r\n COUNT(Protocol) as \"Protocol frequency\"\r\n FROM traffic_records_all\r\n GROUP BY Protocol",
"refId": "A"
}
],
"title": "Distribution of L4 protocol",
"title": "Distribution of L4 protocol (frequency)",
"type": "piechart"
},
{
@ -287,11 +290,11 @@
},
"pluginVersion": "4.5.1",
"queryType": "table",
"rawSql": "SELECT \r\n Port, \r\n SourcePortCount, \r\n DestPortCount\r\nFROM\r\n(\r\n SELECT \r\n src_port AS Port, \r\n COUNT(*) AS SourcePortCount\r\n FROM traffic_records_all\r\n GROUP BY src_port\r\n ORDER BY SourcePortCount DESC\r\n LIMIT 40\r\n) AS src\r\nINNER JOIN\r\n(\r\n SELECT \r\n dst_port AS Port, \r\n COUNT(*) AS DestPortCount\r\n FROM traffic_records_all\r\n GROUP BY dst_port\r\n ORDER BY DestPortCount DESC\r\n LIMIT 40\r\n) AS dst\r\nUSING (Port)\r\nORDER BY (SourcePortCount + DestPortCount) DESC\r\nLIMIT 40;\r\n",
"rawSql": "SELECT \r\n Port, \r\n SourcePortCount AS \"Source port frequency\",\r\n DestPortCount AS \"Destination port frequency\"\r\nFROM\r\n(\r\n SELECT \r\n src_port AS Port, \r\n COUNT(*) AS SourcePortCount\r\n FROM traffic_records_all\r\n GROUP BY src_port\r\n ORDER BY SourcePortCount DESC\r\n LIMIT 40\r\n) AS src\r\nINNER JOIN\r\n(\r\n SELECT \r\n dst_port AS Port, \r\n COUNT(*) AS DestPortCount\r\n FROM traffic_records_all\r\n GROUP BY dst_port\r\n ORDER BY DestPortCount DESC\r\n LIMIT 40\r\n) AS dst\r\nUSING (Port)\r\nORDER BY (SourcePortCount + DestPortCount) DESC\r\nLIMIT 40;\r\n",
"refId": "A"
}
],
"title": "Top ports (by count)",
"title": "Top ports (frequency)",
"type": "barchart"
},
{
@ -305,67 +308,48 @@
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"fillOpacity": 80,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineWidth": 1,
"scaleDistribution": {
"type": "linear"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
}
]
}
"mappings": []
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 16
"w": 6,
"x": 18,
"y": 8
},
"id": 2,
"id": 6,
"options": {
"barRadius": 0,
"barWidth": 0.97,
"fullHighlight": false,
"groupWidth": 0.7,
"displayLabels": [
"percent",
"name"
],
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
"showLegend": true,
"values": [
"percent"
]
},
"pieType": "pie",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "/^Protocol bandwidth$/",
"values": true
},
"orientation": "horizontal",
"showValue": "auto",
"stacking": "none",
"tooltip": {
"mode": "single",
"sort": "none"
},
"xField": "SourcePort",
"xTickLabelRotation": 0,
"xTickLabelSpacing": 100
}
},
"pluginVersion": "11.3.1",
"targets": [
@ -384,109 +368,12 @@
},
"pluginVersion": "4.5.1",
"queryType": "table",
"rawSql": "SELECT\r\n src_port as SourcePort,\r\n COUNT(SourcePort) as SourcePortCount\r\n FROM traffic_records_all\r\n GROUP BY SourcePort\r\n ORDER BY SourcePortCount DESC\r\n LIMIT 10",
"rawSql": "SELECT\n l4_protocol as Protocol,\n SUM(pkt_len)/1024.0/1024.0 as \"Protocol bandwidth\"\n FROM traffic_records_all\n GROUP BY Protocol",
"refId": "A"
}
],
"title": "Top 10 source ports (by count)",
"type": "barchart"
},
{
"datasource": {
"type": "grafana-clickhouse-datasource",
"uid": "PDEE91DDB90597936"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"fillOpacity": 80,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineWidth": 1,
"scaleDistribution": {
"type": "linear"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 16
},
"id": 3,
"options": {
"barRadius": 0,
"barWidth": 0.97,
"fullHighlight": false,
"groupWidth": 0.7,
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"orientation": "horizontal",
"showValue": "auto",
"stacking": "none",
"tooltip": {
"mode": "single",
"sort": "none"
},
"xField": "DestPort",
"xTickLabelRotation": 0,
"xTickLabelSpacing": 100
},
"pluginVersion": "11.3.1",
"targets": [
{
"editorType": "sql",
"format": 1,
"meta": {
"builderOptions": {
"columns": [],
"database": "",
"limit": 1000,
"mode": "list",
"queryType": "table",
"table": ""
}
},
"pluginVersion": "4.5.1",
"queryType": "table",
"rawSql": "SELECT\r\n dst_port as DestPort,\r\n COUNT(DestPort) as DestPortCount\r\n FROM traffic_records_all\r\n GROUP BY DestPort\r\n ORDER BY DestPortCount DESC\r\n LIMIT 10",
"refId": "A"
}
],
"title": "Top 10 destination ports (by count)",
"type": "barchart"
"title": "Distribution of L4 protocol (bandwidth)",
"type": "piechart"
}
],
"preload": false,
@ -503,6 +390,6 @@
"timezone": "browser",
"title": "Internet traffic capture analysis",
"uid": "be59fkbp3zs3kc",
"version": 11,
"version": 1,
"weekStart": ""
}

View File

@ -14,6 +14,12 @@ services:
clickhouse-server-network:
aliases:
- grafana
deploy:
replicas: 1
# placement:
# constraints: [node.labels.worker == true]
restart_policy:
condition: on-failure
depends_on:
- clickhouse-server1
environment: