mirror of
https://github.com/20kaushik02/real-time-traffic-analysis-clickhouse.git
synced 2026-01-25 16:04:04 +00:00
Merge branch 'main' into preprocessing
This commit is contained in:
@@ -11,6 +11,8 @@ import json
|
||||
|
||||
dbg_print = lambda *x: DEBUG and print(f"[DEBUG] {x}")
|
||||
|
||||
|
||||
|
||||
class KafkaClient:
|
||||
def __init__(self, topic_name=None, mode='producer'):
|
||||
self.mode = mode
|
||||
@@ -19,7 +21,7 @@ class KafkaClient:
|
||||
self.client = KafkaProducer(
|
||||
bootstrap_servers=['kafka:9092'],
|
||||
max_request_size = 200000000,
|
||||
api_version=(0,11,5),
|
||||
#api_version=(0,11,5),
|
||||
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
|
||||
elif mode == 'consumer' and topic_name is not None:
|
||||
self.client = KafkaConsumer(
|
||||
@@ -31,7 +33,7 @@ class KafkaClient:
|
||||
raise ValueError("Consumer mode requires a topic_name")
|
||||
|
||||
# Kafka Configuration
|
||||
KAFKA_TOPIC = 'pcap_stream'
|
||||
KAFKA_TOPIC = 'pcap_stream_new'
|
||||
KAFKA_SERVER = 'kafka:9092' # Adjust to your Kafka server
|
||||
#KAFKA_SERVER = 'kafka_service:9092'
|
||||
|
||||
@@ -112,6 +114,20 @@ def create_pkt_object(pkt: Packet) -> dict:
|
||||
return res_json
|
||||
|
||||
|
||||
def row_to_dict(pkt_row: list) -> dict:
|
||||
"""make dict from CSV row"""
|
||||
|
||||
return {
|
||||
"time": float(pkt_row[0]),
|
||||
"l4_proto": pkt_row[1],
|
||||
"src_addr": pkt_row[2],
|
||||
"dst_addr": pkt_row[3],
|
||||
"src_port": int(pkt_row[4]),
|
||||
"dst_port": int(pkt_row[5]),
|
||||
"pkt_len": int(pkt_row[6]),
|
||||
}
|
||||
|
||||
|
||||
def prep_csv(out_file: str):
|
||||
with open(out_file, "w", newline="") as csvfile:
|
||||
writer = csv.writer(csvfile)
|
||||
@@ -138,7 +154,8 @@ def pkts_write_csv(pkts: list, out_file: str):
|
||||
|
||||
if __name__ == "__main__":
|
||||
argp = ArgumentParser()
|
||||
argp.add_argument("-f", "--pcap_file", required=True, dest="_pcap")
|
||||
argp.add_argument("-f", "--pcap_file", required=False, dest="_pcap")
|
||||
argp.add_argument("-c", "--csv_file", required=False, dest="_csv")
|
||||
argp.add_argument("-o", "--out_file", required=False, dest="_out")
|
||||
argp.add_argument(
|
||||
"--stream_size", required=False, default=100000, dest="_streamsize"
|
||||
@@ -170,6 +187,7 @@ if __name__ == "__main__":
|
||||
args = argp.parse_args()
|
||||
|
||||
pcap_file = args._pcap
|
||||
csv_file = args._csv
|
||||
out_file = args._out
|
||||
streaming = args._stream
|
||||
sample = args._sample
|
||||
@@ -177,52 +195,62 @@ if __name__ == "__main__":
|
||||
DEBUG = args._debug
|
||||
|
||||
sample_size = int(args._streamsize) # 100000
|
||||
batch_size = 10000 # 10000
|
||||
batch_size = 100 #100000
|
||||
|
||||
pcap_rdr = PcapReader(pcap_file)
|
||||
if not streaming:
|
||||
assert args._out and args._out != ""
|
||||
prep_csv(out_file)
|
||||
# if preprocessed data ready for streaming
|
||||
if csv_file:
|
||||
#print("true")
|
||||
with open(csv_file, newline="") as f:
|
||||
csv_rdr = csv.reader(f)
|
||||
next(csv_rdr) # skip headers
|
||||
pkts = []
|
||||
|
||||
pkts = []
|
||||
cnt = 0
|
||||
for idx, pkt in enumerate(pcap_rdr):
|
||||
# filter packets
|
||||
if not pkt_filter(pkt):
|
||||
continue
|
||||
for idx, row in enumerate(csv_rdr):
|
||||
# direct streaming to kafka goes here
|
||||
producer.client.send(KAFKA_TOPIC, row_to_dict(row))
|
||||
dbg_print(row_to_dict(row))
|
||||
print("streamed packet", idx)
|
||||
if idx > sample_size:
|
||||
break
|
||||
print(f"total streamed: {idx}")
|
||||
|
||||
# otherwise, process packets
|
||||
else:
|
||||
pcap_rdr = PcapReader(pcap_file)
|
||||
if not streaming:
|
||||
# write to file
|
||||
pkts.append(pkt_extract(pkt))
|
||||
assert args._out and args._out != ""
|
||||
prep_csv(out_file)
|
||||
|
||||
if sample and idx > sample_size:
|
||||
break
|
||||
pkts = []
|
||||
cnt = 0
|
||||
seen_count = 0
|
||||
for idx, pkt in enumerate(pcap_rdr):
|
||||
seen_count += 1
|
||||
# filter packets
|
||||
if not pkt_filter(pkt):
|
||||
continue
|
||||
|
||||
if idx > 0 and idx % batch_size == 0:
|
||||
pkts_write_csv(pkts, out_file)
|
||||
pkts = []
|
||||
else:
|
||||
# Kafka Configuration
|
||||
KAFKA_TOPIC = "pcap_stream"
|
||||
KAFKA_SERVER = "localhost:9092" # Adjust to your Kafka server
|
||||
# KAFKA_SERVER = 'kafka_service:9092'
|
||||
if not streaming:
|
||||
# write to file
|
||||
pkts.append(pkt_extract(pkt))
|
||||
|
||||
# Initialize Kafka Producer
|
||||
producer = KafkaProducer(
|
||||
bootstrap_servers=KAFKA_SERVER,
|
||||
# value_serializer=lambda v: json.dumps(v).encode('utf-8') # Encode data as JSON
|
||||
value_serializer=lambda v: (
|
||||
v.encode("utf-8") if isinstance(v, str) else str(v).encode("utf-8")
|
||||
), # remove intermediate JSON encoding
|
||||
)
|
||||
if sample and idx > sample_size:
|
||||
break
|
||||
|
||||
packet_data = create_pkt_object(pkt)
|
||||
producer.client.send(KAFKA_TOPIC, packet_data)
|
||||
cnt += 1
|
||||
#print(f"streamed packet at index {idx} ")
|
||||
if idx > sample_size: break
|
||||
|
||||
print(f"total streamed: {cnt}")
|
||||
# flush remaining
|
||||
if not streaming and len(pkts) > 0:
|
||||
pkts_write_csv(pkts, out_file)
|
||||
if idx > 0 and idx % batch_size == 0:
|
||||
pkts_write_csv(pkts, out_file)
|
||||
pkts = []
|
||||
else:
|
||||
# direct streaming to kafka goes here
|
||||
packet_data = create_pkt_object(pkt)
|
||||
producer.client.send(KAFKA_TOPIC, packet_data)
|
||||
cnt += 1
|
||||
# print(f"streamed packet at index {idx} ")
|
||||
if idx > sample_size:
|
||||
break
|
||||
|
||||
print(f"total seen: {seen_count-1}")
|
||||
print(f"total streamed: {cnt}")
|
||||
# flush remaining
|
||||
if not streaming and len(pkts) > 0:
|
||||
pkts_write_csv(pkts, out_file)
|
||||
|
||||
Reference in New Issue
Block a user