From be893ccbc6cbd21cd1bdd2070ead9e52415812e3 Mon Sep 17 00:00:00 2001 From: Lukas Rist Date: Wed, 8 Dec 2021 10:54:12 +0100 Subject: [PATCH] formatting with black --- oschameleon/__init__.py | 4 +- oschameleon/oschameleonRun.py | 74 ++++-- oschameleon/osfuscation.py | 8 +- oschameleon/parse_fp.py | 201 +++++++------- oschameleon/root_fork.py | 43 ++- oschameleon/session/__init__.py | 1 - oschameleon/session/ext_ip.py | 2 +- oschameleon/session/log.py | 2 +- oschameleon/session/session.py | 17 +- oschameleon/stack_packet/ICMP_.py | 25 +- oschameleon/stack_packet/IP_.py | 39 +-- oschameleon/stack_packet/TCP_.py | 375 ++++++++++++++++++++------- oschameleon/stack_packet/UDP_.py | 13 +- oschameleon/stack_packet/helper.py | 28 +- oschameleon/stack_packet/template.py | 2 +- setup.py | 18 +- 16 files changed, 572 insertions(+), 280 deletions(-) diff --git a/oschameleon/__init__.py b/oschameleon/__init__.py index 1db1c12..e3246b7 100644 --- a/oschameleon/__init__.py +++ b/oschameleon/__init__.py @@ -1,2 +1,2 @@ -__title__ = 'oschameleon' -__version__ = '0.1.2' \ No newline at end of file +__title__ = "oschameleon" +__version__ = "0.1.2" diff --git a/oschameleon/oschameleonRun.py b/oschameleon/oschameleonRun.py index ba546eb..b03ea8b 100755 --- a/oschameleon/oschameleonRun.py +++ b/oschameleon/oschameleonRun.py @@ -1,8 +1,8 @@ -''' +""" Created on 01.12.2016 @author: manuel -''' +""" import argparse import gevent.monkey import grp @@ -22,19 +22,47 @@ class OSChameleon(object): def __init__(self, template=None, template_directory=None, args=None): if float(nfqueue.nfq_bindings_version()) < 0.6: - print("Found nfqueue version: {} but need at least 0.6, aborting.".format(nfqueue.nfq_bindings_version())) + print( + "Found nfqueue version: {} but need at least 0.6, aborting.".format( + nfqueue.nfq_bindings_version() + ) + ) exit(1) - self.parser = argparse.ArgumentParser(description='OSChameleon sample usage') - self.parser.add_argument('--template', metavar='template/SIMATIC_300_PLC.txt', type=str, help='path to the nmap fingerprint template', default="template/SIMATIC_300_PLC.txt") - self.parser.add_argument('--server', metavar='IP', type=str, help='server ip for iptables', default='127.0.0.1') - self.parser.add_argument('--public_ip', metavar='IP', help='running in production with public ip', default=False) - self.parser.add_argument('--interface', metavar='eth0', help='network interface', default='eth0') - self.parser.add_argument('--debug', metavar='True/False', help='verbose debugging output', default=False) + self.parser = argparse.ArgumentParser(description="OSChameleon sample usage") + self.parser.add_argument( + "--template", + metavar="template/SIMATIC_300_PLC.txt", + type=str, + help="path to the nmap fingerprint template", + default="template/SIMATIC_300_PLC.txt", + ) + self.parser.add_argument( + "--server", + metavar="IP", + type=str, + help="server ip for iptables", + default="127.0.0.1", + ) + self.parser.add_argument( + "--public_ip", + metavar="IP", + help="running in production with public ip", + default=False, + ) + self.parser.add_argument( + "--interface", metavar="eth0", help="network interface", default="eth0" + ) + self.parser.add_argument( + "--debug", + metavar="True/False", + help="verbose debugging output", + default=False, + ) self.args = self.parser.parse_args() gevent.monkey.patch_all() - if self.args.debug == 'True': + if self.args.debug == "True": self.args.debug = True else: self.args.debug = False @@ -49,18 +77,26 @@ def start(self): self.drop_privileges() except KeyboardInterrupt: flush_tables() - print ("bye") + print("bye") def root_process(self): if self.args.debug: - print("Child: Running as {0}/{1}.".format(pwd.getpwuid(os.getuid())[0], grp.getgrgid(os.getgid())[0])) + print( + "Child: Running as {0}/{1}.".format( + pwd.getpwuid(os.getuid())[0], grp.getgrgid(os.getgid())[0] + ) + ) data = OSFuscation.run(self.args.debug, self.args.template, self.args.server) if self.args.debug: - print('OSFuscation return value', data) + print("OSFuscation return value", data) - def drop_privileges(self, uid_name='nobody', gid_name='nogroup'): + def drop_privileges(self, uid_name="nobody", gid_name="nogroup"): if self.args.debug: - print("Init: Running as {0}/{1}.".format(pwd.getpwuid(os.getuid())[0], grp.getgrgid(os.getgid())[0])) + print( + "Init: Running as {0}/{1}.".format( + pwd.getpwuid(os.getuid())[0], grp.getgrgid(os.getgid())[0] + ) + ) wanted_uid = pwd.getpwnam(uid_name)[2] wanted_gid = grp.getgrnam(gid_name)[2] @@ -81,7 +117,11 @@ def drop_privileges(self, uid_name='nobody', gid_name='nogroup'): new_uid_name = pwd.getpwuid(os.getuid())[0] new_gid_name = grp.getgrgid(os.getgid())[0] if self.args.debug: - print("Parent: Privileges dropped, running as {0}/{1}.".format(new_uid_name, new_gid_name)) + print( + "Parent: Privileges dropped, running as {0}/{1}.".format( + new_uid_name, new_gid_name + ) + ) while True: try: gevent.sleep(1) @@ -90,6 +130,6 @@ def drop_privileges(self, uid_name='nobody', gid_name='nogroup'): break -if __name__ == '__main__': +if __name__ == "__main__": p = OSChameleon() p.start() diff --git a/oschameleon/osfuscation.py b/oschameleon/osfuscation.py index 77d761c..5cb695b 100755 --- a/oschameleon/osfuscation.py +++ b/oschameleon/osfuscation.py @@ -120,9 +120,9 @@ def run(cls, debug=False, template_path='', server_ip=None): os_pattern = get_os_pattern(template_path, debug) if debug: - print '*' * 30 - print os_pattern - print '*' * 30 + print('*' * 30) + print(os_pattern) + print('*' * 30) # Flush the IP tables first flush_tables() @@ -148,7 +148,7 @@ def run(cls, debug=False, template_path='', server_ip=None): q.unbind(socket.AF_INET) q.close() flush_tables() - print 'Exiting...' + print('Exiting...') if __name__ == '__main__': diff --git a/oschameleon/parse_fp.py b/oschameleon/parse_fp.py index 3eca1ab..d421d2c 100755 --- a/oschameleon/parse_fp.py +++ b/oschameleon/parse_fp.py @@ -3,9 +3,10 @@ import random from stack_packet.template import OSPatternTemplate + # from pdfminer.rijndael import KEYLENGTH -''' +""" TODO ISR TS not correct @@ -15,26 +16,26 @@ U1 IE R T -''' +""" def _str2int(string): try: return int(string) except ValueError: - return int('0x' + string, 16) + return int("0x" + string, 16) def _str2hexint(string): try: return int(string, 16) except ValueError: - return int('0x' + string, 16) + return int("0x" + string, 16) def _parse_range(value, step=5): - if '-' in value: - return map(_str2int, value.split('-')) + if "-" in value: + return map(_str2int, value.split("-")) else: return _str2int(value), _str2int(value) + step @@ -58,22 +59,22 @@ def _upper_end_hex(string, start): # timestamp def set_ip_timestamp(os_pattern, fp): - ts = fp['SEQ']['TS'] + ts = fp["SEQ"]["TS"] ts_ans = 0 - if ts == 'U': + if ts == "U": ts_ans = 1 - elif ts == '0': + elif ts == "0": ts_ans = 0 - elif ts == '1': + elif ts == "1": ts_ans = random.randint(0, 5) - elif ts == '7': + elif ts == "7": ts_ans = random.randint(70, 150) - elif ts == '8': + elif ts == "8": ts_ans = random.randint(150, 350) else: # round(log2(average increments per second)) = A @@ -88,27 +89,27 @@ def set_ip_timestamp(os_pattern, fp): def set_probes_to_send(os_pattern, fp): for c in fp.keys(): ans = 1 - if 'R' in fp[c].keys(): - if fp[c]['R'] == 'N': + if "R" in fp[c].keys(): + if fp[c]["R"] == "N": ans = 0 if c in os_pattern.PROBES_2_SEND.keys(): os_pattern.PROBES_2_SEND[c] = ans - elif c == 'T1': - os_pattern.PROBES_2_SEND['P1'] = ans - os_pattern.PROBES_2_SEND['P2'] = ans - os_pattern.PROBES_2_SEND['P3'] = ans - os_pattern.PROBES_2_SEND['P4'] = ans - os_pattern.PROBES_2_SEND['P5'] = ans - os_pattern.PROBES_2_SEND['P6'] = ans + elif c == "T1": + os_pattern.PROBES_2_SEND["P1"] = ans + os_pattern.PROBES_2_SEND["P2"] = ans + os_pattern.PROBES_2_SEND["P3"] = ans + os_pattern.PROBES_2_SEND["P4"] = ans + os_pattern.PROBES_2_SEND["P5"] = ans + os_pattern.PROBES_2_SEND["P6"] = ans # if no WIN and OPS probe test, set to 0 ans = 1 - if 'R' in fp['OPS'].keys() and 'R' in fp['WIN'].keys(): - if fp['OPS']['R'] == 'N' and fp['WIN']['R'] == 'N': + if "R" in fp["OPS"].keys() and "R" in fp["WIN"].keys(): + if fp["OPS"]["R"] == "N" and fp["WIN"]["R"] == "N": ans = 0 - os_pattern.PROBES_2_SEND['P5'] = ans - os_pattern.PROBES_2_SEND['P6'] = ans + os_pattern.PROBES_2_SEND["P5"] = ans + os_pattern.PROBES_2_SEND["P6"] = ans return os_pattern @@ -117,26 +118,26 @@ def set_probes_to_send(os_pattern, fp): def set_ip_fragment_flag(os_pattern, fp): # set the ip fragment flag for c in fp.keys(): - if 'DF' in fp[c].keys(): + if "DF" in fp[c].keys(): ans = 0 - if fp[c]['DF'] == 'Y': + if fp[c]["DF"] == "Y": ans = 1 if c in os_pattern.TCP_OPTIONS.keys(): - if c == 'T1': - os_pattern.TCP_OPTIONS['P1']['DF'] = ans - os_pattern.TCP_OPTIONS['P2']['DF'] = ans - os_pattern.TCP_OPTIONS['P3']['DF'] = ans - os_pattern.TCP_OPTIONS['P4']['DF'] = ans - os_pattern.TCP_OPTIONS['P5']['DF'] = ans - os_pattern.TCP_OPTIONS['P6']['DF'] = ans + if c == "T1": + os_pattern.TCP_OPTIONS["P1"]["DF"] = ans + os_pattern.TCP_OPTIONS["P2"]["DF"] = ans + os_pattern.TCP_OPTIONS["P3"]["DF"] = ans + os_pattern.TCP_OPTIONS["P4"]["DF"] = ans + os_pattern.TCP_OPTIONS["P5"]["DF"] = ans + os_pattern.TCP_OPTIONS["P6"]["DF"] = ans else: - os_pattern.TCP_OPTIONS[c]['DF'] = ans + os_pattern.TCP_OPTIONS[c]["DF"] = ans - if 'DFI' in fp[c].keys(): + if "DFI" in fp[c].keys(): ans = 0 - if fp[c]['DFI'] == 'Y': + if fp[c]["DFI"] == "Y": ans = 1 - os_pattern.TCP_OPTIONS[c]['DF'] = ans + os_pattern.TCP_OPTIONS[c]["DF"] = ans return os_pattern @@ -150,34 +151,34 @@ def split_tcp_option(value): ans = 0 # MSS - if value[ch] == 'M': + if value[ch] == "M": upper = _upper_end_hex(value, ch + 1) - ans = value[ch + 1:upper] + ans = value[ch + 1 : upper] # int('0x' + string, 16) - ans = int('0x' + ans, 16) - current_probe.append(('MSS', ans)) + ans = int("0x" + ans, 16) + current_probe.append(("MSS", ans)) # NOP - if value[ch] == 'N': - current_probe.append(('NOP', 0)) + if value[ch] == "N": + current_probe.append(("NOP", 0)) # EOL - if value[ch] == 'L': - current_probe.append(('EOL', 0)) + if value[ch] == "L": + current_probe.append(("EOL", 0)) # Window size - if value[ch] == 'W': + if value[ch] == "W": upper = _upper_end_hex(value, ch + 1) - ans = value[ch + 1:upper] - current_probe.append(('WScale', ans)) + ans = value[ch + 1 : upper] + current_probe.append(("WScale", ans)) # Timestamp - if value[ch] == 'T': - ans = value[ch + 1:ch + 3] - tsval = value[ch + 1:ch + 2] - tsver = value[ch + 2:ch + 3] + if value[ch] == "T": + ans = value[ch + 1 : ch + 3] + tsval = value[ch + 1 : ch + 2] + tsver = value[ch + 2 : ch + 3] # timestamp.append(('Timestamp', (str(tsval+tsver)))) timestamp.append(tsval) timestamp.append(tsver) # selective ack permitted - if value[ch] == 'S': - current_probe.append(('SAckOK', '')) + if value[ch] == "S": + current_probe.append(("SAckOK", "")) return current_probe, timestamp @@ -188,34 +189,34 @@ def set_tcp_option(os_pattern, fp): # python: options=[('Experiment', (0xf989, 0xcafe, 0x0102, 0x0002)), ('NOP', 0), ('NOP', 0)]) # pkt[TCP].options # M5B4NNT11 - if 'O1' in fp['OPS'].keys(): - for current_option in fp['OPS']: + if "O1" in fp["OPS"].keys(): + for current_option in fp["OPS"]: # get the number of the probe O1, O2 current = current_option[1] # the value in the template - value = fp['OPS'][current_option] + value = fp["OPS"][current_option] # current_probe, timestamp = split_tcp_option(value) # set value in os_pattern - os_pattern.TCP_OPTIONS['P' + current]['O'] = current_probe + os_pattern.TCP_OPTIONS["P" + current]["O"] = current_probe if timestamp: - os_pattern.TCP_Timestamp['P' + current]['tsval'] = timestamp[0] - os_pattern.TCP_Timestamp['P' + current]['tsver'] = timestamp[1] + os_pattern.TCP_Timestamp["P" + current]["tsval"] = timestamp[0] + os_pattern.TCP_Timestamp["P" + current]["tsver"] = timestamp[1] # os_pattern.TCP_Timestamp['P' + current] = timestamp # os_pattern.TCP_OPTIONS['P' + current]['O'] = os_pattern.TCP_OPTIONS['P' + current]['O'] + os_pattern.TCP_Timestamp['P' + current] # T2-7 for c in fp.keys(): - if 'O' in fp[c].keys(): - if fp[c]['O'] != '': + if "O" in fp[c].keys(): + if fp[c]["O"] != "": # the value in the template - value = fp[c]['O'] + value = fp[c]["O"] current_probe, timestamp = split_tcp_option(value) - os_pattern.TCP_OPTIONS[c]['O'] = current_probe + os_pattern.TCP_OPTIONS[c]["O"] = current_probe os_pattern.TCP_Timestamp[c] = [] if timestamp: - os_pattern.TCP_Timestamp[c]['tsval'] = timestamp[0] - os_pattern.TCP_Timestamp[c]['tsver'] = timestamp[1] + os_pattern.TCP_Timestamp[c]["tsval"] = timestamp[0] + os_pattern.TCP_Timestamp[c]["tsver"] = timestamp[1] # os_pattern.TCP_Timestamp[c] = timestamp # os_pattern.TCP_OPTIONS[c]['O'] = os_pattern.TCP_OPTIONS[c]['O'] + os_pattern.TCP_Timestamp[c] @@ -224,18 +225,18 @@ def set_tcp_option(os_pattern, fp): def set_win_option(os_pattern, fp): # set window size for - if 'W1' in fp['WIN'].keys(): - os_pattern.TCP_OPTIONS['P1']['W'] = int(fp['WIN']['W1'], 16) - os_pattern.TCP_OPTIONS['P2']['W'] = int(fp['WIN']['W2'], 16) - os_pattern.TCP_OPTIONS['P3']['W'] = int(fp['WIN']['W3'], 16) - os_pattern.TCP_OPTIONS['P4']['W'] = int(fp['WIN']['W4'], 16) - os_pattern.TCP_OPTIONS['P5']['W'] = int(fp['WIN']['W5'], 16) - os_pattern.TCP_OPTIONS['P6']['W'] = int(fp['WIN']['W6'], 16) + if "W1" in fp["WIN"].keys(): + os_pattern.TCP_OPTIONS["P1"]["W"] = int(fp["WIN"]["W1"], 16) + os_pattern.TCP_OPTIONS["P2"]["W"] = int(fp["WIN"]["W2"], 16) + os_pattern.TCP_OPTIONS["P3"]["W"] = int(fp["WIN"]["W3"], 16) + os_pattern.TCP_OPTIONS["P4"]["W"] = int(fp["WIN"]["W4"], 16) + os_pattern.TCP_OPTIONS["P5"]["W"] = int(fp["WIN"]["W5"], 16) + os_pattern.TCP_OPTIONS["P6"]["W"] = int(fp["WIN"]["W6"], 16) for c in fp.keys(): - if 'W' in fp[c].keys(): + if "W" in fp[c].keys(): if c in os_pattern.TCP_OPTIONS.keys(): - os_pattern.TCP_OPTIONS[c]['W'] = int(fp[c]['W'], 16) + os_pattern.TCP_OPTIONS[c]["W"] = int(fp[c]["W"], 16) return os_pattern @@ -244,8 +245,8 @@ def set_win_option(os_pattern, fp): def set_ci(os_pattern, fp): for c in fp.keys(): # nmap probe for windows does not contain CI key - if 'CI' in fp[c].keys(): - os_pattern.IP_ID_CI_CNT = fp['SEQ']['CI'] + if "CI" in fp[c].keys(): + os_pattern.IP_ID_CI_CNT = fp["SEQ"]["CI"] break return os_pattern @@ -255,12 +256,12 @@ def set_ci(os_pattern, fp): def set_tcp_ttl(os_pattern, fp): # todo for c in fp.keys(): - if 'T' in fp[c].keys(): + if "T" in fp[c].keys(): # the value was added in get_os_pattern() # if the T key is not in the template, add the # TG value - os_pattern.TTL = int(fp[c]['T']) + os_pattern.TTL = int(fp[c]["T"]) # d = _str2int(fp['T1']['TG']) # os_pattern.TTL = int((fp[c]['T']),16) @@ -268,8 +269,8 @@ def set_tcp_ttl(os_pattern, fp): # os_pattern.TTL = chr(_str2int(fp['T1']['TG'])) # os_pattern.TTL = int(fp['T1']['TG'],10) break - elif 'TG' in fp[c].keys(): - os_pattern.TTL = int((fp[c]['TG']), 16) + elif "TG" in fp[c].keys(): + os_pattern.TTL = int((fp[c]["TG"]), 16) break return os_pattern @@ -278,7 +279,7 @@ def set_tcp_ttl(os_pattern, fp): def get_os_pattern(fprint_template, debug): os_pattern = OSPatternTemplate() # print os_pattern.TTL - with open(fprint_template, 'rb') as fh: + with open(fprint_template, "rb") as fh: data = fh.readlines() fp = dict() @@ -286,20 +287,20 @@ def get_os_pattern(fprint_template, debug): for line in data: line = line.strip() - category, result = line.split('(', 1) + category, result = line.split("(", 1) result = result[:-1] fp[category] = dict() - for item in result.split('%'): - key, val = item.split('=') - if '|' in val: + for item in result.split("%"): + key, val = item.split("=") + if "|" in val: # eg. GCD=FA7F|1F4FE|2EF7D|3E9FC|4E47B - val = random.choice(val.split('|')) - if '-' in val: - minVal, maxVal = val.split('-') + val = random.choice(val.split("|")) + if "-" in val: + minVal, maxVal = val.split("-") # choose random number in the range min-max # eg. SP=0-5 # os_pattern.SP_MIN, os_pattern.SP_MAX = _parse_range(fp['SEQ']['SP']) - if key == 'SP': + if key == "SP": minVal = _str2int(minVal) maxVal = _str2int(maxVal) if minVal > maxVal: @@ -308,18 +309,18 @@ def get_os_pattern(fprint_template, debug): os_pattern.SP_MAX = maxVal val = str(random.randint(minVal, maxVal)) # - if key == 'GCD': + if key == "GCD": minVal = _str2int(minVal) maxVal = _str2int(maxVal) if minVal > maxVal: minVal, maxVal = _switch(minVal, maxVal) val = str(random.randint(minVal, maxVal)) # - if key == 'ISR': + if key == "ISR": os_pattern.ISR_MIN = _str2hexint(minVal) os_pattern.ISR_MAX = _str2hexint(maxVal) - if key == 'T': + if key == "T": minD = int(minVal, 16) maxD = int(maxVal, 16) val = str(random.randint(minD, maxD)) @@ -331,13 +332,13 @@ def get_os_pattern(fprint_template, debug): print(val) print("---------") # GCD - os_pattern.GCD = _str2int(fp['SEQ']['GCD']) + os_pattern.GCD = _str2int(fp["SEQ"]["GCD"]) # TI - os_pattern.IP_ID_TI_CNT = fp['SEQ']['TI'] + os_pattern.IP_ID_TI_CNT = fp["SEQ"]["TI"] # CI os_pattern = set_ci(os_pattern, fp) # II - os_pattern.IP_ID_TI_CNT = fp['SEQ']['II'] + os_pattern.IP_ID_TI_CNT = fp["SEQ"]["II"] # set time to live os_pattern = set_tcp_ttl(os_pattern, fp) # TS timestamp @@ -353,10 +354,10 @@ def get_os_pattern(fprint_template, debug): # window size os_pattern = set_win_option(os_pattern, fp) # set IPL = IP total length - os_pattern.ICMP_IPL = int(fp['U1']['IPL']) + os_pattern.ICMP_IPL = int(fp["U1"]["IPL"]) return os_pattern -if __name__ == '__main__': - print get_os_pattern('SIMATIC_300_PLC.txt', False) +if __name__ == "__main__": + print(get_os_pattern("SIMATIC_300_PLC.txt", False)) diff --git a/oschameleon/root_fork.py b/oschameleon/root_fork.py index 7f14102..779c87b 100755 --- a/oschameleon/root_fork.py +++ b/oschameleon/root_fork.py @@ -15,13 +15,21 @@ def root_process(): - print("Child: Running as {0}/{1}.".format(pwd.getpwuid(os.getuid())[0], grp.getgrgid(os.getgid())[0])) + print( + "Child: Running as {0}/{1}.".format( + pwd.getpwuid(os.getuid())[0], grp.getgrgid(os.getgid())[0] + ) + ) data = OSFuscation.run(args.template) - print('OSFuscation return value', data) + print("OSFuscation return value", data) -def drop_privileges(uid_name='nobody', gid_name='nogroup'): - print("Init: Running as {0}/{1}.".format(pwd.getpwuid(os.getuid())[0], grp.getgrgid(os.getgid())[0])) +def drop_privileges(uid_name="nobody", gid_name="nogroup"): + print( + "Init: Running as {0}/{1}.".format( + pwd.getpwuid(os.getuid())[0], grp.getgrgid(os.getgid())[0] + ) + ) wanted_uid = pwd.getpwnam(uid_name)[2] wanted_gid = grp.getgrnam(gid_name)[2] @@ -29,30 +37,39 @@ def drop_privileges(uid_name='nobody', gid_name='nogroup'): # print "root_fork : drop_privil : pid ",pid if pid == 0: # child - print ('starting child process') + print("starting child process") child_process = gevent.spawn(root_process) child_process.join() - print ('Child done:', child_process.successful()) + print("Child done:", child_process.successful()) flush_tables() - print ('Child exit') + print("Child exit") else: # parent os.setgid(wanted_gid) os.setuid(wanted_uid) new_uid_name = pwd.getpwuid(os.getuid())[0] new_gid_name = grp.getgrgid(os.getgid())[0] - print("Parent: Privileges dropped, running as {0}/{1}.".format(new_uid_name, new_gid_name)) + print( + "Parent: Privileges dropped, running as {0}/{1}.".format( + new_uid_name, new_gid_name + ) + ) while True: try: gevent.sleep(1) - print ('Parent: ping') + print("Parent: ping") except KeyboardInterrupt: break -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='OSChameleon sample usage') - parser.add_argument('--template', metavar='template.txt', type=str, help='path to the fingerprint template') +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="OSChameleon sample usage") + parser.add_argument( + "--template", + metavar="template.txt", + type=str, + help="path to the fingerprint template", + ) args = parser.parse_args() if args.template is None: args.template = "template/SIMATIC_300_PLC.txt" @@ -60,4 +77,4 @@ def drop_privileges(uid_name='nobody', gid_name='nogroup'): drop_privileges() except KeyboardInterrupt: flush_tables() - print ("bye") + print("bye") diff --git a/oschameleon/session/__init__.py b/oschameleon/session/__init__.py index e506ca8..7c15c3d 100644 --- a/oschameleon/session/__init__.py +++ b/oschameleon/session/__init__.py @@ -1,4 +1,3 @@ - from session import Session session_ = Session() diff --git a/oschameleon/session/ext_ip.py b/oschameleon/session/ext_ip.py index f4a3c65..b6c8dd9 100644 --- a/oschameleon/session/ext_ip.py +++ b/oschameleon/session/ext_ip.py @@ -71,4 +71,4 @@ def get_ext_ip(self, config=None, urls=None): if __name__ == "__main__": ext = Ext_IP() - print ext.get_ext_ip(urls=["http://queryip.net/ip/", "http://ifconfig.me/ip"]) + print(ext.get_ext_ip(urls=["http://queryip.net/ip/", "http://ifconfig.me/ip"])) diff --git a/oschameleon/session/log.py b/oschameleon/session/log.py index 800c2cd..32bd2aa 100644 --- a/oschameleon/session/log.py +++ b/oschameleon/session/log.py @@ -26,7 +26,7 @@ def folder_exist(self, path): def remote_logging(self, name, server): rootLogger = logging.getLogger('') rootLogger.setLevel(logging.DEBUG) - print server + print(server) socketHandler = logging.handlers.SocketHandler(server, logging.handlers.DEFAULT_TCP_LOGGING_PORT) # don't bother with a formatter, since a socket handler sends the event as # an unformatted pickle diff --git a/oschameleon/session/session.py b/oschameleon/session/session.py index d30626e..f3008d0 100644 --- a/oschameleon/session/session.py +++ b/oschameleon/session/session.py @@ -1,8 +1,8 @@ -''' +""" Created on 02.12.2016 @author: manuel -''' +""" from datetime import datetime, timedelta import logging @@ -29,7 +29,7 @@ def externalIP(self, public, interface): if public is True: self.my_ip = ext.get_ext_ip() else: - self.my_ip = ni.ifaddresses(interface)[AF_INET][0]['addr'] + self.my_ip = ni.ifaddresses(interface)[AF_INET][0]["addr"] def in_session(self, ip, debug): currenttime = datetime.now() @@ -43,7 +43,12 @@ def in_session(self, ip, debug): exists = True if currenttime > session.time: session.time = timeout - logger.info('%s : Renewed session from %s at %s', currenttimestring, ip, self.my_ip) + logger.info( + "%s : Renewed session from %s at %s", + currenttimestring, + ip, + self.my_ip, + ) if debug: print("renew " + ip) @@ -51,6 +56,8 @@ def in_session(self, ip, debug): # print "added" nsess = nmap_session(ip, timeout) self.sessions.append(nsess) - logger.info("%s : New session from %s at %s", currenttimestring, ip, self.my_ip) + logger.info( + "%s : New session from %s at %s", currenttimestring, ip, self.my_ip + ) if debug: print("new " + ip) diff --git a/oschameleon/stack_packet/ICMP_.py b/oschameleon/stack_packet/ICMP_.py index b732ef0..37d6f27 100755 --- a/oschameleon/stack_packet/ICMP_.py +++ b/oschameleon/stack_packet/ICMP_.py @@ -1,10 +1,10 @@ #!/usr/bin/python -''' +""" Created on 24.09.2016 @author: manuel -''' +""" from IP_ import ReplyPacket from helper import drop_packet @@ -16,6 +16,7 @@ class ICMPPacket(ReplyPacket): """ ICMP packet """ + def __init__(self, pkt, os_pattern, package_type): ReplyPacket.__init__(self, pkt, os_pattern) self.icmp = ICMP() @@ -56,13 +57,13 @@ def send_ICMP_reply(pkt, ICMP_type, os_pattern, TCP_OPTIONS): # create reply packet and set flags icmp_rpl = ICMPPacket(pkt, os_pattern, ICMP_type) # set ICMP header fields - icmp_rpl.set_DF(TCP_OPTIONS['DF']) + icmp_rpl.set_DF(TCP_OPTIONS["DF"]) # ICMP type = 0 =^ echo reply if ICMP_type == 0: icmp_rpl.set_IP_ID(os_pattern.IP_ID_II_CNT) # set ICMP code field - if os_pattern.ICMP_CODE == 'S': + if os_pattern.ICMP_CODE == "S": icmp_rpl.set_ICMP_code(pkt[ICMP].code) else: icmp_rpl.set_ICMP_code(os_pattern.ICMP_CODE) @@ -86,7 +87,7 @@ def send_ICMP_reply(pkt, ICMP_type, os_pattern, TCP_OPTIONS): print("icmp input packet length: ", len_packet) pad_len = os_pattern.ICMP_IPL - len_packet - 16 pad = Padding() - pad.add_payload('\x00' * pad_len) + pad.add_payload("\x00" * pad_len) icmp_rpl.pkt = icmp_rpl.pkt / pad print("icmp reply packet length: ", int(str(len(icmp_rpl.pkt)), 16)) @@ -101,12 +102,22 @@ def check_ICMP_probes(pkt, nfq_packet, os_pattern): """ if pkt[ICMP].type is 8: # Probe 1 + 2 - if (pkt[ICMP].seq == 295 and pkt[IP].flags == 0x02 and len(pkt[ICMP].payload) == 120) or (pkt[ICMP].seq == 296 and pkt[IP].tos == 0x04 and len(pkt[ICMP].payload) == 150): + if ( + pkt[ICMP].seq == 295 + and pkt[IP].flags == 0x02 + and len(pkt[ICMP].payload) == 120 + ) or ( + pkt[ICMP].seq == 296 + and pkt[IP].tos == 0x04 + and len(pkt[ICMP].payload) == 150 + ): drop_packet(nfq_packet) if os_pattern.PROBES_2_SEND["IE"]: # ICMP type = 0 =^ echo reply ICMP_type = 0 - send_ICMP_reply(pkt, ICMP_type, os_pattern, os_pattern.TCP_OPTIONS['IE']) + send_ICMP_reply( + pkt, ICMP_type, os_pattern, os_pattern.TCP_OPTIONS["IE"] + ) # print "IE Probe" else: forward_packet(nfq_packet) diff --git a/oschameleon/stack_packet/IP_.py b/oschameleon/stack_packet/IP_.py index 99b14c9..7daf2bd 100755 --- a/oschameleon/stack_packet/IP_.py +++ b/oschameleon/stack_packet/IP_.py @@ -1,9 +1,9 @@ #!/usr/bin/python -''' +""" Created on 24.09.2016 @author: manuel -''' +""" import struct @@ -15,6 +15,7 @@ class ReplyPacket(object): IP packet Setting the IP fields """ + def __init__(self, pkt, os_pattern): self.ip = IP() self.ip.src = pkt[IP].dst @@ -24,7 +25,7 @@ def __init__(self, pkt, os_pattern): def set_DF(self, df): if df: - self.ip.flags = 'DF' + self.ip.flags = "DF" def set_ToS(self, tos): self.ip.tos = tos @@ -32,15 +33,15 @@ def set_ToS(self, tos): # set IP ID according to the OS pattern def set_IP_ID(self, ip_id): - if ip_id == 'I': + if ip_id == "I": self.os_pattern.IP_ID_tmp += 1 self.ip.id = self.os_pattern.IP_ID_tmp - elif ip_id == 'RI': + elif ip_id == "RI": self.os_pattern.IP_ID_tmp += 1001 self.ip.id = self.os_pattern.IP_ID_tmp - elif ip_id == 'Z': + elif ip_id == "Z": self.os_pattern.IP_ID_tmp += 0 self.ip.id = self.os_pattern.IP_ID_tmp else: @@ -75,13 +76,13 @@ def _build_crc_tables(crc32_table, crc32_reverse): for j in range(8, 0, -1): # build normal table if (fwd & 1) == 1: - fwd = (fwd >> 1) ^ 0xedb88320 + fwd = (fwd >> 1) ^ 0xEDB88320 else: fwd >>= 1 crc32_table[i] = fwd # build reverse table =) if rev & 0x80000000 == 0x80000000: - rev = ((rev ^ 0xedb88320) << 1) | 1 + rev = ((rev ^ 0xEDB88320) << 1) | 1 else: rev <<= 1 crc32_reverse[i] = rev @@ -90,10 +91,10 @@ def _build_crc_tables(crc32_table, crc32_reverse): def crc32(s, crc32_table): - crc = 0xffffffff + crc = 0xFFFFFFFF for c in s: - crc = (crc >> 8) ^ crc32_table[crc & 0xff ^ ord(c)] - return crc ^ 0xffffffff + crc = (crc >> 8) ^ crc32_table[crc & 0xFF ^ ord(c)] + return crc ^ 0xFFFFFFFF def reverse_crc(wanted_crc): @@ -103,22 +104,22 @@ def reverse_crc(wanted_crc): crc32_table, crc32_reverse = _build_crc_tables(crc32_table, crc32_reverse) # forward calculation of CRC up to pos, sets current forward CRC state - fwd_crc = 0xffffffff + fwd_crc = 0xFFFFFFFF for c in s[:pos]: - fwd_crc = (fwd_crc >> 8) ^ crc32_table[fwd_crc & 0xff ^ ord(c)] + fwd_crc = (fwd_crc >> 8) ^ crc32_table[fwd_crc & 0xFF ^ ord(c)] # backward calculation of CRC up to pos, sets wanted backward CRC state - bkd_crc = wanted_crc ^ 0xffffffff + bkd_crc = wanted_crc ^ 0xFFFFFFFF for c in s[pos:][::-1]: - bkd_crc = ((bkd_crc << 8) & 0xffffffff) ^ crc32_reverse[bkd_crc >> 24] + bkd_crc = ((bkd_crc << 8) & 0xFFFFFFFF) ^ crc32_reverse[bkd_crc >> 24] bkd_crc ^= ord(c) # deduce the 4 bytes we need to insert - for c in struct.pack('> 24] + for c in struct.pack("> 24] bkd_crc ^= ord(c) - res = s[:pos] + struct.pack(' (self.os_pattern.SEQNr_mean + temp)): + while ( + self.os_pattern.SEQ_MAX < (self.os_pattern.SEQNr_mean + temp) + ) or (self.os_pattern.SEQ_MIN > (self.os_pattern.SEQNr_mean + temp)): temp = random.randint(0, self.os_pattern.SEQ_std_dev) ISN_delta += temp - self.os_pattern.TCP_SEQ_NR_tmp = (self.os_pattern.TCP_SEQ_NR_tmp + ISN_delta) % 2 ** 32 + self.os_pattern.TCP_SEQ_NR_tmp = ( + self.os_pattern.TCP_SEQ_NR_tmp + ISN_delta + ) % 2 ** 32 self.tcp.seq = self.os_pattern.TCP_SEQ_NR_tmp else: - self.os_pattern.TCP_SEQ_NR_tmp = (self.os_pattern.TCP_SEQ_NR_tmp + self.os_pattern.SEQNr_mean) % 2 ** 32 - self.tcp.seq = (self.os_pattern.TCP_SEQ_NR_tmp) + self.os_pattern.TCP_SEQ_NR_tmp = ( + self.os_pattern.TCP_SEQ_NR_tmp + self.os_pattern.SEQNr_mean + ) % 2 ** 32 + self.tcp.seq = self.os_pattern.TCP_SEQ_NR_tmp - elif seqn == 'A': + elif seqn == "A": self.tcp.seq = self.pkt[TCP].ack - elif seqn == 'A+': + elif seqn == "A+": self.tcp.seq = self.pkt[TCP].ack + 1 else: @@ -113,13 +165,13 @@ def set_ACK_NR(self, ack): self.tcp.ack = ack # to SEQNr + 1 - if ack == 'S': + if ack == "S": self.tcp.ack = self.pkt[TCP].seq # to the SEQNr of the probe - elif ack == 'S+': + elif ack == "S+": self.tcp.ack = self.pkt[TCP].seq + 1 # to a random value - elif ack == 'O': + elif ack == "O": self.tcp.ack = random.randint(1, 10) else: self.tcp.ack = ack @@ -143,14 +195,20 @@ def send_packet(self): send(self.ip / self.tcp, verbose=0) -def check_TCP_Nmap_match(pkt, nfq_packet, INPUT_TCP_OPTIONS, EXPECTED_TCP_flags, IP_flags="no", urgt_ptr=0): +def check_TCP_Nmap_match( + pkt, nfq_packet, INPUT_TCP_OPTIONS, EXPECTED_TCP_flags, IP_flags="no", urgt_ptr=0 +): """ Check if the packet is a Nmap probe IPflags and urgt_ptr are optional return 1 if packet is a Nmap probe """ # print pkt[TCP] - if pkt[TCP].window == EXPECTED_TCP_flags['WSZ'] and pkt[TCP].flags == EXPECTED_TCP_flags['FLGS'] and pkt[TCP].options == INPUT_TCP_OPTIONS: + if ( + pkt[TCP].window == EXPECTED_TCP_flags["WSZ"] + and pkt[TCP].flags == EXPECTED_TCP_flags["FLGS"] + and pkt[TCP].options == INPUT_TCP_OPTIONS + ): if IP_flags == "no": if urgt_ptr == 0: @@ -161,14 +219,14 @@ def check_TCP_Nmap_match(pkt, nfq_packet, INPUT_TCP_OPTIONS, EXPECTED_TCP_flags, drop_packet(nfq_packet) return 1 - elif pkt[IP].flags == IP_flags['FLGS']: + elif pkt[IP].flags == IP_flags["FLGS"]: drop_packet(nfq_packet) return 1 return 0 -def send_TCP_reply(pkt, os_pattern, TCP_OPTIONS, flags, ipid=0, seqn='O', ack='S+'): +def send_TCP_reply(pkt, os_pattern, TCP_OPTIONS, flags, ipid=0, seqn="O", ack="S+"): """ Send TCP reply packet following parameter are optional: ipid, seqn, ack, @@ -180,14 +238,14 @@ def send_TCP_reply(pkt, os_pattern, TCP_OPTIONS, flags, ipid=0, seqn='O', ack='S tcp_rpl.set_TCP_Flags(flags) # set/adjust special header fields - tcp_rpl.set_DF(TCP_OPTIONS['DF']) + tcp_rpl.set_DF(TCP_OPTIONS["DF"]) tcp_rpl.set_IN_SEQ_NR(seqn) tcp_rpl.set_ACK_NR(ack) - tcp_rpl.set_WSZ(TCP_OPTIONS['W']) + tcp_rpl.set_WSZ(TCP_OPTIONS["W"]) tcp_rpl.set_IP_ID(ipid) # set TCP options if needed - if TCP_OPTIONS['O']: + if TCP_OPTIONS["O"]: # calculate Timestamp for TCP header os_pattern.TCP_Timestamp_tmp += os_pattern.TCP_TS_CNT # print os_pattern.TCP_Timestamp_tmp @@ -198,11 +256,14 @@ def send_TCP_reply(pkt, os_pattern, TCP_OPTIONS, flags, ipid=0, seqn='O', ack='S if ele[0] == "Timestamp": tsver = ele[1][0] - tcp_rpl.set_TCP_Options(TCP_OPTIONS['O'] + [('Timestamp', (int(str(os_pattern.TCP_Timestamp_tmp), 10), tsver))]) + tcp_rpl.set_TCP_Options( + TCP_OPTIONS["O"] + + [("Timestamp", (int(str(os_pattern.TCP_Timestamp_tmp), 10), tsver))] + ) # set TCP data - if TCP_OPTIONS['RD']: - tcp_rpl.set_TCP_data(TCP_OPTIONS['RD']) + if TCP_OPTIONS["RD"]: + tcp_rpl.set_TCP_data(TCP_OPTIONS["RD"]) # send the TCP packet tcp_rpl.send_packet() @@ -219,84 +280,216 @@ def check_TCP_probes(pkt, nfq_packet, os_pattern, session, debug): # SEQ, OPS, WIN, and T1 - Sequence generation # 6 Probes sent - if check_TCP_Nmap_match(pkt, nfq_packet, NMAP_PROBE_TCP_OPTION['P1'], NMAP_PROBE_TCP_ATTR['P1']): - if os_pattern.PROBES_2_SEND['P1']: + if check_TCP_Nmap_match( + pkt, nfq_packet, NMAP_PROBE_TCP_OPTION["P1"], NMAP_PROBE_TCP_ATTR["P1"] + ): + if os_pattern.PROBES_2_SEND["P1"]: check_in_session(session, pkt.src, debug) - send_TCP_reply(pkt, os_pattern, os_pattern.TCP_OPTIONS['P1'], os_pattern.TCP_FLAGS['SEQ'], os_pattern.IP_ID_TI_CNT) + send_TCP_reply( + pkt, + os_pattern, + os_pattern.TCP_OPTIONS["P1"], + os_pattern.TCP_FLAGS["SEQ"], + os_pattern.IP_ID_TI_CNT, + ) # print "TCP Probe #1" - elif check_TCP_Nmap_match(pkt, nfq_packet, NMAP_PROBE_TCP_OPTION['P2'], NMAP_PROBE_TCP_ATTR['P2']): - if os_pattern.PROBES_2_SEND['P2']: + elif check_TCP_Nmap_match( + pkt, nfq_packet, NMAP_PROBE_TCP_OPTION["P2"], NMAP_PROBE_TCP_ATTR["P2"] + ): + if os_pattern.PROBES_2_SEND["P2"]: check_in_session(session, pkt.src, debug) - send_TCP_reply(pkt, os_pattern, os_pattern.TCP_OPTIONS['P2'], os_pattern.TCP_FLAGS['SEQ'], os_pattern.IP_ID_TI_CNT) + send_TCP_reply( + pkt, + os_pattern, + os_pattern.TCP_OPTIONS["P2"], + os_pattern.TCP_FLAGS["SEQ"], + os_pattern.IP_ID_TI_CNT, + ) # print "TCP Probe #2" - elif check_TCP_Nmap_match(pkt, nfq_packet, NMAP_PROBE_TCP_OPTION['P3'], NMAP_PROBE_TCP_ATTR['P3']): - if os_pattern.PROBES_2_SEND['P3']: + elif check_TCP_Nmap_match( + pkt, nfq_packet, NMAP_PROBE_TCP_OPTION["P3"], NMAP_PROBE_TCP_ATTR["P3"] + ): + if os_pattern.PROBES_2_SEND["P3"]: check_in_session(session, pkt.src, debug) - send_TCP_reply(pkt, os_pattern, os_pattern.TCP_OPTIONS['P3'], os_pattern.TCP_FLAGS['SEQ'], os_pattern.IP_ID_TI_CNT) + send_TCP_reply( + pkt, + os_pattern, + os_pattern.TCP_OPTIONS["P3"], + os_pattern.TCP_FLAGS["SEQ"], + os_pattern.IP_ID_TI_CNT, + ) # print "TCP Probe #3" - elif check_TCP_Nmap_match(pkt, nfq_packet, NMAP_PROBE_TCP_OPTION['P4'], NMAP_PROBE_TCP_ATTR['P4']): - if os_pattern.PROBES_2_SEND['P4']: + elif check_TCP_Nmap_match( + pkt, nfq_packet, NMAP_PROBE_TCP_OPTION["P4"], NMAP_PROBE_TCP_ATTR["P4"] + ): + if os_pattern.PROBES_2_SEND["P4"]: check_in_session(session, pkt.src, debug) - send_TCP_reply(pkt, os_pattern, os_pattern.TCP_OPTIONS['P4'], os_pattern.TCP_FLAGS['SEQ'], os_pattern.IP_ID_TI_CNT) + send_TCP_reply( + pkt, + os_pattern, + os_pattern.TCP_OPTIONS["P4"], + os_pattern.TCP_FLAGS["SEQ"], + os_pattern.IP_ID_TI_CNT, + ) # print "TCP Probe #4" - elif check_TCP_Nmap_match(pkt, nfq_packet, NMAP_PROBE_TCP_OPTION['P5'], NMAP_PROBE_TCP_ATTR['P5']): - if os_pattern.PROBES_2_SEND['P5']: + elif check_TCP_Nmap_match( + pkt, nfq_packet, NMAP_PROBE_TCP_OPTION["P5"], NMAP_PROBE_TCP_ATTR["P5"] + ): + if os_pattern.PROBES_2_SEND["P5"]: check_in_session(session, pkt.src, debug) - send_TCP_reply(pkt, os_pattern, os_pattern.TCP_OPTIONS['P5'], os_pattern.TCP_FLAGS['SEQ'], os_pattern.IP_ID_TI_CNT) + send_TCP_reply( + pkt, + os_pattern, + os_pattern.TCP_OPTIONS["P5"], + os_pattern.TCP_FLAGS["SEQ"], + os_pattern.IP_ID_TI_CNT, + ) # print "TCP Probe #5" - elif check_TCP_Nmap_match(pkt, nfq_packet, NMAP_PROBE_TCP_OPTION['P6'], NMAP_PROBE_TCP_ATTR['P6']): - if os_pattern.PROBES_2_SEND['P6']: + elif check_TCP_Nmap_match( + pkt, nfq_packet, NMAP_PROBE_TCP_OPTION["P6"], NMAP_PROBE_TCP_ATTR["P6"] + ): + if os_pattern.PROBES_2_SEND["P6"]: check_in_session(session, pkt.src, debug) - send_TCP_reply(pkt, os_pattern, os_pattern.TCP_OPTIONS['P6'], os_pattern.TCP_FLAGS['SEQ'], os_pattern.IP_ID_TI_CNT) + send_TCP_reply( + pkt, + os_pattern, + os_pattern.TCP_OPTIONS["P6"], + os_pattern.TCP_FLAGS["SEQ"], + os_pattern.IP_ID_TI_CNT, + ) # print "TCP Probe #6" # ECN - elif check_TCP_Nmap_match(pkt, nfq_packet, NMAP_PROBE_TCP_OPTION['ECN'], NMAP_PROBE_TCP_ATTR['ECN'],): - if os_pattern.PROBES_2_SEND['ECN']: + elif check_TCP_Nmap_match( + pkt, + nfq_packet, + NMAP_PROBE_TCP_OPTION["ECN"], + NMAP_PROBE_TCP_ATTR["ECN"], + ): + if os_pattern.PROBES_2_SEND["ECN"]: check_in_session(session, pkt.src, debug) - send_TCP_reply(pkt, os_pattern, os_pattern.TCP_OPTIONS['ECN'], os_pattern.TCP_FLAGS['ECN'], os_pattern.IP_ID_TI_CNT, ECN_URGT_PTR) + send_TCP_reply( + pkt, + os_pattern, + os_pattern.TCP_OPTIONS["ECN"], + os_pattern.TCP_FLAGS["ECN"], + os_pattern.IP_ID_TI_CNT, + ECN_URGT_PTR, + ) # print "TCP Probe #ECN" # T2-T7 - elif check_TCP_Nmap_match(pkt, nfq_packet, NMAP_PROBE_TCP_OPTION['T2-T6'], NMAP_PROBE_TCP_ATTR['T2'], NMAP_PROBE_IP_ATTR['T2']): - if os_pattern.PROBES_2_SEND['T2']: + elif check_TCP_Nmap_match( + pkt, + nfq_packet, + NMAP_PROBE_TCP_OPTION["T2-T6"], + NMAP_PROBE_TCP_ATTR["T2"], + NMAP_PROBE_IP_ATTR["T2"], + ): + if os_pattern.PROBES_2_SEND["T2"]: check_in_session(session, pkt.src, debug) - send_TCP_reply(pkt, os_pattern, os_pattern.TCP_OPTIONS['T2'], os_pattern.TCP_FLAGS['T2'], 0, os_pattern.TCP_SEQ_NR['T2'], os_pattern.TCP_ACK_NR['T2']) + send_TCP_reply( + pkt, + os_pattern, + os_pattern.TCP_OPTIONS["T2"], + os_pattern.TCP_FLAGS["T2"], + 0, + os_pattern.TCP_SEQ_NR["T2"], + os_pattern.TCP_ACK_NR["T2"], + ) # print "TCP Probe #T2" - elif check_TCP_Nmap_match(pkt, nfq_packet, NMAP_PROBE_TCP_OPTION['T2-T6'], NMAP_PROBE_TCP_ATTR['T3']): - if os_pattern.PROBES_2_SEND['T3']: + elif check_TCP_Nmap_match( + pkt, nfq_packet, NMAP_PROBE_TCP_OPTION["T2-T6"], NMAP_PROBE_TCP_ATTR["T3"] + ): + if os_pattern.PROBES_2_SEND["T3"]: check_in_session(session, pkt.src, debug) - send_TCP_reply(pkt, os_pattern, os_pattern.TCP_OPTIONS['T3'], os_pattern.TCP_FLAGS['T3'], 0, os_pattern.TCP_SEQ_NR['T3'], os_pattern.TCP_ACK_NR['T3']) + send_TCP_reply( + pkt, + os_pattern, + os_pattern.TCP_OPTIONS["T3"], + os_pattern.TCP_FLAGS["T3"], + 0, + os_pattern.TCP_SEQ_NR["T3"], + os_pattern.TCP_ACK_NR["T3"], + ) # print "TCP Probe #T3" - elif check_TCP_Nmap_match(pkt, nfq_packet, NMAP_PROBE_TCP_OPTION['T2-T6'], NMAP_PROBE_TCP_ATTR['T4'], NMAP_PROBE_IP_ATTR['T4']): - if os_pattern.PROBES_2_SEND['T4']: + elif check_TCP_Nmap_match( + pkt, + nfq_packet, + NMAP_PROBE_TCP_OPTION["T2-T6"], + NMAP_PROBE_TCP_ATTR["T4"], + NMAP_PROBE_IP_ATTR["T4"], + ): + if os_pattern.PROBES_2_SEND["T4"]: check_in_session(session, pkt.src, debug) - send_TCP_reply(pkt, os_pattern, os_pattern.TCP_OPTIONS['T4'], os_pattern.TCP_FLAGS['T4'], 0, os_pattern.TCP_SEQ_NR['T4'], os_pattern.TCP_ACK_NR['T4']) + send_TCP_reply( + pkt, + os_pattern, + os_pattern.TCP_OPTIONS["T4"], + os_pattern.TCP_FLAGS["T4"], + 0, + os_pattern.TCP_SEQ_NR["T4"], + os_pattern.TCP_ACK_NR["T4"], + ) # print "TCP Probe #T4" - elif check_TCP_Nmap_match(pkt, nfq_packet, NMAP_PROBE_TCP_OPTION['T2-T6'], NMAP_PROBE_TCP_ATTR['T5']): - if os_pattern.PROBES_2_SEND['T5']: + elif check_TCP_Nmap_match( + pkt, nfq_packet, NMAP_PROBE_TCP_OPTION["T2-T6"], NMAP_PROBE_TCP_ATTR["T5"] + ): + if os_pattern.PROBES_2_SEND["T5"]: check_in_session(session, pkt.src, debug) - send_TCP_reply(pkt, os_pattern, os_pattern.TCP_OPTIONS['T5'], os_pattern.TCP_FLAGS['T5'], os_pattern.IP_ID_CI_CNT, os_pattern.TCP_SEQ_NR['T5'], os_pattern.TCP_ACK_NR['T5']) + send_TCP_reply( + pkt, + os_pattern, + os_pattern.TCP_OPTIONS["T5"], + os_pattern.TCP_FLAGS["T5"], + os_pattern.IP_ID_CI_CNT, + os_pattern.TCP_SEQ_NR["T5"], + os_pattern.TCP_ACK_NR["T5"], + ) # print "TCP Probe #T5" - elif check_TCP_Nmap_match(pkt, nfq_packet, NMAP_PROBE_TCP_OPTION['T2-T6'], NMAP_PROBE_TCP_ATTR['T6'], NMAP_PROBE_IP_ATTR['T6']): - if os_pattern.PROBES_2_SEND['T6']: + elif check_TCP_Nmap_match( + pkt, + nfq_packet, + NMAP_PROBE_TCP_OPTION["T2-T6"], + NMAP_PROBE_TCP_ATTR["T6"], + NMAP_PROBE_IP_ATTR["T6"], + ): + if os_pattern.PROBES_2_SEND["T6"]: check_in_session(session, pkt.src, debug) - send_TCP_reply(pkt, os_pattern, os_pattern.TCP_OPTIONS['T6'], os_pattern.TCP_FLAGS['T6'], os_pattern.IP_ID_CI_CNT, os_pattern.TCP_SEQ_NR['T6'], os_pattern.TCP_ACK_NR['T6']) + send_TCP_reply( + pkt, + os_pattern, + os_pattern.TCP_OPTIONS["T6"], + os_pattern.TCP_FLAGS["T6"], + os_pattern.IP_ID_CI_CNT, + os_pattern.TCP_SEQ_NR["T6"], + os_pattern.TCP_ACK_NR["T6"], + ) # print "TCP Probe #T6" - elif check_TCP_Nmap_match(pkt, nfq_packet, NMAP_PROBE_TCP_OPTION['T7'], NMAP_PROBE_TCP_ATTR['T7']): - if os_pattern.PROBES_2_SEND['T7']: + elif check_TCP_Nmap_match( + pkt, nfq_packet, NMAP_PROBE_TCP_OPTION["T7"], NMAP_PROBE_TCP_ATTR["T7"] + ): + if os_pattern.PROBES_2_SEND["T7"]: check_in_session(session, pkt.src, debug) - send_TCP_reply(pkt, os_pattern, os_pattern.TCP_OPTIONS['T7'], os_pattern.TCP_FLAGS['T7'], os_pattern.IP_ID_CI_CNT, os_pattern.TCP_SEQ_NR['T7'], os_pattern.TCP_ACK_NR['T7']) + send_TCP_reply( + pkt, + os_pattern, + os_pattern.TCP_OPTIONS["T7"], + os_pattern.TCP_FLAGS["T7"], + os_pattern.IP_ID_CI_CNT, + os_pattern.TCP_SEQ_NR["T7"], + os_pattern.TCP_ACK_NR["T7"], + ) # print "TCP Probe #T7" else: diff --git a/oschameleon/stack_packet/UDP_.py b/oschameleon/stack_packet/UDP_.py index 2b7ee6e..9dc2926 100755 --- a/oschameleon/stack_packet/UDP_.py +++ b/oschameleon/stack_packet/UDP_.py @@ -1,9 +1,9 @@ #!/usr/bin/python -''' +""" Created on 24.09.2016 @author: manuel -''' +""" from ICMP_ import send_ICMP_reply from helper import drop_packet @@ -16,13 +16,18 @@ def check_UDP_probe(pkt, nfq_packet, os_pattern): Identify the UDP based probe and reply with a faked reply if needed """ - if pkt[IP].id == 0x1042 and pkt[UDP].payload.load[0] == "C" and pkt[UDP].payload.load[1] == "C" and pkt[UDP].payload.load[2] == "C": + if ( + pkt[IP].id == 0x1042 + and pkt[UDP].payload.load[0] == "C" + and pkt[UDP].payload.load[1] == "C" + and pkt[UDP].payload.load[2] == "C" + ): drop_packet(nfq_packet) if os_pattern.PROBES_2_SEND["U1"]: # create reply packet (ICMP port unreachable) # ICMP type = 3 =^ destination unreable ICMP_type = 3 - send_ICMP_reply(pkt, ICMP_type, os_pattern, os_pattern.TCP_OPTIONS['U1']) + send_ICMP_reply(pkt, ICMP_type, os_pattern, os_pattern.TCP_OPTIONS["U1"]) # print "U1 Probe" else: diff --git a/oschameleon/stack_packet/helper.py b/oschameleon/stack_packet/helper.py index 3a596d5..c6b8e86 100755 --- a/oschameleon/stack_packet/helper.py +++ b/oschameleon/stack_packet/helper.py @@ -5,7 +5,7 @@ def flush_tables(): - os.system('iptables -F') + os.system("iptables -F") def forward_packet(nfq_packet): @@ -21,13 +21,29 @@ def drop_packet(nfq_packet): def rules(server): # print server # allow incoming ssh - os.system('iptables -A INPUT -p tcp -s' + server + ' --dport 63712 -m state --state NEW,ESTABLISHED -j ACCEPT') - os.system('iptables -A OUTPUT -p tcp -d' + server + ' --sport 63712 -m state --state ESTABLISHED -j ACCEPT') + os.system( + "iptables -A INPUT -p tcp -s" + + server + + " --dport 63712 -m state --state NEW,ESTABLISHED -j ACCEPT" + ) + os.system( + "iptables -A OUTPUT -p tcp -d" + + server + + " --sport 63712 -m state --state ESTABLISHED -j ACCEPT" + ) # allow outgoing ssh - os.system('iptables -A OUTPUT -p tcp -d' + server + ' --sport 63712 -m state --state NEW,ESTABLISHED -j ACCEPT') - os.system('iptables -A INPUT -p tcp -s' + server + ' --dport 63712 -m state --state ESTABLISHED -j ACCEPT') + os.system( + "iptables -A OUTPUT -p tcp -d" + + server + + " --sport 63712 -m state --state NEW,ESTABLISHED -j ACCEPT" + ) + os.system( + "iptables -A INPUT -p tcp -s" + + server + + " --dport 63712 -m state --state ESTABLISHED -j ACCEPT" + ) # Configure NFQUEUE target # Capture incoming packets and put in nfqueue 1 - os.system('iptables -A INPUT -j NFQUEUE --queue-num 0') + os.system("iptables -A INPUT -j NFQUEUE --queue-num 0") diff --git a/oschameleon/stack_packet/template.py b/oschameleon/stack_packet/template.py index e37f403..d557c7b 100755 --- a/oschameleon/stack_packet/template.py +++ b/oschameleon/stack_packet/template.py @@ -194,7 +194,7 @@ def __str__(self): for elem in self.TCP_Timestamp: timestamp = elem timestamp = timestamp + " " + str(self.TCP_Timestamp[elem]) - print timestamp + print(timestamp) return ' TTL: ' + str(self.TTL) + \ '\t\t\t GCD: ' + str(self.GCD) + \ diff --git a/setup.py b/setup.py index 5e2061f..83fd967 100644 --- a/setup.py +++ b/setup.py @@ -3,11 +3,13 @@ import oschameleon setup( - packages=["oschameleon", ], + packages=[ + "oschameleon", + ], name=oschameleon.__title__, version=oschameleon.__version__, - author='MushMush', - author_email='glaslos@gmail.com', + author="MushMush", + author_email="glaslos@gmail.com", classifiers=[ "Development Status :: 4 - Beta", "Environment :: Console", @@ -21,11 +23,11 @@ "": ["*.txt", "*.md"], }, include_package_data=True, - long_description=open('README.md').read(), - url='https://github.com/mushorg/oschameleon', - description='OS Fingerprint Obfuscation for modern Linux Kernels', - test_suite='nose.collector', + long_description=open("README.md").read(), + url="https://github.com/mushorg/oschameleon", + description="OS Fingerprint Obfuscation for modern Linux Kernels", + test_suite="nose.collector", tests_require="nose", zip_safe=False, - install_requires=open('requirements.txt').read().splitlines(), + install_requires=open("requirements.txt").read().splitlines(), )