temp push

This commit is contained in:
Akash Sivakumar 2024-11-20 16:30:09 -07:00
parent 35881893f6
commit d4fc9382ed
4 changed files with 95 additions and 10 deletions

View File

@ -16,5 +16,5 @@ COPY pcap_processor.py /app
# Expose the port Kafka uses (optional, for communication with other services) # Expose the port Kafka uses (optional, for communication with other services)
EXPOSE 9092 EXPOSE 9092
# Command to run your Python application # Command to allow custom runtime arguments
CMD ["python", "pcap_processor.py"] CMD ["python", "pcap_processor.py"]

View File

@ -48,4 +48,4 @@
- -d or --debug: boolean value indicating if program is run in debug mode - -d or --debug: boolean value indicating if program is run in debug mode
python pcap_processor.py -f C:/Users/akash/storage/Asu/sem3/dds/project/202310081400.pcap -s --sample-size 1000 python pcap_processor.py -f C:/Users/akash/storage/Asu/sem3/dds/project/202310081400.pcap -s --stream_size 1000

View File

@ -0,0 +1,64 @@
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
networks:
- kafka_network
deploy:
replicas: 1
restart_policy:
condition: on-failure
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
healthcheck:
test: ["CMD", "nc", "-z", "localhost", "2181"]
interval: 10s
timeout: 5s
retries: 5
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL: PLAINTEXT
KAFKA_BROKER_ID: 1
networks:
- kafka_network
ports:
- "9092:9092"
deploy:
replicas: 1
restart_policy:
condition: on-failure
healthcheck:
test: ["CMD", "nc", "-z", "localhost", "9092"]
interval: 10s
timeout: 5s
retries: 5
pcap_streamer:
image: levenshtein/streamer_test3:latest
depends_on:
- kafka
networks:
- kafka_network
volumes:
- "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap"
environment:
PCAP_FILE: /data/pcap/202310081400.pcap
command: ["sh", "-c", "sleep 10 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"]
deploy:
replicas: 1
restart_policy:
condition: on-failure
networks:
kafka_network:
driver: overlay

View File

@ -6,22 +6,40 @@ from scapy.packet import Packet
from scapy.utils import PcapReader from scapy.utils import PcapReader
from scapy.layers.inet import IP, TCP, UDP from scapy.layers.inet import IP, TCP, UDP
from kafka import KafkaProducer from kafka import KafkaProducer, KafkaConsumer
import json import json
dbg_print = lambda *x: DEBUG and print(f"[DEBUG] {x}") dbg_print = lambda *x: DEBUG and print(f"[DEBUG] {x}")
class KafkaClient:
def __init__(self, topic_name=None, mode='producer'):
self.mode = mode
self.topic_name = topic_name
if mode == 'producer':
self.client = KafkaProducer(
bootstrap_servers=['localhost:9092'],
api_version=(0,11,5),
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
elif mode == 'consumer' and topic_name is not None:
self.client = KafkaConsumer(
topic_name,
bootstrap_servers=['localhost:9092'],
api_version=(0,11,5),
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
else:
raise ValueError("Consumer mode requires a topic_name")
# Kafka Configuration # Kafka Configuration
KAFKA_TOPIC = 'pcap_stream' KAFKA_TOPIC = 'pcap_stream'
KAFKA_SERVER = 'localhost:9092' # Adjust to your Kafka server KAFKA_SERVER = 'localhost:9092' # Adjust to your Kafka server
#KAFKA_SERVER = 'kafka_service:9092' #KAFKA_SERVER = 'kafka_service:9092'
# Initialize Kafka Producer # Initialize Kafka Producer
producer = KafkaProducer( # producer = KafkaProducer(
bootstrap_servers=KAFKA_SERVER, # bootstrap_servers=KAFKA_SERVER,
#value_serializer=lambda v: json.dumps(v).encode('utf-8') # Encode data as JSON # value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else str(v).encode('utf-8') #remove intermediate JSON encoding
value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else str(v).encode('utf-8') #remove intermediate JSON encoding # )
) producer = KafkaClient(topic_name=KAFKA_TOPIC)
def pkt_filter(pkt: Packet) -> bool: def pkt_filter(pkt: Packet) -> bool:
@ -164,6 +182,7 @@ if __name__ == "__main__":
prep_csv(out_file) prep_csv(out_file)
pkts = [] pkts = []
cnt = 0
for idx, pkt in enumerate(pcap_rdr): for idx, pkt in enumerate(pcap_rdr):
# filter packets # filter packets
if not pkt_filter(pkt): if not pkt_filter(pkt):
@ -182,10 +201,12 @@ if __name__ == "__main__":
else: else:
# direct streaming to kafka goes here # direct streaming to kafka goes here
packet_data = create_pkt_object(pkt) packet_data = create_pkt_object(pkt)
producer.send(KAFKA_TOPIC, packet_data) producer.client.send(KAFKA_TOPIC, packet_data)
print(f"streamed packet at index {idx} ") cnt += 1
#print(f"streamed packet at index {idx} ")
if idx > sample_size: break if idx > sample_size: break
print(f"total streamed: {cnt}")
# flush remaining # flush remaining
if not streaming and len(pkts) > 0: if not streaming and len(pkts) > 0:
pkts_write_csv(pkts, out_file) pkts_write_csv(pkts, out_file)