mirror of
https://github.com/20kaushik02/AUNMS---Network-Monitoring-System.git
synced 2025-12-06 08:04:06 +00:00
Ping function, some imports and error handling
This commit is contained in:
parent
26d762f1fc
commit
3cc3aa40f8
@ -1,8 +1,11 @@
|
|||||||
import sys
|
import sys
|
||||||
from PyQt5.QtGui import *
|
from PyQt5.QtGui import *
|
||||||
from PyQt5.QtCore import *
|
from PyQt5.QtCore import *
|
||||||
from PyQt5.QtWidgets import *
|
from PyQt5.QtWidgets import *
|
||||||
from scapy.all import *
|
from scapy.all import *
|
||||||
|
from scapy.layers.inet import *
|
||||||
|
from scapy.layers.l2 import *
|
||||||
|
|
||||||
|
|
||||||
class NetworkMonitorThread(QObject):
|
class NetworkMonitorThread(QObject):
|
||||||
def __init__(self, interface, parent=None):
|
def __init__(self, interface, parent=None):
|
||||||
@ -10,19 +13,20 @@ class NetworkMonitorThread(QObject):
|
|||||||
self.interface = interface
|
self.interface = interface
|
||||||
self.packetList = []
|
self.packetList = []
|
||||||
self.end = False
|
self.end = False
|
||||||
|
|
||||||
quitBool = pyqtSignal(int)
|
quitBool = pyqtSignal(int)
|
||||||
|
|
||||||
def endSniff(self):
|
def endSniff(self):
|
||||||
QApplication.processEvents()
|
QApplication.processEvents()
|
||||||
print("Ending")
|
print("Ending")
|
||||||
self.end = True
|
self.end = True
|
||||||
self.quitBool.emit(1)
|
self.quitBool.emit(1)
|
||||||
|
|
||||||
def sniffStatus(self):
|
def sniffStatus(self):
|
||||||
QApplication.processEvents()
|
QApplication.processEvents()
|
||||||
return self.end
|
return self.end
|
||||||
|
|
||||||
def getLayers(self,packet):
|
def getLayers(self, packet):
|
||||||
QApplication.processEvents()
|
QApplication.processEvents()
|
||||||
layers = []
|
layers = []
|
||||||
counter = 0
|
counter = 0
|
||||||
@ -35,9 +39,10 @@ class NetworkMonitorThread(QObject):
|
|||||||
break
|
break
|
||||||
counter += 1
|
counter += 1
|
||||||
return layers
|
return layers
|
||||||
|
|
||||||
packetData = pyqtSignal(tuple)
|
packetData = pyqtSignal(tuple)
|
||||||
def handlePacket(self,packet):
|
|
||||||
|
def handlePacket(self, packet):
|
||||||
self.packetList.append(packet)
|
self.packetList.append(packet)
|
||||||
QApplication.processEvents()
|
QApplication.processEvents()
|
||||||
tableViewPart = dict()
|
tableViewPart = dict()
|
||||||
@ -50,9 +55,9 @@ class NetworkMonitorThread(QObject):
|
|||||||
tableViewPart['destination'] = packet.dst
|
tableViewPart['destination'] = packet.dst
|
||||||
tableViewPart['length'] = len(packet)
|
tableViewPart['length'] = len(packet)
|
||||||
tableViewPart['layers'] = self.getLayers(packet)
|
tableViewPart['layers'] = self.getLayers(packet)
|
||||||
|
|
||||||
QApplication.processEvents()
|
QApplication.processEvents()
|
||||||
(protocol,info) = self.getInfo(packet)
|
(protocol, info) = self.getInfo(packet)
|
||||||
tableViewPart['info'] = info
|
tableViewPart['info'] = info
|
||||||
if protocol:
|
if protocol:
|
||||||
tableViewPart['Protocol'] = protocol
|
tableViewPart['Protocol'] = protocol
|
||||||
@ -62,17 +67,19 @@ class NetworkMonitorThread(QObject):
|
|||||||
tableViewPart['Protocol'] = "Other"
|
tableViewPart['Protocol'] = "Other"
|
||||||
QApplication.processEvents()
|
QApplication.processEvents()
|
||||||
self.packetData.emit((packet, tableViewPart))
|
self.packetData.emit((packet, tableViewPart))
|
||||||
|
|
||||||
def getInfo(self,packet):
|
def getInfo(self, packet):
|
||||||
QApplication.processEvents()
|
QApplication.processEvents()
|
||||||
info = ""
|
info = ""
|
||||||
protocol = ""
|
protocol = ""
|
||||||
if UDP in packet:
|
if UDP in packet:
|
||||||
protocol = "UDP"
|
protocol = "UDP"
|
||||||
info = "{} -> {} len={} chksum={}".format(packet[UDP].sport,
|
info = "{} -> {} len={} chksum={}".format(
|
||||||
packet[UDP].dport,
|
packet[UDP].sport,
|
||||||
packet[UDP].len,
|
packet[UDP].dport,
|
||||||
packet[UDP].chksum)
|
packet[UDP].len,
|
||||||
|
packet[UDP].chksum
|
||||||
|
)
|
||||||
elif TCP in packet:
|
elif TCP in packet:
|
||||||
flags = {
|
flags = {
|
||||||
'F': 'FIN',
|
'F': 'FIN',
|
||||||
@ -84,26 +91,35 @@ class NetworkMonitorThread(QObject):
|
|||||||
'E': 'ECE',
|
'E': 'ECE',
|
||||||
'C': 'CWR',
|
'C': 'CWR',
|
||||||
}
|
}
|
||||||
|
|
||||||
flgs = str([flags[x] for x in packet.sprintf('%TCP.flags%')])
|
flgs = str([flags[x] for x in packet.sprintf('%TCP.flags%')])
|
||||||
protocol = "TCP"
|
protocol = "TCP"
|
||||||
info = "{} -> {} {} seq={} ack={} window={}".format(packet[TCP].sport,
|
info = "{} -> {} {} seq={} ack={} window={}".format(
|
||||||
packet[TCP].dport,
|
packet[TCP].sport,
|
||||||
flgs,
|
packet[TCP].dport,
|
||||||
packet[TCP].seq,
|
flgs,
|
||||||
packet[TCP].ack,
|
packet[TCP].seq,
|
||||||
packet[TCP].window)
|
packet[TCP].ack,
|
||||||
|
packet[TCP].window
|
||||||
|
)
|
||||||
|
|
||||||
elif ARP in packet:
|
elif ARP in packet:
|
||||||
protocol = "ARP"
|
protocol = "ARP"
|
||||||
info = "hwtype={} ptype={} hwlen={} plen={} op={}".format(packet[ARP].hwtype,
|
info = "hwtype={} ptype={} hwlen={} plen={} op={}".format(
|
||||||
packet[ARP].ptype,
|
packet[ARP].hwtype,
|
||||||
packet[ARP].hwlen,
|
packet[ARP].ptype,
|
||||||
packet[ARP].plen,
|
packet[ARP].hwlen,
|
||||||
packet[ARP].op)
|
packet[ARP].plen,
|
||||||
|
packet[ARP].op
|
||||||
|
)
|
||||||
QApplication.processEvents()
|
QApplication.processEvents()
|
||||||
return (protocol,info)
|
return (protocol, info)
|
||||||
|
|
||||||
|
|
||||||
def startSniff(self):
|
def startSniff(self):
|
||||||
QApplication.processEvents()
|
QApplication.processEvents()
|
||||||
self.pkts = sniff(count=0, iface=self.interface, prn=self.handlePacket, stop_filter=lambda x: self.sniffStatus())
|
self.pkts = sniff(
|
||||||
|
count=0,
|
||||||
|
iface=self.interface,
|
||||||
|
prn=self.handlePacket,
|
||||||
|
stop_filter={lambda x: self.sniffStatus()}
|
||||||
|
)
|
||||||
|
|||||||
30
ping.py
30
ping.py
@ -1,28 +1,40 @@
|
|||||||
import sys
|
import sys
|
||||||
from PyQt5.QtGui import *
|
from PyQt5.QtGui import *
|
||||||
from PyQt5.QtCore import *
|
from PyQt5.QtCore import *
|
||||||
from PyQt5.QtWidgets import *
|
from PyQt5.QtWidgets import *
|
||||||
from scapy.all import *
|
from scapy.all import *
|
||||||
|
from scapy.layers.inet import IP
|
||||||
|
from scapy.layers.inet import ICMP
|
||||||
|
|
||||||
|
|
||||||
class Ping(QWidget):
|
class Ping(QWidget):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
|
|
||||||
self.hostn = QLabel("Enter Address/Hostname to ping:")
|
self.hostn = QLabel("Enter Address/Hostname to ping:")
|
||||||
self.host = QLineEdit()
|
self.host = QLineEdit()
|
||||||
|
|
||||||
self.startPingBtn = QPushButton()
|
self.startPingBtn = QPushButton()
|
||||||
self.startPingBtn.setText('Ping')
|
self.startPingBtn.setText('Ping')
|
||||||
self.startPingBtn.clicked.connect(self.startPing)
|
self.startPingBtn.clicked.connect(self.startPing)
|
||||||
|
|
||||||
self.result = QTextEdit()
|
self.result = QTextEdit()
|
||||||
|
self.result.setReadOnly(True)
|
||||||
|
|
||||||
self.layoutPing = QVBoxLayout(self)
|
self.layoutPing = QVBoxLayout(self)
|
||||||
self.layoutPing.addWidget(self.hostn)
|
self.layoutPing.addWidget(self.hostn)
|
||||||
self.layoutPing.addWidget(self.host)
|
self.layoutPing.addWidget(self.host)
|
||||||
self.layoutPing.addWidget(self.startPingBtn)
|
self.layoutPing.addWidget(self.startPingBtn)
|
||||||
self.layoutPing.addWidget(self.result)
|
self.layoutPing.addWidget(self.result)
|
||||||
self.setLayout(self.layoutPing)
|
self.setLayout(self.layoutPing)
|
||||||
|
|
||||||
def startPing():
|
def startPing(self):
|
||||||
pass
|
if(len(self.host.text()) >= 1):
|
||||||
|
try:
|
||||||
|
packet = IP(dst=self.host.text(), ttl=10)/ICMP()
|
||||||
|
output = sr1(packet, timeout=2)
|
||||||
|
if output is not None:
|
||||||
|
self.result.setText(output.summary())
|
||||||
|
except socket.gaierror as e:
|
||||||
|
self.result.setText(
|
||||||
|
"Invalid address/Could not get address info")
|
||||||
|
|||||||
@ -1,35 +1,43 @@
|
|||||||
from PyQt5.QtGui import *
|
from PyQt5.QtGui import *
|
||||||
from PyQt5.QtCore import *
|
from PyQt5.QtCore import *
|
||||||
from PyQt5.QtWidgets import *
|
from PyQt5.QtWidgets import *
|
||||||
from scapy.all import *
|
from scapy.all import *
|
||||||
from scapy.layers.inet import traceroute
|
from scapy.layers.inet import traceroute
|
||||||
|
|
||||||
|
|
||||||
class TraceRoute(QWidget):
|
class TraceRoute(QWidget):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
self.hostn = QLabel("Enter Address/Hostname to trace:")
|
self.hostn = QLabel("Enter Address/Hostname to trace:")
|
||||||
self.host = QLineEdit()
|
self.host = QLineEdit()
|
||||||
|
|
||||||
self.startTraceBtn = QPushButton()
|
self.startTraceBtn = QPushButton()
|
||||||
self.startTraceBtn.setText('Trace Route')
|
self.startTraceBtn.setText('Trace Route')
|
||||||
self.startTraceBtn.clicked.connect(self.startTrace)
|
self.startTraceBtn.clicked.connect(self.startTrace)
|
||||||
|
|
||||||
self.result = QTextEdit()
|
self.result = QTextEdit()
|
||||||
self.result.setReadOnly(True)
|
self.result.setReadOnly(True)
|
||||||
|
|
||||||
self.layoutTrace = QVBoxLayout(self)
|
self.layoutTrace = QVBoxLayout(self)
|
||||||
self.layoutTrace.addWidget(self.hostn)
|
self.layoutTrace.addWidget(self.hostn)
|
||||||
self.layoutTrace.addWidget(self.host)
|
self.layoutTrace.addWidget(self.host)
|
||||||
self.layoutTrace.addWidget(self.startTraceBtn)
|
self.layoutTrace.addWidget(self.startTraceBtn)
|
||||||
self.layoutTrace.addWidget(self.result)
|
self.layoutTrace.addWidget(self.result)
|
||||||
self.setLayout(self.layoutTrace)
|
self.setLayout(self.layoutTrace)
|
||||||
|
|
||||||
def startTrace(self):
|
def startTrace(self):
|
||||||
result, unans = traceroute(target=self.host.text(), dport=80, verbose=0)
|
if(len(self.host.text()) >= 1):
|
||||||
output = []
|
try:
|
||||||
output.append("\tRoute path\tResponse time")
|
result, unans = traceroute(
|
||||||
for snd, rcv in result:
|
target=self.host.text(), dport=80, verbose=0)
|
||||||
output.append(str(f"{snd.ttl}\t{rcv.src}\t\t{(int((rcv.time - snd.sent_time)*1000))} ms"))
|
output = []
|
||||||
output.append(f"\nUnanswered packets: {len(unans)}")
|
output.append("\tRoute path\tResponse time")
|
||||||
self.result.setText('\n'.join(output))
|
for snd, rcv in result:
|
||||||
|
output.append(
|
||||||
|
str(f"{snd.ttl}\t{rcv.src}\t\t{(int((rcv.time - snd.sent_time)*1000))} ms"))
|
||||||
|
output.append(f"\nUnanswered packets: {len(unans)}")
|
||||||
|
self.result.setText('\n'.join(output))
|
||||||
|
except socket.gaierror as e:
|
||||||
|
self.result.setText(
|
||||||
|
"Invalid address/Could not get address info")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user