Source code for src.database

# coding: utf8
from datetime import datetime
import sqlite3
import os
import sys

__author__ = 'Romain KRAFT <romain.kraft@protonmail.com>'

[docs]class DHCPInstance(): ''' Representation of a DHCP Request ''' def __init__(self): self.server_id = None self.subnet_mask = None self.name_server = None self.domain = None def __repr__(self): return f"[+] {datetime.now()} - DHCP Request :\n\t- server_id -> {self.server_id}\n\t- subnet_mask -> {self.subnet_mask}\n\t- name_server -> {self.name_server}\n\t- domain -> {self.domain}"
[docs]class DataBase: """ The SQLite database. """ database_path = "" instance = None dhcp_request_number = 0 class _Singleton: ''' Singleton used to have only one instance of database, without declaring global variable. ''' def __init__(self,database_path,keep =False): """ Connect to a sqlite3 database, create it if it doesn't already exist. :param database_path: the path where the database will be created """ # We check if the database is already existing if ( os.path.exists(database_path) and os.path.isfile(database_path) and not keep): try : print("INFO : The database has been removed to create a new one ...") os.remove(database_path) self.database= sqlite3.connect(database_path,check_same_thread=False) except Exception: raise Exception(f"ERROR : {database_path} is already existing and not a valid Sqlite3 file !") # If it's a dir we exit elif( os.path.isdir(database_path) ): raise Exception(f"ERROR : {database_path} can't be created, a folder with the same name already exists !") # If the file doesn't exist we create it else: print("INFO : Appending new data to existing database") self.database = sqlite3.connect(database_path,check_same_thread=False) def __init__(self,database_path=None,keep=False): """ Connect to a sqlite3 database, create it if it doesn't already exist. :param database_path: the path where the database will be created (default=None) """ if not DataBase.instance and not database_path : print("You must specify a databse_path when no database instance exists") sys.exit(-1) elif not DataBase.instance: DataBase.database_path = database_path DataBase.instance = DataBase._Singleton(database_path,keep).database
[docs] def create_table_scheme(self): """ Function that will create the according relational database for the current database. """ """ Database Specifications ( https://app.quickdatabasediagrams.com/#/ ) ==================================================================== IPAddress --------- ip_address PK string Ports --------- port_number int PK protocol string ip_address string PK FK >- IPAddress.ip_address Credentials ----------- src_ip string PK FK >- IPAddress.ip_address dst_ip string PK FK >- IPAddress.ip_address dst_port string PK FK >- Ports.port_number type string PK data string PK HttpRequest ----------- src_ip string PK FK >- IPAddress.ip_address dst_ip string PK FK >- IPAddress.ip_address dst_port string PK FK >- Ports.port_number method string PK host string PK url string PK cookies string PK get_params string PK post_params string PK DHCP_Info --------- server_id string PK FK >- IPAddress.ip_address subnet_mask string PK domain string PK id_dns_ip PK FK >- DNS_IP.id_dns_ip DNS_IP ------ id_dns_ip int PK ip_address string PK FK >- IPAddress.ip_address """ cursor = DataBase.instance.cursor() # Creating the table containing all ip_address found by te sniffer cursor.execute(""" CREATE TABLE "IPAddress" ( "ip_address" string PRIMARY KEY )""") # Creating the table containing all port found by the sniffer cursor.execute(""" CREATE TABLE "Port" ( "ip_address" string NOT NULL , "port_number" int NOT NULL , "protocol" string NOT NULL , PRIMARY KEY (ip_address, port_number), FOREIGN KEY (ip_address) REFERENCES IPAddress(ip_address) )""") # Creatign the the table containing all the credentials harvested # by the script cursor.execute(""" CREATE TABLE "Credentials" ( "src_ip" string NOT NULL, "dst_ip" string NOT NULL, "dst_port" string NOT NULL, "type" string NOT NULL, "data" string NOT NULL, FOREIGN KEY (src_ip) REFERENCES IPAddress(ip_address), FOREIGN KEY (dst_ip) REFERENCES IPAddress(ip_address), FOREIGN KEY (dst_port) REFERENCES Port(port_number) )""") # Creating the table used to store all the http request harvested cursor.execute(""" CREATE TABLE "HttpRequest" ( src_ip string NOT NULL, dst_ip string NOT NULL, dst_port string NOT NULL, method string NOT NULL, host string NOT NULL, url string NOT NULL, cookies string, get_params string, post_params string, FOREIGN KEY (src_ip) REFERENCES IPAddress(ip_address), FOREIGN KEY (dst_ip) REFERENCES IPAddress(ip_address), FOREIGN KEY (dst_port) REFERENCES Port(port_number) ) """) #Creating the tables containing the DHCP infos cursor.execute(""" CREATE TABLE "DNS" ( name_server int NOT NULL, ip_address string NOT NULL, FOREIGN KEY (ip_address) REFERENCES IPAddress(ip_address) ) """) cursor.execute(""" CREATE TABLE "DHCP_INFO" ( server_id string NOT NULL, subnet_mask string NOT NULL, name_server int NOT NULL, domain string NOT NULL, FOREIGN KEY (server_id) REFERENCES IPAddress(ip_address), FOREIGN KEY (name_server) REFERENCES DNS(name_server) ) """) # Commiting modifications DataBase.instance.commit()
[docs] def insert_ip(self,ip_address): """ Function that will insert an ip into the table. :param ip_address: the ip to insert """ # Insert only if the ip_address doesn't already exists cursor = DataBase.instance.cursor() # Inserting the ip_adress into the sqlite base cursor.execute("INSERT OR IGNORE INTO IPAddress(ip_address) VALUES (?)", (ip_address,) ) # Commiting modifications DataBase.instance.commit()
[docs] def insert_port(self, ip_address, port_number, protocol): """ Function that will insert a port for an associated ip. :param ip_address: the ip_adress :param port_number: the port to insert :param protocol: the protocol to insert """ # Inserting the ip adress self.insert_ip(ip_address) # Getting the cursor cursor = DataBase.instance.cursor() # Inserting the others parameters cursor.execute("INSERT OR IGNORE INTO Port(ip_address,port_number,protocol) VALUES (?,?,?)",(ip_address,port_number,protocol)) # Commiting modifications DataBase.instance.commit()
[docs] def insert_credentials(self, src, dest,type, infos): """ Insert found credentials into the database. :param src: (ip,port) of the source :param dst: (ip,port) of the receiver :param infos: (type of credential, ... ) """ src_ip, src_port = src dst_ip, dst_port = dest # We had the given port if they are not already existing self.insert_ip(dst_ip) self.insert_port(dst_ip,dst_port,type) # We insert credentials cursor = DataBase.instance.cursor() cursor.execute("INSERT OR IGNORE INTO \"Credentials\" VALUES (?,?,?,?,?)",(src_ip,dst_ip,dst_port,type,str(infos)))
[docs] def insert_http_request(self,src,dst,method,host,url,cookies=None,get_params=None,post_params=None): """ Store HTTP data into the sqlite database. :param src: (ip,port) of the sender :param dst: (ip,port) of the receiver :param method: method of the http request :param host: host of the http request :param url: url of the http request :param cookies: cookies of the http request (default=None) :param get_params: get parameters of the http request (default=None) :param post_params: post parameters of the httprequest (default=None) """ src_ip, src_port = src dst_ip, dst_port = dst # We had the given port if they are not already existing self.insert_ip(dst_ip) self.insert_port(dst_ip,dst_port,'HTTP') # We insert credentials cursor = DataBase.instance.cursor() cursor.execute("INSERT OR IGNORE INTO \"HttpRequest\" VALUES (?,?,?,?,?,?,?,?,?)", (src_ip,dst_ip,dst_port,method,host,url,cookies,get_params,post_params)) DataBase.instance.commit()
[docs] def insert_dhcp(self, dhcp_instance): """ Store DHCP informations :param dhcp_instance: instance of class DHCPInstance containing the dhcp info to store """ # Inserting server id ip self.insert_ip(dhcp_instance.server_id) cursor = DataBase.instance.cursor() cursor.execute("INSERT OR IGNORE INTO \"DHCP_INFO\" VALUES (?,?,?,?)",(dhcp_instance.server_id, dhcp_instance.subnet_mask, self.dhcp_request_number, dhcp_instance.domain)) # Inserting all name server ip if(dhcp_instance.name_server != None): for i in dhcp_instance.name_server : self.insert_ip(i) cursor.execute("INSERT OR IGNORE INTO \"DNS\" VALUES (?,?)", (self.dhcp_request_number,i)) self.dhcp_request_number = self.dhcp_request_number + 1 DataBase.instance.commit()
[docs] def close(self): """ Close the connection to database """ print("INFO : Closing database") DataBase.instance.close() DataBase.instance = None