This commit is contained in:
Akash Sivakumar 2024-11-14 17:13:33 -07:00
commit 5ade3d90fc
2 changed files with 72 additions and 1 deletions

View File

@ -1,5 +1,7 @@
# Data filtering, preprocessing and selection for further use
## Traffic data
- IP packet traces are taken [from here](https://mawi.wide.ad.jp/mawi/samplepoint-F/2023/)
- Filtering
- L4 - Limit to TCP and UDP
@ -15,7 +17,27 @@
- Packet size - in bytes
- `sample_output.csv` contains a partial subset of `202310081400.pcap`, ~600K packets
## IP geolocation database
- This project uses the IP2Location LITE database for [IP geolocation](https://lite.ip2location.com)
- bit of preprocessing to leave out country code and convert IP address from decimal format to dotted string format
# Setting up Kafka
- Download and install kafka [from here](https://kafka.apache.org/downloads)
- Run all commands in separate terminals from installation location
- Zookeeper:
- Windows: `.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties`
- Mac: `bin/zookeeper-server-start.sh config/zookeeper.properties`
- Kafka Broker:
- Windows: `.\bin\windows\kafka-server-start.bat .\config\server.properties`
- Mac: `bin/kafka-server-start.sh config/server.properties`
- Creating a Kafka topic:
- Windows: `.\bin\windows\kafka-topics.bat --create --topic %topicname% --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1`
- Mac: `bin/kafka-topics.sh --create --topic %topicname% --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1`
# Streaming from pcap file using Kafka
- Start zookeeper and Kafka broker whenever python code is run after machine reboot
- Run pcap_processor.py file
- Arguments
- -f or --pcap_file: pcap file path, mandatory argument

View File

@ -0,0 +1,49 @@
import struct
import socket
import csv
sample_size = 100
batch_size = 10000
sample = True
def int_to_ipv4(num: int) -> str:
return socket.inet_ntoa(struct.pack("!L", num))
with open("IP2LOCATION-LITE-DB3.csv", "r") as input_file, open(
"geoip.csv", "w", newline=""
) as output_file:
reader = csv.reader(input_file)
writer = csv.writer(output_file)
# header row
writer.writerow(
[
"ip_from",
"ip_to",
"country",
"region",
"city",
]
)
records = []
for idx, record in enumerate(reader):
new_record = [
int_to_ipv4(int(record[0])),
int_to_ipv4(int(record[1])),
record[3],
record[4],
record[5],
]
records.append(new_record)
if sample and idx > sample_size:
break
if idx > 0 and idx % batch_size == 0:
writer.writerows(records)
records = []
if len(records) > 0:
writer.writerows(records)