Merge branch 'preprocessing' into integration_2

This commit is contained in:
Kaushik Narayan R 2024-11-26 17:33:24 -07:00
commit 75a75a08d6
3 changed files with 231582 additions and 5 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,46 @@
#!/bin/bash
data_year=2023
data_month=10
# some info
total_size=0
for data_day in {01..31}; do
pcap_size=$(curl -sI "http://mawi.nezu.wide.ad.jp/mawi/samplepoint-F/${data_year}/${data_year}${data_month}${data_day}1400.pcap.gz" |
grep Content-Length |
awk '{printf "%.3f", $2/1024/1024/1024}')
echo "[o] ${data_year}-${data_month}-${data_day} - ${pcap_size} GB"
total_size=$(echo $total_size + $pcap_size | bc -l)
done
echo "[+] Total size (compressed) of ${data_year}-${data_month} - ${total_size} GB"
# Total size (compressed) of 2023-10 - 193.292 GB
# extracting data
mkdir -p csv_files
# for data_day in {08..08}; do
for data_day in {01..31}; do
if [[ ! -f "${data_year}${data_month}${data_day}1400.pcap.gz" ]]; then
wget "http://mawi.nezu.wide.ad.jp/mawi/samplepoint-F/${data_year}/${data_year}${data_month}${data_day}1400.pcap.gz"
fi
echo "[+] decompression..."
# gzip -d "${data_year}${data_month}${data_day}1400.pcap.gz"
gzip -kd "${data_year}${data_month}${data_day}1400.pcap.gz"
echo "[+] packet processing..."
# 10000 packets from each day
python3 pcap_processor.py \
--pcap_file "${data_year}${data_month}${data_day}1400.pcap" \
--out_file csv_files/${data_day}.csv \
--sample \
--stream_size 10000
rm "${data_year}${data_month}${data_day}1400.pcap"
done
# merge all CSV together
rm csv_files/merged.csv
awk '(NR == 1) || (FNR > 1)' csv_files/*.csv > csv_files/merged.csv

View File

@ -189,11 +189,10 @@ if __name__ == "__main__":
out_file = args._out out_file = args._out
streaming = args._stream streaming = args._stream
sample = args._sample sample = args._sample
samplesize = int(args._streamsize)
DEBUG = args._debug DEBUG = args._debug
sample_size = samplesize #1000000 sample_size = int(args._samplesize) #1000000
batch_size = 100 #100000 batch_size = 100 #100000
# if preprocessed data ready for streaming # if preprocessed data ready for streaming
@ -243,9 +242,8 @@ if __name__ == "__main__":
packet_data = create_pkt_object(pkt) packet_data = create_pkt_object(pkt)
producer.client.send(KAFKA_TOPIC, packet_data) producer.client.send(KAFKA_TOPIC, packet_data)
cnt += 1 cnt += 1
# print(f"streamed packet at index {idx} ") #print(f"streamed packet at index {idx} ")
if idx > sample_size: if idx > sample_size: break
break
print(f"total seen: {seen_count-1}") print(f"total seen: {seen_count-1}")
print(f"total streamed: {cnt}") print(f"total streamed: {cnt}")