diff --git a/napalm_panos/__init__.py b/napalm_panos/__init__.py index 37fa85f..2d74243 100644 --- a/napalm_panos/__init__.py +++ b/napalm_panos/__init__.py @@ -21,8 +21,8 @@ from napalm_panos.panos import PANOSDriver try: - __version__ = pkg_resources.get_distribution('napalm-panos').version + __version__ = pkg_resources.get_distribution("napalm-panos").version except pkg_resources.DistributionNotFound: __version__ = "Not installed" -__all__ = ('PANOSDriver',) +__all__ = ("PANOSDriver",) diff --git a/napalm_panos/panos.py b/napalm_panos/panos.py index c100596..fe5ddd3 100644 --- a/napalm_panos/panos.py +++ b/napalm_panos/panos.py @@ -51,7 +51,6 @@ class PANOSDriver(NetworkDriver): - def __init__(self, hostname, username, password, timeout=60, optional_args=None): self.hostname = hostname self.username = username @@ -68,21 +67,21 @@ def __init__(self, hostname, username, password, timeout=60, optional_args=None) optional_args = {} netmiko_argument_map = { - 'port': None, - 'verbose': False, - 'use_keys': False, - 'key_file': None, - 'ssh_strict': False, - 'system_host_keys': False, - 'alt_host_keys': False, - 'alt_key_file': '', - 'ssh_config_file': None, + "port": None, + "verbose": False, + "use_keys": False, + "key_file": None, + "ssh_strict": False, + "system_host_keys": False, + "alt_host_keys": False, + "alt_key_file": "", + "ssh_config_file": None, } if parse_version(netmiko_version) >= parse_version("2.0.0"): - netmiko_argument_map['allow_agent'] = False + netmiko_argument_map["allow_agent"] = False elif parse_version(netmiko_version) >= parse_version("1.1.0"): - netmiko_argument_map['allow_agent'] = False + netmiko_argument_map["allow_agent"] = False # Build dict of any optional Netmiko args self.netmiko_optional_args = {} @@ -91,27 +90,32 @@ def __init__(self, hostname, username, password, timeout=60, optional_args=None) self.netmiko_optional_args[k] = optional_args[k] except KeyError: pass - self.api_key = optional_args.get('api_key', '') + self.api_key = optional_args.get("api_key", "") def open(self): try: if self.api_key: - self.device = pan.xapi.PanXapi(hostname=self.hostname, - api_key=self.api_key) + self.device = pan.xapi.PanXapi( + hostname=self.hostname, api_key=self.api_key + ) else: - self.device = pan.xapi.PanXapi(hostname=self.hostname, - api_username=self.username, - api_password=self.password) + self.device = pan.xapi.PanXapi( + hostname=self.hostname, + api_username=self.username, + api_password=self.password, + ) except ConnectionException as e: raise ConnectionException(str(e)) def _open_ssh(self): try: - self.ssh_device = ConnectHandler(device_type='paloalto_panos', - ip=self.hostname, - username=self.username, - password=self.password, - **self.netmiko_optional_args) + self.ssh_device = ConnectHandler( + device_type="paloalto_panos", + ip=self.hostname, + username=self.username, + password=self.password, + **self.netmiko_optional_args + ) except ConnectionException as e: raise ConnectionException(str(e)) @@ -130,35 +134,29 @@ def _import_file(self, filename): else: key = self.api_key - params = { - 'type': 'import', - 'category': 'configuration', - 'key': key - } + params = {"type": "import", "category": "configuration", "key": key} path = os.path.basename(filename) mef = requests_toolbelt.MultipartEncoder( - fields={ - 'file': (path, open(filename, 'rb'), 'application/octet-stream') - } + fields={"file": (path, open(filename, "rb"), "application/octet-stream")} ) requests.packages.urllib3.disable_warnings(InsecureRequestWarning) - url = 'https://{0}/api/'.format(self.hostname) + url = "https://{0}/api/".format(self.hostname) request = requests.post( url, verify=False, params=params, - headers={'Content-Type': mef.content_type}, - data=mef + headers={"Content-Type": mef.content_type}, + data=mef, ) # if something goes wrong just raise an exception request.raise_for_status() response = xml.etree.ElementTree.fromstring(request.content) - if response.attrib['status'] == 'error': + if response.attrib["status"] == "error": return False else: return path @@ -171,7 +169,7 @@ def is_alive(self): is_alive = True else: is_alive = False - return {'is_alive': is_alive} + return {"is_alive": is_alive} def load_replace_candidate(self, filename=None, config=None): if config: @@ -180,7 +178,7 @@ def load_replace_candidate(self, filename=None, config=None): elif filename: if self.loaded is False: if self._save_backup() is False: - raise ReplaceConfigException('Error while storing backup config') + raise ReplaceConfigException("Error while storing backup config") path = self._import_file(filename) if path is False: @@ -188,24 +186,28 @@ def load_replace_candidate(self, filename=None, config=None): raise ReplaceConfigException(msg) # Let's load the config. - cmd = '{0}'.format(path) + cmd = "{0}".format(path) self.device.op(cmd=cmd) - if self.device.status == 'success': + if self.device.status == "success": self.loaded = True else: - raise ReplaceConfigException('Error while loading config from {0}').format(path) + raise ReplaceConfigException( + "Error while loading config from {0}" + ).format(path) else: raise ReplaceConfigException("This method requires a config file.") def _get_file_content(self, filename): try: - with open(filename, 'r') as f: + with open(filename, "r") as f: content = f.read() except IOError: - raise MergeConfigException('Error while opening {0}. Make sure ' - 'filename is correct.'.format(filename)) + raise MergeConfigException( + "Error while opening {0}. Make sure " + "filename is correct.".format(filename) + ) return content def _send_merge_commands(self, config, file_config): @@ -214,8 +216,7 @@ def _send_merge_commands(self, config, file_config): """ if self.loaded is False: if self._save_backup() is False: - raise MergeConfigException('Error while storing backup ' - 'config.') + raise MergeConfigException("Error while storing backup " "config.") if self.ssh_connection is False: self._open_ssh() @@ -231,7 +232,7 @@ def _send_merge_commands(self, config, file_config): self.merge_config = True def _get_candidate(self): - candidate_command = '' + candidate_command = "" self.device.op(cmd=candidate_command) candidate = str(self.device.xml_root()) return candidate @@ -241,23 +242,31 @@ def _get_running(self): running = str(self.device.xml_root()) return running - def get_config(self, retrieve='all'): + def get_config(self, retrieve="all", full=False): + """ + Full is not supported, need to apply to pass tests. It + is not clear to me if this construct exists in panos + """ + if full: + raise NotImplementedError( + "Full config is not implemented for this platform" + ) configs = {} - running = py23_compat.text_type('') - candidate = py23_compat.text_type('') - startup = py23_compat.text_type('') + running = py23_compat.text_type("") + candidate = py23_compat.text_type("") + startup = py23_compat.text_type("") - if retrieve == 'all': + if retrieve == "all": running = py23_compat.text_type(self._get_running()) candidate = py23_compat.text_type(self._get_candidate()) - elif retrieve == 'running': + elif retrieve == "running": running = py23_compat.text_type(self._get_running()) - elif retrieve == 'candidate': + elif retrieve == "candidate": candidate = py23_compat.text_type(self._get_candidate()) - configs['running'] = running - configs['candidate'] = candidate - configs['startup'] = startup + configs["running"] = running + configs["candidate"] = candidate + configs["startup"] = startup return configs @@ -273,8 +282,9 @@ def load_merge_candidate(self, filename=None, config=None): self._send_merge_commands(config, file_config) else: - raise MergeConfigException('You must provide either a file ' - 'or a set-format string') + raise MergeConfigException( + "You must provide either a file " "or a set-format string" + ) def compare_config(self): """ @@ -289,20 +299,28 @@ def compare_config(self): return diff.strip() def _save_backup(self): - self.backup_file = 'config_{0}.xml'.format(str(datetime.now().date()).replace(' ', '_')) - backup_command = '{0}'.format(self.backup_file) + self.backup_file = "config_{0}.xml".format( + str(datetime.now().date()).replace(" ", "_") + ) + backup_command = "{0}".format( + self.backup_file + ) self.device.op(cmd=backup_command) - if self.device.status == 'success': + if self.device.status == "success": return True else: return False - def commit_config(self): + def commit_config(self, message=""): """ Netmiko is being used to commit the configuration because it takes a better care of results compared to pan-python. """ + if message: + raise NotImplementedError( + "Commit message not implemented for this platform" + ) if self.loaded: if self.ssh_connection is False: self._open_ssh() @@ -311,20 +329,22 @@ def commit_config(self): time.sleep(3) self.loaded = False self.changed = True - except: # noqa + except: # noqa if self.merge_config: - raise MergeConfigException('Error while commiting config') + raise MergeConfigException("Error while commiting config") else: - raise ReplaceConfigException('Error while commiting config') + raise ReplaceConfigException("Error while commiting config") else: - raise ReplaceConfigException('No config loaded.') + raise ReplaceConfigException("No config loaded.") def discard_config(self): if self.loaded: - discard_cmd = '{0}'.format(self.backup_file) + discard_cmd = "{0}".format( + self.backup_file + ) self.device.op(cmd=discard_cmd) - if self.device.status == 'success': + if self.device.status == "success": self.loaded = False self.merge_config = False else: @@ -336,7 +356,9 @@ def rollback(self): it takes a better care of results compared to pan-python. """ if self.changed: - rollback_cmd = '{0}'.format(self.backup_file) + rollback_cmd = "{0}".format( + self.backup_file + ) self.device.op(cmd=rollback_cmd) time.sleep(5) @@ -347,13 +369,13 @@ def rollback(self): self.loaded = False self.changed = False self.merge_config = False - except: # noqa + except: # noqa ReplaceConfigException("Error while loading backup config") def _extract_interface_list(self): - self.device.op(cmd='all') + self.device.op(cmd="all") interfaces_xml = xmltodict.parse(self.device.xml_root()) - interfaces_json = json.dumps(interfaces_xml['response']['result']) + interfaces_json = json.dumps(interfaces_xml["response"]["result"]) interfaces = json.loads(interfaces_json) interface_set = set() @@ -365,7 +387,7 @@ def _extract_interface_list(self): # it returns a list of dictionaries. entry_contents = [entry_contents] for intf in entry_contents: - interface_set.add(intf['name']) + interface_set.add(intf["name"]) return list(interface_set) @@ -373,24 +395,26 @@ def get_facts(self): facts = {} try: - self.device.op(cmd='') + self.device.op(cmd="") system_info_xml = xmltodict.parse(self.device.xml_root()) - system_info_json = json.dumps(system_info_xml['response']['result']['system']) + system_info_json = json.dumps( + system_info_xml["response"]["result"]["system"] + ) system_info = json.loads(system_info_json) except AttributeError: system_info = {} if system_info: - facts['hostname'] = system_info['hostname'] - facts['vendor'] = py23_compat.text_type('Palo Alto Networks') - facts['uptime'] = int(convert_uptime_string_seconds(system_info['uptime'])) - facts['os_version'] = system_info['sw-version'] - facts['serial_number'] = system_info['serial'] - facts['model'] = system_info['model'] - facts['fqdn'] = py23_compat.text_type('N/A') - facts['interface_list'] = self._extract_interface_list() + facts["hostname"] = system_info["hostname"] + facts["vendor"] = py23_compat.text_type("Palo Alto Networks") + facts["uptime"] = int(convert_uptime_string_seconds(system_info["uptime"])) + facts["os_version"] = system_info["sw-version"] + facts["serial_number"] = system_info["serial"] + facts["model"] = system_info["model"] + facts["fqdn"] = py23_compat.text_type("N/A") + facts["interface_list"] = self._extract_interface_list() - facts['interface_list'].sort() + facts["interface_list"].sort() return facts @@ -399,41 +423,41 @@ def get_lldp_neighbors(self): neighbors = {} - cmd = 'all' + cmd = "all" try: self.device.op(cmd=cmd) lldp_table_xml = xmltodict.parse(self.device.xml_root()) - lldp_table_json = json.dumps(lldp_table_xml['response']['result']['entry']) + lldp_table_json = json.dumps(lldp_table_xml["response"]["result"]["entry"]) lldp_table = json.loads(lldp_table_json) except AttributeError: lldp_table = [] - if isinstance(lldp_table,dict): + if isinstance(lldp_table, dict): # If only 1 interface is listed, xmltodict returns a dictionary, otherwise # it returns a list of dictionaries. lldp_table = [lldp_table] for lldp_item in lldp_table: - local_int = lldp_item['@name'] + local_int = lldp_item["@name"] if local_int not in neighbors.keys(): neighbors[local_int] = [] try: - lldp_neighs = lldp_item.get('neighbors').get('entry') + lldp_neighs = lldp_item.get("neighbors").get("entry") except AttributeError: - lldp_neighs = '' + lldp_neighs = "" if isinstance(lldp_neighs, dict): lldp_neighs = [lldp_neighs] for neighbor in lldp_neighs: n = {} - n['hostname'] = neighbor['system-name'] - n['port'] = neighbor['port-id'] + n["hostname"] = neighbor["system-name"] + n["port"] = neighbor["port-id"] neighbors[local_int].append(n) return neighbors - def get_route_to(self, destination='', protocol=''): + def get_route_to(self, destination="", protocol=""): """Return route details to a specific destination, learned from a certain protocol.""" # Note, it should be possible to query the FIB: @@ -446,11 +470,15 @@ def get_route_to(self, destination='', protocol=''): if protocol: protocol = "{0}".format(protocol) - cmd = "{0}{1}".format(protocol, destination) + cmd = "{0}{1}".format( + protocol, destination + ) try: self.device.op(cmd=cmd) routes_table_xml = xmltodict.parse(self.device.xml_root()) - routes_table_json = json.dumps(routes_table_xml['response']['result']['entry']) + routes_table_json = json.dumps( + routes_table_xml["response"]["result"]["entry"] + ) routes_table = json.loads(routes_table_json) except (AttributeError, KeyError): routes_table = [] @@ -460,49 +488,49 @@ def get_route_to(self, destination='', protocol=''): for route in routes_table: d = { - 'current_active': False, - 'last_active': False, - 'age': -1, - 'next_hop': u'', - 'protocol': u'', - 'outgoing_interface': u'', - 'preference': -1, - 'inactive_reason': u'', - 'routing_table': u'default', - 'selected_next_hop': False, - 'protocol_attributes': {} + "current_active": False, + "last_active": False, + "age": -1, + "next_hop": "", + "protocol": "", + "outgoing_interface": "", + "preference": -1, + "inactive_reason": "", + "routing_table": "default", + "selected_next_hop": False, + "protocol_attributes": {}, } - destination = route['destination'] - flags = route['flags'] + destination = route["destination"] + flags = route["flags"] - if 'A' in flags: - d['current_active'] = True + if "A" in flags: + d["current_active"] = True else: - d['current_active'] = False - if 'C' in flags: - d['protocol'] = "connect" - if 'S' in flags: - d['protocol'] = "static" - if 'R' in flags: - d['protocol'] = "rip" - if 'R' in flags: - d['protocol'] = "rip" - if 'O' in flags: - d['protocol'] = "ospf" - if 'B' in flags: - d['protocol'] = "bgp" - if 'H' in flags: - d['protocol'] = "host" - if route['age'] is not None: - d['age'] = int(route['age']) - if route['nexthop'] is not None: - d['next_hop'] = route['nexthop'] - if route['interface'] is not None: - d['outgoing_interface'] = route['interface'] - if route['metric'] is not None: - d['preference'] = int(route['metric']) - if route['virtual-router'] is not None: - d['routing_table'] = route['virtual-router'] + d["current_active"] = False + if "C" in flags: + d["protocol"] = "connect" + if "S" in flags: + d["protocol"] = "static" + if "R" in flags: + d["protocol"] = "rip" + if "R" in flags: + d["protocol"] = "rip" + if "O" in flags: + d["protocol"] = "ospf" + if "B" in flags: + d["protocol"] = "bgp" + if "H" in flags: + d["protocol"] = "host" + if route["age"] is not None: + d["age"] = int(route["age"]) + if route["nexthop"] is not None: + d["next_hop"] = route["nexthop"] + if route["interface"] is not None: + d["outgoing_interface"] = route["interface"] + if route["metric"] is not None: + d["preference"] = int(route["metric"]) + if route["virtual-router"] is not None: + d["routing_table"] = route["virtual-router"] if destination not in routes.keys(): routes[destination] = [] @@ -512,15 +540,17 @@ def get_route_to(self, destination='', protocol=''): def get_interfaces(self): SUBIF_DEFAULTS = { - 'is_up': True, - 'is_enabled': True, - 'speed': 0, - 'last_flapped': -1.0, - 'mac_address': '', - 'description': 'N/A' + "is_up": True, + "is_enabled": True, + "speed": 0, + "last_flapped": -1.0, + "mac_address": "", + "mtu": 0, + "description": "N/A", } interface_pattern = re.compile( - r"(ethernet\d+/\d+\.\d+)|(ae\d+\.\d+)|(loopback\.)|(tunnel\.)|(vlan\.)") + r"(ethernet\d+/\d+\.\d+)|(ae\d+\.\d+)|(loopback\.)|(tunnel\.)|(vlan\.)" + ) interface_dict = {} interface_list = self._extract_interface_list() @@ -531,44 +561,49 @@ def get_interfaces(self): try: self.device.op(cmd=cmd) interface_info_xml = xmltodict.parse(self.device.xml_root()) - interface_info_json = json.dumps(interface_info_xml['response']['result']['hw']) + interface_info_json = json.dumps( + interface_info_xml["response"]["result"]["hw"] + ) interface_info = json.loads(interface_info_json) except KeyError as err: - if interface_pattern.search(intf) and 'hw' in str(err): + if interface_pattern.search(intf) and "hw" in str(err): # physical/ae/tunnel/loopback sub-ifs don't return a 'hw' key interface_dict[intf] = SUBIF_DEFAULTS continue raise - interface['is_up'] = interface_info.get('state') == 'up' + interface["is_up"] = interface_info.get("state") == "up" - conf_state = interface_info.get('state_c') - if conf_state == 'down': - interface['is_enabled'] = False - elif conf_state in ('up', 'auto'): - interface['is_enabled'] = True + conf_state = interface_info.get("state_c") + if conf_state == "down": + interface["is_enabled"] = False + elif conf_state in ("up", "auto"): + interface["is_enabled"] = True else: - msg = 'Unknown configured state {} for interface {}'.format(conf_state, intf) + msg = "Unknown configured state {} for interface {}".format( + conf_state, intf + ) raise RuntimeError(msg) - interface['last_flapped'] = -1.0 - interface['speed'] = interface_info.get('speed') + interface["last_flapped"] = -1.0 + interface["mtu"] = 0 + interface["speed"] = interface_info.get("speed") # Loopback and down interfaces - if interface['speed'] in ('[n/a]', 'unknown'): - interface['speed'] = 0 + if interface["speed"] in ("[n/a]", "unknown"): + interface["speed"] = 0 else: - interface['speed'] = int(interface['speed']) - interface['mac_address'] = standardize_mac(interface_info.get('mac')) - interface['description'] = py23_compat.text_type('N/A') + interface["speed"] = int(interface["speed"]) + interface["mac_address"] = standardize_mac(interface_info.get("mac")) + interface["description"] = py23_compat.text_type("N/A") interface_dict[intf] = interface return interface_dict def get_interfaces_ip(self): - '''Return IP interface data.''' + """Return IP interface data.""" def extract_ip_info(parsed_intf_dict): - ''' + """ IPv4: - Primary IP is in the '' tag. If no v4 is configured the return value is 'N/A'. - Secondary IP's are in ''. If no secondaries, this field is not returned by @@ -607,36 +642,42 @@ def extract_ip_info(parsed_intf_dict): - ''' - intf = parsed_intf_dict['name'] + """ + intf = parsed_intf_dict["name"] _ip_info = {intf: {}} - v4_ip = parsed_intf_dict.get('ip') - secondary_v4_ip = parsed_intf_dict.get('addr') - v6_ip = parsed_intf_dict.get('addr6') + v4_ip = parsed_intf_dict.get("ip") + secondary_v4_ip = parsed_intf_dict.get("addr") + v6_ip = parsed_intf_dict.get("addr6") - if v4_ip != 'N/A': - address, pref = v4_ip.split('/') - _ip_info[intf].setdefault('ipv4', {})[address] = {'prefix_length': int(pref)} + if v4_ip != "N/A": + address, pref = v4_ip.split("/") + _ip_info[intf].setdefault("ipv4", {})[address] = { + "prefix_length": int(pref) + } if secondary_v4_ip is not None: - members = secondary_v4_ip['member'] + members = secondary_v4_ip["member"] if not isinstance(members, list): # If only 1 secondary IP is present, xmltodict converts field to a string, else # it converts it to a list of strings. members = [members] for address in members: - address, pref = address.split('/') - _ip_info[intf].setdefault('ipv4', {})[address] = {'prefix_length': int(pref)} + address, pref = address.split("/") + _ip_info[intf].setdefault("ipv4", {})[address] = { + "prefix_length": int(pref) + } if v6_ip is not None: - members = v6_ip['member'] + members = v6_ip["member"] if not isinstance(members, list): # Same "1 vs many -> string vs list of strings" comment. members = [members] for address in members: - address, pref = address.split('/') - _ip_info[intf].setdefault('ipv6', {})[address] = {'prefix_length': int(pref)} + address, pref = address.split("/") + _ip_info[intf].setdefault("ipv6", {})[address] = { + "prefix_length": int(pref) + } # Reset dictionary if no addresses were found. if _ip_info == {intf: {}}: @@ -650,7 +691,7 @@ def extract_ip_info(parsed_intf_dict): self.device.op(cmd=cmd) interface_info_xml = xmltodict.parse(self.device.xml_root()) interface_info_json = json.dumps( - interface_info_xml['response']['result']['ifnet']['entry'] + interface_info_xml["response"]["result"]["ifnet"]["entry"] ) interface_info = json.loads(interface_info_json) diff --git a/requirements-dev.txt b/requirements-dev.txt index 9fcae16..e8e7466 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,4 +1,5 @@ coveralls +pydocstyle=3.0.0 pytest pytest-cov pytest-json diff --git a/setup.py b/setup.py index 1e6cb5d..48d8032 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages -__author__ = 'Gabriele Gerbino ' +__author__ = "Gabriele Gerbino " with open("requirements.txt", "r") as fs: reqs = [r for r in fs.read().splitlines() if (len(r) > 0 and not r.startswith("#"))] @@ -15,12 +15,12 @@ author_email="gabriele@networktocode.com", description="Network Automation and Programmability Abstraction Layer with Multivendor support", classifiers=[ - 'Topic :: Utilities', - 'Programming Language :: Python', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', - 'Operating System :: POSIX :: Linux', - 'Operating System :: MacOS', + "Topic :: Utilities", + "Programming Language :: Python", + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 2.7", + "Operating System :: POSIX :: Linux", + "Operating System :: MacOS", ], url="https://github.com/napalm-automation/napalm-panos", include_package_data=True, diff --git a/test/unit/TestDriver.py b/test/unit/TestDriver.py index 2304fd5..06e2ada 100644 --- a/test/unit/TestDriver.py +++ b/test/unit/TestDriver.py @@ -24,13 +24,13 @@ class TestConfigDriver(unittest.TestCase, TestConfigNetworkDriver): @classmethod def setUpClass(cls): - hostname = '1.2.3.4' - username = 'test' - password = 'test' - cls.vendor = 'panos' + hostname = "1.2.3.4" + username = "test" + password = "test" + cls.vendor = "panos" cls.device = panos.PANOSDriver(hostname, username, password, timeout=60) cls.device.open() - cls.device.load_replace_candidate(filename='%s/initial.conf' % cls.vendor) + cls.device.load_replace_candidate(filename="%s/initial.conf" % cls.vendor) cls.device.commit_config() diff --git a/test/unit/conftest.py b/test/unit/conftest.py index d188320..b864cb8 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -8,16 +8,18 @@ from napalm_panos import PANOSDriver as OriginalDriver -@pytest.fixture(scope='class') +@pytest.fixture(scope="class") def set_device_parameters(request): """Set up the class.""" + def fin(): request.cls.device.close() + request.addfinalizer(fin) request.cls.driver = OriginalDriver request.cls.patched_driver = PatchedDriver - request.cls.vendor = 'panos' + request.cls.vendor = "panos" parent_conftest.set_device_parameters(request) @@ -28,9 +30,10 @@ def pytest_generate_tests(metafunc): class PatchedDriver(OriginalDriver): """Patched Driver.""" + def __init__(self, hostname, username, password, timeout=60, optional_args=None): super().__init__(hostname, username, password, timeout, optional_args) - self.patched_attrs = ['device'] + self.patched_attrs = ["device"] self.device = FakeDevice() def open(self): @@ -40,14 +43,14 @@ def close(self): pass def is_alive(self): - return({'is_alive': True}) + return {"is_alive": True} class FakeDevice(BaseTestDouble): """Device test double.""" def __init__(self): - self.cmd = '' + self.cmd = "" @staticmethod def read_txt_file(filename): @@ -56,16 +59,16 @@ def read_txt_file(filename): return data_file.read() def xml_root(self): - tmp_str = self.cmd.replace('<', '_').replace('>', '_') - filename = tmp_str.replace('/', '_').replace('\n', '').replace(' ', '') - full_path = self.find_file('{}.xml'.format(filename)) + tmp_str = self.cmd.replace("<", "_").replace(">", "_") + filename = tmp_str.replace("/", "_").replace("\n", "").replace(" ", "") + full_path = self.find_file("{}.xml".format(filename)) xml_string = self.read_txt_file(full_path) return xml_string - def op(self, cmd=''): + def op(self, cmd=""): self.cmd = cmd return True - def show(self, cmd=''): - self.cmd = 'running_config' + def show(self, cmd=""): + self.cmd = "running_config" return True diff --git a/test/unit/mocked_data/test_get_interfaces/normal/expected_result.json b/test/unit/mocked_data/test_get_interfaces/normal/expected_result.json index 8f28041..e085c59 100644 --- a/test/unit/mocked_data/test_get_interfaces/normal/expected_result.json +++ b/test/unit/mocked_data/test_get_interfaces/normal/expected_result.json @@ -5,6 +5,7 @@ "mac_address": "BA:DB:EE:FB:AD:12", "speed": 1000, "description": "N/A", + "mtu": 0, "last_flapped": -1.0 }, "ethernet1/4": { @@ -13,6 +14,7 @@ "mac_address": "BA:DB:EE:FB:AD:13", "speed": 1000, "description": "N/A", + "mtu": 0, "last_flapped": -1.0 }, "ethernet1/2": { @@ -21,6 +23,7 @@ "mac_address": "BA:DB:EE:FB:AD:11", "speed": 1000, "description": "N/A", + "mtu": 0, "last_flapped": -1.0 }, "loopback": { @@ -29,6 +32,7 @@ "mac_address": "BA:DB:EE:FB:AD:03", "speed": 0, "description": "N/A", + "mtu": 0, "last_flapped": -1.0 }, "ethernet1/1": { @@ -37,6 +41,7 @@ "mac_address": "00:50:56:11:22:33", "speed": 10000, "description": "N/A", + "mtu": 0, "last_flapped": -1.0 }, "tunnel": { @@ -45,6 +50,7 @@ "mac_address": "E4:A7:49:AA:BB:CC", "speed": 0, "description": "N/A", + "mtu": 0, "last_flapped": -1.0 }, "loopback.100": { @@ -53,6 +59,7 @@ "mac_address": "", "speed": 0, "description": "N/A", + "mtu": 0, "is_up": true }, "tunnel.100": { @@ -61,6 +68,7 @@ "mac_address": "", "speed": 0, "description": "N/A", + "mtu": 0, "is_up": true }, "ethernet1/1.143": { @@ -69,6 +77,7 @@ "mac_address": "", "speed": 0, "description": "N/A", + "mtu": 0, "is_up": true }, @@ -78,6 +87,7 @@ "mac_address": "00:11:22:33:44:55", "speed": 0, "description": "N/A", + "mtu": 0, "is_up": true }, @@ -87,7 +97,8 @@ "mac_address": "", "speed": 0, "description": "N/A", + "mtu": 0, "is_up": true } -} \ No newline at end of file +}