diff --git a/.env.example b/.env.example index 0848da2..9b8ec2b 100644 --- a/.env.example +++ b/.env.example @@ -3,87 +3,58 @@ # Timezone TIMEZONE='Europe/Amsterdam' -# Docker version and container repo credentials -VERSION="$(TZ=${TIMEZONE} date +%Y.%m.%d.%H)" -CR_PAT="" - -# Victron -CERBOGX_IP="0.0.0.0" -VRM_PORTAL_ID='XXXXXXXXXXXXXXXX' - -# Domoticz -DZ_URL_PREFIX="http://:80/json.htm?type=command¶m=udevice&idx=" +# Your Home consumes roughyl this amount of energy (in kWh) per 24 hours. +DAILY_HOME_ENERGY_CONSUMPTION=16.0 # Module/Feature toggles # You can select one or more modules to run but make sure that `mqtt_client` is always enabled and is always the # module defined. This is because the `mqtt_client` module is the one that is responsible for the MQTT connection # and is a blocking function which will not return while the other modules run in their own threads. -ACTIVE_MODULES='[{ - "sync": { - "ev_charge_controller": false, - "energy_broker": false - }, +ACTIVE_MODULES='[{"sync": {"ev_charge_controller": false, "energy_broker": false }, "async": {"mqtt_client": true, "tibber_api": false }}]' - "async": { - "mqtt_client": true, - "tibber_api": false - } - }]' +# Enable / disable appliance run scheduling at lowest prices (requires a homeconnect2mqtt bridge in local network) +HOME_CONNECT_APPLIANCE_SCHEDULING=False # Enable / disable dynamic buy and sell decisions DYNAMIC_ESS_NET_METERING_ENABLED=False # Rate in Watts to export energy to the grid from the ESS -ESS_EXPORT_AC_SETPOINT=-8000.0 +ESS_EXPORT_AC_SETPOINT=-10000.0 # Percentage of battery capacity to retain for own use (ie. stop energy sale at 65% battery state of charge) -DYNAMIC_ESS_BATT_MIN_SOC=80.0 +DYNAMIC_ESS_BATT_MIN_SOC=50.0 -VICTRON_OPTIMIZED_CHARGING=0 +VICTRON_OPTIMIZED_CHARGING=1 TIBBER_UPDATES_ENABLED=0 -# the max amount you want to pay in cents per kWh from Tibber (energy supplier) -MAX_TIBBER_BUY_PRICE=0.30 +# the max amount you want to pay in cents per kWh from Tibber (energy supplier) when charging the ESS +MAX_TIBBER_BUY_PRICE=0.40 + +# !!! Warning !!! +# Unless mitigated in some other way, if the grid is lost when using this mode, the loads will also +# lose power until grid power is restored or manual intervention is done (switch back to inverter mode on). +# Use this carefully. +# +# If energy prices are equal or lower to this, switch to grid consumption (victron inverter pass-through mode) +SWITCH_TO_GRID_PRICE_THRESHOLD=0.22 ### ev_charge_controller ESS options # LOAD_RESERVATION defines the amount of solar energy produced in Watts that you want to reserve for charging your ESS # and running house (and other) loads. When the MINIMUM_ESS_SOC percentage is reached, this amount will be reduced with # a division of the LOAD_RESERVATION by the LOAD_REDUCTION_FACTOR to begin favoring the charge of your electric vehicle. -LOAD_RESERVATION=400 -LOAD_REDUCTION_FACTOR=2 +LOAD_RESERVATION=1 +LOAD_REDUCTION_FACTOR=1 # battery will charge at this voltage when under MIMIMUM_ESS_SOC -BATTERY_ABSORPTION_VOLTAGE=55.0 +BATTERY_ABSORPTION_VOLTAGE=57.0 # battery charge voltage will be reduced to this voltage WHEN MIMIMUM_ESS_SOC is reached -BATTERY_FLOAT_VOLTAGE=54.8 -MINIMUM_ESS_SOC=95 +BATTERY_FLOAT_VOLTAGE=57.0 +MINIMUM_ESS_SOC=90 # batter max voltage will drop to this when MAXIMUM_ESS_SOC is reached -BATTERY_FULL_VOLTAGE=54.0 -MAXIMUM_ESS_SOC=98 +BATTERY_FULL_VOLTAGE=55.8 +MAXIMUM_ESS_SOC=95 ### ABB B2x kWh Meter integration # if you have integrated an ABB B23/B24 RS-485 meter into your venusOS system you can toggle this option on # and configure the topic it will read/write to in conf.py. This will allow for bypassing the load reservation # functionality and result in more precise and accurate surplus power calculations ABB_METER_INTEGRATION=0 - -# tesla credentials and home address LAT and LONG -TESLA_EMAIL=elon@tesla.com -HOME_ADDRESS_LAT=54.1345 -HOME_ADDRESS_LONG=4.1234 - -# HomeConnect options -CLIENTID="" -CLIENTSECRET="" -REDIRECTURI="" - -# Tibber options -TIBBER_ACCESS_TOKEN="XXXXXXXXXXXXX" - - -# Pushover API Access -PO_USER_ID="" -PO_API_KEY="" - -# AWS Credentials -AWS_ACCESS_KEY="" -AWS_SECRET_KEY="" diff --git a/README.md b/README.md index be13f96..6b89d15 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ Do you have one or more of the following devices in your home? - [x] Home Energy Storage System with canbus or serial control and working well with your Victron system - [x] an ABB B21/23/24 Kilowatt meter (optional) - [x] a Domoticz based Home Automation system (optional) +- [x] HomeConnect enabled smart appliances (optional and currently requires you to run [https://github.com/hcpy2-0/hcpy](https://github.com/hcpy2-0/hcpy) as an additional service.) If so, this project might be something you will find interesting. Have a look at what this project offers by reading more below. Also, many of the cool features the modules in this project offer are visualized and controllable @@ -39,7 +40,8 @@ a Domoticz server via its REST API for monitoring and historic tracking - deep integration with Victron system for monitoring and control via the cerbo Gx MQTT broker - Creates, exports, and updates a number of custom metrics to the victron MQTT broker for consumption by the [venus-nextgen Energy Dashboard](https://github.com/JoshuaDodds/venus-nextgen) - dynamic ESS algorithms for automated buy and sell of energy -- solar forecasting data specific to your installation using ML models and AI for quite accurate current day production forecasts (courtesy of new VRM API features developed by Victron Energy). Note: A Victron VRM portal account is needed for this feature. +- solar forecasting data specific to your installation using ML models and AI for quite accurate current day production forecasts (courtesy of new VRM API features developed by Victron Energy). Note: A Victron VRM portal account is needed for this feature. +- HomeConnect supported appliance control. Schedules appliances to run at cheapest time of day without user intervention Configuration for your CerboGX IP Address, VRM instance ID, and Domoticz IP/Port are configured in the ```.env``` configuration file. @@ -60,7 +62,6 @@ for the things you will need to adjust in your own fork of this repo. **TODO: handle this issue automatically in a universal container build** - ### Running from CLI ```python3 main.py``` @@ -75,4 +76,4 @@ Finally, use the build.sh script as a template for building an arm64 image and p --------------- (This package is in its infancy, but contributions and collaborations are welcome.) -Copyright 2022, 2023, 2024 Joshua Dodds - All Rights Reserved. +Copyright 2022, 2023, 2024, 2025 Joshua Dodds - All Rights Reserved. diff --git a/lib/__init__.py b/lib/__init__.py index 5d93d7d..696808a 100644 --- a/lib/__init__.py +++ b/lib/__init__.py @@ -15,4 +15,5 @@ 'global_state', 'config_retrieval', 'config_change_handler', + 'event_handler_appliances', ] diff --git a/lib/clients/mqtt_client_factory.py b/lib/clients/mqtt_client_factory.py index 919f9df..ff56468 100644 --- a/lib/clients/mqtt_client_factory.py +++ b/lib/clients/mqtt_client_factory.py @@ -94,7 +94,11 @@ def _on_message(_client, _userdata, msg): try: # grab topic and payload from message topic = msg.topic - value = json.loads(msg.payload.decode("utf-8"))['value'] + payload = json.loads(msg.payload.decode("utf-8")) + + # Attempt to extract 'value', fall back to entire payload if 'value' is not present + value = payload.get('value', payload) + # format a logging message logmsg = f"{' '.join(topic.rsplit('/', 3)[1:3])}: {value}" logging.debug(logmsg) @@ -108,4 +112,4 @@ def _on_message(_client, _userdata, msg): Event(topic, value, logmsg).dispatch() except Exception as E: - logging.info(E) + logging.info(f"mqtt_client_factory: error processing new message: {E}") diff --git a/lib/constants.py b/lib/constants.py index 5cae87c..6a2fa35 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -81,6 +81,10 @@ "tesla_charge_requested": f"Tesla/vehicle0/control/charge_requested", "tesla_battery_soc": f"Tesla/vehicle0/battery_soc", "tesla_battery_soc_setpoint": f"Tesla/vehicle0/battery_soc_setpoint", + + # Home Connect Appliance topics + "dryer_state": f"Cerbomoticzgx/homeconnect/dryer/state", + "dishwasher_state": f"Cerbomoticzgx/homeconnect/dishwasher/state", } }) diff --git a/lib/event_handler.py b/lib/event_handler.py index 55fe87e..9c14d01 100644 --- a/lib/event_handler.py +++ b/lib/event_handler.py @@ -8,6 +8,7 @@ from lib.victron_integration import regulate_battery_max_voltage, ac_power_setpoint from lib.global_state import GlobalStateClient from lib.notifications import pushover_notification_critical +from lib.event_handler_appliances import handle_dryer_event, handle_dishwasher_event from lib.energy_broker import ( manage_sale_of_stored_energy_to_the_grid, set_charging_schedule, @@ -19,7 +20,7 @@ LOAD_RESERVATION = int(retrieve_setting("LOAD_RESERVATION")) or 0 LOAD_RESERVATION_REDUCTION_FACTOR = float(retrieve_setting("LOAD_REDUCTION_FACTOR")) or 1 MINIMUM_ESS_SOC = int(retrieve_setting("MINIMUM_ESS_SOC")) or 100 - +HOME_CONNECT_APPLIANCE_SCHEDULING = bool(retrieve_setting("HOME_CONNECT_APPLIANCE_SCHEDULING")) or False class Event: @@ -65,6 +66,14 @@ def ac_in_connected(self): elif event == 1: logging.info("AC Input: Grid is online.") + def dryer_state(self): + if HOME_CONNECT_APPLIANCE_SCHEDULING: + handle_dryer_event(self.value) + + def dishwasher_state(self): + if HOME_CONNECT_APPLIANCE_SCHEDULING: + handle_dishwasher_event(self.value) + def ac_power_setpoint(self): if float(self.value) > 0 or float(self.value) < 0: logging.debug(f"AC Power Setpoint changed to {self.value}") diff --git a/lib/event_handler_appliances.py b/lib/event_handler_appliances.py new file mode 100644 index 0000000..c963bbd --- /dev/null +++ b/lib/event_handler_appliances.py @@ -0,0 +1,381 @@ +import json +import threading +import time +from datetime import datetime, timedelta + +from lib.constants import logging +from lib.global_state import GlobalStateClient +from lib.helpers import publish_message +from lib.tibber_api import lowest_24h_prices, lowest_48h_prices + +gs_client = GlobalStateClient() + +global_ready_flags = { + "Dishwasher": False, + "Dryer": False, +} + +TRACKED_KEYS = [ + "SelectedProgram", + "RemoteControlStartAllowed", + "DoorState", + "PowerState", + "FinishInRelative", + "OperationState", + "DryingTarget", + "RemoteControlLevel", + "RemoteControlActive", +] + + +def send_delayed_start_to_dishwasher(): + logging.info("Sending delayed start command to Dishwasher...") + + delay_seconds = determine_optimal_run_time() + + # Send the delayed start program command + delayed_start_command = {"program": 8203, "options": [{"uid": 558, "value": delay_seconds}]} + topic = "Cerbomoticzgx/homeconnect/dishwasher/activeProgram" + publish_message( + topic=topic, + payload=json.dumps(delayed_start_command) + ) + logging.info(f"Sent start command to Dishwasher.") + + +def send_delayed_start_to_dryer(): + logging.info("Sending delayed start command to Dryer...") + + delay_seconds = determine_optimal_run_time() + + selected_program = int(gs_client.get('Dryer_SelectedProgram')) + selected_program_runtime = int(gs_client.get('Dryer_FinishInRelative')) + + # Adjust to match dryer step size requirement for this value + delay_seconds = round(delay_seconds / 60) * 60 + selected_program_runtime + + # Calculate the absolute start time + current_time = datetime.now() + start_time = current_time + timedelta(seconds=delay_seconds) + + # Check if start time is later than 8:30 PM + if start_time.hour > 20 or (start_time.hour == 20 and start_time.minute > 30): + logging.info("Calculated start time for Dryer is after 8:30 PM. Enforcing SilentDry programme.") + # change program + select_program_command = {"program": 32068} # SilentDry program UID + publish_message("Cerbomoticzgx/homeconnect/dryer/selectedProgram", payload=select_program_command) + + selected_program = int(gs_client.get('Dryer_SelectedProgram')) + silent_dry_runtime = int(gs_client.get('Dryer_FinishInRelative')) + delay_seconds = round(determine_optimal_run_time() / 60) * 60 + silent_dry_runtime + + # Send the delayed start program command + delayed_start_command = {"program": selected_program, "options": [{"uid": 551, "value": delay_seconds}]} + topic = "Cerbomoticzgx/homeconnect/dryer/activeProgram" + publish_message( + topic=topic, + payload=json.dumps(delayed_start_command) + ) + logging.info(f"Sent start command to Dryer.") + + +def handle_dryer_running_state(): + if gs_client.get('Dryer_RemoteControlStartAllowed'): + try: + # Abort the current program + logging.info("Dryer is running. Sending abort command...") + abort_command = {"uid": 512, "value": True} # AbortProgram command + publish_message( + topic="Cerbomoticzgx/homeconnect/dryer/set", + payload=json.dumps(abort_command) + ) + logging.info("Sent abort command to Dryer.") + + # Start monitoring thread to wait for 'Ready' state + wait_for_ready_state("Dryer", send_delayed_start_to_dryer) + + except Exception as e: + logging.error(f"Unexpected error in handle_dryer_running_state(): {e}") + + +def handle_dishwasher_running_state(): + try: + # Abort the current program + logging.info("Dishwasher is running. Sending abort command...") + abort_command = {"uid": 512, "value": True} # AbortProgram command + publish_message( + topic="Cerbomoticzgx/homeconnect/dishwasher/set", + payload=json.dumps(abort_command) + ) + logging.info("Sent abort command to Dishwasher.") + + # Start monitoring thread to wait for 'Ready' state + wait_for_ready_state("Dishwasher", send_delayed_start_to_dishwasher) + + except Exception as e: + logging.error(f"Unexpected error in handle_dishwasher_running_state(): {e}") + + +def handle_dryer_event(payload): + """ + Handles events for the dryer by processing the state and tracking relevant keys. + :param payload: The payload containing the new state data. + """ + try: + new_state = payload if isinstance(payload, dict) else json.loads(payload) + detect_changed_state_values("Dryer", new_state) + store_appliance_state("Dryer", new_state) + + except Exception as e: + logging.error(f"Unexpected error while handling dryer event: {e}") + + +def handle_dishwasher_event(payload): + try: + new_state = payload if isinstance(payload, dict) else json.loads(payload) + detect_changed_state_values("Dishwasher", new_state) + store_appliance_state("Dishwasher", new_state) + + except Exception as e: + logging.error(f"Error in handle_dishwasher_event: {e}") + + +def handle_user_intervention(device, current_state, new_state): + current_operation = current_state.get("OperationState") + new_operation = new_state.get("OperationState") + + # User cancels a delayed start + if current_operation == "DelayedStart" and new_operation == "Ready": + logging.info(f"{device} delayed start was canceled by user.") + user_intervention_count = int(gs_client.get(f"{device}_UserInterventionCount") or 0) + user_intervention_count += 1 + gs_client.set(f"{device}_UserInterventionCount", user_intervention_count) + logging.debug(f"{device} UserInterventionCount: {user_intervention_count}") + return False # Indicating further handling is not needed. + + # User starts the appliance from Ready to Run + if current_operation == "Ready" and new_operation == "Run": + logging.info(f"{device} was manually started by user.") + user_intervention_count = int(gs_client.get(f"{device}_UserInterventionCount") or 0) + user_intervention_count += 1 + gs_client.set(f"{device}_UserInterventionCount", user_intervention_count) + logging.debug(f"{device} UserInterventionCount: {user_intervention_count}") + + # Allow immediate run if the user intervened multiple times + if user_intervention_count >= 2: + logging.info(f"{device} user intervened multiple times. Allowing immediate run.") + gs_client.set(f"{device}_UserInterventionCount", 0) # Reset count + return True # Allow the run. + + # Reset the count ONLY after a successful uninterrupted delayed start run + if current_operation == "DelayedStart" and new_operation == "Run": + logging.info(f"{device} is starting a scheduled operation. Resetting intervention count.") + gs_client.set(f"{device}_UserInterventionCount", 0) + return False # Normal operation; no special handling. + + return False # No special handling needed. + + +def detect_changed_state_values(device, new_state): + current_state = retrieve_appliance_state(device) + changes = {} + + logging.debug(f"Current state for {device}: {current_state}") + logging.debug(f"New state for {device}: {new_state}") + + for key in TRACKED_KEYS: + if key not in new_state or key not in current_state: + logging.debug(f"Skipping comparison for {key} as it's missing in either state.") + continue + + new_value = new_state[key] + current_value = current_state[key] + + logging.debug(f"Comparing {key}: current={current_value}, new={new_value}") + + if str(current_value) != str(new_value): + changes[key] = new_value + logging.debug(f"{device} state change detected: {key}: {current_value} -> {new_value}") + + # Reset user intervention counter if PowerState changes to Off (todo: this might be confusing. Its on hold for now) + # if key == "PowerState" and new_value == "Off": + # logging.info(f"{device} power turned off. Resetting UserInterventionCount.") + # gs_client.set(f"{device}_UserInterventionCount", 0) + # continue + + # Call user intervention handler + if key == "OperationState" and handle_user_intervention(device, current_state, new_state): + logging.info(f"Allowing Immediate run for {device}.") + return # Skip further handling to allow immediate run. + + # Detect and handle appliances that start to run without a schedule set + if key == "OperationState" and new_value == "Run": + if not ((current_value in ["0", 0, "DelayedStart", "Pause"]) and new_value == "Run"): + logging.info(f"{device} has started without scheduling. Checking if there is a better time to run...") + if device == "Dishwasher": + handle_dishwasher_running_state() + if device == "Dryer": + handle_dryer_running_state() + + if changes: + store_appliance_state(device, changes) + else: + logging.debug(f"No changes detected for {device}.") + + +def store_appliance_state(device, state): + """ + Stores only the relevant keys from the appliance state in GlobalState. + :param device: The name of the device (e.g., "Dryer", "Dishwasher"). + :param state: Dictionary of the state to store. + """ + try: + for key in TRACKED_KEYS: # Iterate only over tracked keys + if key in state: # Store the key only if it exists in the incoming payload + value = state[key] + gs_client.set(f"{device}_{key}", value) + logging.debug(f"Stored {device}_{key} = {value}") + except Exception as e: + logging.error(f"Failed to store state for {device}: {e}") + + +def retrieve_appliance_state(device): + """ + Retrieves the tracked appliance state from GlobalState. + :param device: The name of the device (e.g., "Dryer", "Dishwasher"). + :return: A dictionary containing the tracked state. + """ + state = {} + + for key in TRACKED_KEYS: + value = gs_client.get(f"{device}_{key}") + if value is not None: + state[key] = value # The value is already correctly typecast by gs.get() + else: + logging.debug(f"{device}_{key} not found in GlobalState. Assuming key not initialized.") + + logging.debug(f"Retrieved state for {device}: {state}") + return state + + +def calculate_delay_in_seconds(optimal_time): + """ + Calculates the delay in seconds from the current time to the target time. + :param optimal_time: List containing [day, hour, level, price]. + :return: Delay in seconds as an integer. + """ + current_time = datetime.now() + target_day, target_hour, _, _ = optimal_time # Unpack the target day and hour + + # Determine target date and time + target_date = current_time + timedelta(days=target_day) + target_datetime = datetime( + year=target_date.year, + month=target_date.month, + day=target_date.day, + hour=target_hour, + minute=0, + second=0 + ) + + # Calculate delay in seconds + delay_seconds = (target_datetime - current_time).total_seconds() + if delay_seconds < 0: + raise ValueError("Calculated delay time is negative. Check optimal_time.") + + return int(delay_seconds) + + +def check_optimal_run_time(prices): + """ + Finds the optimal time to run based on the pricing data. + :param prices: List of price slots in the format [day, hour, level, price]. + :return: Optimal slot [day, hour, level, price] or None. + """ + try: + for price_data in prices: + # Validate the structure of the price data + if len(price_data) != 4: + logging.error(f"Invalid price data format: {price_data}. Skipping.") + continue + + day, hour, level, price = price_data + + # Add any additional validation here if needed + if price >= 0: # Example condition + return price_data + + logging.info("No optimal time found in pricing data.") + return None + except Exception as e: + logging.error(f"Unexpected error in check_optimal_run_time: {e}") + return None + + +def determine_optimal_run_time(price_cap=0.38): + """ + Determines the optimal time to run based on pricing and time of day. + Returns the optimal time slot and calculated delay seconds. + """ + logging.debug("Determining optimal run time...") + current_time = datetime.now().hour + current_datetime = datetime.now() + + # Filter pricing based on time of day + # Before 7 PM: find the cheapest in the next 4-5 hours + if current_time < 19: + combined_prices = lowest_24h_prices(price_cap=price_cap, max_items=8) + filtered_prices = [slot for slot in combined_prices if + 0 <= (slot[0] * 24 + slot[1] - current_datetime.hour) <= 5] + # After 7 PM: find the cheapest until 5:30 AM the next day + else: + combined_prices = lowest_48h_prices(price_cap=price_cap, max_items=8) + filtered_prices = [slot for slot in combined_prices if + 0 <= (slot[0] * 24 + slot[1] - current_datetime.hour) <= 10.5] + + # Find the cheapest time + optimal_time = check_optimal_run_time(prices=filtered_prices) + + if not optimal_time: + logging.info("No optimal time found. Scheduling immediate run.") + return 0 # Immediate run + else: + delay_seconds = calculate_delay_in_seconds(optimal_time) + return delay_seconds + + +def wait_for_ready_state(device, callback): + """ + Waits for the appliance to transition to the 'Ready' state and executes a callback. + Runs in a separate thread to avoid blocking the main process. + + :param device: The name of the device (e.g., "Dishwasher"). + :param callback: Function to call when the device is in the 'Ready' state. + """ + def monitor_ready_state(): + logging.info(f"Starting monitoring thread for {device} readiness...") + while True: + try: + operation_state = retrieve_appliance_state(device).get("OperationState") + logging.debug(f"Current {device} OperationState: {operation_state}") + + if operation_state == "Ready": + logging.info(f"{device} is now in the 'Ready' state and should be ready for commands.") + global_ready_flags[device] = True + callback() + break + + if operation_state not in ["Aborting", "Run"]: + logging.warning(f"Unexpected state detected for {device}: {operation_state}. Exiting monitoring.") + break + + time.sleep(2) # Check again after 2 seconds + except Exception as e: + logging.error(f"Error while monitoring {device} state: {e}") + break + + logging.debug(f"Monitoring thread for {device} readiness has exited.") + + # Start the thread + monitoring_thread = threading.Thread(target=monitor_ready_state, daemon=True) + monitoring_thread.start() diff --git a/main.py b/main.py index 1275fa7..bd17c3e 100644 --- a/main.py +++ b/main.py @@ -26,6 +26,7 @@ ACTIVE_MODULES = json.loads(retrieve_setting('ACTIVE_MODULES')) ESS_NET_METERING = bool(retrieve_setting('TIBBER_UPDATES_ENABLED')) or False +HOME_CONNECT_APPLIANCE_SCHEDULING = bool(retrieve_setting("HOME_CONNECT_APPLIANCE_SCHEDULING")) or False def ev_charge_controller(): EvCharger().main() @@ -75,7 +76,11 @@ def init(): def post_startup(): - time.sleep(2) + time.sleep(1) + + if HOME_CONNECT_APPLIANCE_SCHEDULING: + logging.info(f"HomeConnect Appliance Schedulling mode is enabled.") + logging.info(f"post_startup() actions executing...") # Re-apply state/configuration from previous run or sane defaults