aggregating data

This commit is contained in:
2024-11-15 13:12:04 -07:00
parent afdddbaf54
commit e6c0182724
2 changed files with 65 additions and 22 deletions

View File

@@ -7,22 +7,9 @@ from scapy.utils import PcapReader
from scapy.layers.inet import IP, TCP, UDP
from kafka import KafkaProducer
import json
dbg_print = lambda *x: DEBUG and print(f"[DEBUG] {x}")
# Kafka Configuration
KAFKA_TOPIC = 'pcap_stream'
KAFKA_SERVER = 'localhost:9092' # Adjust to your Kafka server
#KAFKA_SERVER = 'kafka_service:9092'
# Initialize Kafka Producer
producer = KafkaProducer(
bootstrap_servers=KAFKA_SERVER,
#value_serializer=lambda v: json.dumps(v).encode('utf-8') # Encode data as JSON
value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else str(v).encode('utf-8') #remove intermediate JSON encoding
)
def pkt_filter(pkt: Packet) -> bool:
"""filter to include/exclude a packet"""
@@ -87,11 +74,12 @@ def create_pkt_object(pkt: Packet) -> dict:
"dst_addr": pkt[IP].dst,
"src_port": pkt[l4_proto].sport,
"dst_port": pkt[l4_proto].dport,
"pkt_len": len(pkt)
"pkt_len": len(pkt),
}
return res_json
def prep_csv(out_file: str):
with open(out_file, "w", newline="") as csvfile:
writer = csv.writer(csvfile)
@@ -120,7 +108,9 @@ if __name__ == "__main__":
argp = ArgumentParser()
argp.add_argument("-f", "--pcap_file", required=True, dest="_pcap")
argp.add_argument("-o", "--out_file", required=False, dest="_out")
argp.add_argument("--stream_size", required=False, default=10000, dest="_streamsize")
argp.add_argument(
"--stream_size", required=False, default=100000, dest="_streamsize"
)
argp.add_argument(
"-x",
"--sample",
@@ -151,12 +141,11 @@ if __name__ == "__main__":
out_file = args._out
streaming = args._stream
sample = args._sample
samplesize = int(args._streamsize)
DEBUG = args._debug
sample_size = samplesize #1000000
batch_size = 100 #100000
sample_size = int(args._streamsize) # 100000
batch_size = 10000 # 10000
pcap_rdr = PcapReader(pcap_file)
if not streaming:
@@ -180,13 +169,26 @@ if __name__ == "__main__":
pkts_write_csv(pkts, out_file)
pkts = []
else:
# direct streaming to kafka goes here
# Kafka Configuration
KAFKA_TOPIC = "pcap_stream"
KAFKA_SERVER = "localhost:9092" # Adjust to your Kafka server
# KAFKA_SERVER = 'kafka_service:9092'
# Initialize Kafka Producer
producer = KafkaProducer(
bootstrap_servers=KAFKA_SERVER,
# value_serializer=lambda v: json.dumps(v).encode('utf-8') # Encode data as JSON
value_serializer=lambda v: (
v.encode("utf-8") if isinstance(v, str) else str(v).encode("utf-8")
), # remove intermediate JSON encoding
)
packet_data = create_pkt_object(pkt)
producer.send(KAFKA_TOPIC, packet_data)
print(f"streamed packet at index {idx} ")
if idx > sample_size: break
if idx > sample_size:
break
# flush remaining
if not streaming and len(pkts) > 0:
pkts_write_csv(pkts, out_file)