integration working in akash's system

This commit is contained in:
Akash Sivakumar
2024-11-22 20:26:57 -07:00
parent e65cf00d34
commit 5b1add9241
5 changed files with 244 additions and 2 deletions

View File

@@ -0,0 +1,25 @@
from kafka import KafkaConsumer
import json
KAFKA_TOPIC = 'pcap_stream_new' # Use the topic name from your producer
KAFKA_SERVER = 'localhost:9092' # Ensure this matches your Kafka server configuration
# Create a Kafka consumer
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[KAFKA_SERVER],
value_deserializer=lambda x: x.decode('utf-8'),#json.loads(x.decode('utf-8')),
auto_offset_reset='earliest', # Ensures it starts reading from the beginning
enable_auto_commit=True
)
print("Consuming messages from topic:", KAFKA_TOPIC)
# Consume and print messages
for message in consumer:
print(type(message))

View File

@@ -21,7 +21,7 @@ class KafkaClient:
self.client = KafkaProducer(
bootstrap_servers=['kafka:9092'],
max_request_size = 200000000,
api_version=(0,11,5),
#api_version=(0,11,5),
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
elif mode == 'consumer' and topic_name is not None:
self.client = KafkaConsumer(
@@ -33,7 +33,7 @@ class KafkaClient:
raise ValueError("Consumer mode requires a topic_name")
# Kafka Configuration
KAFKA_TOPIC = 'pcap_stream'
KAFKA_TOPIC = 'pcap_stream_new'
KAFKA_SERVER = 'kafka:9092' # Adjust to your Kafka server
#KAFKA_SERVER = 'kafka_service:9092'