diff --git a/bin/kamstrup_prober.py b/bin/kamstrup_prober.py index 836870ab..50d5aa5b 100644 --- a/bin/kamstrup_prober.py +++ b/bin/kamstrup_prober.py @@ -84,7 +84,9 @@ def get_register(self, register): received_data = self._sock.recv(1024) received_data = bytearray(received_data) except socket.error as socket_err: - logger.exception("Error while communicating: {0}".format(str(socket_err))) + logger.exception( + "Error while communicating: {0}".format(str(socket_err)) + ) self._connect() data_length = len(received_data) @@ -151,9 +153,15 @@ def registers_from_candidates(candidate_registers_values, args): "value_length": length, "unknown": unknown, } - logger.info("Found register value at {0}:{1}".format(hex(register_id), register_value)) + logger.info( + "Found register value at {0}:{1}".format( + hex(register_id), register_value + ) + ) with open(dumpfile, "w") as json_file: - json_file.write(json.dumps(found_registers, indent=4, default=json_default)) + json_file.write( + json.dumps(found_registers, indent=4, default=json_default) + ) else: not_found_counts += 1 if not_found_counts % 10 == 0: @@ -196,8 +204,10 @@ def json_default(obj): return None -if __name__ == '__main__': - parser = argparse.ArgumentParser(description="Probes kamstrup_meter meter registers.") +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Probes kamstrup_meter meter registers." + ) parser.add_argument("host", help="Hostname or IP or Kamstrup meter") parser.add_argument("port", type=int, help="TCP port") parser.add_argument( @@ -205,6 +215,8 @@ def json_default(obj): dest="registerfile", help="Reads registers from previous dumps files instead of bruteforcing the meter.", ) - parser.add_argument("--comm-addr", dest="communication_address", default=default_comm_port) + parser.add_argument( + "--comm-addr", dest="communication_address", default=default_comm_port + ) find_registers_in_candidates(parser.parse_args()) diff --git a/conpot/protocols/s7comm/s7.py b/conpot/protocols/s7comm/s7.py index 7fe47b0b..1d8470ab 100644 --- a/conpot/protocols/s7comm/s7.py +++ b/conpot/protocols/s7comm/s7.py @@ -16,7 +16,6 @@ # S7 packet class S7(object): - ssl_lists = {} def __init__( @@ -68,7 +67,6 @@ def __init__( self.data_bus = conpot_core.get_databus() def __len__(self): - if self.pdu_type in (2, 3): return 12 + int(self.param_length) + int(self.data_length) else: @@ -85,7 +83,6 @@ def request_not_implemented(self): raise ParseException("s7comm", "request not implemented in honeypot yet.") def pack(self): - if self.pdu_type not in self.pdu_mapping: raise AssembleException("s7comm", "invalid or unsupported pdu type") elif self.pdu_type in (2, 3): @@ -120,7 +117,6 @@ def pack(self): ) def parse(self, packet): - # dissect fixed header try: fixed_header = unpack("!BBHHHH", packet[:10]) @@ -176,7 +172,6 @@ def plc_stop_signal(self, current_client): return str_to_bytes("0x00"), str_to_bytes("0x29") def request_diagnostics(self): - # semi-check try: unpack("!BBBBBBBB", self.parameters[:8]) @@ -224,7 +219,6 @@ def request_diagnostics(self): # W#16#xy11 - module identification def request_ssl_17(self, data_ssl_index): - # just for convenience current_ssl = S7.ssl_lists["W#16#xy11"] @@ -254,7 +248,6 @@ def request_ssl_17(self, data_ssl_index): ) # 1 WORD ( Length of following data ) elif data_ssl_index == 6: # 0x0006 - hardware identification - ssl_index_description = "Hardware identification" ssl_resp_data = pack( @@ -279,7 +272,6 @@ def request_ssl_17(self, data_ssl_index): ) # 1 WORD ( Length of following data ) elif data_ssl_index == 7: # 0x0007 - firmware identification - ssl_index_description = "Firmware identification" ssl_resp_data = pack( @@ -301,7 +293,6 @@ def request_ssl_17(self, data_ssl_index): 0x09, # 1 BYTE ( Data Type. 0x09 = Char/String ) len(ssl_resp_data), ) # 1 WORD ( Length of following data ) - else: ssl_index_description = "UNKNOWN / UNDEFINED / RESERVED {0}".format( hex(data_ssl_index) @@ -324,7 +315,6 @@ def request_ssl_17(self, data_ssl_index): # W#16#011C def request_ssl_28(self, data_ssl_index): - # just for convenience current_ssl = S7.ssl_lists["W#16#xy1C"] # initiate header for mass component block