Merge branch 'preprocessing' into integration_2

This commit is contained in:
Kaushik Narayan R 2024-11-26 22:48:44 -07:00
commit d07d94cc1e
7 changed files with 252148 additions and 19 deletions

View File

@ -74,7 +74,7 @@ docker exec -it clickhouse-kafka-1 kafka-console-consumer --bootstrap-server kaf
Get into the ClickHouse client: Get into the ClickHouse client:
```bash ```bash
docker exec -it clickhouse-client clickhouse-client docker exec -it <server1's container ID from docker ps> clickhouse-client
``` ```
Check if tables are available: Check if tables are available:
```bash ```bash

View File

@ -21,7 +21,7 @@
<keeper_server> <keeper_server>
<tcp_port>9181</tcp_port> <tcp_port>9181</tcp_port>
<server_id>3</server_id> <server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path> <log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path> <snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings> <coordination_settings>

View File

@ -125,7 +125,7 @@ services:
condition: on-failure condition: on-failure
pcap_streamer: pcap_streamer:
image: levenshtein/streamer_test4:latest image: levenshtein/streamer_test7:latest
depends_on: depends_on:
- kafka - kafka
networks: networks:
@ -133,10 +133,9 @@ services:
aliases: aliases:
- pcap_streamer - pcap_streamer
volumes: volumes:
- "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap" #- "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap"
environment: - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project/project_github/real-time-traffic-analysis-clickhouse/preprocessing:/data/pcap"
PCAP_FILE: /data/pcap/202310081400.pcap command: ["sh", "-c", "sleep 60 && python /app/pcap_processor.py -c /data/pcap/sample_output.csv -s --stream_size 1000"]
command: ["sh", "-c", "sleep 30 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"]
deploy: deploy:
replicas: 1 replicas: 1
restart_policy: restart_policy:

View File

@ -49,3 +49,4 @@
python pcap_processor.py -f C:/Users/akash/storage/Asu/sem3/dds/project/202310081400.pcap -s --stream_size 1000 python pcap_processor.py -f C:/Users/akash/storage/Asu/sem3/dds/project/202310081400.pcap -s --stream_size 1000
python pcap_procesor.py -c sample_output.csv -s --stream_size 1000

252127
preprocessing/geoip.csv Normal file

File diff suppressed because it is too large Load Diff

View File

@ -5,14 +5,15 @@ import csv
sample_size = 100 sample_size = 100
batch_size = 10000 batch_size = 10000
sample = True sample = False
def int_to_ipv4(num: int) -> str: def int_to_ipv4(num: int) -> str:
return socket.inet_ntoa(struct.pack("!L", num)) return socket.inet_ntoa(struct.pack("!L", num))
with open("IP2LOCATION-LITE-DB3.csv", "r") as input_file, open( # with open("IP2LOCATION-LITE-DB3.csv", "r") as input_file, open(
with open("IP2LOCATION-LITE-DB1.csv", "r") as input_file, open(
"geoip.csv", "w", newline="" "geoip.csv", "w", newline=""
) as output_file: ) as output_file:
reader = csv.reader(input_file) reader = csv.reader(input_file)
@ -21,11 +22,11 @@ with open("IP2LOCATION-LITE-DB3.csv", "r") as input_file, open(
# header row # header row
writer.writerow( writer.writerow(
[ [
"ip_from", "ip_range_start",
"ip_to", "ip_range_end",
"country", "country",
"region", # "region",
"city", # "city",
] ]
) )
@ -35,8 +36,8 @@ with open("IP2LOCATION-LITE-DB3.csv", "r") as input_file, open(
int_to_ipv4(int(record[0])), int_to_ipv4(int(record[0])),
int_to_ipv4(int(record[1])), int_to_ipv4(int(record[1])),
record[3], record[3],
record[4], # record[4],
record[5], # record[5],
] ]
records.append(new_record) records.append(new_record)
if sample and idx > sample_size: if sample and idx > sample_size:

View File

@ -19,7 +19,7 @@ class KafkaClient:
self.topic_name = topic_name self.topic_name = topic_name
if mode == 'producer': if mode == 'producer':
self.client = KafkaProducer( self.client = KafkaProducer(
bootstrap_servers=['localhost:9092'], bootstrap_servers=['kafka:9092'],
max_request_size = 200000000, max_request_size = 200000000,
#api_version=(0,11,5), #api_version=(0,11,5),
value_serializer=lambda x: json.dumps(x).encode('utf-8')) value_serializer=lambda x: json.dumps(x).encode('utf-8'))
@ -192,11 +192,12 @@ if __name__ == "__main__":
DEBUG = args._debug DEBUG = args._debug
sample_size = int(args._samplesize) #1000000 sample_size = int(args._streamsize) # 100000
batch_size = 100 #100000 batch_size = 100 #100000
# if preprocessed data ready for streaming # if preprocessed data ready for streaming
if csv_file: if csv_file:
#print("true")
with open(csv_file, newline="") as f: with open(csv_file, newline="") as f:
csv_rdr = csv.reader(f) csv_rdr = csv.reader(f)
next(csv_rdr) # skip headers next(csv_rdr) # skip headers
@ -206,10 +207,10 @@ if __name__ == "__main__":
# direct streaming to kafka goes here # direct streaming to kafka goes here
producer.client.send(KAFKA_TOPIC, row_to_dict(row)) producer.client.send(KAFKA_TOPIC, row_to_dict(row))
dbg_print(row_to_dict(row)) dbg_print(row_to_dict(row))
dbg_print("streamed packet", idx) print("streamed packet", idx)
if idx > sample_size: if idx > sample_size:
break break
dbg_print(f"total streamed: {idx}") print(f"total streamed: {idx}")
# otherwise, process packets # otherwise, process packets
else: else: