From 6c53f19c51fa80981893bdb940341af5aab77149 Mon Sep 17 00:00:00 2001 From: Snuffy2 Date: Sat, 28 Dec 2024 12:52:19 -0500 Subject: [PATCH] Further linting/formatting fixes --- custom_components/opnsense/__init__.py | 35 +- custom_components/opnsense/binary_sensor.py | 10 +- custom_components/opnsense/config_flow.py | 92 ++---- custom_components/opnsense/coordinator.py | 4 +- custom_components/opnsense/device_tracker.py | 30 +- custom_components/opnsense/entity.py | 8 +- custom_components/opnsense/helpers.py | 2 +- .../opnsense/pyopnsense/__init__.py | 310 +++++------------- custom_components/opnsense/sensor.py | 26 +- custom_components/opnsense/services.py | 37 +-- custom_components/opnsense/switch.py | 53 +-- custom_components/opnsense/update.py | 56 +--- pyproject.toml | 39 +-- 13 files changed, 188 insertions(+), 514 deletions(-) diff --git a/custom_components/opnsense/__init__.py b/custom_components/opnsense/__init__.py index 4f598f5..df80004 100644 --- a/custom_components/opnsense/__init__.py +++ b/custom_components/opnsense/__init__.py @@ -237,18 +237,12 @@ async def async_remove_config_entry_device( """Remove OPNsense Devices that aren't Device Tracker Devices and without any linked entities.""" if device_entry.via_device_id: - _LOGGER.error( - "Remove OPNsense Device Tracker Devices via the Integration Configuration" - ) + _LOGGER.error("Remove OPNsense Device Tracker Devices via the Integration Configuration") return False entity_registry = er.async_get(hass) - for ent in er.async_entries_for_config_entry( - entity_registry, config_entry.entry_id - ): + for ent in er.async_entries_for_config_entry(entity_registry, config_entry.entry_id): if ent.device_id == device_entry.id: - _LOGGER.error( - "Cannot remove OPNsense Devices with linked entities at this time" - ) + _LOGGER.error("Cannot remove OPNsense Devices with linked entities at this time") return False return True @@ -318,8 +312,7 @@ async def _migrate_2_to_3(hass: HomeAssistant, config_entry: ConfigEntry) -> boo is_main_dev: bool = any(t[0] == "opnsense" for t in dev.identifiers) if is_main_dev: new_identifiers = { - (t[0], new_device_unique_id) if t[0] == "opnsense" else t - for t in dev.identifiers + (t[0], new_device_unique_id) if t[0] == "opnsense" else t for t in dev.identifiers } _LOGGER.debug( "[migrate_2_to_3] dev.identifiers: %s, new_identifiers: %s", @@ -339,9 +332,7 @@ async def _migrate_2_to_3(hass: HomeAssistant, config_entry: ConfigEntry) -> boo e, ) - for ent in er.async_entries_for_config_entry( - entity_registry, config_entry.entry_id - ): + for ent in er.async_entries_for_config_entry(entity_registry, config_entry.entry_id): # _LOGGER.debug(f"[migrate_2_to_3] ent: {ent}") platform = ent.entity_id.split(".")[0] try: @@ -418,9 +409,7 @@ async def _migrate_3_to_4(hass: HomeAssistant, config_entry: ConfigEntry) -> boo ) telemetry = await client.get_telemetry() - for ent in er.async_entries_for_config_entry( - entity_registry, config_entry.entry_id - ): + for ent in er.async_entries_for_config_entry(entity_registry, config_entry.entry_id): platform = ent.entity_id.split(".")[0] if platform == Platform.SENSOR: # _LOGGER.debug(f"[migrate_3_to_4] ent: {ent}") @@ -429,15 +418,11 @@ async def _migrate_3_to_4(hass: HomeAssistant, config_entry: ConfigEntry) -> boo "_telemetry_interface_", "_interface_" ) elif "_telemetry_gateway_" in ent.unique_id: - new_unique_id = ent.unique_id.replace( - "_telemetry_gateway_", "_gateway_" - ) + new_unique_id = ent.unique_id.replace("_telemetry_gateway_", "_gateway_") elif "_connected_client_count" in ent.unique_id: try: entity_registry.async_remove(ent.entity_id) - _LOGGER.debug( - "[migrate_3_to_4] removed_entity_id: %s", ent.entity_id - ) + _LOGGER.debug("[migrate_3_to_4] removed_entity_id: %s", ent.entity_id) except (KeyError, ValueError) as e: _LOGGER.error( "Error removing entity: %s. %s: %s", @@ -447,9 +432,7 @@ async def _migrate_3_to_4(hass: HomeAssistant, config_entry: ConfigEntry) -> boo ) continue elif "_telemetry_openvpn_" in ent.unique_id: - new_unique_id = ent.unique_id.replace( - "_telemetry_openvpn_", "_openvpn_" - ) + new_unique_id = ent.unique_id.replace("_telemetry_openvpn_", "_openvpn_") elif "_telemetry_filesystems_" in ent.unique_id: new_unique_id = None for filesystem in telemetry.get("filesystems", []): diff --git a/custom_components/opnsense/binary_sensor.py b/custom_components/opnsense/binary_sensor.py index 86db83e..6b4e7b8 100644 --- a/custom_components/opnsense/binary_sensor.py +++ b/custom_components/opnsense/binary_sensor.py @@ -28,9 +28,9 @@ async def async_setup_entry( ) -> None: """Set up the OPNsense binary sensors.""" - coordinator: OPNsenseDataUpdateCoordinator = hass.data[DOMAIN][ - config_entry.entry_id - ][COORDINATOR] + coordinator: OPNsenseDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id][ + COORDINATOR + ] entities: list = [] entities.extend( @@ -75,9 +75,7 @@ def __init__( ) -> None: """Initialize OPNsense Binary Sensor entity.""" name_suffix: str | None = ( - entity_description.name - if isinstance(entity_description.name, str) - else None + entity_description.name if isinstance(entity_description.name, str) else None ) unique_id_suffix: str | None = ( entity_description.key if isinstance(entity_description.key, str) else None diff --git a/custom_components/opnsense/config_flow.py b/custom_components/opnsense/config_flow.py index 54b5795..c594b27 100644 --- a/custom_components/opnsense/config_flow.py +++ b/custom_components/opnsense/config_flow.py @@ -13,12 +13,7 @@ import awesomeversion import voluptuous as vol -from homeassistant.config_entries import ( - ConfigEntry, - ConfigFlow, - ConfigFlowResult, - OptionsFlow, -) +from homeassistant.config_entries import ConfigEntry, ConfigFlow, ConfigFlowResult, OptionsFlow from homeassistant.const import ( CONF_NAME, CONF_PASSWORD, @@ -113,9 +108,7 @@ async def validate_input( message=f"Missing Device Unique ID Error. {err.__class__.__qualname__}: {err}", ) except PluginMissing: - _log_and_set_error( - errors=errors, key="plugin_missing", message="OPNsense Plugin Missing" - ) + _log_and_set_error(errors=errors, key="plugin_missing", message="OPNsense Plugin Missing") except (aiohttp.InvalidURL, InvalidURL) as err: _log_and_set_error( errors=errors, @@ -158,10 +151,7 @@ async def validate_input( ) except xmlrpc.client.ProtocolError as err: error_message = str(err) - if ( - "307 Temporary Redirect" in error_message - or "301 Moved Permanently" in error_message - ): + if "307 Temporary Redirect" in error_message or "301 Moved Permanently" in error_message: errors["base"] = "url_redirect" else: errors["base"] = "cannot_connect" @@ -218,9 +208,7 @@ async def _clean_and_parse_url(user_input: MutableMapping[str, Any]) -> None: _LOGGER.debug("[config_flow] Cleaned URL: %s", user_input[CONF_URL]) -async def _get_client( - user_input: MutableMapping[str, Any], hass: HomeAssistant -) -> OPNsenseClient: +async def _get_client(user_input: MutableMapping[str, Any], hass: HomeAssistant) -> OPNsenseClient: """Create and return the OPNsense client.""" return OPNsenseClient( url=user_input[CONF_URL], @@ -244,18 +232,14 @@ def _validate_firmware_version(firmware_version: str) -> None: raise BelowMinFirmware -async def _handle_user_input( - user_input: MutableMapping[str, Any], hass: HomeAssistant -) -> None: +async def _handle_user_input(user_input: MutableMapping[str, Any], hass: HomeAssistant) -> None: """Handle and validate the user input.""" await _clean_and_parse_url(user_input) client: OPNsenseClient = await _get_client(user_input, hass) user_input[CONF_FIRMWARE_VERSION] = await client.get_host_firmware_version() - _LOGGER.debug( - "[config_flow] Firmware Version: %s", user_input[CONF_FIRMWARE_VERSION] - ) + _LOGGER.debug("[config_flow] Firmware Version: %s", user_input[CONF_FIRMWARE_VERSION]) try: _validate_firmware_version(user_input[CONF_FIRMWARE_VERSION]) @@ -272,17 +256,13 @@ async def _handle_user_input( user_input[CONF_NAME] = system_info.get("name") or "OPNsense" user_input[CONF_DEVICE_UNIQUE_ID] = await client.get_device_unique_id() - _LOGGER.debug( - "[config_flow] Device Unique ID: %s", user_input[CONF_DEVICE_UNIQUE_ID] - ) + _LOGGER.debug("[config_flow] Device Unique ID: %s", user_input[CONF_DEVICE_UNIQUE_ID]) if not user_input.get(CONF_DEVICE_UNIQUE_ID): raise MissingDeviceUniqueID -def _log_and_set_error( - errors: MutableMapping[str, Any], key: str, message: str -) -> None: +def _log_and_set_error(errors: MutableMapping[str, Any], key: str, message: str) -> None: """Log the error and set it in the errors dictionary.""" _LOGGER.error(message) errors["base"] = key @@ -303,9 +283,7 @@ async def async_step_user( errors: MutableMapping[str, Any] = {} firmware: str = "Unknown" if user_input is not None: - errors = await validate_input( - hass=self.hass, user_input=user_input, errors=errors - ) + errors = await validate_input(hass=self.hass, user_input=user_input, errors=errors) firmware = user_input.get(CONF_FIRMWARE_VERSION, "Unknown") if not errors: # https://developers.home-assistant.io/docs/config_entries_config_flow_handler#unique-ids @@ -327,9 +305,7 @@ async def async_step_user( user_input = {} schema = vol.Schema( { - vol.Required( - CONF_URL, default=user_input.get(CONF_URL, "https://") - ): str, + vol.Required(CONF_URL, default=user_input.get(CONF_URL, "https://")): str, vol.Optional( CONF_VERIFY_SSL, default=user_input.get(CONF_VERIFY_SSL, DEFAULT_VERIFY_SSL), @@ -338,9 +314,7 @@ async def async_step_user( CONF_USERNAME, default=user_input.get(CONF_USERNAME, ""), ): str, - vol.Required( - CONF_PASSWORD, default=user_input.get(CONF_PASSWORD, "") - ): str, + vol.Required(CONF_PASSWORD, default=user_input.get(CONF_PASSWORD, "")): str, vol.Optional(CONF_NAME, default=user_input.get(CONF_NAME, "")): str, } ) @@ -365,9 +339,7 @@ async def async_step_reconfigure( firmware: str = "Unknown" if user_input is not None: user_input[CONF_NAME] = prev_data.get(CONF_NAME, "") - errors = await validate_input( - hass=self.hass, user_input=user_input, errors=errors - ) + errors = await validate_input(hass=self.hass, user_input=user_input, errors=errors) firmware = user_input.get(CONF_FIRMWARE_VERSION, "Unknown") if not errors: # https://developers.home-assistant.io/docs/config_entries_config_flow_handler#unique-ids @@ -391,9 +363,7 @@ async def async_step_reconfigure( { vol.Required( CONF_URL, - default=user_input.get( - CONF_URL, prev_data.get(CONF_URL, "https://") - ), + default=user_input.get(CONF_URL, prev_data.get(CONF_URL, "https://")), ): str, vol.Optional( CONF_VERIFY_SSL, @@ -404,15 +374,11 @@ async def async_step_reconfigure( ): bool, vol.Required( CONF_USERNAME, - default=user_input.get( - CONF_USERNAME, prev_data.get(CONF_USERNAME, "") - ), + default=user_input.get(CONF_USERNAME, prev_data.get(CONF_USERNAME, "")), ): str, vol.Required( CONF_PASSWORD, - default=user_input.get( - CONF_PASSWORD, prev_data.get(CONF_PASSWORD, "") - ), + default=user_input.get(CONF_PASSWORD, prev_data.get(CONF_PASSWORD, "")), ): str, } ) @@ -455,9 +421,7 @@ async def async_step_init(self, user_input=None) -> ConfigFlowResult: return await self.async_step_device_tracker() return self.async_create_entry(title="", data=user_input) - scan_interval = self.config_entry.options.get( - CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL - ) + scan_interval = self.config_entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL) device_tracker_enabled = self.config_entry.options.get( CONF_DEVICE_TRACKER_ENABLED, DEFAULT_DEVICE_TRACKER_ENABLED ) @@ -472,9 +436,7 @@ async def async_step_init(self, user_input=None) -> ConfigFlowResult: vol.Optional(CONF_SCAN_INTERVAL, default=scan_interval): vol.All( vol.Coerce(int), vol.Clamp(min=10, max=300) ), - vol.Optional( - CONF_DEVICE_TRACKER_ENABLED, default=device_tracker_enabled - ): bool, + vol.Optional(CONF_DEVICE_TRACKER_ENABLED, default=device_tracker_enabled): bool, vol.Optional( CONF_DEVICE_TRACKER_SCAN_INTERVAL, default=device_tracker_scan_interval ): vol.All(vol.Coerce(int), vol.Clamp(min=30, max=300)), @@ -490,9 +452,7 @@ async def async_step_device_tracker(self, user_input=None) -> ConfigFlowResult: url = self.config_entry.data[CONF_URL].strip() username: str = self.config_entry.data[CONF_USERNAME] password: str = self.config_entry.data[CONF_PASSWORD] - verify_ssl: bool = self.config_entry.data.get( - CONF_VERIFY_SSL, DEFAULT_VERIFY_SSL - ) + verify_ssl: bool = self.config_entry.data.get(CONF_VERIFY_SSL, DEFAULT_VERIFY_SSL) client = OPNsenseClient( url=url, username=username, @@ -540,9 +500,9 @@ async def async_step_device_tracker(self, user_input=None) -> ConfigFlowResult: step_id="device_tracker", data_schema=vol.Schema( { - vol.Optional( - CONF_DEVICES, default=selected_devices - ): cv.multi_select(dict(sorted_entries)), + vol.Optional(CONF_DEVICES, default=selected_devices): cv.multi_select( + dict(sorted_entries) + ), vol.Optional(CONF_MANUAL_DEVICES): selector.TextSelector( selector.TextSelectorConfig() ), @@ -552,9 +512,9 @@ async def async_step_device_tracker(self, user_input=None) -> ConfigFlowResult: if user_input: _LOGGER.debug("[options_flow device_tracker] user_input: %s", user_input) macs: list = [] - if isinstance( - user_input.get(CONF_MANUAL_DEVICES, None), str - ) and user_input.get(CONF_MANUAL_DEVICES, None): + if isinstance(user_input.get(CONF_MANUAL_DEVICES, None), str) and user_input.get( + CONF_MANUAL_DEVICES, None + ): for item in user_input.get(CONF_MANUAL_DEVICES).split(","): if not isinstance(item, str) or not item: continue @@ -562,9 +522,7 @@ async def async_step_device_tracker(self, user_input=None) -> ConfigFlowResult: if is_valid_mac_address(item): macs.append(item) _LOGGER.debug("[async_step_device_tracker] Manual Devices: %s", macs) - _LOGGER.debug( - "[async_step_device_tracker] Devices: %s", user_input.get(CONF_DEVICES) - ) + _LOGGER.debug("[async_step_device_tracker] Devices: %s", user_input.get(CONF_DEVICES)) self.new_options[CONF_DEVICES] = user_input.get(CONF_DEVICES) + macs return self.async_create_entry(title="", data=self.new_options) diff --git a/custom_components/opnsense/coordinator.py b/custom_components/opnsense/coordinator.py index 869de1a..5d0baca 100644 --- a/custom_components/opnsense/coordinator.py +++ b/custom_components/opnsense/coordinator.py @@ -155,9 +155,7 @@ async def _async_update_data(self): self._state.update(await self._get_states(categories)) if self._state.get("device_unique_id") is None: - _LOGGER.warning( - "Coordinator failed to confirm OPNsense Router Unique ID. Will retry" - ) + _LOGGER.warning("Coordinator failed to confirm OPNsense Router Unique ID. Will retry") return {} if self._state.get("device_unique_id") != self._device_unique_id: _LOGGER.debug( diff --git a/custom_components/opnsense/device_tracker.py b/custom_components/opnsense/device_tracker.py index b982bae..0a533ce 100644 --- a/custom_components/opnsense/device_tracker.py +++ b/custom_components/opnsense/device_tracker.py @@ -84,9 +84,7 @@ async def async_setup_entry( pass devices.append(device) - elif config_entry.options.get( - CONF_DEVICE_TRACKER_ENABLED, DEFAULT_DEVICE_TRACKER_ENABLED - ): + elif config_entry.options.get(CONF_DEVICE_TRACKER_ENABLED, DEFAULT_DEVICE_TRACKER_ENABLED): for arp_entry in arp_entries: mac_address = arp_entry.get("mac", None) if mac_address and mac_address not in mac_addresses: @@ -115,9 +113,7 @@ async def async_setup_entry( # Get the MACs that need to be removed and remove their devices for mac_address in list(set(previous_mac_addresses) - set(mac_addresses)): - rem_device = dev_reg.async_get_device( - connections={(CONNECTION_NETWORK_MAC, mac_address)} - ) + rem_device = dev_reg.async_get_device(connections={(CONNECTION_NETWORK_MAC, mac_address)}) if rem_device: dev_reg.async_remove_device(rem_device.id) @@ -211,9 +207,7 @@ def _handle_coordinator_update(self) -> None: entry = {} # _LOGGER.debug(f"[OPNsenseScannerEntity handle_coordinator_update] entry: {entry}") try: - self._attr_ip_address = ( - entry.get("ip") if len(entry.get("ip", 0)) > 0 else None - ) + self._attr_ip_address = entry.get("ip") if len(entry.get("ip", 0)) > 0 else None except (TypeError, KeyError, AttributeError): self._attr_ip_address = None @@ -232,11 +226,7 @@ def _handle_coordinator_update(self) -> None: if self._attr_hostname: self._last_known_hostname = self._attr_hostname - if ( - not isinstance(entry, MutableMapping) - or not entry - or entry.get("expired", False) - ): + if not isinstance(entry, MutableMapping) or not entry or entry.get("expired", False): if self._last_known_ip: # force a ping to _last_known_ip to possibly recreate arp entry? pass @@ -248,9 +238,7 @@ def _handle_coordinator_update(self) -> None: if device_tracker_consider_home > 0 and isinstance( self._last_known_connected_time, datetime ): - elapsed: timedelta = ( - datetime.now().astimezone() - self._last_known_connected_time - ) + elapsed: timedelta = datetime.now().astimezone() - self._last_known_connected_time if elapsed.total_seconds() < device_tracker_consider_home: self._is_connected = True @@ -287,9 +275,7 @@ def _handle_coordinator_update(self) -> None: pass if self._attr_hostname is None and self._last_known_hostname: - self._attr_extra_state_attributes["last_known_hostname"] = ( - self._last_known_hostname - ) + self._attr_extra_state_attributes["last_known_hostname"] = self._last_known_hostname else: self._attr_extra_state_attributes.pop("last_known_hostname", None) @@ -304,9 +290,7 @@ def _handle_coordinator_update(self) -> None: ) try: - self._attr_icon = ( - "mdi:lan-connect" if self.is_connected else "mdi:lan-disconnect" - ) + self._attr_icon = "mdi:lan-connect" if self.is_connected else "mdi:lan-disconnect" except (TypeError, KeyError, AttributeError): self._attr_icon = "mdi:lan-disconnect" diff --git a/custom_components/opnsense/entity.py b/custom_components/opnsense/entity.py index 53d4555..f7bc072 100644 --- a/custom_components/opnsense/entity.py +++ b/custom_components/opnsense/entity.py @@ -32,13 +32,9 @@ def __init__( self.coordinator: OPNsenseDataUpdateCoordinator = coordinator self._device_unique_id: str = config_entry.data[CONF_DEVICE_UNIQUE_ID] if unique_id_suffix: - self._attr_unique_id: str = slugify( - f"{self._device_unique_id}_{unique_id_suffix}" - ) + self._attr_unique_id: str = slugify(f"{self._device_unique_id}_{unique_id_suffix}") if name_suffix: - self._attr_name: str | None = ( - f"{self.opnsense_device_name or 'OPNsense'} {name_suffix}" - ) + self._attr_name: str | None = f"{self.opnsense_device_name or 'OPNsense'} {name_suffix}" self._client: OPNsenseClient | None = None self._attr_extra_state_attributes: dict[str, Any] = {} self._available: bool = False diff --git a/custom_components/opnsense/helpers.py b/custom_components/opnsense/helpers.py index 7eca701..805de9f 100644 --- a/custom_components/opnsense/helpers.py +++ b/custom_components/opnsense/helpers.py @@ -15,7 +15,7 @@ def dict_get(data: MutableMapping[str, Any], path: str, default=None) -> Any | N for key in pathList: if key.isnumeric(): key = int(key) - if isinstance(result, (MutableMapping, list)) and key in result: + if isinstance(result, MutableMapping | list) and key in result: result = result[key] else: result = default diff --git a/custom_components/opnsense/pyopnsense/__init__.py b/custom_components/opnsense/pyopnsense/__init__.py index e24a94a..f11f04f 100644 --- a/custom_components/opnsense/pyopnsense/__init__.py +++ b/custom_components/opnsense/pyopnsense/__init__.py @@ -2,7 +2,7 @@ from abc import ABC import asyncio -from collections.abc import MutableMapping +from collections.abc import Callable, MutableMapping from datetime import datetime, timedelta, timezone from functools import partial import inspect @@ -13,7 +13,7 @@ import socket import ssl import traceback -from typing import Any, Callable +from typing import Any from urllib.parse import quote, quote_plus, urlparse import xmlrpc.client @@ -36,9 +36,7 @@ async def inner(self, *args, **kwargs): except asyncio.CancelledError: raise except (TimeoutError, aiohttp.ServerTimeoutError) as e: - _LOGGER.warning( - "Timeout Error in %s. Will retry. %s", func.__name__.strip("_"), e - ) + _LOGGER.warning("Timeout Error in %s. Will retry. %s", func.__name__.strip("_"), e) if self._initial: raise except Exception as e: @@ -111,9 +109,7 @@ def get_ip_key(item) -> tuple: # If the address is empty, place it at the end return (3, "") try: - ip_obj: ipaddress.IPv4Address | ipaddress.IPv6Address = ipaddress.ip_address( - address - ) + ip_obj: ipaddress.IPv4Address | ipaddress.IPv6Address = ipaddress.ip_address(address) except ValueError: return (2, "") else: @@ -128,7 +124,7 @@ def dict_get(data: MutableMapping[str, Any], path: str, default=None): for key in pathList: if key.isnumeric(): key = int(key) - if isinstance(result, (MutableMapping, list)) and key in result: + if isinstance(result, MutableMapping | list) and key in result: result = result[key] else: result = default @@ -164,7 +160,9 @@ def __init__( self._verify_ssl: bool = self._opts.get("verify_ssl", True) parts = urlparse(url.rstrip("/")) self._url: str = f"{parts.scheme}://{parts.netloc}" - self._xmlrpc_url: str = f"{parts.scheme}://{quote_plus(username)}:{quote_plus(password)}@{parts.netloc}" + self._xmlrpc_url: str = ( + f"{parts.scheme}://{quote_plus(username)}:{quote_plus(password)}@{parts.netloc}" + ) self._scheme: str = parts.scheme self._session: aiohttp.ClientSession = session self._initial = initial @@ -218,9 +216,7 @@ async def _get_config_section(self, section) -> MutableMapping[str, Any]: @_xmlrpc_timeout async def _restore_config_section(self, section_name, data): params = {section_name: data} - proxy_method = partial( - self._get_proxy().opnsense.restore_config_section, params - ) + proxy_method = partial(self._get_proxy().opnsense.restore_config_section, params) return await self._loop.run_in_executor(None, proxy_method) @_xmlrpc_timeout @@ -247,9 +243,7 @@ async def _exec_php(self, script) -> MutableMapping[str, Any]: return json.loads(response.get("real", "")) except TypeError as e: stack = inspect.stack() - calling_function = ( - stack[1].function.strip("_") if len(stack) > 1 else "Unknown" - ) + calling_function = stack[1].function.strip("_") if len(stack) > 1 else "Unknown" _LOGGER.error( "Invalid data returned from exec_php for %s. %s: %s. Called from %s", calling_function, @@ -259,9 +253,7 @@ async def _exec_php(self, script) -> MutableMapping[str, Any]: ) except xmlrpc.client.Fault as e: stack = inspect.stack() - calling_function = ( - stack[1].function.strip("_") if len(stack) > 1 else "Unknown" - ) + calling_function = stack[1].function.strip("_") if len(stack) > 1 else "Unknown" _LOGGER.error( "Error running exec_php script for %s. %s: %s. Ensure the 'os-homeassistant-maxit' plugin has been installed on OPNsense", calling_function, @@ -270,9 +262,7 @@ async def _exec_php(self, script) -> MutableMapping[str, Any]: ) except socket.gaierror as e: stack = inspect.stack() - calling_function = ( - stack[1].function.strip("_") if len(stack) > 1 else "Unknown" - ) + calling_function = stack[1].function.strip("_") if len(stack) > 1 else "Unknown" _LOGGER.warning( "Connection Error running exec_php script for %s. %s: %s. Will retry", calling_function, @@ -281,9 +271,7 @@ async def _exec_php(self, script) -> MutableMapping[str, Any]: ) except ssl.SSLError as e: stack = inspect.stack() - calling_function = ( - stack[1].function.strip("_") if len(stack) > 1 else "Unknown" - ) + calling_function = stack[1].function.strip("_") if len(stack) > 1 else "Unknown" _LOGGER.warning( "SSL Connection Error running exec_php script for %s. %s: %s. Will retry", calling_function, @@ -325,9 +313,7 @@ async def is_plugin_installed(self) -> bool: return True return False - async def _get_from_stream( - self, path: str - ) -> MutableMapping[str, Any] | list | None: + async def _get_from_stream(self, path: str) -> MutableMapping[str, Any] | list | None: self._rest_api_query_count += 1 url: str = f"{self._url}{path}" _LOGGER.debug("[get_from_stream] url: %s", url) @@ -360,9 +346,9 @@ async def _get_from_stream( message_count += 1 if message_count == 2: response_str: str = line[len("data:") :].strip() - response_json: ( - MutableMapping[str, Any] | list - ) = json.loads(response_str) + response_json: MutableMapping[str, Any] | list = json.loads( + response_str + ) # _LOGGER.debug(f"[get_from_stream] response_json ({type(response_json).__name__}): {response_json}") return response_json # Exit after processing the second message @@ -370,9 +356,7 @@ async def _get_from_stream( if response.status == 403: stack = inspect.stack() calling_function = ( - stack[1].function.strip("_") - if len(stack) > 1 - else "Unknown" + stack[1].function.strip("_") if len(stack) > 1 else "Unknown" ) _LOGGER.error( "Permission Error in %s. Path: %s. Ensure the OPNsense user connected to HA has full Admin access", @@ -382,9 +366,7 @@ async def _get_from_stream( else: stack = inspect.stack() calling_function = ( - stack[1].function.strip("_") - if len(stack) > 1 - else "Unknown" + stack[1].function.strip("_") if len(stack) > 1 else "Unknown" ) _LOGGER.error( "Error in %s. Path: %s. Response %s: %s", @@ -422,15 +404,13 @@ async def _get(self, path: str) -> MutableMapping[str, Any] | list | None: ) as response: _LOGGER.debug("[get] Response %s: %s", response.status, response.reason) if response.ok: - response_json: ( - MutableMapping[str, Any] | list - ) = await response.json(content_type=None) + response_json: MutableMapping[str, Any] | list = await response.json( + content_type=None + ) return response_json if response.status == 403: stack = inspect.stack() - calling_function = ( - stack[1].function.strip("_") if len(stack) > 1 else "Unknown" - ) + calling_function = stack[1].function.strip("_") if len(stack) > 1 else "Unknown" _LOGGER.error( "Permission Error in %s. Path: %s. Ensure the OPNsense user connected to HA has full Admin access", calling_function, @@ -438,9 +418,7 @@ async def _get(self, path: str) -> MutableMapping[str, Any] | list | None: ) else: stack = inspect.stack() - calling_function = ( - stack[1].function.strip("_") if len(stack) > 1 else "Unknown" - ) + calling_function = stack[1].function.strip("_") if len(stack) > 1 else "Unknown" _LOGGER.error( "Error in %s. Path: %s. Response %s: %s", calling_function, @@ -463,9 +441,7 @@ async def _get(self, path: str) -> MutableMapping[str, Any] | list | None: return None - async def _post( - self, path: str, payload=None - ) -> MutableMapping[str, Any] | list | None: + async def _post(self, path: str, payload=None) -> MutableMapping[str, Any] | list | None: # /api////[/[/...]] self._rest_api_query_count += 1 url: str = f"{self._url}{path}" @@ -479,19 +455,15 @@ async def _post( timeout=aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT), ssl=self._verify_ssl, ) as response: - _LOGGER.debug( - "[post] Response %s: %s", response.status, response.reason - ) + _LOGGER.debug("[post] Response %s: %s", response.status, response.reason) if response.ok: - response_json: ( - MutableMapping[str, Any] | list - ) = await response.json(content_type=None) + response_json: MutableMapping[str, Any] | list = await response.json( + content_type=None + ) return response_json if response.status == 403: stack = inspect.stack() - calling_function = ( - stack[1].function.strip("_") if len(stack) > 1 else "Unknown" - ) + calling_function = stack[1].function.strip("_") if len(stack) > 1 else "Unknown" _LOGGER.error( "Permission Error in %s. Path: %s. Ensure the OPNsense user connected to HA has full Admin access", calling_function, @@ -499,9 +471,7 @@ async def _post( ) else: stack = inspect.stack() - calling_function = ( - stack[1].function.strip("_") if len(stack) > 1 else "Unknown" - ) + calling_function = stack[1].function.strip("_") if len(stack) > 1 else "Unknown" _LOGGER.error( "Error in %s. Path: %s. Response %s: %s", calling_function, @@ -541,20 +511,14 @@ async def get_device_unique_id(self) -> str | None: return None mac_addresses = [ - d.get("macaddr_hw") - for d in instances - if d.get("is_physical") and "macaddr_hw" in d + d.get("macaddr_hw") for d in instances if d.get("is_physical") and "macaddr_hw" in d ] unique_mac_addresses: list = sorted(set(mac_addresses)) - device_unique_id: str | None = ( - unique_mac_addresses[0] if unique_mac_addresses else None - ) + device_unique_id: str | None = unique_mac_addresses[0] if unique_mac_addresses else None if device_unique_id: device_unique_id_fmt = device_unique_id.replace(":", "_").strip() - _LOGGER.debug( - "[get_device_unique_id] device_unique_id: %s", device_unique_id_fmt - ) + _LOGGER.debug("[get_device_unique_id] device_unique_id: %s", device_unique_id_fmt) return device_unique_id_fmt _LOGGER.debug("[get_device_unique_id] device_unique_id: None") return None @@ -593,9 +557,7 @@ async def _get_system_info_legacy(self) -> MutableMapping[str, Any]: response: MutableMapping[str, Any] = await self._exec_php(script) if not isinstance(response, MutableMapping): return {} - response["name"] = ( - f"{response.pop('hostname', '')}.{response.pop('domain', '')}" - ) + response["name"] = f"{response.pop('hostname', '')}.{response.pop('domain', '')}" return response @_log_errors @@ -629,14 +591,10 @@ async def get_firmware_update_info(self): # "last_check": "Sun Jan 15 22:05:55 UTC 2023" # format = "%a %b %d %H:%M:%S %Z %Y" try: - last_check: datetime = parse( - status.get("last_check", 0), tzinfos=AMBIGUOUS_TZINFOS - ) + last_check: datetime = parse(status.get("last_check", 0), tzinfos=AMBIGUOUS_TZINFOS) if last_check.tzinfo is None: last_check = last_check.replace( - tzinfo=timezone( - datetime.now().astimezone().utcoffset() or timedelta() - ) + tzinfo=timezone(datetime.now().astimezone().utcoffset() or timedelta()) ) last_check_timestamp: float = last_check.timestamp() @@ -851,9 +809,9 @@ async def get_service_is_running(self, service: str) -> bool: if services is None or not isinstance(services, list): return False for svc in services: - if ( - svc.get("name", None) == service or svc.get("id", None) == service - ) and svc.get("status", False): + if (svc.get("name", None) == service or svc.get("id", None) == service) and svc.get( + "status", False + ): return True return False @@ -862,13 +820,8 @@ async def _manage_service(self, action: str, service: str) -> bool: return False api_addr: str = f"/api/core/service/{action}/{service}" response = await self._post(api_addr) - _LOGGER.debug( - "[%s_service] service: %s, response: %s", action, service, response - ) - return ( - isinstance(response, MutableMapping) - and response.get("result", "failed") == "ok" - ) + _LOGGER.debug("[%s_service] service: %s, response: %s", action, service, response) + return isinstance(response, MutableMapping) and response.get("result", "failed") == "ok" @_log_errors async def start_service(self, service: str) -> bool: @@ -921,9 +874,7 @@ async def get_dhcp_leases(self) -> MutableMapping[str, Any]: sorted_lease_interfaces: MutableMapping[str, Any] = { key: lease_interfaces[key] for key in sorted(lease_interfaces) } - sorted_leases: MutableMapping[str, Any] = { - key: leases[key] for key in sorted(leases) - } + sorted_leases: MutableMapping[str, Any] = {key: leases[key] for key in sorted(leases)} for if_subnet in sorted_leases.values(): sorted_if: list = sorted(if_subnet, key=get_ip_key) if_subnet = sorted_if @@ -942,9 +893,7 @@ async def _get_kea_interfaces(self) -> MutableMapping[str, Any]: if not isinstance(response, MutableMapping): return {} lease_interfaces: MutableMapping[str, Any] = {} - general: MutableMapping[str, Any] = response.get("dhcpv4", {}).get( - "general", {} - ) + general: MutableMapping[str, Any] = response.get("dhcpv4", {}).get("general", {}) if general.get("enabled", "0") != "1": return {} for if_name, iface in general.get("interfaces", {}).items(): @@ -998,8 +947,7 @@ async def _get_kea_dhcpv4_leases(self) -> list: if ( lease_info.get("hwaddr", None) and lease_info.get("hwaddr") in reservations - and reservations[lease_info.get("hwaddr")] - == lease_info.get("address", None) + and reservations[lease_info.get("hwaddr")] == lease_info.get("address", None) ): lease["type"] = "static" else: @@ -1049,13 +997,9 @@ async def _get_isc_dhcpv4_leases(self) -> list: lease["type"] = lease_info.get("type", None) lease["mac"] = lease_info.get("mac", None) if lease_info.get("ends", None): - dt: datetime = datetime.strptime( - lease_info.get("ends", None), "%Y/%m/%d %H:%M:%S" - ) + dt: datetime = datetime.strptime(lease_info.get("ends", None), "%Y/%m/%d %H:%M:%S") lease["expires"] = dt.replace( - tzinfo=timezone( - datetime.now().astimezone().utcoffset() or timedelta() - ) + tzinfo=timezone(datetime.now().astimezone().utcoffset() or timedelta()) ) if lease["expires"] < datetime.now().astimezone(): continue @@ -1096,13 +1040,9 @@ async def _get_isc_dhcpv6_leases(self) -> list: lease["type"] = lease_info.get("type", None) lease["mac"] = lease_info.get("mac", None) if lease_info.get("ends", None): - dt: datetime = datetime.strptime( - lease_info.get("ends", None), "%Y/%m/%d %H:%M:%S" - ) + dt: datetime = datetime.strptime(lease_info.get("ends", None), "%Y/%m/%d %H:%M:%S") lease["expires"] = dt.replace( - tzinfo=timezone( - datetime.now().astimezone().utcoffset() or timedelta() - ) + tzinfo=timezone(datetime.now().astimezone().utcoffset() or timedelta()) ) if lease["expires"] < datetime.now().astimezone(): continue @@ -1148,10 +1088,7 @@ async def get_carp_interfaces(self) -> list: continue for status in vip_status: - if ( - vip.get("interface", "").lower() - == status.get("interface", "_").lower() - ): + if vip.get("interface", "").lower() == status.get("interface", "_").lower(): vip["status"] = status.get("status", None) break if "status" not in vip or not vip.get("status"): @@ -1182,9 +1119,7 @@ async def system_halt(self) -> None: @_log_errors async def send_wol(self, interface, mac) -> bool: """Send a wake on lan packet to the specified MAC address.""" - payload: MutableMapping[str, Any] = { - "wake": {"interface": interface, "mac": mac} - } + payload: MutableMapping[str, Any] = {"wake": {"interface": interface, "mac": mac}} _LOGGER.debug("[send_wol] payload: %s", payload) response = await self._post("/api/wol/wol/set", payload) _LOGGER.debug("[send_wol] response: %s", response) @@ -1246,10 +1181,7 @@ async def get_interfaces(self) -> MutableMapping[str, Any]: interfaces: MutableMapping[str, Any] = {} for ifinfo in interface_info: interface: MutableMapping[str, Any] = {} - if ( - not isinstance(ifinfo, MutableMapping) - or ifinfo.get("identifier", "") == "" - ): + if not isinstance(ifinfo, MutableMapping) or ifinfo.get("identifier", "") == "": continue interface["inpkts"] = OPNsenseClient._try_to_int( ifinfo.get("statistics", {}).get("packets received", None) @@ -1291,10 +1223,7 @@ async def get_interfaces(self) -> MutableMapping[str, Any]: interface["gateways"] = ifinfo.get("gateways", []) interface["routes"] = ifinfo.get("routes", []) interface["device"] = ifinfo.get("device", None) - if ( - ifinfo.get("macaddr", None) - and ifinfo.get("macaddr", None) != "00:00:00:00:00:00" - ): + if ifinfo.get("macaddr", None) and ifinfo.get("macaddr", None) != "00:00:00:00:00:00": interface["mac"] = ifinfo.get("macaddr", None) interface["enabled"] = ifinfo.get("enabled", None) interface["vlan_tag"] = ifinfo.get("vlan_tag", None) @@ -1354,9 +1283,7 @@ async def _get_telemetry_memory(self) -> MutableMapping[str, Any]: memory["physmem"] = OPNsenseClient._try_to_int( memory_info.get("memory", {}).get("total", None) ) - memory["used"] = OPNsenseClient._try_to_int( - memory_info.get("memory", {}).get("used", None) - ) + memory["used"] = OPNsenseClient._try_to_int(memory_info.get("memory", {}).get("used", None)) memory["used_percent"] = ( round(memory["used"] / memory["physmem"] * 100) if isinstance(memory["used"], int) @@ -1376,9 +1303,7 @@ async def _get_telemetry_memory(self) -> MutableMapping[str, Any]: memory["swap_total"] = OPNsenseClient._try_to_int( swap_info.get("swap", [])[0].get("total", None) ) - memory["swap_reserved"] = OPNsenseClient._try_to_int( - swap_info["swap"][0].get("used", None) - ) + memory["swap_reserved"] = OPNsenseClient._try_to_int(swap_info["swap"][0].get("used", None)) memory["swap_used_percent"] = ( round(memory["swap_reserved"] / memory["swap_total"] * 100) if isinstance(memory["swap_reserved"], int) @@ -1436,28 +1361,18 @@ async def _get_telemetry_cpu(self) -> MutableMapping[str, Any]: return {} cpu: MutableMapping[str, Any] = {} cores_match = re.search(r"\((\d+) cores", cputype_info[0]) - cpu["count"] = ( - OPNsenseClient._try_to_int(cores_match.group(1)) if cores_match else 0 - ) + cpu["count"] = OPNsenseClient._try_to_int(cores_match.group(1)) if cores_match else 0 - cpustream_info = await self._get_from_stream( - "/api/diagnostics/cpu_usage/stream" - ) + cpustream_info = await self._get_from_stream("/api/diagnostics/cpu_usage/stream") # {"total":29,"user":2,"nice":0,"sys":27,"intr":0,"idle":70} # _LOGGER.debug(f"[get_telemetry_cpu] cpustream_info: {cpustream_info}") if not isinstance(cpustream_info, MutableMapping): return cpu - cpu["usage_total"] = OPNsenseClient._try_to_int( - cpustream_info.get("total", None) - ) + cpu["usage_total"] = OPNsenseClient._try_to_int(cpustream_info.get("total", None)) cpu["usage_user"] = OPNsenseClient._try_to_int(cpustream_info.get("user", None)) cpu["usage_nice"] = OPNsenseClient._try_to_int(cpustream_info.get("nice", None)) - cpu["usage_system"] = OPNsenseClient._try_to_int( - cpustream_info.get("sys", None) - ) - cpu["usage_interrupt"] = OPNsenseClient._try_to_int( - cpustream_info.get("intr", None) - ) + cpu["usage_system"] = OPNsenseClient._try_to_int(cpustream_info.get("sys", None)) + cpu["usage_interrupt"] = OPNsenseClient._try_to_int(cpustream_info.get("intr", None)) cpu["usage_idle"] = OPNsenseClient._try_to_int(cpustream_info.get("idle", None)) # _LOGGER.debug(f"[get_telemetry_cpu] cpu: {cpu}") return cpu @@ -1510,10 +1425,7 @@ async def get_openvpn(self) -> MutableMapping[str, Any]: or instance.get("role", "").lower() != "server" ): continue - if ( - instance.get("uuid", None) - and instance.get("uuid", None) not in openvpn["servers"] - ): + if instance.get("uuid", None) and instance.get("uuid", None) not in openvpn["servers"]: openvpn["servers"][instance.get("uuid")] = { "uuid": instance.get("uuid"), "name": instance.get("description"), @@ -1556,9 +1468,7 @@ async def get_openvpn(self) -> MutableMapping[str, Any]: elif session.get("status", None) == "failed": openvpn["servers"][server_id].update({"status": "failed"}) elif isinstance(session.get("status", None), str): - openvpn["servers"][server_id].update( - {"status": session.get("status")} - ) + openvpn["servers"][server_id].update({"status": session.get("status")}) else: openvpn["servers"][server_id].update({"status": "down"}) else: @@ -1567,9 +1477,7 @@ async def get_openvpn(self) -> MutableMapping[str, Any]: "status": "up", "latest_handshake": datetime.fromtimestamp( int(session.get("connected_since__time_t_")), - tz=timezone( - datetime.now().astimezone().utcoffset() or timedelta() - ), + tz=timezone(datetime.now().astimezone().utcoffset() or timedelta()), ), "total_bytes_recv": OPNsenseClient._try_to_int( session.get("bytes_received", 0), 0 @@ -1594,9 +1502,7 @@ async def get_openvpn(self) -> MutableMapping[str, Any]: "tunnel_addresses": [route.get("virtual_address")], "latest_handshake": datetime.fromtimestamp( int(route.get("last_ref__time_t_", 0)), - tz=timezone( - datetime.now().astimezone().utcoffset() or timedelta() - ), + tz=timezone(datetime.now().astimezone().utcoffset() or timedelta()), ), } ) @@ -1648,9 +1554,7 @@ async def get_gateways(self) -> MutableMapping[str, Any]: if isinstance(gw_info, MutableMapping) and "name" in gw_info: gateways[gw_info["name"]] = gw_info for gateway in gateways.values(): - gateway["status"] = gateway.pop( - "status_translated", gateway.get("status", "") - ).lower() + gateway["status"] = gateway.pop("status_translated", gateway.get("status", "")).lower() # _LOGGER.debug(f"[get_gateways] gateways: {gateways}") return gateways @@ -1663,9 +1567,7 @@ async def _get_telemetry_temps(self) -> MutableMapping[str, Any]: temps: MutableMapping[str, Any] = {} for i, temp_info in enumerate(temps_info): temp: MutableMapping[str, Any] = {} - temp["temperature"] = OPNsenseClient._try_to_float( - temp_info.get("temperature", 0), 0 - ) + temp["temperature"] = OPNsenseClient._try_to_float(temp_info.get("temperature", 0), 0) temp["name"] = ( f"{temp_info.get('type_translated', 'Num')} {temp_info.get('device_seq', i)}" ) @@ -1745,9 +1647,7 @@ async def _get_telemetry_legacy(self) -> MutableMapping[str, Any]: for filesystem in telemetry.get("filesystems", []): filesystem["blocks"] = filesystem.pop("size", None) try: - filesystem["used_pct"] = int( - filesystem.pop("capacity", "").strip("%") - ) + filesystem["used_pct"] = int(filesystem.pop("capacity", "").strip("%")) except ValueError: filesystem.pop("capacity", None) # _LOGGER.debug(f"[get_telemetry_legacy] telemetry: {telemetry}") @@ -1773,10 +1673,7 @@ async def get_notices(self) -> MutableMapping[str, Any]: "created_at": ( datetime.fromtimestamp( int(notice.get("timestamp", 0)), - tz=timezone( - datetime.now().astimezone().utcoffset() - or timedelta() - ), + tz=timezone(datetime.now().astimezone().utcoffset() or timedelta()), ) if notice.get("timestamp", None) else None @@ -1814,14 +1711,9 @@ async def close_notice(self, id) -> bool: ): success = False else: - dismiss = await self._post( - "/api/core/system/dismissStatus", payload={"subject": id} - ) + dismiss = await self._post("/api/core/system/dismissStatus", payload={"subject": id}) _LOGGER.debug("[close_notice] id: %s, dismiss: %s", id, dismiss) - if ( - not isinstance(dismiss, MutableMapping) - or dismiss.get("status", "failed") != "ok" - ): + if not isinstance(dismiss, MutableMapping) or dismiss.get("status", "failed") != "ok": success = False _LOGGER.debug("[close_notice] success: %s", success) return success @@ -1847,8 +1739,7 @@ async def get_unbound_blocklist(self) -> MutableMapping[str, Any]: [ key for key, value in dnsbl_settings.get(attr, {}).items() - if isinstance(value, MutableMapping) - and value.get("selected", 0) == 1 + if isinstance(value, MutableMapping) and value.get("selected", 0) == 1 ] ) else: @@ -1993,10 +1884,7 @@ async def get_wireguard(self) -> MutableMapping[str, Any]: ): match_cl: MutableMapping[str, Any] = {} for cl in server.get("clients", {}): - if ( - isinstance(cl, MutableMapping) - and cl.get("uuid", None) == uid - ): + if isinstance(cl, MutableMapping) and cl.get("uuid", None) == uid: match_cl = cl break if match_cl: @@ -2009,10 +1897,7 @@ async def get_wireguard(self) -> MutableMapping[str, Any]: clients[uid] = client for entry in summary: - if ( - isinstance(entry, MutableMapping) - and entry.get("type", "") == "interface" - ): + if isinstance(entry, MutableMapping) and entry.get("type", "") == "interface": for server in servers.values(): if ( isinstance(server, MutableMapping) @@ -2051,8 +1936,7 @@ async def get_wireguard(self) -> MutableMapping[str, Any]: srv["latest_handshake"] = datetime.fromtimestamp( int(entry.get("latest-handshake", 0)), tz=timezone( - datetime.now().astimezone().utcoffset() - or timedelta() + datetime.now().astimezone().utcoffset() or timedelta() ), ) srv["connected"] = wireguard_is_connected( @@ -2060,14 +1944,10 @@ async def get_wireguard(self) -> MutableMapping[str, Any]: ) if srv["connected"]: client["connected_servers"] += 1 - if client.get( - "latest_handshake", None - ) is None or client.get( + if client.get("latest_handshake", None) is None or client.get( "latest_handshake" ) < srv.get("latest_handshake", 0): - client["latest_handshake"] = srv.get( - "latest_handshake" - ) + client["latest_handshake"] = srv.get("latest_handshake") else: srv["connected"] = False @@ -2100,8 +1980,7 @@ async def get_wireguard(self) -> MutableMapping[str, Any]: clnt["latest_handshake"] = datetime.fromtimestamp( int(entry.get("latest-handshake", 0)), tz=timezone( - datetime.now().astimezone().utcoffset() - or timedelta() + datetime.now().astimezone().utcoffset() or timedelta() ), ) clnt["connected"] = wireguard_is_connected( @@ -2109,14 +1988,10 @@ async def get_wireguard(self) -> MutableMapping[str, Any]: ) if clnt["connected"]: server["connected_clients"] += 1 - if server.get( - "latest_handshake", None - ) is None or server.get( + if server.get("latest_handshake", None) is None or server.get( "latest_handshake" ) < clnt.get("latest_handshake", 0): - server["latest_handshake"] = clnt.get( - "latest_handshake" - ) + server["latest_handshake"] = clnt.get("latest_handshake") else: clnt["connected"] = False @@ -2124,15 +1999,11 @@ async def get_wireguard(self) -> MutableMapping[str, Any]: _LOGGER.debug("[get_wireguard] wireguard: %s", wireguard) return wireguard - async def toggle_vpn_instance( - self, vpn_type: str, clients_servers: str, uuid: str - ) -> bool: + async def toggle_vpn_instance(self, vpn_type: str, clients_servers: str, uuid: str) -> bool: """Toggle the specified VPN instance on or off.""" if vpn_type == "openvpn": success = await self._post(f"/api/openvpn/instances/toggle/{uuid}") - if not isinstance(success, MutableMapping) or not success.get( - "changed", False - ): + if not isinstance(success, MutableMapping) or not success.get("changed", False): return False reconfigure = await self._post("/api/openvpn/service/reconfigure") if isinstance(reconfigure, MutableMapping): @@ -2142,9 +2013,7 @@ async def toggle_vpn_instance( success = await self._post(f"/api/wireguard/client/toggleClient/{uuid}") elif clients_servers == "servers": success = await self._post(f"/api/wireguard/server/toggleServer/{uuid}") - if not isinstance(success, MutableMapping) or not success.get( - "changed", False - ): + if not isinstance(success, MutableMapping) or not success.get("changed", False): return False reconfigure = await self._post("/api/wireguard/service/reconfigure") if isinstance(reconfigure, MutableMapping): @@ -2192,9 +2061,7 @@ async def generate_vouchers(self, data: MutableMapping[str, Any]) -> list: else: servers = await self._get("/api/captiveportal/voucher/listProviders") if not isinstance(servers, list): - raise VoucherServerError( - f"Error getting list of voucher servers: {servers}" - ) + raise VoucherServerError(f"Error getting list of voucher servers: {servers}") if len(servers) == 0: raise VoucherServerError("No voucher servers exist") if len(servers) != 1: @@ -2225,9 +2092,7 @@ async def generate_vouchers(self, data: MutableMapping[str, Any]) -> list: ] for voucher in vouchers: if voucher.get("validity", None): - voucher["validity_str"] = human_friendly_duration( - voucher.get("validity") - ) + voucher["validity_str"] = human_friendly_duration(voucher.get("validity")) if voucher.get("expirytime", None): voucher["expiry_timestamp"] = voucher.get("expirytime") voucher["expirytime"] = datetime.fromtimestamp( @@ -2287,7 +2152,7 @@ async def toggle_alias(self, alias, toggle_on_off) -> bool: payload=payload, ) _LOGGER.debug( - "[toggle_alias] alias: %s, uuid: %s, action: %s, " "url: %s, response: %s", + "[toggle_alias] alias: %s, uuid: %s, action: %s, url: %s, response: %s", alias, uuid, toggle_on_off, @@ -2302,10 +2167,7 @@ async def toggle_alias(self, alias, toggle_on_off) -> bool: return False set_resp = await self._post("/api/firewall/alias/set") - if ( - not isinstance(set_resp, MutableMapping) - or set_resp.get("result") != "saved" - ): + if not isinstance(set_resp, MutableMapping) or set_resp.get("result") != "saved": return False reconfigure_resp = await self._post("/api/firewall/alias/reconfigure") diff --git a/custom_components/opnsense/sensor.py b/custom_components/opnsense/sensor.py index 9d9ffca..ed4e739 100644 --- a/custom_components/opnsense/sensor.py +++ b/custom_components/opnsense/sensor.py @@ -66,9 +66,7 @@ async def _compile_filesystem_sensors( entities: list = [] for filesystem in dict_get(state, "telemetry.filesystems", []) or []: - filesystem_slug: str = slugify_filesystem_mountpoint( - filesystem.get("mountpoint", None) - ) + filesystem_slug: str = slugify_filesystem_mountpoint(filesystem.get("mountpoint", None)) enabled_default = False if filesystem_slug == "root": enabled_default = True @@ -386,9 +384,7 @@ async def _compile_vpn_sensors( properties.append("connected_servers") for prop_name in properties: state_class: SensorStateClass | None = None - native_unit_of_measurement: ( - UnitOfDataRate | UnitOfInformation | None - ) = None + native_unit_of_measurement: UnitOfDataRate | UnitOfInformation | None = None device_class: SensorDeviceClass | None = None enabled_default = False suggested_display_precision = None @@ -449,9 +445,9 @@ async def async_setup_entry( ) -> None: """Set up the OPNsense sensors.""" - coordinator: OPNsenseDataUpdateCoordinator = hass.data[DOMAIN][ - config_entry.entry_id - ][COORDINATOR] + coordinator: OPNsenseDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id][ + COORDINATOR + ] state: MutableMapping[str, Any] = coordinator.data if not isinstance(state, MutableMapping): _LOGGER.error("Missing state data in sensor async_setup_entry") @@ -512,9 +508,7 @@ def __init__( ) -> None: """Initialize the sensor.""" name_suffix: str | None = ( - entity_description.name - if isinstance(entity_description.name, str) - else None + entity_description.name if isinstance(entity_description.name, str) else None ) unique_id_suffix: str | None = ( entity_description.key if isinstance(entity_description.key, str) else None @@ -925,9 +919,7 @@ def _handle_coordinator_update(self) -> None: return sensor_temp_device: str = self.entity_description.key.split(".")[2] temp: MutableMapping[str, Any] = {} - for temp_device, temp_temp in ( - state.get("telemetry", {}).get("temps", {}).items() - ): + for temp_device, temp_temp in state.get("telemetry", {}).get("temps", {}).items(): if temp_device == sensor_temp_device: temp = temp_temp break @@ -981,9 +973,7 @@ def _handle_coordinator_update(self) -> None: try: for ifn, if_descr in lease_interfaces.items(): if_count: int = sum( - 1 - for d in leases.get(ifn, []) - if d.get("address") not in {None, ""} + 1 for d in leases.get(ifn, []) if d.get("address") not in {None, ""} ) lease_counts[if_descr] = f"{if_count} leases" total_lease_count += if_count diff --git a/custom_components/opnsense/services.py b/custom_components/opnsense/services.py index 9e7ded1..f08e996 100644 --- a/custom_components/opnsense/services.py +++ b/custom_components/opnsense/services.py @@ -7,12 +7,7 @@ import voluptuous as vol -from homeassistant.core import ( - HomeAssistant, - ServiceCall, - ServiceResponse, - SupportsResponse, -) +from homeassistant.core import HomeAssistant, ServiceCall, ServiceResponse, SupportsResponse from homeassistant.exceptions import HomeAssistantError, ServiceValidationError from homeassistant.helpers import ( config_validation as cv, @@ -213,7 +208,11 @@ async def async_setup_services(hass: HomeAssistant) -> None: { vol.Required("alias"): vol.Any(cv.string), vol.Required("toggle_on_off", default="toggle"): vol.In( - {"toggle": "Toggle", "on": "On", "off": "Off"} + { + "toggle": "Toggle", + "on": "On", + "off": "Off", + } ), vol.Optional("device_id"): vol.Any(cv.string), vol.Optional("entity_id"): vol.Any(cv.string), @@ -435,14 +434,10 @@ async def _service_reload_interface(hass: HomeAssistant, call: ServiceCall) -> N if success is None or success: success = response if success is None or not success: - raise ServiceValidationError( - f"Reload Interface Failed: {call.data.get('interface')}" - ) + raise ServiceValidationError(f"Reload Interface Failed: {call.data.get('interface')}") -async def _service_generate_vouchers( - hass: HomeAssistant, call: ServiceCall -) -> ServiceResponse: +async def _service_generate_vouchers(hass: HomeAssistant, call: ServiceCall) -> ServiceResponse: clients: list = await _get_clients( hass=hass, opndevice_id=call.data.get("device_id", []), @@ -454,9 +449,7 @@ async def _service_generate_vouchers( vouchers: list = await client.generate_vouchers(call.data) except VoucherServerError as e: _LOGGER.error("Error getting vouchers from %s. %s", client.name, e) - raise ServiceValidationError( - f"Error getting vouchers from {client.name}. {e}" - ) from e + raise ServiceValidationError(f"Error getting vouchers from {client.name}. {e}") from e _LOGGER.debug( "[service_generate_vouchers] client: %s, data: %s, vouchers: %s", client.name, @@ -476,9 +469,7 @@ async def _service_generate_vouchers( return final_vouchers -async def _service_kill_states( - hass: HomeAssistant, call: ServiceCall -) -> ServiceResponse: +async def _service_kill_states(hass: HomeAssistant, call: ServiceCall) -> ServiceResponse: clients: list = await _get_clients( hass=hass, opndevice_id=call.data.get("device_id", []), @@ -487,9 +478,7 @@ async def _service_kill_states( success: bool | None = None response_list: list = [] for client in clients: - response: MutableMapping[str, Any] = await client.kill_states( - call.data.get("ip_addr") - ) + response: MutableMapping[str, Any] = await client.kill_states(call.data.get("ip_addr")) _LOGGER.debug( "[service_kill_states] client: %s, ip_addr: %s, response: %s", client.name, @@ -522,9 +511,7 @@ async def _service_toggle_alias(hass: HomeAssistant, call: ServiceCall) -> None: ) success: bool | None = None for client in clients: - response = await client.toggle_alias( - call.data.get("alias"), call.data.get("toggle_on_off") - ) + response = await client.toggle_alias(call.data.get("alias"), call.data.get("toggle_on_off")) _LOGGER.debug( "[service_toggle_alias] client: %s, alias: %s, response: %s", client.name, diff --git a/custom_components/opnsense/switch.py b/custom_components/opnsense/switch.py index 36e0771..a2be2c2 100644 --- a/custom_components/opnsense/switch.py +++ b/custom_components/opnsense/switch.py @@ -6,11 +6,7 @@ import traceback from typing import Any -from homeassistant.components.switch import ( - SwitchDeviceClass, - SwitchEntity, - SwitchEntityDescription, -) +from homeassistant.components.switch import SwitchDeviceClass, SwitchEntity, SwitchEntityDescription from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant, callback from homeassistant.helpers import entity_platform @@ -34,9 +30,7 @@ async def _compile_filter_switches( coordinator: OPNsenseDataUpdateCoordinator, state: MutableMapping[str, Any], ) -> list: - if not isinstance(state, MutableMapping) or not isinstance( - state.get("config"), MutableMapping - ): + if not isinstance(state, MutableMapping) or not isinstance(state.get("config"), MutableMapping): return [] entities: list = [] # filter rules @@ -81,9 +75,7 @@ async def _compile_port_forward_switches( coordinator: OPNsenseDataUpdateCoordinator, state: MutableMapping[str, Any], ) -> list: - if not isinstance(state, MutableMapping) or not isinstance( - state.get("config"), MutableMapping - ): + if not isinstance(state, MutableMapping) or not isinstance(state.get("config"), MutableMapping): return [] entities: list = [] @@ -121,9 +113,7 @@ async def _compile_nat_outbound_switches( coordinator: OPNsenseDataUpdateCoordinator, state: MutableMapping[str, Any], ) -> list: - if not isinstance(state, MutableMapping) or not isinstance( - state.get("config"), MutableMapping - ): + if not isinstance(state, MutableMapping) or not isinstance(state.get("config"), MutableMapping): return [] entities: list = [] # nat outbound rules @@ -164,9 +154,7 @@ async def _compile_service_switches( coordinator: OPNsenseDataUpdateCoordinator, state: MutableMapping[str, Any], ) -> list: - if not isinstance(state, MutableMapping) or not isinstance( - state.get("services"), list - ): + if not isinstance(state, MutableMapping) or not isinstance(state.get("services"), list): return [] entities: list = [] @@ -201,9 +189,7 @@ async def _compile_vpn_switches( for clients_servers in ("clients", "servers"): if not isinstance(state, MutableMapping): return [] - for uuid, instance in ( - state.get(vpn_type, {}).get(clients_servers, {}).items() - ): + for uuid, instance in state.get(vpn_type, {}).get(clients_servers, {}).items(): if ( not isinstance(instance, MutableMapping) or instance.get("enabled", None) is None @@ -215,7 +201,7 @@ async def _compile_vpn_switches( coordinator=coordinator, entity_description=SwitchEntityDescription( key=f"{vpn_type}.{clients_servers}.{uuid}", - name=f"{"OpenVPN" if vpn_type == "openvpn" else vpn_type.title()} {clients_servers.title().rstrip('s')} {instance['name']}", + name=f"{'OpenVPN' if vpn_type == 'openvpn' else vpn_type.title()} {clients_servers.title().rstrip('s')} {instance['name']}", icon="mdi:folder-key-network-outline", # entity_category=ENTITY_CATEGORY_CONFIG, device_class=SwitchDeviceClass.SWITCH, @@ -257,9 +243,9 @@ async def async_setup_entry( async_add_entities: entity_platform.AddEntitiesCallback, ) -> None: """Set up the OPNsense switches.""" - coordinator: OPNsenseDataUpdateCoordinator = hass.data[DOMAIN][ - config_entry.entry_id - ][COORDINATOR] + coordinator: OPNsenseDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id][ + COORDINATOR + ] state: MutableMapping[str, Any] = coordinator.data if not isinstance(state, MutableMapping): _LOGGER.error("Missing state data in switch async_setup_entry") @@ -300,9 +286,7 @@ def __init__( ) -> None: """Initialize OPNsense Switch entities.""" name_suffix: str | None = ( - entity_description.name - if isinstance(entity_description.name, str) - else None + entity_description.name if isinstance(entity_description.name, str) else None ) unique_id_suffix: str | None = ( entity_description.key if isinstance(entity_description.key, str) else None @@ -426,12 +410,7 @@ def _opnsense_get_rule(self): if self._rule_type == ATTR_NAT_PORT_FORWARD: rules = state.get("config", {}).get("nat", {}).get("rule", []) if self._rule_type == ATTR_NAT_OUTBOUND: - rules = ( - state.get("config", {}) - .get("nat", {}) - .get("outbound", {}) - .get("rule", []) - ) + rules = state.get("config", {}).get("nat", {}).get("outbound", {}).get("rule", []) for rule in rules: if dict_get(rule, "created.time") == self._tracker: @@ -536,9 +515,7 @@ def _handle_coordinator_update(self) -> None: self._available = True self._attr_extra_state_attributes = {} for attr in ("id", "name"): - self._attr_extra_state_attributes[f"service_{attr}"] = self._service.get( - attr, None - ) + self._attr_extra_state_attributes[f"service_{attr}"] = self._service.get(attr, None) self.async_write_ha_state() # _LOGGER.debug(f"[OPNsenseServiceSwitch handle_coordinator_update] Name: {self.name}, available: {self.available}, is_on: {self.is_on}, extra_state_attributes: {self.extra_state_attributes}") @@ -643,9 +620,7 @@ def _handle_coordinator_update(self) -> None: self.async_write_ha_state() return instance: MutableMapping[str, Any] = ( - state.get(self._vpn_type, {}) - .get(self._clients_servers, {}) - .get(self._uuid, {}) + state.get(self._vpn_type, {}).get(self._clients_servers, {}).get(self._uuid, {}) ) if not isinstance(instance, MutableMapping): self._available = False diff --git a/custom_components/opnsense/update.py b/custom_components/opnsense/update.py index f265b00..becdb95 100644 --- a/custom_components/opnsense/update.py +++ b/custom_components/opnsense/update.py @@ -5,11 +5,7 @@ import logging from typing import Any -from homeassistant.components.update import ( - UpdateDeviceClass, - UpdateEntity, - UpdateEntityDescription, -) +from homeassistant.components.update import UpdateDeviceClass, UpdateEntity, UpdateEntityDescription from homeassistant.components.update.const import UpdateEntityFeature from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant, callback @@ -31,9 +27,9 @@ async def async_setup_entry( async_add_entities: entity_platform.AddEntitiesCallback, ) -> None: """Set up the OPNsense update entities.""" - coordinator: OPNsenseDataUpdateCoordinator = hass.data[DOMAIN][ - config_entry.entry_id - ][COORDINATOR] + coordinator: OPNsenseDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id][ + COORDINATOR + ] entities: list = [] entity = OPNsenseFirmwareUpdatesAvailableUpdate( config_entry=config_entry, @@ -63,9 +59,7 @@ def __init__( """Initialize update entity.""" name_suffix: str | None = ( - entity_description.name - if isinstance(entity_description.name, str) - else None + entity_description.name if isinstance(entity_description.name, str) else None ) unique_id_suffix: str | None = ( entity_description.key if isinstance(entity_description.key, str) else None @@ -116,12 +110,8 @@ def _handle_coordinator_update(self) -> None: self._attr_installed_version = None try: - product_version = dict_get( - state, "firmware_update_info.product.product_version" - ) - product_latest = dict_get( - state, "firmware_update_info.product.product_latest" - ) + product_version = dict_get(state, "firmware_update_info.product.product_version") + product_latest = dict_get(state, "firmware_update_info.product.product_latest") if product_version is None or product_latest is None: self._attr_latest_version = None @@ -132,9 +122,7 @@ def _handle_coordinator_update(self) -> None: product_latest = f"{product_latest}+" if dict_get(state, "firmware_update_info.status") == "upgrade": - product_latest = dict_get( - state, "firmware_update_info.upgrade_major_version" - ) + product_latest = dict_get(state, "firmware_update_info.upgrade_major_version") self._attr_latest_version = product_latest except (TypeError, KeyError, AttributeError): @@ -147,18 +135,10 @@ def _handle_coordinator_update(self) -> None: summary: str | None = None try: if dict_get(state, "firmware_update_info.status") == "update": - product_name = dict_get( - state, "firmware_update_info.product.product_name" - ) - product_nickname = dict_get( - state, "firmware_update_info.product.product_nickname" - ) - product_version = dict_get( - state, "firmware_update_info.product.product_version" - ) - product_latest = dict_get( - state, "firmware_update_info.product.product_latest" - ) + product_name = dict_get(state, "firmware_update_info.product.product_name") + product_nickname = dict_get(state, "firmware_update_info.product.product_nickname") + product_version = dict_get(state, "firmware_update_info.product.product_version") + product_latest = dict_get(state, "firmware_update_info.product.product_latest") status_msg = dict_get(state, "firmware_update_info.status_msg") needs_reboot: bool = ( @@ -168,9 +148,7 @@ def _handle_coordinator_update(self) -> None: ) total_package_count: int = len( - ( - dict_get(state, "firmware_update_info.all_packages", {}) or {} - ).keys() + (dict_get(state, "firmware_update_info.all_packages", {}) or {}).keys() ) new_package_count: int = len( dict_get(state, "firmware_update_info.new_packages", []) or [] @@ -198,12 +176,8 @@ def _handle_coordinator_update(self) -> None: - upgraded packages: {upgrade_package_count} """ if dict_get(state, "firmware_update_info.status") == "upgrade": - product_name = dict_get( - state, "firmware_update_info.product.product_name" - ) - product_version = dict_get( - state, "firmware_update_info.upgrade_major_version" - ) + product_name = dict_get(state, "firmware_update_info.product.product_name") + product_version = dict_get(state, "firmware_update_info.upgrade_major_version") status_msg = dict_get(state, "firmware_update_info.status_msg") upgrade_needs_reboot: bool = ( diff --git a/pyproject.toml b/pyproject.toml index de875ad..0a565ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,15 +50,18 @@ timeout = 30 addopts = "-rA" [tool.ruff] +line-length = 100 +indent-width = 4 fix = true force-exclude = true +target-version = "py313" required-version = ">=0.8.0" [tool.ruff.format] quote-style = "double" indent-style = "space" skip-magic-trailing-comma = false -line-ending = "auto" +line-ending = "lf" docstring-code-format = true docstring-code-line-length = "dynamic" @@ -284,37 +287,3 @@ max-complexity = 25 [tool.ruff.lint.pydocstyle] property-decorators = ["propcache.cached_property"] - -[tool.tox.gh-actions] -python = """ - 3.11: py311 - 3.12: py312 - 3.13: py313, lint, mypy -""" - -[tool.tox] -skipsdist = true -requires = ["tox>=4.19"] -env_list = ["py313", "lint", "mypy"] -skip_missing_interpreters = true - -[tool.tox.env_run_base] -description = "Run pytest under {base_python}" -commands = [[ "pytest", "tests", { replace = "posargs", extend = true} ]] -deps = ["-rrequirements.txt"] -ignore_errors = true - -[tool.tox.env.lint] -description = "Lint code using ruff under {base_python}" -ignore_errors = true -commands = [ - ["ruff", "check", "custom_components{/}"], - ["ruff", "check", "tests{/}"], -] -deps = ["-rrequirements.txt"] - -[tool.tox.env.mypy] -description = "Run mypy for type-checking under {base_python}" -ignore_errors = true -commands = [["mypy", "custom_components{/}opnsense"]] -deps = ["-rrequirements.txt"]