diff --git a/Dockerfile b/Dockerfile index ca10c00..89806a2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Use the official Python image as the base image -FROM python:3.12.3-slim +FROM python:3.12.6-slim # Set the working directory inside the container WORKDIR /app/kasa_collector diff --git a/README.md b/README.md index 2b3f3a1..f02ee12 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ + + ## About The Project
@@ -20,7 +22,7 @@ The project uses a Docker container that may be configured to accept different s ## Supported Kasa Smart Plugs -This project currently supports collecting data from the Kasa [KP115](https://www.kasasmart.com/us/products/smart-plugs/kasa-smart-plug-slim-energy-monitoring-kp115) smart plug device and Kasa [HS300](https://www.kasasmart.com/us/products/smart-plugs/kasa-smart-wi-fi-power-strip-hs300) power strip. These are the two devices that I have - others may be supported based on the "energy" tag found during device disovery. +This project currently supports collecting data from the Kasa [KP115](https://www.kasasmart.com/us/products/smart-plugs/kasa-smart-plug-slim-energy-monitoring-kp115) smart plug device, Kasa [HS300](https://www.kasasmart.com/us/products/smart-plugs/kasa-smart-wi-fi-power-strip-hs300) power strip, and the Kasa [KP125M](https://www.tp-link.com/us/home-networking/smart-plug/kp125m/). I have personally tested these three devices - others may be supported based on the "energy" tag found during device discovery. ## TPLink Smartplug Open Source Project @@ -31,7 +33,7 @@ This project uses the python-kasa module to discover and connect to Kasa devices Use the following [Docker container](https://hub.docker.com/r/lux4rd0/kasa-collector): ```plaintext -lux4rd0/kasa-collector:2.0.06 +lux4rd0/kasa-collector:2.0.07 lux4rd0/kasa-collector:latest ``` @@ -50,6 +52,10 @@ services: KASA_COLLECTOR_INFLUXDB_ORG: Lux4rd0 KASA_COLLECTOR_INFLUXDB_TOKEN: TOKEN KASA_COLLECTOR_INFLUXDB_URL: http://influxdb.lux4rd0.com:8086 + KASA_COLLECTOR_TPLINK_USERNAME: user@example.com # Optional + KASA_COLLECTOR_TPLINK_PASSWORD: yourpassword # Optional + KASA_COLLECTOR_DEVICE_HOSTS: "10.50.0.101,10.50.0.102" # Optional + KASA_COLLECTOR_ENABLE_AUTO_DISCOVERY: false # Optional TZ: America/Chicago image: lux4rd0/kasa-collector:latest network_mode: host @@ -65,6 +71,10 @@ docker run -d \ -e KASA_COLLECTOR_INFLUXDB_ORG=Lux4rd0 \ -e KASA_COLLECTOR_INFLUXDB_TOKEN=TOKEN \ -e KASA_COLLECTOR_INFLUXDB_URL=http://influxdb.lux4rd0.com:8086 \ + -e KASA_COLLECTOR_TPLINK_USERNAME=user@example.com \ # Optional + -e KASA_COLLECTOR_TPLINK_PASSWORD=yourpassword \ # Optional + -e KASA_COLLECTOR_DEVICE_HOSTS="10.50.0.101,10.50.0.102" \ # Optional + -e KASA_COLLECTOR_ENABLE_AUTO_DISCOVERY=false \ # Optional -e TZ=America/Chicago \ --restart always \ --network host \ @@ -75,111 +85,155 @@ Be sure to change your InfluxDB details and timezone in the environmental variab Running `docker-compose up -d` or the `docker run` command will download and start up the kasa-collector container. -## Environmental Flags +## How It Works -Kasa Collector may be configured with additional environment flags to control its behaviors. They are described below: +The Kasa Collector is designed to automate data collection from Kasa Smart Plugs, providing a streamlined approach to gathering, storing, and visualizing device information. It supports both automatic device discovery and manual device configuration, allowing for flexible integration in various environments. Below is a high-level overview of its capabilities: -`KASA_COLLECTOR_DATA_FETCH_INTERVAL` - OPTIONAL +### Automatic Device Discovery -How frequently does the Collector poll your devices to collect measurements in seconds? Defaults to 15 (seconds) if it's not set. +The Kasa Collector can automatically discover compatible Kasa devices on your network without requiring manual configuration. By default, the collector sends discovery packets regularly (`KASA_COLLECTOR_DEVICE_DISCOVERY_INTERVAL`) and identifies devices supporting energy monitoring features. The `KASA_COLLECTOR_ENABLE_AUTO_DISCOVERY` environment variable controls this behavior: -- integer (in seconds) +- **To Enable Auto-Discovery:** Ensure that `KASA_COLLECTOR_ENABLE_AUTO_DISCOVERY` is set to `true` (default behavior). +- **To Disable Auto-Discovery:** Set `KASA_COLLECTOR_ENABLE_AUTO_DISCOVERY` to `false`. This prevents automatic discovery and only uses manually specified devices. -`KASA_COLLECTOR_DEVICE_DISCOVERY_INTERVAL` - OPTIONAL +### Manual Device Configuration -How frequently the Collector discovers devices in seconds. Defaults to 300 (seconds) if it's not set. +In cases where devices are not automatically discovered or need specific attention, you can manually specify device IPs or hostnames using the `KASA_COLLECTOR_DEVICE_HOSTS` variable. This variable accepts a comma-separated list of device IPs/hostnames. Manually added devices take precedence over auto-discovered devices and are continuously monitored. -- integer (in seconds) +- **Example:** `KASA_COLLECTOR_DEVICE_HOSTS="10.50.0.101,10.50.0.102"` -`KASA_COLLECTOR_DISCOVERY_TIMEOUT` - OPTIONAL +### TP-Link Account Configuration -Timeout for discovering devices in seconds. Defaults to 5 (seconds) if it's not set. +For newer Kasa devices that require TP-Link account authentication, you can provide your account credentials using the `KASA_COLLECTOR_TPLINK_USERNAME` and `KASA_COLLECTOR_TPLINK_PASSWORD` variables. These credentials enable the collector to authenticate with devices linked to your TP-Link account. -- integer (in seconds) +- **Example Configuration:** + ```yaml + KASA_COLLECTOR_TPLINK_USERNAME: user@example.com + KASA_COLLECTOR_TPLINK_PASSWORD: yourpassword + ``` -`KASA_COLLECTOR_DISCOVERY_PACKETS` - OPTIONAL +When configured, the collector will attempt to authenticate and control these devices, providing extended functionality for devices that need TP-Link cloud authentication. -Number of packets sent for device discovery. Defaults to 3 if it's not set. +## Environmental Flags -- integer +Kasa Collector may be configured with additional environment flags to control its behaviors. They are described below: -`KASA_COLLECTOR_FETCH_MAX_RETRIES` - OPTIONAL +### Required Variables -Maximum number of retries for fetching data. Defaults to 3 if it's not set. +`KASA_COLLECTOR_INFLUXDB_URL` +The URL of the InfluxDB instance. -- integer +- Example: `http://influxdb.lux4rd0.com:8086` -`KASA_COLLECTOR_FETCH_RETRY_DELAY` - OPTIONAL +`KASA_COLLECTOR_INFLUXDB_TOKEN` +The token for the InfluxDB instance. -The delay between retries for fetching data in seconds. If it's not set, it defaults to 1 (second). +- Example: `your-influxdb-token` -- integer (in seconds) +`KASA_COLLECTOR_INFLUXDB_ORG` +The organization for the InfluxDB instance. -`KASA_COLLECTOR_SYSINFO_FETCH_INTERVAL` - OPTIONAL +- Example: `Lux4rd0` -How frequently the Collector gathers system information in seconds. Defaults to 60 (seconds) if it's not set. +`KASA_COLLECTOR_INFLUXDB_BUCKET` +The bucket for the InfluxDB instance. -- integer (in seconds) +- Example: `kasa` -`KASA_COLLECTOR_WRITE_TO_FILE` - OPTIONAL +### Optional Variables -Indicates whether to write data to JSON files. Defaults to False. +`KASA_COLLECTOR_DATA_FETCH_INTERVAL` +How frequently the Collector polls your devices to collect measurements in seconds. Defaults to `15` (seconds) if not set. -- true -- false +- Example: `15` +- Type: Integer (seconds) -`KASA_COLLECTOR_KEEP_MISSING_DEVICES` - OPTIONAL +`KASA_COLLECTOR_DEVICE_DISCOVERY_INTERVAL` +How frequently the Collector discovers devices in seconds. Defaults to `300` (seconds) if not set. -Indicates whether to keep missing devices in the collection. Defaults to True. +- Example: `300` +- Type: Integer (seconds) -- true -- false +`KASA_COLLECTOR_DISCOVERY_TIMEOUT` +Timeout for discovering devices in seconds. Defaults to `5` (seconds) if not set. -`KASA_COLLECTOR_INFLUXDB_URL` - REQUIRED +- Example: `5` +- Type: Integer (seconds) -The URL of the InfluxDB instance. +`KASA_COLLECTOR_DISCOVERY_PACKETS` +Number of packets sent for device discovery. Defaults to `3` if not set. -`KASA_COLLECTOR_INFLUXDB_TOKEN` - REQUIRED +- Example: `3` +- Type: Integer -The token for the InfluxDB instance. +`KASA_COLLECTOR_FETCH_MAX_RETRIES` +Maximum number of retries for fetching data. Defaults to `3` if not set. -`KASA_COLLECTOR_INFLUXDB_ORG` - REQUIRED +- Example: `3` +- Type: Integer -The organization for the InfluxDB instance. +`KASA_COLLECTOR_FETCH_RETRY_DELAY` +The delay between retries for fetching data in seconds. Defaults to `1` (second) if not set. -`KASA_COLLECTOR_INFLUXDB_BUCKET` - REQUIRED +- Example: `1` +- Type: Integer (seconds) -The bucket for the InfluxDB instance. +`KASA_COLLECTOR_SYSINFO_FETCH_INTERVAL` +How frequently the Collector gathers system information in seconds. Defaults to `60` (seconds) if not set. -`KASA_COLLECTOR_LOG_LEVEL_KASA_API` - OPTIONAL +- Example: `60` +- Type: Integer (seconds) -Log level for Kasa API. Defaults to "INFO". +`KASA_COLLECTOR_WRITE_TO_FILE` +Indicates whether to write data to JSON files. Defaults to `false` if not set. -- DEBUG -- INFO -- WARNING -- ERROR -- CRITICAL +- Example: `true` or `false` +- Type: Boolean -`KASA_COLLECTOR_LOG_LEVEL_INFLUXDB_STORAGE` - OPTIONAL +`KASA_COLLECTOR_KEEP_MISSING_DEVICES` +Indicates whether to keep missing devices in the collection. Defaults to `true` if not set. -Log level for InfluxDB Storage. Defaults to "INFO". +- Example: `true` or `false` +- Type: Boolean -- DEBUG -- INFO -- WARNING -- ERROR -- CRITICAL +`KASA_COLLECTOR_ENABLE_AUTO_DISCOVERY` +Enables or disables automatic device discovery. Defaults to `true` if not set. -`KASA_COLLECTOR_LOG_LEVEL_KASA_COLLECTOR` - OPTIONAL +- Example: `true` or `false` +- Type: Boolean -Log level for Kasa Collector. Defaults to "INFO". +`KASA_COLLECTOR_TPLINK_USERNAME` +Your TP-Link Kasa account username (email) is used to authenticate with devices. If set, manual device control is enabled. -- DEBUG -- INFO -- WARNING -- ERROR -- CRITICAL +- Example: `user@example.com` +- Type: String + +`KASA_COLLECTOR_TPLINK_PASSWORD` +Your TP-Link Kasa account password for authenticating with devices. Required if `KASA_COLLECTOR_TPLINK_USERNAME` is set. + +- Example: `yourpassword` +- Type: String + +`KASA_COLLECTOR_DEVICE_HOSTS` +Comma-separated list of IP addresses or hostnames of specific Kasa devices to monitor manually. + +- Example: `"10.50.0.101,10.50.0.102"` +- Type: String + +`KASA_COLLECTOR_LOG_LEVEL_KASA_API` +Log level for Kasa API. Defaults to `INFO`. + +- Example: `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL` + +`KASA_COLLECTOR_LOG_LEVEL_INFLUXDB_STORAGE` +Log level for InfluxDB Storage. Defaults to `INFO`. + +- Example: `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL` + +`KASA_COLLECTOR_LOG_LEVEL_KASA_COLLECTOR` +Log level for Kasa Collector. Defaults to `INFO`. + +- Example: `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL` ## Collector Details @@ -192,7 +246,7 @@ Kasa Collector is the primary data collector and is responsible for gathering de * Power (milliwatts) * Total Watt Hours -Additional details like Wifi RSSI signal strength, device and plug names, and device details are also collected. +Additional details, such as Wi-Fi RSSI signal strength, device and plug names, and device details, are also collected. Not all devices provide all the details. ## Grafana Dashboards @@ -224,7 +278,7 @@ This collector uses InfluxQL, and for the dashboards to function, you need to cr https://docs.influxdata.com/influxdb/v2/tools/grafana/?t=InfluxQL#configure-your-influxdb-connection -The biggest change here is: +The most significant change here is: - Configure InfluxDB authentication: @@ -246,7 +300,6 @@ Sometimes, you'll see the following error message from the Kasa Collector: ```plaintext kasa_request_info: malformed JSON, retrying ``` - This is because some devices that respond to the collector might be malformed data. The collector will try again until a good response is received. ## Roadmap @@ -255,7 +308,7 @@ See the open issues for a list of proposed features (and known issues). ## Contact -Dave Schmid: [@lux4rd0](https://twitter.com/lux4rd0) - dave@pulpfree.org +Dave Schmid: dave@pulpfree.org Project Link: https://github.com/lux4rd0/kasa-collector @@ -264,4 +317,3 @@ Project Link: https://github.com/lux4rd0/kasa-collector - Grafana Labs - [https://grafana.com/](https://grafana.com/) - Grafana - [https://grafana.com/oss/grafana/](https://grafana.com/oss/grafana/) - Grafana Dashboard Community - [https://grafana.com/grafana/dashboards/](https://grafana.com/grafana/dashboards/) - diff --git a/requirements.txt b/requirements.txt index d3a37f6..a1ff866 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ aiofiles==24.1.0 -influxdb_client==1.40.0 -python-kasa==0.7.0.1 +influxdb_client==1.46.0 +python-kasa==0.7.4 diff --git a/src/config.py b/src/config.py index 15fd3e5..51799e8 100644 --- a/src/config.py +++ b/src/config.py @@ -1,6 +1,5 @@ import os - class Config: # Whether to write data to file. Expected values are "true" or "false". # Default is "False". @@ -88,3 +87,15 @@ class Config: KASA_COLLECTOR_LOG_LEVEL_KASA_COLLECTOR = os.getenv( "KASA_COLLECTOR_LOG_LEVEL_KASA_COLLECTOR", "INFO" ).upper() + + # Comma-separated list of device hosts (IPs) for manual configuration. Default is None. + KASA_COLLECTOR_DEVICE_HOSTS = os.getenv("KASA_COLLECTOR_DEVICE_HOSTS", None) + + # TP-Link account credentials for devices that require login. Default is None. + KASA_COLLECTOR_TPLINK_USERNAME = os.getenv("KASA_COLLECTOR_TPLINK_USERNAME", None) + KASA_COLLECTOR_TPLINK_PASSWORD = os.getenv("KASA_COLLECTOR_TPLINK_PASSWORD", None) + + # Flag to enable/disable auto-discovery. Default is True. + KASA_COLLECTOR_ENABLE_AUTO_DISCOVERY = ( + os.getenv("KASA_COLLECTOR_ENABLE_AUTO_DISCOVERY", "True").lower() == "true" + ) diff --git a/src/holder.txt b/src/holder.txt deleted file mode 100644 index 8b13789..0000000 --- a/src/holder.txt +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/influxdb_storage.py b/src/influxdb_storage.py index ab311d9..8e0d2f7 100644 --- a/src/influxdb_storage.py +++ b/src/influxdb_storage.py @@ -53,7 +53,7 @@ async def write_data(self, measurement, data, tags=None): point = point.tag(k, v) self.write_api.write(bucket=self.bucket, org=self.client.org, record=point) - self.logger.info( + self.logger.debug( f"Wrote data to InfluxDB: {measurement}, Tags: {tags}, Data: {data}" ) @@ -68,7 +68,7 @@ async def write_to_json(self, data, output_dir): filename = f"{output_dir}/{datetime.utcnow().isoformat()}.json" async with aiofiles.open(filename, "w") as f: await f.write(json.dumps(data, indent=4)) - self.logger.info(f"Wrote data to JSON file: {filename}") + self.logger.debug(f"Wrote data to JSON file: {filename}") def _dns_lookup(self, ip): """ @@ -230,18 +230,37 @@ async def process_sysinfo_data(self, device_data): try: points = [] for ip, data in device_data.items(): - sysinfo = data.get("sysinfo", {}) - alias = sysinfo.get("alias", "unknown") - dns_name = self._dns_lookup(ip) + # Normalize sysinfo data before processing + normalized_sysinfo = self.normalize_sysinfo(data.get("sysinfo", {})) + alias = ( + data.get("device_alias") or data.get("alias") or ip + ) # Check for device_alias, alias, then fallback to IP + self.logger.debug( + f"Processing sysinfo for IP: {ip}, Alias: {alias}, Hostname: {data.get('dns_name')}" + ) + + # Log the full normalized sysinfo data being processed + self.logger.debug(f"Full sysinfo data: {normalized_sysinfo}") - for key, value in sysinfo.items(): - point = Point("sysinfo").tag("ip", ip).tag("dns_name", dns_name) + for key, value in normalized_sysinfo.items(): + # Logging the individual field values + self.logger.debug(f"Processing key: {key}, value: {value}") + + # Create the InfluxDB point + point = ( + Point("sysinfo") + .tag("ip", ip) + .tag("dns_name", data.get("dns_name")) + .tag("device_alias", alias) + ) point = point.field(key, self._format_value(value)) - point = point.tag("device_alias", alias) + self.logger.debug( + f"Created InfluxDB point for key: {key} with alias: {alias}, value: {value}" + ) point = point.time(datetime.now(timezone.utc)) points.append(point) - self.logger.debug(f"Processing sysinfo data for InfluxDB: {points}") + self.logger.debug(f"Collected points for InfluxDB: {points}") await self.send_to_influxdb(points) await self._write_to_file( Config.KASA_COLLECTOR_SYSINFO_OUTPUT_FILE, device_data @@ -250,6 +269,34 @@ async def process_sysinfo_data(self, device_data): except Exception as e: self.logger.error(f"Error processing sysinfo data for InfluxDB: {e}") + def normalize_sysinfo(self, sysinfo): + """ + Normalize sysinfo data to standardize fields and handle variations. + + This method addresses variations in the sysinfo fields across different device models. + For example, some devices report 'sw_ver' (software version) while others report 'fw_ver' (firmware version). + This normalization ensures that data is stored consistently in InfluxDB. + + Parameters: + - sysinfo (dict): The original sysinfo data from the device. + + Returns: + - dict: The normalized sysinfo data. + """ + normalized = {} + for key, value in sysinfo.items(): + # Merge 'fw_ver' into 'sw_ver' + if key == "fw_ver": + normalized["sw_ver"] = value + self.logger.debug( + f"Normalized 'fw_ver' to 'sw_ver' with value: {value}" + ) + else: + normalized[key] = value + + # Return the normalized dictionary + return normalized + async def send_to_influxdb(self, points): """ Send data points to InfluxDB. diff --git a/src/kasa_api.py b/src/kasa_api.py index 5c477ac..6ac3019 100644 --- a/src/kasa_api.py +++ b/src/kasa_api.py @@ -1,5 +1,5 @@ import asyncio -from kasa import Discover, SmartStrip +from kasa import Discover, SmartDevice, DeviceConfig import socket import logging from config import Config @@ -27,15 +27,69 @@ async def discover_devices(): discovery_timeout=discovery_timeout, discovery_packets=discovery_packets ) logger.info(f"Discovered {len(devices)} devices") + # Filter and return only devices that have emeter functionality return {ip: device for ip, device in devices.items() if device.has_emeter} + @staticmethod + async def get_device(ip_or_hostname, username=None, password=None): + """ + Get a Kasa device by IP address or hostname. Attempt to use credentials if provided. + + Parameters: + - ip_or_hostname (str): IP address or hostname of the device. + - username (str): Username for devices that require authentication. + - password (str): Password for devices that require authentication. + + Returns: + - device (SmartDevice): The initialized device, with authentication if applicable. + """ + try: + # Resolve hostname to IP if necessary + ip = socket.gethostbyname(ip_or_hostname) + except socket.gaierror: + logger.error(f"Failed to resolve hostname: {ip_or_hostname}") + raise + + try: + # Discover the device + if username and password: + # Discover with credentials for devices that require authentication + device = await Discover.discover_single( + ip, username=username, password=password + ) + logger.info( + f"Discovered and authenticated device: {device.alias if device.alias else device.model} (IP: {ip})" + ) + else: + # Discover without credentials for devices that do not require authentication + device = await Discover.discover_single(ip) + logger.info( + f"Discovered device: {device.alias if device.alias else device.model} (IP: {ip})" + ) + except Exception as e: + logger.error(f"Failed to discover or authenticate device {ip}: {e}") + raise + + # Check capabilities after discovery + if not device.has_emeter and not hasattr(device, "sys_info"): + logger.warning( + f"Device {ip} does not support emeter or sysinfo capabilities." + ) + + return device + @staticmethod async def fetch_emeter_data(device): """ Fetch emeter data from a device. """ await device.update() - logger.info(f"Fetched emeter data for device {device.alias}") + if not device.has_emeter: + logger.warning( + f"Device {device.model} does not support emeter functionality." + ) + return {} + logger.info(f"Fetched emeter data for device {device.model}") return device.emeter_realtime @staticmethod @@ -44,7 +98,12 @@ async def fetch_sysinfo(device): Fetch system information from a device. """ await device.update() - logger.info(f"Fetched sysinfo for device {device.alias}") + if not hasattr(device, "sys_info"): + logger.warning( + f"Device {device.model} does not support sysinfo functionality." + ) + return {} + logger.info(f"Fetched sysinfo for device {device.model}") return device.sys_info @staticmethod @@ -53,8 +112,13 @@ async def fetch_device_data(device): Fetch both emeter and system information from a device. """ await device.update() - logger.info(f"Fetched device data for {device.alias}") - return {"emeter": device.emeter_realtime, "sys_info": device.sys_info} + logger.info(f"Fetched device data for {device.model}") + data = {} + if device.has_emeter: + data["emeter"] = device.emeter_realtime + if hasattr(device, "sys_info"): + data["sys_info"] = device.sys_info + return data @staticmethod async def _async_dns_lookup(ip): @@ -62,22 +126,39 @@ async def _async_dns_lookup(ip): Perform an asynchronous DNS lookup to get the hostname for an IP address. """ loop = asyncio.get_event_loop() - return await loop.getnameinfo((ip, 0), socket.NI_NAMEREQD) + try: + return await loop.getnameinfo((ip, 0), socket.NI_NAMEREQD) + except Exception as e: + logger.warning(f"DNS lookup failed for {ip}: {e}") + return "unknown" @staticmethod async def get_device_info(device): """ - Get the IP address, alias, and DNS name of a device. + Get the IP address, alias, and DNS name of a device, with fallback options. + + Parameters: + - device (SmartDevice): The device to fetch information from. + + Returns: + - dict: Dictionary containing device information. """ ip = device.host + try: + # Attempt DNS lookup dns_name = await KasaAPI._async_dns_lookup(ip) except Exception as e: logger.warning(f"DNS lookup failed for {ip}: {e}") dns_name = "unknown" try: - alias = device.alias + # Use alias if available, otherwise fallback to model name or "unknown" + alias = ( + device.alias + if device.alias + else (device.model if device.model else "Unknown") + ) except Exception as e: logger.warning(f"Failed to get alias for {ip}: {e}") alias = "unknown" diff --git a/src/kasa_collector.py b/src/kasa_collector.py index 2441df7..15fabdb 100644 --- a/src/kasa_collector.py +++ b/src/kasa_collector.py @@ -27,12 +27,88 @@ def __init__(self): self.storage = InfluxDBStorage() self.devices = {} + # Initialize manual devices if provided + self.device_hosts = [] + if Config.KASA_COLLECTOR_DEVICE_HOSTS: + self.device_hosts = [ + ip.strip() for ip in Config.KASA_COLLECTOR_DEVICE_HOSTS.split(",") + ] + + # Initialize credentials if provided + self.tplink_username = Config.KASA_COLLECTOR_TPLINK_USERNAME + self.tplink_password = Config.KASA_COLLECTOR_TPLINK_PASSWORD + + async def initialize_manual_devices(self): + """ + Initialize manual devices based on the IPs or hostnames specified in the configuration. + """ + for ip in self.device_hosts: + try: + device = await KasaAPI.get_device( + ip, self.tplink_username, self.tplink_password + ) + + # Log detailed device information + self.logger.debug(f"Device details for {ip}: {device.__dict__}") + + # Check and set alias, log for clarity + device_alias = device.alias if device.alias else device.host + self.devices[ip] = device + self.logger.info( + f"Manually added device: {device_alias} (IP/Hostname: {ip}, Hostname: {socket.getfqdn(ip)})" + ) + except Exception as e: + self.logger.error(f"Failed to add manual device {ip}: {e}") + async def discover_devices(self): """ - Discover Kasa devices on the network and store them in the devices attribute. + Discover Kasa devices on the network and merge with existing devices. + """ + if not Config.KASA_COLLECTOR_ENABLE_AUTO_DISCOVERY: + self.logger.info("Auto-discovery is disabled. Skipping device discovery.") + return + + discovered_devices = await KasaAPI.discover_devices() + self.logger.info( + f"Discovered {len(discovered_devices)} devices: {discovered_devices}" + ) + + # Merge auto-discovered devices without overwriting manual devices + for ip, device in discovered_devices.items(): + if ip not in self.devices: + self.devices[ip] = device + self.logger.info( + f"Auto-discovered new device: {device.alias} (IP: {ip}, Hostname: {socket.getfqdn(ip)})" + ) + else: + self.logger.info( + f"Device {device.alias} (IP: {ip}) already initialized manually." + ) + + async def start(self): + """ + Start the KasaCollector by initializing manual devices, discovering devices, + and starting periodic tasks. """ - self.devices = await KasaAPI.discover_devices() - self.logger.info(f"Discovered {len(self.devices)} devices") + # Initialize manual devices first + await self.initialize_manual_devices() + + # Perform device discovery if auto-discovery is enabled + if Config.KASA_COLLECTOR_ENABLE_AUTO_DISCOVERY: + await self.discover_devices() + + # Start periodic tasks + asyncio.create_task(self.periodic_fetch()) + asyncio.create_task(self.periodic_discover()) + asyncio.create_task(self.periodic_sysinfo_fetch()) + + async def periodic_discover(self): + """ + Periodically discover devices on the network, respecting existing manual devices. + """ + while True: + await self.discover_devices() + await asyncio.sleep(Config.KASA_COLLECTOR_DEVICE_DISCOVERY_INTERVAL) async def fetch_and_store_data(self): """ @@ -40,10 +116,34 @@ async def fetch_and_store_data(self): """ for ip, device in self.devices.items(): try: + # Fetch device data and log the response data = await KasaAPI.fetch_device_data(device) - await self.storage.write_data("kasa_device", data["emeter"], {"ip": ip}) + self.logger.debug(f"Fetched data for device {ip}: {data}") + + # Log current aliases before using them + self.logger.debug(f"Pre-fetch device alias: {device.alias}, IP: {ip}") + + # Re-fetch alias to ensure it's updated correctly + updated_alias = device.alias if device.alias else device.host + self.logger.debug(f"Post-fetch device alias: {updated_alias}, IP: {ip}") + + # Store emeter and sysinfo data + self.logger.debug( + f"Storing emeter data for {updated_alias}: {data['emeter']}" + ) + await self.storage.write_data( + "kasa_device", + data["emeter"], + {"ip": ip, "device_alias": updated_alias}, + ) + + self.logger.debug( + f"Storing sysinfo data for {updated_alias}: {data['sys_info']}" + ) await self.storage.write_data( - "kasa_sysinfo", data["sys_info"], {"ip": ip} + "kasa_sysinfo", + data["sys_info"], + {"ip": ip, "device_alias": updated_alias}, ) if Config.KASA_COLLECTOR_WRITE_TO_FILE: @@ -61,37 +161,13 @@ async def periodic_fetch(self): await self.fetch_and_store_data() await asyncio.sleep(Config.KASA_COLLECTOR_DATA_FETCH_INTERVAL) - async def periodic_discover(self): - """ - Periodically discover devices on the network. - """ - while True: - await self.discover_devices() - await asyncio.sleep(Config.KASA_COLLECTOR_DEVICE_DISCOVERY_INTERVAL) - - async def start(self): - """ - Start the KasaCollector by discovering devices and starting periodic tasks. - """ - await self.discover_devices() - asyncio.create_task(self.periodic_fetch()) - asyncio.create_task(self.periodic_discover()) - async def fetch_and_send_emeter_data(self, ip, device): """ Fetch and send emeter data from a device. Retry if necessary. """ retries = 0 - alias = "Unknown" - hostname = "Unknown" - - try: - alias = device.alias - hostname = socket.getfqdn(ip) - except Exception as e: - self.logger.error( - f"Error retrieving alias or hostname for device {ip}: {e}" - ) + alias = device.alias if device.alias else device.host + hostname = socket.getfqdn(ip) while retries < Config.KASA_COLLECTOR_FETCH_MAX_RETRIES: try: @@ -150,9 +226,11 @@ async def process_device_data(self, ip, device): Process emeter data for a device. """ emeter_data = {key: int(value) for key, value in device.emeter_realtime.items()} + device_alias = device.alias if device.alias else device.host + device_data = { "emeter": emeter_data, - "alias": device.alias, + "alias": device_alias, "dns_name": socket.getfqdn(ip), "ip": ip, "equipment_type": "device", @@ -164,23 +242,37 @@ async def fetch_and_send_sysinfo(self, device): Fetch and send system info data from a device. Retry if necessary. """ ip = device.host - alias = "Unknown" - hostname = "Unknown" + alias = ( + device.alias if device.alias else device.host + ) # Use alias or host as fallback + hostname = None # Initialize hostname to avoid uninitialized variable error retries = 0 - try: - alias = device.alias - hostname = socket.getfqdn(ip) - except Exception as e: - self.logger.error( - f"Error retrieving alias or hostname for device {ip}: {e}" - ) - while retries < Config.KASA_COLLECTOR_FETCH_MAX_RETRIES: try: await device.update() - sysinfo = {"sysinfo": device.sys_info} - await self.storage.process_sysinfo_data({ip: sysinfo}) + hostname = socket.getfqdn( + ip + ) # Get the hostname after successful update + + # Log the sysinfo and alias + self.logger.debug(f"Fetched sysinfo for device {ip}: {device.sys_info}") + self.logger.debug(f"Pre-fetch Alias: {alias}, Hostname: {hostname}") + + # Re-fetch alias to ensure it's updated correctly + alias = device.alias if device.alias else device.host + self.logger.debug(f"Post-fetch Alias: {alias}, Hostname: {hostname}") + + sysinfo_data = { + "sysinfo": device.sys_info, + "device_alias": alias, # Use the updated alias + "dns_name": hostname, + "ip": ip, + "equipment_type": "device", + } + # Log the data being sent to InfluxDBStorage + self.logger.debug(f"Storing sysinfo data for {ip}: {sysinfo_data}") + await self.storage.process_sysinfo_data({ip: sysinfo_data}) break # Break the loop if successful except Exception as e: self.logger.error( @@ -203,6 +295,7 @@ async def periodic_fetch(self): device_count = len(self.devices) self.logger.info(f"Starting periodic_fetch for {device_count} devices") + # Use asyncio.gather to concurrently fetch data from all devices await asyncio.gather( *[ self.fetch_and_send_emeter_data(ip, device) @@ -212,18 +305,25 @@ async def periodic_fetch(self): end_time = datetime.now() elapsed = (end_time - start_time).total_seconds() + + # Check if the fetch operation took longer than the defined interval if elapsed > Config.KASA_COLLECTOR_DATA_FETCH_INTERVAL: self.logger.warning( f"Fetch operation took {format_duration(elapsed)}, which is longer than the set interval of {Config.KASA_COLLECTOR_DATA_FETCH_INTERVAL} seconds." ) + # Calculate next run time and log the details next_run = end_time + timedelta( seconds=max(0, Config.KASA_COLLECTOR_DATA_FETCH_INTERVAL - elapsed) ) self.logger.info( - f"Finished periodic_fetch for {device_count} devices. Duration: {format_duration(elapsed)}. Next run in {format_duration(Config.KASA_COLLECTOR_DATA_FETCH_INTERVAL - elapsed)} at {next_run.strftime('%Y-%m-%d %H:%M:%S')}" + f"Finished periodic_fetch for {device_count} devices. " + f"Duration: {format_duration(elapsed)}. Next run in " + f"{format_duration(Config.KASA_COLLECTOR_DATA_FETCH_INTERVAL - elapsed)} " + f"at {next_run.strftime('%Y-%m-%d %H:%M:%S')}" ) + # Sleep until the next scheduled fetch time await asyncio.sleep( max(0, Config.KASA_COLLECTOR_DATA_FETCH_INTERVAL - elapsed) ) @@ -345,37 +445,36 @@ async def main(): lock = asyncio.Lock() logger.info("Starting Kasa Collector") + # Initialize the KasaCollector instance + collector = KasaCollector() + try: - logger.info("Starting Kasa device discovery...") - start_time_discovery = datetime.now() - try: - devices = await KasaAPI.discover_devices() - except Exception as e: - logger.error(f"Error during device discovery: {e}") - devices = {} - - if devices: - device_details = [ - f"\t{device.alias} (IP: {ip}, Hostname: {socket.getfqdn(ip)})" - for ip, device in devices.items() - ] - device_details_str = "\n".join(device_details) + # Initialize manual devices first + await collector.initialize_manual_devices() + + # Perform initial device discovery if auto-discovery is enabled + if Config.KASA_COLLECTOR_ENABLE_AUTO_DISCOVERY: + logger.info("Starting initial Kasa device discovery...") + start_time_discovery = datetime.now() + try: + await collector.discover_devices() + except Exception as e: + logger.error(f"Error during initial device discovery: {e}") + + end_time_discovery = datetime.now() + elapsed_discovery = ( + end_time_discovery - start_time_discovery + ).total_seconds() logger.info( - f"Initial device discovery found {len(devices)} devices:\n{device_details_str}" + f"Initial device discovery completed in {format_duration(elapsed_discovery)}" ) else: - logger.info("No devices found in initial discovery") - - end_time_discovery = datetime.now() - elapsed_discovery = (end_time_discovery - start_time_discovery).total_seconds() - logger.info( - f"Device discovery at startup took {format_duration(elapsed_discovery)}" - ) - - collector = KasaCollector() - collector.devices = devices + logger.info( + "Auto-discovery is disabled. Only using manually specified devices." + ) - discovery_task = asyncio.create_task(collector.periodic_discover_devices(lock)) + # Start periodic tasks + discovery_task = asyncio.create_task(collector.periodic_discover()) data_fetch_task = asyncio.create_task(collector.periodic_fetch()) sysinfo_fetch_task = asyncio.create_task(collector.periodic_sysinfo_fetch())