diff --git a/preprocessing/Dockerfile.python b/preprocessing/Dockerfile.python index bba0ca6..b4b0742 100644 --- a/preprocessing/Dockerfile.python +++ b/preprocessing/Dockerfile.python @@ -16,5 +16,5 @@ COPY pcap_processor.py /app # Expose the port Kafka uses (optional, for communication with other services) EXPOSE 9092 -# Command to run your Python application +# Command to allow custom runtime arguments CMD ["python", "pcap_processor.py"] diff --git a/preprocessing/README.md b/preprocessing/README.md index bfd5208..069efee 100644 --- a/preprocessing/README.md +++ b/preprocessing/README.md @@ -48,4 +48,4 @@ - -d or --debug: boolean value indicating if program is run in debug mode -python pcap_processor.py -f C:/Users/akash/storage/Asu/sem3/dds/project/202310081400.pcap -s --sample-size 1000 +python pcap_processor.py -f C:/Users/akash/storage/Asu/sem3/dds/project/202310081400.pcap -s --stream_size 1000 diff --git a/preprocessing/docker-compose.yml b/preprocessing/docker-compose.yml new file mode 100644 index 0000000..008ded4 --- /dev/null +++ b/preprocessing/docker-compose.yml @@ -0,0 +1,64 @@ +version: '3.8' + +services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + networks: + - kafka_network + deploy: + replicas: 1 + restart_policy: + condition: on-failure + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ports: + - "2181:2181" + healthcheck: + test: ["CMD", "nc", "-z", "localhost", "2181"] + interval: 10s + timeout: 5s + retries: 5 + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL: PLAINTEXT + KAFKA_BROKER_ID: 1 + networks: + - kafka_network + ports: + - "9092:9092" + deploy: + replicas: 1 + restart_policy: + condition: on-failure + healthcheck: + test: ["CMD", "nc", "-z", "localhost", "9092"] + interval: 10s + timeout: 5s + retries: 5 + + pcap_streamer: + image: levenshtein/streamer_test3:latest + depends_on: + - kafka + networks: + - kafka_network + volumes: + - "/host_mnt/c/Users/akash/storage/Asu/sem3/dds/project:/data/pcap" + environment: + PCAP_FILE: /data/pcap/202310081400.pcap + command: ["sh", "-c", "sleep 10 && python /app/pcap_processor.py -f /data/pcap/202310081400.pcap -s --stream_size 1000"] + deploy: + replicas: 1 + restart_policy: + condition: on-failure + +networks: + kafka_network: + driver: overlay diff --git a/preprocessing/pcap_processor.py b/preprocessing/pcap_processor.py index bcda443..8e5cdaa 100644 --- a/preprocessing/pcap_processor.py +++ b/preprocessing/pcap_processor.py @@ -6,22 +6,40 @@ from scapy.packet import Packet from scapy.utils import PcapReader from scapy.layers.inet import IP, TCP, UDP -from kafka import KafkaProducer +from kafka import KafkaProducer, KafkaConsumer import json dbg_print = lambda *x: DEBUG and print(f"[DEBUG] {x}") +class KafkaClient: + def __init__(self, topic_name=None, mode='producer'): + self.mode = mode + self.topic_name = topic_name + if mode == 'producer': + self.client = KafkaProducer( + bootstrap_servers=['localhost:9092'], + api_version=(0,11,5), + value_serializer=lambda x: json.dumps(x).encode('utf-8')) + elif mode == 'consumer' and topic_name is not None: + self.client = KafkaConsumer( + topic_name, + bootstrap_servers=['localhost:9092'], + api_version=(0,11,5), + value_deserializer=lambda x: json.loads(x.decode('utf-8'))) + else: + raise ValueError("Consumer mode requires a topic_name") + # Kafka Configuration KAFKA_TOPIC = 'pcap_stream' KAFKA_SERVER = 'localhost:9092' # Adjust to your Kafka server #KAFKA_SERVER = 'kafka_service:9092' # Initialize Kafka Producer -producer = KafkaProducer( - bootstrap_servers=KAFKA_SERVER, - #value_serializer=lambda v: json.dumps(v).encode('utf-8') # Encode data as JSON - value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else str(v).encode('utf-8') #remove intermediate JSON encoding -) +# producer = KafkaProducer( +# bootstrap_servers=KAFKA_SERVER, +# value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else str(v).encode('utf-8') #remove intermediate JSON encoding +# ) +producer = KafkaClient(topic_name=KAFKA_TOPIC) def pkt_filter(pkt: Packet) -> bool: @@ -164,6 +182,7 @@ if __name__ == "__main__": prep_csv(out_file) pkts = [] + cnt = 0 for idx, pkt in enumerate(pcap_rdr): # filter packets if not pkt_filter(pkt): @@ -182,10 +201,12 @@ if __name__ == "__main__": else: # direct streaming to kafka goes here packet_data = create_pkt_object(pkt) - producer.send(KAFKA_TOPIC, packet_data) - print(f"streamed packet at index {idx} ") + producer.client.send(KAFKA_TOPIC, packet_data) + cnt += 1 + #print(f"streamed packet at index {idx} ") if idx > sample_size: break + print(f"total streamed: {cnt}") # flush remaining if not streaming and len(pkts) > 0: pkts_write_csv(pkts, out_file)