Source code for src.sniffer

# coding: utf8
import socket
import threading

from .database import *
from .credential_harvester import *
from .network_mapper import *
from scapy.all import *

__author__ = 'Romain KRAFT <romain.kraft@protonmail.com>'

[docs]class Sniffer(threading.Thread): ''' Class used to sniff the network and saving into the database ''' def __init__(self, interface, database, pcap = None, output_pcap = None, mode = "stealthy"): """ Create the sniffer instance. You may specify a pcap file to read from it. :param interface: interface to use to sniff the traffic :param database: database wher infos are stored :param pcap: the pcap file to read, optional :param output_pcap: the pcap file to output, optional :param mode: mode to be used by sniffer, optional """ threading.Thread.__init__(self) self.credentials_engine = CredentialEngine() self.network_mapper = NetworkMapper() self.database = database self.interface = interface # In case of pcap file is in input self.pcap = pcap # In cas we're storing packets into a pcap self.output_pcap = output_pcap # Mode to be used by sniffer valid_modes = ['stealthy', 'recon'] # to be updated each time we implement a new mode! if mode not in valid_modes: print(f"INFO: mode {mode} not implemented, downgrading to stealthy!") mode = 'stealthy' self.mode = mode
[docs] def run(self): self.capture()
[docs] def analyse(self, packet): """ Analyse the packet for useful informations before storing it :param packet: packet to analyse """ ''' Check if it's DHCP in order to process it ''' if packet.haslayer(DHCP) and self.mode != 'stealthy': self.network_mapper.process_dhcp_packet(packet) return # Search for credentials in the live data self.credentials_engine.process(packet) # Store content for mapping the network self.store_packet(packet)
[docs] def store_packet(self, packet): """ Callback function that will store all packet received by the capture. :param packet: packet to process """ if(packet.haslayer(IP) ): # Getting protocol as string # TODO: find a better way to get the protocol proto = None try: if(packet.haslayer(TCP)): proto = socket.getservbyport(packet[TCP].dport) elif(packet.haslayer(UDP)): proto = socket.getservbyport(packet[UDP].dport) except : proto = 'tcp' if(packet.haslayer(TCP)): # TODO: find a better way to handle random ports if (packet[TCP].dport < 50000): # Inserting packet in the database. self.database.insert_port(packet[IP].dst, packet[TCP].dport, proto) elif(packet.haslayer(UDP)): # TODO: find a better way to handle random ports if (packet[UDP].dport < 50000): # Inserting packet in the database. self.database.insert_port(packet[IP].dst, packet[UDP].dport, proto) if self.output_pcap: wrpcap(self.output_pcap, packet, append=True)
[docs] def capture( self ): """ Launch the sniffing process. """ print("Capturing") try: if (self.pcap == None) : # Case no pcap given, listening on an interface sniff(iface = self.interface, prn=self.analyse, store=0) else : # Case pcap given, analysing it sniff(offline = self.pcap, prn=self.analyse, store=0) except KeyboardInterrupt: print("INFO : User asked to exit, closing the sniffer !")