mirror of
https://github.com/20kaushik02/real-time-traffic-analysis-clickhouse.git
synced 2026-01-25 08:04:04 +00:00
Merge pull request #1 from 20kaushik02/preprocessing
Preprocessing part completed
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1 +1,4 @@
|
|||||||
*.pcap*
|
*.pcap*
|
||||||
|
*.gz*
|
||||||
|
*.csv*
|
||||||
|
*.tsv*
|
||||||
|
|||||||
16
preprocessing/README.md
Normal file
16
preprocessing/README.md
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
# Data filtering, preprocessing and selection for further use
|
||||||
|
|
||||||
|
- IP packet traces are taken [from here](https://mawi.wide.ad.jp/mawi/samplepoint-F/2023/)
|
||||||
|
- Filtering
|
||||||
|
- L4 - Limit to TCP and UDP
|
||||||
|
- L3 - IPv6 is only around 10%, let's drop it
|
||||||
|
- Selection of fields:
|
||||||
|
- Timestamp
|
||||||
|
- capture window is from 0500-0515 UTC
|
||||||
|
- nanosecond precision, use `DateTime64` data type in ClickHouse
|
||||||
|
- IP
|
||||||
|
- addresses - src, dst
|
||||||
|
- L4 protocol - TCP, UDP. use `Enum` data type in ClickHouse
|
||||||
|
- TCP/UDP - ports - sport, dport
|
||||||
|
- Packet size - in bytes
|
||||||
|
- `sample_output.csv` contains a partial subset of `202310081400.pcap`, ~600K packets
|
||||||
148
preprocessing/pcap_processor.py
Normal file
148
preprocessing/pcap_processor.py
Normal file
@@ -0,0 +1,148 @@
|
|||||||
|
from argparse import ArgumentParser
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
import csv
|
||||||
|
|
||||||
|
from scapy.packet import Packet
|
||||||
|
from scapy.utils import PcapReader
|
||||||
|
from scapy.layers.inet import IP, TCP, UDP
|
||||||
|
|
||||||
|
dbg_print = lambda *x: DEBUG and print(f"[DEBUG] {x}")
|
||||||
|
|
||||||
|
|
||||||
|
def pkt_filter(pkt: Packet) -> bool:
|
||||||
|
"""filter to include/exclude a packet"""
|
||||||
|
try:
|
||||||
|
assert IP in pkt
|
||||||
|
assert pkt[IP].version == 4
|
||||||
|
assert (TCP in pkt) or (UDP in pkt)
|
||||||
|
return True
|
||||||
|
except AssertionError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def pkt_extract(pkt: Packet) -> list:
|
||||||
|
"""extract select attributes from a packet"""
|
||||||
|
l4_proto = None
|
||||||
|
if TCP in pkt:
|
||||||
|
l4_proto = TCP
|
||||||
|
elif UDP in pkt:
|
||||||
|
l4_proto = UDP
|
||||||
|
pkt_attrs = [
|
||||||
|
float(pkt.time),
|
||||||
|
pkt.getlayer(l4_proto).name,
|
||||||
|
pkt[IP].src,
|
||||||
|
pkt[IP].dst,
|
||||||
|
pkt[l4_proto].sport,
|
||||||
|
pkt[l4_proto].dport,
|
||||||
|
len(pkt),
|
||||||
|
]
|
||||||
|
|
||||||
|
pkt_time_str = str(datetime.fromtimestamp(float(pkt.time), timezone.utc))
|
||||||
|
dbg_print(
|
||||||
|
"[{}] {} {}:{} -> {}:{} - {} bytes".format(
|
||||||
|
pkt_time_str,
|
||||||
|
pkt.getlayer(l4_proto).name,
|
||||||
|
pkt[IP].src,
|
||||||
|
pkt[l4_proto].sport,
|
||||||
|
pkt[IP].dst,
|
||||||
|
pkt[l4_proto].dport,
|
||||||
|
len(pkt),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return pkt_attrs
|
||||||
|
|
||||||
|
|
||||||
|
def prep_csv(out_file: str):
|
||||||
|
with open(out_file, "w", newline="") as csvfile:
|
||||||
|
writer = csv.writer(csvfile)
|
||||||
|
|
||||||
|
# header row
|
||||||
|
writer.writerow(
|
||||||
|
[
|
||||||
|
"time",
|
||||||
|
"l4_proto",
|
||||||
|
"src_addr",
|
||||||
|
"dst_addr",
|
||||||
|
"src_port",
|
||||||
|
"dst_port",
|
||||||
|
"pkt_len",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def pkts_write_csv(pkts: list, out_file: str):
|
||||||
|
with open(out_file, "a", newline="") as csvfile:
|
||||||
|
writer = csv.writer(csvfile)
|
||||||
|
writer.writerows(pkts)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
argp = ArgumentParser()
|
||||||
|
argp.add_argument("-f", "--pcap_file", required=True, dest="_pcap")
|
||||||
|
argp.add_argument("-o", "--out_file", required=False, dest="_out")
|
||||||
|
argp.add_argument(
|
||||||
|
"-x",
|
||||||
|
"--sample",
|
||||||
|
required=False,
|
||||||
|
default=False,
|
||||||
|
dest="_sample",
|
||||||
|
action="store_true",
|
||||||
|
)
|
||||||
|
argp.add_argument(
|
||||||
|
"-s",
|
||||||
|
"--stream",
|
||||||
|
required=False,
|
||||||
|
default=False,
|
||||||
|
dest="_stream",
|
||||||
|
action="store_true",
|
||||||
|
)
|
||||||
|
argp.add_argument(
|
||||||
|
"-d",
|
||||||
|
"--debug",
|
||||||
|
required=False,
|
||||||
|
default=False,
|
||||||
|
dest="_debug",
|
||||||
|
action="store_true",
|
||||||
|
)
|
||||||
|
args = argp.parse_args()
|
||||||
|
|
||||||
|
pcap_file = args._pcap
|
||||||
|
out_file = args._out
|
||||||
|
streaming = args._stream
|
||||||
|
sample = args._sample
|
||||||
|
|
||||||
|
DEBUG = args._debug
|
||||||
|
|
||||||
|
sample_size = 1000000
|
||||||
|
batch_size = 100000
|
||||||
|
|
||||||
|
pcap_rdr = PcapReader(pcap_file)
|
||||||
|
if not streaming:
|
||||||
|
assert args._out and args._out != ""
|
||||||
|
prep_csv(out_file)
|
||||||
|
|
||||||
|
pkts = []
|
||||||
|
for idx, pkt in enumerate(pcap_rdr):
|
||||||
|
# filter packets
|
||||||
|
if not pkt_filter(pkt):
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not streaming:
|
||||||
|
# write to file
|
||||||
|
pkts.append(pkt_extract(pkt))
|
||||||
|
|
||||||
|
if sample and idx > sample_size:
|
||||||
|
break
|
||||||
|
|
||||||
|
if idx > 0 and idx % batch_size == 0:
|
||||||
|
pkts_write_csv(pkts, out_file)
|
||||||
|
pkts = []
|
||||||
|
else:
|
||||||
|
# direct streaming to kafka goes here
|
||||||
|
pass
|
||||||
|
|
||||||
|
# flush remaining
|
||||||
|
if not streaming and len(pkts) > 0:
|
||||||
|
pkts_write_csv(pkts, out_file)
|
||||||
|
|
||||||
601819
preprocessing/sample_output.csv
Normal file
601819
preprocessing/sample_output.csv
Normal file
File diff suppressed because it is too large
Load Diff
41
preprocessing/scratch.py
Normal file
41
preprocessing/scratch.py
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
from scapy.utils import PcapReader
|
||||||
|
from scapy.layers.inet import IP, TCP, UDP
|
||||||
|
|
||||||
|
pcap_rdr = PcapReader("202310081400.pcap")
|
||||||
|
sample_size = 100
|
||||||
|
|
||||||
|
for idx, pkt in enumerate(pcap_rdr):
|
||||||
|
try:
|
||||||
|
assert (IP in pkt)
|
||||||
|
assert (pkt[IP].version == 4)
|
||||||
|
assert (TCP in pkt) or (UDP in pkt)
|
||||||
|
except AssertionError:
|
||||||
|
continue
|
||||||
|
# pkt.show()
|
||||||
|
if TCP in pkt:
|
||||||
|
print(
|
||||||
|
"[{}] TCP {}:{} -> {}:{} - {} bytes".format(
|
||||||
|
datetime.fromtimestamp(float(pkt.time), timezone.utc),
|
||||||
|
pkt[IP].src,
|
||||||
|
pkt[TCP].sport,
|
||||||
|
pkt[IP].dst,
|
||||||
|
pkt[TCP].dport,
|
||||||
|
len(pkt),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
elif UDP in pkt:
|
||||||
|
print(
|
||||||
|
"[{}] UDP {}:{} -> {}:{} - {} bytes".format(
|
||||||
|
datetime.fromtimestamp(float(pkt.time), timezone.utc),
|
||||||
|
pkt[IP].src,
|
||||||
|
pkt[UDP].sport,
|
||||||
|
pkt[IP].dst,
|
||||||
|
pkt[UDP].dport,
|
||||||
|
len(pkt),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if idx > sample_size:
|
||||||
|
break
|
||||||
Reference in New Issue
Block a user