diff --git a/network_monitor_thread.py b/network_monitor_thread.py index cf4cd9b..53f95de 100644 --- a/network_monitor_thread.py +++ b/network_monitor_thread.py @@ -1,8 +1,11 @@ import sys -from PyQt5.QtGui import * -from PyQt5.QtCore import * +from PyQt5.QtGui import * +from PyQt5.QtCore import * from PyQt5.QtWidgets import * from scapy.all import * +from scapy.layers.inet import * +from scapy.layers.l2 import * + class NetworkMonitorThread(QObject): def __init__(self, interface, parent=None): @@ -10,19 +13,20 @@ class NetworkMonitorThread(QObject): self.interface = interface self.packetList = [] self.end = False - + quitBool = pyqtSignal(int) + def endSniff(self): QApplication.processEvents() print("Ending") self.end = True self.quitBool.emit(1) - + def sniffStatus(self): QApplication.processEvents() return self.end - - def getLayers(self,packet): + + def getLayers(self, packet): QApplication.processEvents() layers = [] counter = 0 @@ -35,9 +39,10 @@ class NetworkMonitorThread(QObject): break counter += 1 return layers - + packetData = pyqtSignal(tuple) - def handlePacket(self,packet): + + def handlePacket(self, packet): self.packetList.append(packet) QApplication.processEvents() tableViewPart = dict() @@ -50,9 +55,9 @@ class NetworkMonitorThread(QObject): tableViewPart['destination'] = packet.dst tableViewPart['length'] = len(packet) tableViewPart['layers'] = self.getLayers(packet) - + QApplication.processEvents() - (protocol,info) = self.getInfo(packet) + (protocol, info) = self.getInfo(packet) tableViewPart['info'] = info if protocol: tableViewPart['Protocol'] = protocol @@ -62,17 +67,19 @@ class NetworkMonitorThread(QObject): tableViewPart['Protocol'] = "Other" QApplication.processEvents() self.packetData.emit((packet, tableViewPart)) - - def getInfo(self,packet): + + def getInfo(self, packet): QApplication.processEvents() info = "" protocol = "" if UDP in packet: protocol = "UDP" - info = "{} -> {} len={} chksum={}".format(packet[UDP].sport, - packet[UDP].dport, - packet[UDP].len, - packet[UDP].chksum) + info = "{} -> {} len={} chksum={}".format( + packet[UDP].sport, + packet[UDP].dport, + packet[UDP].len, + packet[UDP].chksum + ) elif TCP in packet: flags = { 'F': 'FIN', @@ -84,26 +91,35 @@ class NetworkMonitorThread(QObject): 'E': 'ECE', 'C': 'CWR', } - + flgs = str([flags[x] for x in packet.sprintf('%TCP.flags%')]) protocol = "TCP" - info = "{} -> {} {} seq={} ack={} window={}".format(packet[TCP].sport, - packet[TCP].dport, - flgs, - packet[TCP].seq, - packet[TCP].ack, - packet[TCP].window) + info = "{} -> {} {} seq={} ack={} window={}".format( + packet[TCP].sport, + packet[TCP].dport, + flgs, + packet[TCP].seq, + packet[TCP].ack, + packet[TCP].window + ) + elif ARP in packet: protocol = "ARP" - info = "hwtype={} ptype={} hwlen={} plen={} op={}".format(packet[ARP].hwtype, - packet[ARP].ptype, - packet[ARP].hwlen, - packet[ARP].plen, - packet[ARP].op) + info = "hwtype={} ptype={} hwlen={} plen={} op={}".format( + packet[ARP].hwtype, + packet[ARP].ptype, + packet[ARP].hwlen, + packet[ARP].plen, + packet[ARP].op + ) QApplication.processEvents() - return (protocol,info) - - + return (protocol, info) + def startSniff(self): QApplication.processEvents() - self.pkts = sniff(count=0, iface=self.interface, prn=self.handlePacket, stop_filter=lambda x: self.sniffStatus()) \ No newline at end of file + self.pkts = sniff( + count=0, + iface=self.interface, + prn=self.handlePacket, + stop_filter={lambda x: self.sniffStatus()} + ) diff --git a/ping.py b/ping.py index b65b2ec..e7dd7ea 100644 --- a/ping.py +++ b/ping.py @@ -1,28 +1,40 @@ import sys -from PyQt5.QtGui import * -from PyQt5.QtCore import * +from PyQt5.QtGui import * +from PyQt5.QtCore import * from PyQt5.QtWidgets import * from scapy.all import * +from scapy.layers.inet import IP +from scapy.layers.inet import ICMP + class Ping(QWidget): def __init__(self): super().__init__() - - + self.hostn = QLabel("Enter Address/Hostname to ping:") self.host = QLineEdit() - + self.startPingBtn = QPushButton() self.startPingBtn.setText('Ping') self.startPingBtn.clicked.connect(self.startPing) - + self.result = QTextEdit() + self.result.setReadOnly(True) + self.layoutPing = QVBoxLayout(self) self.layoutPing.addWidget(self.hostn) self.layoutPing.addWidget(self.host) self.layoutPing.addWidget(self.startPingBtn) self.layoutPing.addWidget(self.result) self.setLayout(self.layoutPing) - - def startPing(): - pass \ No newline at end of file + + def startPing(self): + if(len(self.host.text()) >= 1): + try: + packet = IP(dst=self.host.text(), ttl=10)/ICMP() + output = sr1(packet, timeout=2) + if output is not None: + self.result.setText(output.summary()) + except socket.gaierror as e: + self.result.setText( + "Invalid address/Could not get address info") diff --git a/trace_route.py b/trace_route.py index f8aa393..c695495 100644 --- a/trace_route.py +++ b/trace_route.py @@ -1,35 +1,43 @@ -from PyQt5.QtGui import * -from PyQt5.QtCore import * +from PyQt5.QtGui import * +from PyQt5.QtCore import * from PyQt5.QtWidgets import * from scapy.all import * from scapy.layers.inet import traceroute + class TraceRoute(QWidget): def __init__(self): super().__init__() - + self.hostn = QLabel("Enter Address/Hostname to trace:") self.host = QLineEdit() - + self.startTraceBtn = QPushButton() self.startTraceBtn.setText('Trace Route') self.startTraceBtn.clicked.connect(self.startTrace) - + self.result = QTextEdit() self.result.setReadOnly(True) - + self.layoutTrace = QVBoxLayout(self) self.layoutTrace.addWidget(self.hostn) self.layoutTrace.addWidget(self.host) self.layoutTrace.addWidget(self.startTraceBtn) self.layoutTrace.addWidget(self.result) self.setLayout(self.layoutTrace) - + def startTrace(self): - result, unans = traceroute(target=self.host.text(), dport=80, verbose=0) - output = [] - output.append("\tRoute path\tResponse time") - for snd, rcv in result: - output.append(str(f"{snd.ttl}\t{rcv.src}\t\t{(int((rcv.time - snd.sent_time)*1000))} ms")) - output.append(f"\nUnanswered packets: {len(unans)}") - self.result.setText('\n'.join(output)) \ No newline at end of file + if(len(self.host.text()) >= 1): + try: + result, unans = traceroute( + target=self.host.text(), dport=80, verbose=0) + output = [] + output.append("\tRoute path\tResponse time") + for snd, rcv in result: + output.append( + str(f"{snd.ttl}\t{rcv.src}\t\t{(int((rcv.time - snd.sent_time)*1000))} ms")) + output.append(f"\nUnanswered packets: {len(unans)}") + self.result.setText('\n'.join(output)) + except socket.gaierror as e: + self.result.setText( + "Invalid address/Could not get address info")