-
Notifications
You must be signed in to change notification settings - Fork 6
/
hostdb.py
157 lines (116 loc) · 5.54 KB
/
hostdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python2
# standard Python library imports
# secondary Python library imports
# (should be in your distribution's repository)
import sqlite3
# Implements a database of host and port information, with API calls for plugins
class HostDatabase:
def __init__(self):
self.db = self._create_database()
def _create_database(self):
"""Private method: create an empty database"""
self.db = sqlite3.connect(':memory:')
self.cursor = self.db.cursor()
self.cursor.executescript("""
CREATE TABLE Hosts(
id INTEGER PRIMARY KEY,
hostip TEXT UNIQUE,
hostname TEXT
);
CREATE TABLE Ports(
id INTEGER PRIMARY KEY,
portnum INTEGER CHECK (portnum>0 and portnum<65535),
protocol TEXT,
svcname TEXT,
state TEXT,
hostip TEXT,
foreignid INTEGER,
FOREIGN KEY (foreignid) REFERENCES Hosts(id)
);
""")
# ----------- API and Utility Methods ----------------------------------------------
#
# These methods are exposed to the plugins and may be overriden by them.
#
#-----------------------------------------------------------------------------------
def add_host(self, host_ip, host_name):
"""Database API: Add a host to the database, where:
- host_ip: a string containing the host's Internet Protocol (IP) number
- host_name: the name associated with this host"""
# note: FIXME - skip if this is already in the database
self.cursor.execute("INSERT INTO Hosts(hostip, hostname) VALUES (?, ?);", (host_ip, host_name))
def get_host_iplist(self):
results = self.cursor.execute("SELECT hostip from Hosts")
return results
def get_host_id(self, host_ip):
results = self.cursor.execute("SELECT id from Hosts WHERE hostip=?",[host_ip])
return results[0]
def add_port(self, portnum, port_protocol, service_name, state, hostip):
"""Database API: Add port information for a given host to the database, where:
- host_ip: a string containing the host's Internet Protocol (IP) number
- portnum: a string containing the port number
- protocol: one of "tcp", "udp"
- service_name: name of the service or program responding to queries on this port
- status: can be "Open", "Closed", or "Filered". """
self.cursor.execute("INSERT INTO Ports(portnum, protocol, svcname, state, hostip) VALUES (?, ?, ?, ?, ?);", (portnum, port_protocol, service_name, state, hostip))
def get_host_portinfo(self, host_ip):
"""Database API: Return all port information for the specified host, where:
- host_ip: a string containing the host's Internet Protocol (IP) number
"""
results = self.cursor.execute("SELECT * From Ports WHERE Ports.hostip=?", [host_ip])
return results
def list_matching_ports(self, portnum, protocol="TCP", state="open"):
"""Database API: Return list of all hosts that have this port open, where:
- portnum: a string containing the port number
- protocol: one of "tcp", "udp"
- service_name: name of the service or program responding to queries on this port
- status: optional. Can be "Open", "Closed", or "Filered". """
results = self.cursor.execute("SELECT hostip,portnum,protocol,state FROM Ports WHERE portnum=? AND state=?", [portnum, state])
return results
# loads /etc/services and returns dictionary of "ip/port":servicename
def load_service_defs(fname):
services = parse_service_defs(open(fname).readlines())
return services
# feed it lines of services definition (like from /etc/services) and it will return a
# dictionary of "ip/port":servicename
def parse_service_defs(lines):
services = {}
for _line in lines:
line = _line.strip()
if line == '': continue
if line[0]=='#': continue
portAndType = line.split()[1]
serviceName = line.split()[0]
services[portAndType] = serviceName
return services
# implement test routines here
if __name__=='__main__':
db = HostDatabase()
servicedefs = load_service_defs('/etc/services')
port_protocol = "TCP"
# create some example hosts and ports
IP='1.1.1.1'
HOSTNAME='www.test123.com'
db.add_host(host_ip=IP, host_name=HOSTNAME)
for portnum in range(20, 100):
try:
# note: FIXME - skip if this is already in the database
service_name = servicedefs[portnum]
except Exception:
service_name = "????"
db.add_port(portnum=str(portnum), port_protocol="TCP", service_name=service_name, state="open", hostip=IP)
IP='2.2.2.2'
HOSTNAME='www.testABC.com'
db.add_host(host_ip=IP, host_name=HOSTNAME)
for portnum in range(10, 30):
try:
# note: FIXME - skip if this is already in the database
service_name = servicedefs[portnum]
except Exception:
service_name = "????"
db.add_port(portnum=str(portnum), port_protocol="TCP", service_name=service_name, state="open", hostip=IP)
# sift database for all open port 25/TCP
ports = db.list_matching_ports("25", protocol="TCP", state="open")
print 'ports is', ports
for p in ports:
print p