diff --git a/.changeset/occupancy_component-heavy-teachers-remain.md b/.changeset/occupancy_component-heavy-teachers-remain.md new file mode 100644 index 000000000..cd7d1e416 --- /dev/null +++ b/.changeset/occupancy_component-heavy-teachers-remain.md @@ -0,0 +1,5 @@ +--- +"occupancy_component": minor +--- + +feat: Implement area with timer logic diff --git a/WORKSPACE.bzlmod b/WORKSPACE.bzlmod index 1ceb1bfb5..989a29b04 100644 --- a/WORKSPACE.bzlmod +++ b/WORKSPACE.bzlmod @@ -8,22 +8,22 @@ load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository") http_archive( name = "tilt_linux_arm64", build_file = "//tools/tilt:BUILD.repositories.bazel.tpl", - sha256 = "7b9d859439e063a9fb2701c1dee41e3455590afb436edc2c9a7fc633bb9d374a", - url = "https://github.com/tilt-dev/tilt/releases/download/v0.33.10/tilt.0.33.10.linux.arm64.tar.gz", + sha256 = "49ce19981761b6102cf5fe3cb15d6cfdf6b56aa2be7a1178457955c7476bf21f", + url = "https://github.com/tilt-dev/tilt/releases/download/v0.33.11/tilt.0.33.11.linux.arm64.tar.gz", ) http_archive( name = "tilt_linux_amd64", build_file = "//tools/tilt:BUILD.repositories.bazel.tpl", - sha256 = "fdb62ab5f9e0cf2cb697f8b1e91f2909ee61ab14899f6b2c1a144c0d729d99a3", - url = "https://github.com/tilt-dev/tilt/releases/download/v0.33.10/tilt.0.33.10.linux.x86_64.tar.gz", + sha256 = "b14cc9d33b18c8bdf69b543914696b252e553c9361f65d2ed0cded302698708d", + url = "https://github.com/tilt-dev/tilt/releases/download/v0.33.11/tilt.0.33.11.linux.x86_64.tar.gz", ) http_archive( name = "tilt_darwin_arm64", build_file = "//tools/tilt:BUILD.repositories.bazel.tpl", - sha256 = "b4b460b02eb4e261b512a7c793e8381bef1a5adb864ef46d3fcae3f6a30fcab3", - url = "https://github.com/tilt-dev/tilt/releases/download/v0.33.10/tilt.0.33.10.mac.arm64.tar.gz", + sha256 = "dbb1716432a79a31b9c5f1842cfce02f2abbf9bcfa789d9afbc41d155e930855", + url = "https://github.com/tilt-dev/tilt/releases/download/v0.33.11/tilt.0.33.11.mac.arm64.tar.gz", ) # ------------------------------------ pulumi ------------------------------------ # diff --git a/devbox.lock b/devbox.lock index c0d99efec..ca80badfa 100644 --- a/devbox.lock +++ b/devbox.lock @@ -146,6 +146,7 @@ }, "python311Packages.pip@23.3.1": { "last_modified": "2024-01-27T14:55:31Z", + "plugin_version": "0.0.2", "resolved": "github:NixOS/nixpkgs/160b762eda6d139ac10ae081f8f78d640dd523eb#python311Packages.pip", "source": "devbox-search", "version": "23.3.1", diff --git a/occupancy_component/BUILD.bazel b/occupancy_component/BUILD.bazel index 929b7cb97..ca192d92c 100755 --- a/occupancy_component/BUILD.bazel +++ b/occupancy_component/BUILD.bazel @@ -12,6 +12,8 @@ py_library( "custom_components/occupancy/__init__.py", "custom_components/occupancy/binary_sensor.py", "custom_components/occupancy/const.py", + "custom_components/occupancy/helpers.py", + "custom_components/occupancy/internal_state.py", "custom_components/occupancy/select.py", ], data = [ @@ -26,6 +28,8 @@ py_pytest_test( srcs = [ "tests/conftest.py", "tests/helpers.py", + "tests/test_area.py", + "tests/test_door.py", "tests/test_init.py", ], args = ["--asyncio-mode=auto"], diff --git a/occupancy_component/Dockerfile b/occupancy_component/Dockerfile index 28fc9f133..c2e203398 100644 --- a/occupancy_component/Dockerfile +++ b/occupancy_component/Dockerfile @@ -3,6 +3,7 @@ FROM homeassistant/home-assistant:2024.2 RUN wget -O /usr/local/bin/wait-for https://github.com/eficode/wait-for/releases/download/v2.2.3/wait-for && chmod +x /usr/local/bin/wait-for ADD configuration.yaml /config/configuration.yaml +ADD automations.yaml /config/automations.yaml ADD custom_components /config/custom_components # NOTE: this is a hack to bypass onboarding copied from here diff --git a/occupancy_component/Tiltfile b/occupancy_component/Tiltfile index f78290448..15bc90b4a 100644 --- a/occupancy_component/Tiltfile +++ b/occupancy_component/Tiltfile @@ -7,6 +7,7 @@ docker_build( ".", live_update=[ sync('configuration.yaml', '/config/configuration.yaml'), + sync('automations.yaml', '/config/automations.yaml'), sync('custom_components', '/config/custom_components/'), restart_container(), ] diff --git a/occupancy_component/automations.yaml b/occupancy_component/automations.yaml new file mode 100644 index 000000000..15ae14996 --- /dev/null +++ b/occupancy_component/automations.yaml @@ -0,0 +1,62 @@ +- id: "1707938841757" + alias: Control front door contact + description: "" + trigger: + - platform: state + entity_id: + - input_boolean.front_door_contact_simulate + condition: [] + action: + - service: mqtt.publish + metadata: {} + data: + qos: "2" + retain: true + topic: home-assistant/front_door/contact + payload: "{{ trigger.to_state.state }}" + mode: single +- id: "1707939189264" + alias: Control front door motion occupancy + description: "" + trigger: + - platform: state + entity_id: + - input_button.front_door_motion_occupancy_simulate + condition: [] + action: + - service: mqtt.publish + metadata: {} + data: + qos: "2" + retain: true + topic: home-assistant/front_door/motion_occupancy + payload: "on" + - delay: + hours: 0 + minutes: 0 + seconds: 5 + milliseconds: 0 + - service: mqtt.publish + metadata: {} + data: + qos: "2" + retain: true + topic: home-assistant/front_door/motion_occupancy + payload: "off" + mode: single +- id: "1707942333572" + alias: Control hallway occupancy + description: "" + trigger: + - platform: state + entity_id: + - input_boolean.hallway_occupancy_simulate + condition: [] + action: + - service: mqtt.publish + metadata: {} + data: + qos: "2" + retain: true + topic: home-assistant/hallway/occupancy + payload: "{{ trigger.to_state.state }}" diff --git a/occupancy_component/configuration.yaml b/occupancy_component/configuration.yaml index 5473b0b01..1aaab594c 100644 --- a/occupancy_component/configuration.yaml +++ b/occupancy_component/configuration.yaml @@ -1,6 +1,8 @@ # Loads default set of integrations. Do not remove. default_config: +automation: !include automations.yaml + homeassistant: name: testing latitude: 0 @@ -14,18 +16,36 @@ logger: logs: custom_components.occupancy: debug +input_boolean: + front_door_contact_simulate: + name: Simulate front door contact + icon: mdi:door + hallway_occupancy_simulate: + name: Simulate hallway occupancy + icon: mdi:home + +input_button: + front_door_motion_occupancy_simulate: + name: Simulate front door motion occupancy + icon: mdi:motion-sensor + mqtt: binary_sensor: - name: front_door_contact state_topic: "home-assistant/front_door/contact" device_class: "door" - payload_on: "ON" - payload_off: "OFF" + payload_on: "on" + payload_off: "off" - name: front_door_motion_occupancy state_topic: "home-assistant/front_door/motion_occupancy" device_class: "motion" - payload_on: "ON" - payload_off: "OFF" + payload_on: "on" + payload_off: "off" + - name: hallway_occupancy + state_topic: "home-assistant/hallway/occupancy" + device_class: "occupancy" + payload_on: "on" + payload_off: "off" occupancy: doors: diff --git a/occupancy_component/custom_components/occupancy/__init__.py b/occupancy_component/custom_components/occupancy/__init__.py index 9ef9b433b..60246d341 100644 --- a/occupancy_component/custom_components/occupancy/__init__.py +++ b/occupancy_component/custom_components/occupancy/__init__.py @@ -15,13 +15,23 @@ ATTR_AREAS, ATTR_OCCUPANCY_SENSORS, OCCUPANCY_DATA, + STATUS_ENTERING, + STATUS_ENTERING_CONFIRM, + STATUS_LEAVING, + STATUS_LEAVING_CONFIRM, + ATTR_ENTERING_TIMER, + ATTR_ENTERING_CONFIRM_TIMER, + ATTR_LEAVING_TIMER, + ATTR_LEAVING_CONFIRM_TIMER, + ATTR_TIMER_ENTITIES, ) +from custom_components.occupancy.helpers import create_timer import homeassistant.helpers.config_validation as cv import voluptuous as vol from homeassistant.components.select import DOMAIN as SELECT_DOMAIN from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN -from homeassistant.components.timer import DOMAIN as TIMER_DOMAIN + CONFIG_SCHEMA = vol.Schema( { @@ -59,6 +69,12 @@ ) +async def _create_area_timer(hass, area_id, state): + timer_id = f"{area_id}_{state}" + timer = await create_timer(hass, timer_id) + return timer + + async def async_setup(hass: HomeAssistantType, config: dict) -> bool: _LOGGER.debug("async_setup start %s", config) @@ -78,14 +94,33 @@ async def async_setup(hass: HomeAssistantType, config: dict) -> bool: ) for area_id, area_config in config[DOMAIN][ATTR_AREAS].items(): + entering_timer = await _create_area_timer(hass, area_id, STATUS_ENTERING) + entering_confirm_timer = await _create_area_timer( + hass, area_id, STATUS_ENTERING_CONFIRM + ) + leaving_timer = await _create_area_timer(hass, area_id, STATUS_LEAVING) + leaving_confirm_timer = await _create_area_timer( + hass, area_id, STATUS_LEAVING_CONFIRM + ) + area_config = { - "occupancy_sensors": area_config[ATTR_OCCUPANCY_SENSORS], + ATTR_TIMER_ENTITIES: [ + entering_timer, + entering_confirm_timer, + leaving_timer, + leaving_confirm_timer, + ], + ATTR_OCCUPANCY_SENSORS: area_config[ATTR_OCCUPANCY_SENSORS], # Adding the binary sensor domain to the door references. Maybe this # also requires some validation for the user, so they know they shouldn't # provide an entity reference but a door name. - "doors": [ + ATTR_DOORS: [ f"{BINARY_SENSOR_DOMAIN}.{door}" for door in area_config[ATTR_DOORS] ], + ATTR_ENTERING_TIMER: entering_timer.entity_id, + ATTR_ENTERING_CONFIRM_TIMER: entering_confirm_timer.entity_id, + ATTR_LEAVING_TIMER: leaving_timer.entity_id, + ATTR_LEAVING_CONFIRM_TIMER: leaving_confirm_timer.entity_id, } data[ATTR_AREAS][area_id] = area_config @@ -96,12 +131,5 @@ async def async_setup(hass: HomeAssistantType, config: dict) -> bool: ) ) - # TODO: this does not work for uknown reason - hass.async_create_task( - hass.helpers.discovery.async_load_platform( - "timer", DOMAIN, {"area_id": area_id}, config - ) - ) - _LOGGER.debug("async_setup done") return True diff --git a/occupancy_component/custom_components/occupancy/binary_sensor.py b/occupancy_component/custom_components/occupancy/binary_sensor.py index af41c1043..7032127f3 100644 --- a/occupancy_component/custom_components/occupancy/binary_sensor.py +++ b/occupancy_component/custom_components/occupancy/binary_sensor.py @@ -13,6 +13,7 @@ from homeassistant.components.binary_sensor import ( BinarySensorEntity, ) +from custom_components.occupancy.internal_state import InternalState from homeassistant.const import ( STATE_OFF, @@ -57,18 +58,18 @@ async def async_setup_platform( contact_sensor = door_config.get(ATTR_CONTACT_SENSOR) motion_sensor = door_config.get(ATTR_MOTION_SENSOR) - async_add_entities( - [ - Door( - name=door_id, - unique_id=door_id, - entry=entry, - contact_sensor=contact_sensor, - motion_sensor=motion_sensor, - ) - ] + entity = Door( + name=door_id, + unique_id=door_id, + entry=entry, + contact_sensor=contact_sensor, + motion_sensor=motion_sensor, ) + door_config["entity"] = entity + + async_add_entities([entity]) + class Door(BinarySensorEntity, RestoreEntity): def __init__( @@ -83,12 +84,10 @@ def __init__( self._attr_unique_id = unique_id self._attr_is_on = False - self._door_is_open = False - self._door_has_motion = False - self._entry = entry self._contact_sensor = contact_sensor self._motion_sensor = motion_sensor + self._internal_state = InternalState() @property def icon(self): @@ -101,6 +100,7 @@ def icon(self): async def async_added_to_hass(self) -> None: await super().async_added_to_hass() + self._internal_state.register_entity(self._contact_sensor) self.async_on_remove( async_track_state_change_event( self.hass, @@ -109,6 +109,7 @@ async def async_added_to_hass(self) -> None: ) ) + self._internal_state.register_entity(self._motion_sensor) self.async_on_remove( async_track_state_change_event( self.hass, @@ -120,89 +121,97 @@ async def async_added_to_hass(self) -> None: self._reset_contact_presence_timer = IdleTimer( self.hass, 5, self._reset_contact_presence ) + # The starting state of the timer should be idle, so + # we're able to differentiate between an event just happened or not. + self._reset_contact_presence_timer.idle = True self.async_on_remove( self._reset_contact_presence_timer.clear, ) async def _contact_sensor_event(self, event: EventType): - _LOGGER.debug("Called '_contact_sensor_event' with data %s", event.data) + self._internal_state.set(event.data["entity_id"], event.data["new_state"]) + + _LOGGER.debug( + "Called '_contact_sensor_event' with data %s - new state %s", + event.data, + self._internal_state, + ) + # old_state is None happens when the entity is added to home assistant if event.data["old_state"] == None: from_state = None else: from_state = event.data["old_state"].state - to_state = event.data["new_state"].state + # new_state is None happens when the entity is removed from home assisstant + if event.data["new_state"] == None: + to_state = None + else: + to_state = event.data["new_state"].state - if (from_state == STATE_OFF or from_state == None) and to_state == STATE_ON: - self._door_is_open = True + if from_state == STATE_OFF and to_state == STATE_ON: self._reset_contact_presence_timer.awake() - self._calculate_presence() - self.async_write_ha_state() - elif (from_state == STATE_ON or from_state == None) and to_state == STATE_OFF: - self._door_is_open = False + elif from_state == STATE_ON and to_state == STATE_OFF: self._reset_contact_presence_timer.awake() - self._calculate_presence() - self.async_write_ha_state() - else: - pass + self._calculate_presence() async def _motion_sensor_event(self, event: EventType): - _LOGGER.debug("Called '_motion_sensor_event' with data %s", event.data) + self._internal_state.set(event.data["entity_id"], event.data["new_state"]) - if event.data["old_state"] == None: - from_state = None - else: - from_state = event.data["old_state"].state + _LOGGER.debug( + "Called '_motion_sensor_event' with data %s - new state %s", + event.data, + self._internal_state, + ) - to_state = event.data["new_state"].state + self._calculate_presence() - if (from_state == STATE_OFF or from_state == None) and to_state == STATE_ON: - self._door_has_motion = True - self._calculate_presence() - self.async_write_ha_state() + def _door_is_open(self): + return self._internal_state.get(self._contact_sensor) == STATE_ON - elif (from_state == STATE_ON or from_state == None) and to_state == STATE_OFF: - self._door_has_motion = False - self._calculate_presence() - self.async_write_ha_state() + def _door_is_closed(self): + return self._internal_state.get(self._contact_sensor) == STATE_OFF - else: - pass + def _door_has_motion(self): + return self._internal_state.get(self._motion_sensor) == STATE_ON - def _calculate_presence(self): - door_just_opened = ( - self._door_is_open == True - and self._reset_contact_presence_timer.idle == False - ) - door_just_closed = ( - self._door_is_open == False - and self._reset_contact_presence_timer.idle == False + def _door_just_opened(self): + return self._door_is_open() and self._reset_contact_presence_timer.idle == False + + def _door_just_closed(self): + return ( + self._door_is_closed() and self._reset_contact_presence_timer.idle == False ) + def _calculate_presence(self): + door_just_opened = self._door_just_opened() + door_just_closed = self._door_just_closed() + door_is_open = self._door_is_open() + door_has_motion = self._door_has_motion() + if door_just_closed: self._attr_is_on = True elif door_just_opened: self._attr_is_on = True - elif self._door_is_open == True and self._door_has_motion: + elif door_is_open and door_has_motion: self._attr_is_on = True else: self._attr_is_on = False _LOGGER.debug( - f"_calculate_presence with {door_just_closed} - {door_just_opened} - {self._door_is_open} - {self._door_has_motion} calculated: {self._attr_is_on}" + f"_calculate_presence with {door_just_closed} - {door_just_opened} - {door_is_open} - {door_has_motion} calculated: {self._attr_is_on}" ) + self.async_write_ha_state() async def _reset_contact_presence(self) -> None: self._calculate_presence() - self.async_write_ha_state() @property def extra_state_attributes(self): - door_state = "open" if self._door_is_open else "closed" - motion_state = "motion" if self._door_has_motion else "no motion" + door_state = "open" if self._door_is_open() else "closed" + motion_state = "motion" if self._door_has_motion() else "no motion" return {"door_state": door_state, "motion_state": motion_state} diff --git a/occupancy_component/custom_components/occupancy/const.py b/occupancy_component/custom_components/occupancy/const.py index a54c351b6..e179b7d76 100644 --- a/occupancy_component/custom_components/occupancy/const.py +++ b/occupancy_component/custom_components/occupancy/const.py @@ -10,3 +10,17 @@ ATTR_MOTION_SENSOR = "motion_sensor" ATTR_OCCUPANCY_SENSORS = "occupancy_sensors" + +STATUS_ABSENT = "absent" +STATUS_ENTERING = "entering" +STATUS_ENTERING_CONFIRM = "entering_confirm" +STATUS_PRESENT = "present" +STATUS_LEAVING = "leaving" +STATUS_LEAVING_CONFIRM = "leaving_confirm" + +ATTR_ENTERING_TIMER = "entering_timer" +ATTR_ENTERING_CONFIRM_TIMER = "entering_confirm_timer" +ATTR_LEAVING_TIMER = "leaving_timer" +ATTR_LEAVING_CONFIRM_TIMER = "leaving_confirm_timer" + +ATTR_TIMER_ENTITIES = "timer_entities" diff --git a/occupancy_component/custom_components/occupancy/helpers.py b/occupancy_component/custom_components/occupancy/helpers.py new file mode 100644 index 000000000..f753322c7 --- /dev/null +++ b/occupancy_component/custom_components/occupancy/helpers.py @@ -0,0 +1,51 @@ +from homeassistant.components.timer import ( + DOMAIN as TIMER_DOMAIN, + Timer, + CONF_ICON, + CONF_ID, + CONF_NAME, + CONF_DURATION, +) +from homeassistant.helpers.entity_platform import async_get_platforms +from homeassistant.setup import async_setup_component + + +def find_platform(hass, domain): + platform_list = async_get_platforms(hass, domain) + + for platform in platform_list: + if platform.domain == domain: + return platform + + return None + + +async def get_platform(hass, domain): + platform = find_platform(hass, domain) + + if platform is not None: + return platform + + # Setup the domain because it's not already set up + await async_setup_component(hass, domain, {}) + + platform = find_platform(hass, domain) + + if platform is None: + raise ValueError(f"Unable to load {domain} platform") + return platform + + +async def create_timer(hass, name): + timer_platform = await get_platform(hass, TIMER_DOMAIN) + + timer_config = { + CONF_ID: name, + CONF_DURATION: "00:00:30", + CONF_NAME: name, + CONF_ICON: "", + } + + timer = Timer.from_yaml(timer_config) + await timer_platform.async_add_entities([timer]) + return timer diff --git a/occupancy_component/custom_components/occupancy/internal_state.py b/occupancy_component/custom_components/occupancy/internal_state.py new file mode 100644 index 000000000..173d2f6f8 --- /dev/null +++ b/occupancy_component/custom_components/occupancy/internal_state.py @@ -0,0 +1,45 @@ +from homeassistant.core import State +import homeassistant.util.dt as dt_util + +import logging + +_LOGGER = logging.getLogger(__name__) + + +class InternalState: + def __init__(self): + self._state = {} + + def register_entity(self, entity_id, initial_state=None): + self._state[entity_id] = { + "state": initial_state, + "last_updated": dt_util.utcnow(), + } + + def get(self, entity_id): + if entity_id not in self._state: + raise Exception(f"Entity {entity_id} not found in internal state") + + return self._state[entity_id]["state"] + + def set(self, entity_id, state): + if entity_id not in self._state: + raise Exception(f"Entity {entity_id} not found in internal state") + + if isinstance(state, State): + new_last_updated = state.last_updated + current_last_updated = self._state[entity_id]["last_updated"] + + if new_last_updated > current_last_updated: + self._state[entity_id]["state"] = state.state + self._state[entity_id]["last_updated"] = state.last_updated + else: + _LOGGER.debug( + "State not updated because it's older than the current state" + ) + else: + self._state[entity_id]["state"] = state + self._state[entity_id]["last_updated"] = dt_util.utcnow() + + def __str__(self) -> str: + return self._state.__str__() diff --git a/occupancy_component/custom_components/occupancy/select.py b/occupancy_component/custom_components/occupancy/select.py index bad72a1c6..fb47095d8 100644 --- a/occupancy_component/custom_components/occupancy/select.py +++ b/occupancy_component/custom_components/occupancy/select.py @@ -8,19 +8,23 @@ from homeassistant.helpers.event import ( async_track_state_change_event, ) -from datetime import datetime - -from homeassistant.components.select import SelectEntity -from homeassistant.helpers.event import async_call_later +from homeassistant.components.select import ( + SelectEntity, + DOMAIN as SELECT_DOMAIN, + SERVICE_SELECT_OPTION, +) from homeassistant.helpers.restore_state import RestoreEntity -from homeassistant.components.binary_sensor import ( - BinarySensorDeviceClass, - BinarySensorEntityDescription, - BinarySensorEntity, +from homeassistant.components.timer import ( + DOMAIN as TIMER_DOMAIN, + SERVICE_START as TIMER_SERVICE_START, + SERVICE_CANCEL as TIMER_SERVICE_CANCEL, + SERVICE_PAUSE as TIMER_SERVICE_PAUSE, + STATUS_IDLE as TIMER_STATUS_IDLE, + STATUS_PAUSED as TIMER_STATUS_PAUSED, + STATUS_ACTIVE as TIMER_STATUS_ACTIVE, ) from homeassistant.const import ( - EVENT_HOMEASSISTANT_STARTED, STATE_OFF, STATE_ON, ) @@ -30,23 +34,22 @@ _LOGGER = logging.getLogger(__name__) from custom_components.occupancy.const import ( - DOMAIN, - ATTR_ENTRY, - ATTR_CONTACT_SENSOR, - ATTR_MOTION_SENSOR, ATTR_AREAS, OCCUPANCY_DATA, ATTR_OCCUPANCY_SENSORS, ATTR_DOORS, + STATUS_ABSENT, + STATUS_ENTERING, + STATUS_ENTERING_CONFIRM, + STATUS_PRESENT, + STATUS_LEAVING, + STATUS_LEAVING_CONFIRM, + ATTR_ENTERING_TIMER, + ATTR_ENTERING_CONFIRM_TIMER, + ATTR_LEAVING_TIMER, + ATTR_LEAVING_CONFIRM_TIMER, ) - -from homeassistant.components.timer import ( - Timer, - CONF_ICON, - CONF_ID, - CONF_NAME, - CONF_DURATION, -) +from custom_components.occupancy.internal_state import InternalState async def async_setup_platform( @@ -71,36 +74,37 @@ async def async_setup_platform( occupancy_sensors = area_config.get(ATTR_OCCUPANCY_SENSORS) doors = area_config.get(ATTR_DOORS) + entering_timer = area_config.get(ATTR_ENTERING_TIMER) + entering_confirm_timer = area_config.get(ATTR_ENTERING_CONFIRM_TIMER) + leaving_timer = area_config.get(ATTR_LEAVING_TIMER) + leaving_confirm_timer = area_config.get(ATTR_LEAVING_CONFIRM_TIMER) - async_add_entities( - [ - Area( - name=area_id, - unique_id=area_id, - occupancy_sensors=occupancy_sensors, - doors=doors, - ), - ] + entity = Area( + name=area_id, + unique_id=area_id, + occupancy_sensors=occupancy_sensors, + doors=doors, + entering_timer=entering_timer, + entering_confirm_timer=entering_confirm_timer, + leaving_timer=leaving_timer, + leaving_confirm_timer=leaving_confirm_timer, ) + area_config["entity"] = entity + + async_add_entities([entity]) + class Area(SelectEntity, RestoreEntity): """Representation of a Adaptive Lighting switch.""" - ABSENT = "absent" - ENTERING = "entering" - ENTERING_CONFIRM = "entering_confirm" - PRESENT = "present" - LEAVING = "leaving" - LEAVING_CONFIRM = "leaving_confirm" - - STATES = [ - ABSENT, - ENTERING, - ENTERING_CONFIRM, - PRESENT, - LEAVING, - LEAVING_CONFIRM, + STATUSES = [ + STATUS_ABSENT, + STATUS_ENTERING, + STATUS_ENTERING_CONFIRM, + STATUS_PRESENT, + STATUS_LEAVING, + STATUS_LEAVING_CONFIRM, ] def __init__( @@ -109,16 +113,31 @@ def __init__( unique_id, occupancy_sensors, doors, + entering_timer, + entering_confirm_timer, + leaving_timer, + leaving_confirm_timer, ): self._attr_name = name self._attr_unique_id = unique_id self._occupancy_sensors = occupancy_sensors self._doors = doors - self._current_state = self.ABSENT + self._current_state = STATUS_ABSENT + self._entering_timer = entering_timer + self._entering_confirm_timer = entering_confirm_timer + self._leaving_timer = leaving_timer + self._leaving_confirm_timer = leaving_confirm_timer + self._timer_mapping = { + STATUS_ENTERING: self._entering_timer, + STATUS_ENTERING_CONFIRM: self._entering_confirm_timer, + STATUS_LEAVING: self._leaving_timer, + STATUS_LEAVING_CONFIRM: self._leaving_confirm_timer, + } + self._internal_state = InternalState() @property def options(self) -> list[str]: - return self.STATES + return self.STATUSES @property def current_option(self) -> str | None: @@ -129,43 +148,235 @@ async def async_added_to_hass(self) -> None: """Run when entity about to be added.""" await super().async_added_to_hass() + self._internal_state.register_entity(self.entity_id, self._current_state) + for door in self._doors: + self._internal_state.register_entity(door) + self.async_on_remove( async_track_state_change_event(self.hass, door, self._door_event) ) - # TODO: - # we need to have a single function which runs whenever a door - # or an occupancy sensor changes state. - # - # Whenever the select changes state we should use a service call - # this way all the state changes, either through the UI or the automation - # go through the same code path. - # - # When the select changes state reset the unnecessary times - # and start the timers that are relevant for that particular state - # - # We need a timer component for each state and inject that timer - # into each area component. Each time a timer triggers when it's done - # we need to call the main calculate state function to determine what to do next. + for occupancy_sensor in self._occupancy_sensors: + self._internal_state.register_entity(occupancy_sensor) + + self.async_on_remove( + async_track_state_change_event( + self.hass, occupancy_sensor, self._occupancy_sensor_event + ) + ) + + for timer in self._timer_mapping.values(): + self._internal_state.register_entity(timer) + + self.async_on_remove( + async_track_state_change_event(self.hass, timer, self._timer_event) + ) + + async def _timer_event(self, event: EventType): + self._internal_state.set(event.data["entity_id"], event.data["new_state"]) + + _LOGGER.debug( + "Called '_timer_event' with data %s - new state %s", + event.data, + self._internal_state, + ) + + await self._calculate_state() + async def _door_event(self, event: EventType): - _LOGGER.debug("Called '_door_event' with data %s", event.data) + self._internal_state.set(event.data["entity_id"], event.data["new_state"]) + + _LOGGER.debug( + "Called '_door_event' with data %s - new state %s", + event.data, + self._internal_state, + ) - if event.data["old_state"] == None: - from_state = None - else: - from_state = event.data["old_state"].state + await self._calculate_state() - to_state = event.data["new_state"].state + async def _occupancy_sensor_event(self, event: EventType): + self._internal_state.set(event.data["entity_id"], event.data["new_state"]) - if self._current_state == self.ABSENT: - if (from_state == STATE_OFF or from_state == None) and to_state == STATE_ON: - self._current_state = self.ENTERING - self.async_write_ha_state() + _LOGGER.debug( + "Called '_occupancy_sensor_event' with data %s - new state %s", + event.data, + self._internal_state, + ) + await self._calculate_state() async def async_select_option(self, option: str) -> None: - """Select new tariff (option).""" - _LOGGER.debug("Called 'async_select_option' with data %s", option) + start_timer = self._timer_mapping.get(option) + cancel_timers = [ + timer for status, timer in self._timer_mapping.items() if status != option + ] + + # We are doing an optimisitc update of the internal state here + # to prevent any race condition inside the _calculate_state function. + # If we don't do this when cancelling/starting timers or updating the + # select state the _calculate_state function might be called while some of the + # other updates are still in progress resulting in the wrong state being calculated. + # Basically we want the update of the select and timers to be atomic or + # in a single transaction, but as those options don't exist we do this + # by tracking internal state and doing an optimistic update. + self._internal_state.set(self.entity_id, option) + + for cancel_timer in cancel_timers: + self._internal_state.set(cancel_timer, TIMER_STATUS_IDLE) + + # It's possible there is no start timer for the current state + if start_timer: + self._internal_state.set(start_timer, TIMER_STATUS_ACTIVE) + + _LOGGER.debug( + "Called 'async_select_option' with data %s - new state %s", + option, + self._internal_state, + ) self._current_state = option self.async_write_ha_state() + + for cancel_timer in cancel_timers: + await self._cancel_timer(cancel_timer) + + if start_timer: + await self._start_timer(start_timer) + + async def _start_timer(self, timer: str): + _LOGGER.debug(f"Starting timer {timer}") + data = { + "entity_id": timer, + "duration": "00:00:10", + } + + await self.hass.services.async_call(TIMER_DOMAIN, TIMER_SERVICE_START, data) + + async def _cancel_timer(self, timer: str): + _LOGGER.debug(f"Cancel timer {timer}") + data = { + "entity_id": timer, + } + + await self.hass.services.async_call(TIMER_DOMAIN, TIMER_SERVICE_CANCEL, data) + + async def _pause_timer(self, timer: str): + _LOGGER.debug(f"Pause timer {timer}") + data = { + "entity_id": timer, + } + + await self.hass.services.async_call(TIMER_DOMAIN, TIMER_SERVICE_PAUSE, data) + + async def _resume_timer(self, timer: str): + _LOGGER.debug(f"Resume timer {timer}") + data = { + "entity_id": timer, + } + + await self.hass.services.async_call(TIMER_DOMAIN, TIMER_SERVICE_START, data) + + def _doors_have_activity(self): + for door in self._doors: + state = self._internal_state.get(door) + + if state is not None and state == STATE_ON: + return True + + return False + + def _area_has_occupancy(self): + for occupancy_sensor in self._occupancy_sensors: + state = self._internal_state.get(occupancy_sensor) + + if state is not None and state == STATE_ON: + return True + + return False + + def _timer_is_idle(self, timer: str): + state = self._internal_state.get(timer) + + if state is None or state == TIMER_STATUS_IDLE: + return True + + return False + + def _timer_is_paused(self, timer: str): + state = self._internal_state.get(timer) + + _LOGGER.debug(f"Got timer state for {timer} {state}") + + if state is None or state == TIMER_STATUS_PAUSED: + return True + + return False + + async def _calculate_state(self): + doors_have_activity = self._doors_have_activity() + + if self._current_state == STATUS_ABSENT: + if doors_have_activity: + await self.async_select_option(STATUS_ENTERING) + + elif self._current_state == STATUS_ENTERING: + entering_timer_idle = self._timer_is_idle(self._entering_timer) + entering_timer_paused = self._timer_is_paused(self._entering_timer) + area_has_occupancy = self._area_has_occupancy() + + if area_has_occupancy: + await self.async_select_option(STATUS_ENTERING_CONFIRM) + + # If there is activity at the door then pause the timer + # until there is no longer activity in which case we resume the timer + elif doors_have_activity: + await self._pause_timer(self._entering_timer) + + elif entering_timer_paused: + await self._resume_timer(self._entering_timer) + + elif entering_timer_idle: + await self.async_select_option(STATUS_ABSENT) + + elif self._current_state == STATUS_ENTERING_CONFIRM: + area_has_occupancy = self._area_has_occupancy() + entering_confirm_timer_idle = self._timer_is_idle( + self._entering_confirm_timer + ) + + if not area_has_occupancy: + await self.async_select_option(STATUS_ENTERING) + elif entering_confirm_timer_idle: + await self.async_select_option(STATUS_PRESENT) + + elif self._current_state == STATUS_PRESENT: + if doors_have_activity: + await self.async_select_option(STATUS_LEAVING) + + elif self._current_state == STATUS_LEAVING: + leaving_timer_idle = self._timer_is_idle(self._leaving_timer) + leaving_timer_paused = self._timer_is_paused(self._leaving_timer) + area_has_occupancy = self._area_has_occupancy() + + if not area_has_occupancy: + await self.async_select_option(STATUS_LEAVING_CONFIRM) + + elif doors_have_activity: + await self._pause_timer(self._leaving_timer) + + elif leaving_timer_paused: + await self._resume_timer(self._leaving_timer) + + elif leaving_timer_idle: + await self.async_select_option(STATUS_PRESENT) + + elif self._current_state == STATUS_LEAVING_CONFIRM: + area_has_occupancy = self._area_has_occupancy() + leaving_confirm_timer_idle = self._timer_is_idle( + self._leaving_confirm_timer + ) + + if area_has_occupancy: + await self.async_select_option(STATUS_LEAVING) + elif leaving_confirm_timer_idle: + await self.async_select_option(STATUS_ABSENT) diff --git a/occupancy_component/custom_components/occupancy/timer.py b/occupancy_component/custom_components/occupancy/timer.py deleted file mode 100644 index d82bab14d..000000000 --- a/occupancy_component/custom_components/occupancy/timer.py +++ /dev/null @@ -1,65 +0,0 @@ -"""Platform for sensor integration.""" - -from __future__ import annotations - -from homeassistant.core import HomeAssistant -from homeassistant.helpers.entity_platform import AddEntitiesCallback -from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType -from homeassistant.helpers.event import ( - async_track_state_change_event, -) -from datetime import datetime - -from homeassistant.components.timer import ( - Timer, - CONF_ICON, - CONF_ID, - CONF_NAME, - CONF_DURATION, -) - - -import logging - -_LOGGER = logging.getLogger(__name__) - -from custom_components.occupancy.const import ( - DOMAIN, - ATTR_ENTRY, - ATTR_CONTACT_SENSOR, - ATTR_MOTION_SENSOR, - ATTR_AREAS, - OCCUPANCY_DATA, - ATTR_OCCUPANCY_SENSORS, - ATTR_DOORS, -) - - -async def async_setup_platform( - hass: HomeAssistant, - config: ConfigType, - async_add_entities: AddEntitiesCallback, - discovery_info: DiscoveryInfoType | None = None, -) -> None: - if discovery_info is None: - return - - _LOGGER.debug( - "timer async_setup_platform called with discovery_info: %s", discovery_info - ) - - area_id = discovery_info["area_id"] - entering_timer_id = f"{area_id}_entering_timer" - - timer_config = { - CONF_ID: entering_timer_id, - CONF_DURATION: "00:00:30", - CONF_NAME: entering_timer_id, - CONF_ICON: "", - } - - async_add_entities([AreaTimer.from_yaml(timer_config)]) - - -class AreaTimer(Timer): - pass diff --git a/occupancy_component/tests/conftest.py b/occupancy_component/tests/conftest.py index 91e8a4676..dc142c1eb 100644 --- a/occupancy_component/tests/conftest.py +++ b/occupancy_component/tests/conftest.py @@ -3,7 +3,13 @@ import pytest from homeassistant.setup import async_setup_component -from custom_components.occupancy.const import DOMAIN +from custom_components.occupancy.const import ( + DOMAIN, + OCCUPANCY_DATA, + ATTR_DOORS, + ATTR_AREAS, + ATTR_TIMER_ENTITIES, +) from homeassistant.const import STATE_ON, STATE_OFF, STATE_UNKNOWN from homeassistant.components.template.const import DOMAIN as TEMPLATE_DOMAIN @@ -11,6 +17,10 @@ from homeassistant.components import binary_sensor from pytest_homeassistant_custom_component.common import MockEntityPlatform +import logging + +_LOGGER = logging.getLogger(__name__) + @pytest.fixture(autouse=True) def auto_enable_custom_integrations(enable_custom_integrations): @@ -42,23 +52,41 @@ async def init_integration(hass) -> None: }, } await async_setup_component(hass, DOMAIN, config) is True + await hass.async_block_till_done() + yield - # TODO: can we also teardown the integration / entities instead of waiting? - # This waits for 24 hours to make sure all the timers are reset - await wait(hass, 86400) + for door in hass.data[OCCUPANCY_DATA][ATTR_DOORS].values(): + await door["entity"].async_remove() + + for area in hass.data[OCCUPANCY_DATA][ATTR_AREAS].values(): + await area["entity"].async_remove() + + for timer in area[ATTR_TIMER_ENTITIES]: + timer.async_cancel() + await timer.async_remove() @pytest.fixture() -def init_entities(hass): +async def init_entities(hass): + tracked_entities = None + async def _init_entities(*entities): + nonlocal tracked_entities entity_platform = MockEntityPlatform( hass, domain=binary_sensor.DOMAIN, platform_name="test", platform=None ) + tracked_entities = entities + await entity_platform.async_add_entities(entities) # We have to wait here, because adding entities to hass will trigger a state change await wait(hass) return entities - return _init_entities + yield _init_entities + + for entity in tracked_entities: + await entity.async_remove() + + await hass.async_block_till_done() diff --git a/occupancy_component/tests/helpers.py b/occupancy_component/tests/helpers.py index d1b791c21..399ea632f 100644 --- a/occupancy_component/tests/helpers.py +++ b/occupancy_component/tests/helpers.py @@ -6,10 +6,8 @@ BinarySensorEntity, ) from homeassistant.helpers.event import async_call_later - -import logging - -_LOGGER = logging.getLogger(__name__) +from homeassistant.components.stream.core import IdleTimer +from custom_components.occupancy.const import OCCUPANCY_DATA, ATTR_AREAS async def wait(hass, seconds=10): @@ -42,22 +40,33 @@ def __init__(self, entity_id, state, timeout): self._attr_device_class = BinarySensorDeviceClass.OCCUPANCY self._timeout = timeout + async def async_added_to_hass(self) -> None: + await super().async_added_to_hass() + + self._reset_motion_presence_timer = IdleTimer( + self.hass, self._timeout, self._reset_motion_presence + ) + # The starting state of the timer should be idle, so + # we're able to differentiate between an event just happened or not. + self._reset_motion_presence_timer.idle = True + + self.async_on_remove( + self._reset_motion_presence_timer.clear, + ) + async def motion(self): self._attr_is_on = True self.async_write_ha_state() - self._listener_reset_motion_presence = async_call_later( - self.hass, self._timeout, self._reset_motion_presence - ) + self._reset_motion_presence_timer.awake() await self.hass.async_block_till_done() async def away(self): self._attr_is_on = False self.async_write_ha_state() + self._reset_motion_presence_timer.clear() await self.hass.async_block_till_done() - async def _reset_motion_presence(self, now): - _LOGGER.debug("Called '_reset_motion_presence'") - self._listener_reset_motion_presence = None + async def _reset_motion_presence(self) -> None: await self.away() @@ -69,3 +78,18 @@ def contact_sensor(entity_id, state): def motion_sensor(entity_id, state, timeout=5): entity = MotionSensor(entity_id, state, timeout) return entity + + +def occupancy_sensor(entity_id, state, timeout=30): + entity = MotionSensor(entity_id, state, timeout) + return entity + + +def get_area(hass, area_id): + return hass.data[OCCUPANCY_DATA][ATTR_AREAS][area_id]["entity"] + + +async def update_area(hass, area_id, new_state): + area = get_area(hass, area_id) + await area.async_select_option(new_state) + await hass.async_block_till_done() diff --git a/occupancy_component/tests/test_area.py b/occupancy_component/tests/test_area.py new file mode 100644 index 000000000..41b986c2e --- /dev/null +++ b/occupancy_component/tests/test_area.py @@ -0,0 +1,543 @@ +"""Test component setup.""" + +from custom_components.occupancy.const import ( + STATUS_ABSENT, + STATUS_ENTERING, + STATUS_ENTERING_CONFIRM, + STATUS_PRESENT, + STATUS_LEAVING, + STATUS_LEAVING_CONFIRM, +) +from tests.helpers import ( + wait, + contact_sensor, + motion_sensor, + occupancy_sensor, + update_area, +) +from homeassistant.components.timer import ( + STATUS_IDLE as TIMER_STATUS_IDLE, + STATUS_PAUSED as TIMER_STATUS_PAUSED, + STATUS_ACTIVE as TIMER_STATUS_ACTIVE, +) + + +async def test_area_absent(hass, init_integration, init_entities): + await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + ) + + assert hass.states.get("select.hallway").state == STATUS_ABSENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + +async def test_area_absent_door_closed_has_motion( + hass, init_integration, init_entities +): + [front_door_contact, front_door_motion] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + motion_sensor("binary_sensor.front_door_motion", False), + ) + + assert hass.states.get("select.hallway").state == STATUS_ABSENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + await front_door_motion.motion() + + assert hass.states.get("select.hallway").state == STATUS_ABSENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + # Wait for the motion sensor to become inactive + await wait(hass, 5) + + assert hass.states.get("select.hallway").state == STATUS_ABSENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + +async def test_area_entering_door_opens(hass, init_integration, init_entities): + [front_door_contact] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + ) + + assert hass.states.get("select.hallway").state == STATUS_ABSENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + await front_door_contact.open() + + assert hass.states.get("select.hallway").state == STATUS_ENTERING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_PAUSED + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + # Wait for the door to become inactive to trigger the timer to become active + await wait(hass, 5) + + assert hass.states.get("select.hallway").state == STATUS_ENTERING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_ACTIVE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + await wait(hass, 10) + + assert hass.states.get("select.hallway").state == STATUS_ABSENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + +async def test_area_entering_door_open_has_motion( + hass, init_integration, init_entities +): + [front_door_contact, front_door_motion] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", True), + motion_sensor("binary_sensor.front_door_motion", False), + ) + + assert hass.states.get("select.hallway").state == STATUS_ABSENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + await front_door_motion.motion() + + assert hass.states.get("select.hallway").state == STATUS_ENTERING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_PAUSED + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + # Wait for the motion sensor to become inactive to trigger the timer to become active + await wait(hass, 5) + + assert hass.states.get("select.hallway").state == STATUS_ENTERING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_ACTIVE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + +async def test_area_entering_has_occupancy(hass, init_integration, init_entities): + [front_door_contact, front_door_motion, hallway_occupancy] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + motion_sensor("binary_sensor.front_door_motion", False), + occupancy_sensor("binary_sensor.hallway_occupancy", False), + ) + + await update_area(hass, "hallway", STATUS_ENTERING) + + assert hass.states.get("select.hallway").state == STATUS_ENTERING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_ACTIVE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + await hallway_occupancy.motion() + + assert hass.states.get("select.hallway").state == STATUS_ENTERING_CONFIRM + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert ( + hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_ACTIVE + ) + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + await wait(hass, 10) + + assert hass.states.get("select.hallway").state == STATUS_PRESENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + +async def test_area_entering_without_occupancy(hass, init_integration, init_entities): + [front_door_contact, front_door_motion, hallway_occupancy] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + motion_sensor("binary_sensor.front_door_motion", False), + occupancy_sensor("binary_sensor.hallway_occupancy", False), + ) + + await update_area(hass, "hallway", STATUS_ENTERING) + + assert hass.states.get("select.hallway").state == STATUS_ENTERING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_ACTIVE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + await wait(hass, 10) + + assert hass.states.get("select.hallway").state == STATUS_ABSENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + +async def test_area_entering_door_opens(hass, init_integration, init_entities): + [front_door_contact, front_door_motion, hallway_occupancy] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + motion_sensor("binary_sensor.front_door_motion", False), + occupancy_sensor("binary_sensor.hallway_occupancy", False), + ) + + await update_area(hass, "hallway", STATUS_ENTERING) + + assert hass.states.get("select.hallway").state == STATUS_ENTERING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_ACTIVE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + await front_door_contact.open() + + assert hass.states.get("select.hallway").state == STATUS_ENTERING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_PAUSED + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + # Wait for the door to become inactive to trigger the timer to become active + await wait(hass, 5) + + assert hass.states.get("select.hallway").state == STATUS_ENTERING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_ACTIVE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + await wait(hass, 15) + + assert hass.states.get("select.hallway").state == STATUS_ABSENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + +async def test_area_entering_confirm_has_occupancy( + hass, init_integration, init_entities +): + [front_door_contact, front_door_motion, hallway_occupancy] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + motion_sensor("binary_sensor.front_door_motion", False), + occupancy_sensor("binary_sensor.hallway_occupancy", True), + ) + + await update_area(hass, "hallway", STATUS_ENTERING_CONFIRM) + + assert hass.states.get("select.hallway").state == STATUS_ENTERING_CONFIRM + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert ( + hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_ACTIVE + ) + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + # Wait for the confirm timer to expire + await wait(hass, 10) + + assert hass.states.get("select.hallway").state == STATUS_PRESENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + +async def test_area_entering_confirm_no_occupancy( + hass, init_integration, init_entities +): + [front_door_contact, front_door_motion, hallway_occupancy] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + motion_sensor("binary_sensor.front_door_motion", False), + occupancy_sensor("binary_sensor.hallway_occupancy", True), + ) + + await update_area(hass, "hallway", STATUS_ENTERING_CONFIRM) + + assert hass.states.get("select.hallway").state == STATUS_ENTERING_CONFIRM + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert ( + hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_ACTIVE + ) + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + await hallway_occupancy.away() + + assert hass.states.get("select.hallway").state == STATUS_ENTERING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_ACTIVE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + +async def test_area_present_occupancy_goes_away(hass, init_integration, init_entities): + [front_door_contact, front_door_motion, hallway_occupancy] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + motion_sensor("binary_sensor.front_door_motion", False), + occupancy_sensor("binary_sensor.hallway_occupancy", True), + ) + + await update_area(hass, "hallway", STATUS_PRESENT) + + assert hass.states.get("select.hallway").state == STATUS_PRESENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + await hallway_occupancy.away() + + assert hass.states.get("select.hallway").state == STATUS_PRESENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + +async def test_area_present_occupancy_door_opens(hass, init_integration, init_entities): + [front_door_contact, front_door_motion, hallway_occupancy] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + motion_sensor("binary_sensor.front_door_motion", False), + occupancy_sensor("binary_sensor.hallway_occupancy", True), + ) + + await update_area(hass, "hallway", STATUS_PRESENT) + + assert hass.states.get("select.hallway").state == STATUS_PRESENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + await front_door_contact.open() + + assert hass.states.get("select.hallway").state == STATUS_LEAVING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_PAUSED + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + # Wait for the door activity to go away + await wait(hass, 5) + + assert hass.states.get("select.hallway").state == STATUS_LEAVING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_ACTIVE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + # Wait for both the door activity and the leaving timer to expire + await wait(hass, 15) + + assert hass.states.get("select.hallway").state == STATUS_PRESENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + +async def test_area_present_occupancy_away_door_opens( + hass, init_integration, init_entities +): + [front_door_contact, front_door_motion, hallway_occupancy] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + motion_sensor("binary_sensor.front_door_motion", False), + occupancy_sensor("binary_sensor.hallway_occupancy", False), + ) + + await update_area(hass, "hallway", STATUS_PRESENT) + + assert hass.states.get("select.hallway").state == STATUS_PRESENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + await front_door_contact.open() + + assert hass.states.get("select.hallway").state == STATUS_LEAVING_CONFIRM + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_ACTIVE + + # Wait for the leaving confirm timer to expire + await wait(hass, 10) + + assert hass.states.get("select.hallway").state == STATUS_ABSENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + +async def test_area_leaving_has_occupancy(hass, init_integration, init_entities): + [front_door_contact, front_door_motion, hallway_occupancy] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + motion_sensor("binary_sensor.front_door_motion", False), + occupancy_sensor("binary_sensor.hallway_occupancy", True), + ) + + await update_area(hass, "hallway", STATUS_LEAVING) + + assert hass.states.get("select.hallway").state == STATUS_LEAVING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_ACTIVE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + # Wait for the leaving timer to expire + await wait(hass, 10) + + assert hass.states.get("select.hallway").state == STATUS_PRESENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + +async def test_area_leaving_occupancy_goes_away(hass, init_integration, init_entities): + [front_door_contact, front_door_motion, hallway_occupancy] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + motion_sensor("binary_sensor.front_door_motion", False), + occupancy_sensor("binary_sensor.hallway_occupancy", True), + ) + + await update_area(hass, "hallway", STATUS_LEAVING) + + assert hass.states.get("select.hallway").state == STATUS_LEAVING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_ACTIVE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + await hallway_occupancy.away() + + assert hass.states.get("select.hallway").state == STATUS_LEAVING_CONFIRM + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_ACTIVE + + # Wait for the leaving confirm timer to expire + await wait(hass, 10) + + assert hass.states.get("select.hallway").state == STATUS_ABSENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + +async def test_area_leaving_door_open_has_motion(hass, init_integration, init_entities): + [front_door_contact, front_door_motion, hallway_occupancy] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", True), + motion_sensor("binary_sensor.front_door_motion", False), + occupancy_sensor("binary_sensor.hallway_occupancy", True), + ) + + await update_area(hass, "hallway", STATUS_LEAVING) + + assert hass.states.get("select.hallway").state == STATUS_LEAVING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_ACTIVE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + await front_door_motion.motion() + + assert hass.states.get("select.hallway").state == STATUS_LEAVING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_PAUSED + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + # Wait for the motion sensor to become inactive to trigger the timer to become active + await wait(hass, 5) + + assert hass.states.get("select.hallway").state == STATUS_LEAVING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_ACTIVE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + +async def test_area_leaving_confirm_no_occupancy(hass, init_integration, init_entities): + [front_door_contact, front_door_motion, hallway_occupancy] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", True), + motion_sensor("binary_sensor.front_door_motion", False), + occupancy_sensor("binary_sensor.hallway_occupancy", False), + ) + + await update_area(hass, "hallway", STATUS_LEAVING_CONFIRM) + + assert hass.states.get("select.hallway").state == STATUS_LEAVING_CONFIRM + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_ACTIVE + + # Wait for the leaving confirm timer to expire + await wait(hass, 10) + + assert hass.states.get("select.hallway").state == STATUS_ABSENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + +async def test_area_leaving_confirm_has_occupancy( + hass, init_integration, init_entities +): + [front_door_contact, front_door_motion, hallway_occupancy] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", True), + motion_sensor("binary_sensor.front_door_motion", False), + occupancy_sensor("binary_sensor.hallway_occupancy", False), + ) + + await update_area(hass, "hallway", STATUS_LEAVING_CONFIRM) + + assert hass.states.get("select.hallway").state == STATUS_LEAVING_CONFIRM + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_ACTIVE + + await hallway_occupancy.motion() + + assert hass.states.get("select.hallway").state == STATUS_LEAVING + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_ACTIVE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE + + # Wait for the leaving timer to expire + await wait(hass, 10) + + assert hass.states.get("select.hallway").state == STATUS_PRESENT + assert hass.states.get("timer.hallway_entering").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_entering_confirm").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving").state == TIMER_STATUS_IDLE + assert hass.states.get("timer.hallway_leaving_confirm").state == TIMER_STATUS_IDLE diff --git a/occupancy_component/tests/test_door.py b/occupancy_component/tests/test_door.py new file mode 100644 index 000000000..8a8b4f0b8 --- /dev/null +++ b/occupancy_component/tests/test_door.py @@ -0,0 +1,192 @@ +"""Test component setup.""" + +from homeassistant.const import STATE_ON, STATE_OFF, STATE_UNKNOWN +from custom_components.occupancy.const import DOMAIN +from tests.helpers import wait, contact_sensor, motion_sensor + + +async def test_door_opens(hass, init_integration, init_entities): + [front_door_contact] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + ) + + assert hass.states.get("binary_sensor.front_door").state == STATE_OFF + assert ( + hass.states.get("binary_sensor.front_door").attributes["door_state"] == "closed" + ) + + await front_door_contact.open() + + assert hass.states.get("binary_sensor.front_door").state == STATE_ON + assert ( + hass.states.get("binary_sensor.front_door").attributes["door_state"] == "open" + ) + + await wait(hass) + + assert hass.states.get("binary_sensor.front_door").state == STATE_OFF + assert ( + hass.states.get("binary_sensor.front_door").attributes["door_state"] == "open" + ) + + +async def test_door_closes(hass, init_integration, init_entities): + [front_door_contact] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", STATE_ON), + ) + + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON + assert hass.states.get("binary_sensor.front_door").state == STATE_OFF + + await front_door_contact.close() + + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_OFF + assert hass.states.get("binary_sensor.front_door").state == STATE_ON + + await wait(hass) + + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_OFF + assert hass.states.get("binary_sensor.front_door").state == STATE_OFF + + +async def test_door_unknown_to_open(hass, init_integration, init_entities): + [front_door_contact] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", None), + ) + + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_UNKNOWN + assert hass.states.get("binary_sensor.front_door").state == STATE_OFF + + await front_door_contact.open() + + assert hass.states.get("binary_sensor.front_door").state == STATE_OFF + + +async def test_door_unknown_to_close(hass, init_integration, init_entities): + [front_door_contact] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", None), + ) + + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_UNKNOWN + assert hass.states.get("binary_sensor.front_door").state == STATE_OFF + + await front_door_contact.close() + + assert hass.states.get("binary_sensor.front_door").state == STATE_OFF + + +async def test_door_open_with_motion(hass, init_integration, init_entities): + [front_door_contact, front_door_motion] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", True), + motion_sensor("binary_sensor.front_door_motion", False), + ) + + assert hass.states.get("binary_sensor.front_door").state == STATE_OFF + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON + assert hass.states.get("binary_sensor.front_door_motion").state == STATE_OFF + + await front_door_motion.motion() + + assert hass.states.get("binary_sensor.front_door").state == STATE_ON + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON + assert hass.states.get("binary_sensor.front_door_motion").state == STATE_ON + + await wait(hass) + + assert hass.states.get("binary_sensor.front_door").state == STATE_OFF + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON + assert hass.states.get("binary_sensor.front_door_motion").state == STATE_OFF + + +async def test_door_closed_with_motion(hass, init_integration, init_entities): + [front_door_contact, front_door_motion] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + motion_sensor("binary_sensor.front_door_motion", False), + ) + + assert hass.states.get("binary_sensor.front_door").state == STATE_OFF + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_OFF + assert hass.states.get("binary_sensor.front_door_motion").state == STATE_OFF + + await front_door_motion.motion() + + assert hass.states.get("binary_sensor.front_door").state == STATE_OFF + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_OFF + assert hass.states.get("binary_sensor.front_door_motion").state == STATE_ON + + await wait(hass) + + assert hass.states.get("binary_sensor.front_door").state == STATE_OFF + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_OFF + assert hass.states.get("binary_sensor.front_door_motion").state == STATE_OFF + + +async def test_motion_with_door_open(hass, init_integration, init_entities): + [front_door_contact, front_door_motion] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + motion_sensor("binary_sensor.front_door_motion", False), + ) + + await front_door_motion.motion() + + assert hass.states.get("binary_sensor.front_door").state == STATE_OFF + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_OFF + assert hass.states.get("binary_sensor.front_door_motion").state == STATE_ON + + await front_door_contact.open() + + assert hass.states.get("binary_sensor.front_door").state == STATE_ON + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON + assert hass.states.get("binary_sensor.front_door_motion").state == STATE_ON + + await wait(hass) + + assert hass.states.get("binary_sensor.front_door").state == STATE_OFF + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON + assert hass.states.get("binary_sensor.front_door_motion").state == STATE_OFF + + +async def test_motion_away_does_not_remove_contact_presence( + hass, init_integration, init_entities +): + [front_door_contact, front_door_motion] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + motion_sensor("binary_sensor.front_door_motion", False, timeout=2), + ) + + await front_door_motion.motion() + await front_door_contact.open() + + assert hass.states.get("binary_sensor.front_door").state == STATE_ON + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON + assert hass.states.get("binary_sensor.front_door_motion").state == STATE_ON + + # This makes sure the motion sensor is set to away + await wait(hass, 2) + + assert hass.states.get("binary_sensor.front_door").state == STATE_ON + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON + assert hass.states.get("binary_sensor.front_door_motion").state == STATE_OFF + + +async def test_contact_timeout_does_not_remove_motion_presence( + hass, init_integration, init_entities +): + [front_door_contact, front_door_motion] = await init_entities( + contact_sensor("binary_sensor.front_door_contact", False), + motion_sensor("binary_sensor.front_door_motion", False, timeout=10), + ) + + await front_door_motion.motion() + await front_door_contact.open() + + assert hass.states.get("binary_sensor.front_door").state == STATE_ON + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON + assert hass.states.get("binary_sensor.front_door_motion").state == STATE_ON + + # This makes sure the contact sensor is set to away + await wait(hass, 5) + + assert hass.states.get("binary_sensor.front_door").state == STATE_ON + assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON + assert hass.states.get("binary_sensor.front_door_motion").state == STATE_ON diff --git a/occupancy_component/tests/test_init.py b/occupancy_component/tests/test_init.py index c4b40912b..c51750f11 100644 --- a/occupancy_component/tests/test_init.py +++ b/occupancy_component/tests/test_init.py @@ -1,9 +1,7 @@ """Test component setup.""" from homeassistant.setup import async_setup_component -from homeassistant.const import STATE_ON, STATE_OFF, STATE_UNKNOWN from custom_components.occupancy.const import DOMAIN -from tests.helpers import wait, contact_sensor, motion_sensor async def test_async_setup(hass): @@ -28,190 +26,3 @@ async def test_async_setup(hass): result = await async_setup_component(hass, DOMAIN, config) await hass.async_block_till_done() assert result is True - - -async def test_door_opens(hass, init_integration, init_entities): - [front_door_contact] = await init_entities( - contact_sensor("binary_sensor.front_door_contact", False), - ) - - assert hass.states.get("binary_sensor.front_door").state == STATE_OFF - assert ( - hass.states.get("binary_sensor.front_door").attributes["door_state"] == "closed" - ) - - await front_door_contact.open() - - assert hass.states.get("binary_sensor.front_door").state == STATE_ON - assert ( - hass.states.get("binary_sensor.front_door").attributes["door_state"] == "open" - ) - - await wait(hass) - - assert hass.states.get("binary_sensor.front_door").state == STATE_OFF - assert ( - hass.states.get("binary_sensor.front_door").attributes["door_state"] == "open" - ) - - -async def test_door_closes(hass, init_integration, init_entities): - [front_door_contact] = await init_entities( - contact_sensor("binary_sensor.front_door_contact", STATE_ON), - ) - - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON - assert hass.states.get("binary_sensor.front_door").state == STATE_OFF - - await front_door_contact.close() - - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_OFF - assert hass.states.get("binary_sensor.front_door").state == STATE_ON - - await wait(hass) - - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_OFF - assert hass.states.get("binary_sensor.front_door").state == STATE_OFF - - -async def test_door_unknown_to_open(hass, init_integration, init_entities): - [front_door_contact] = await init_entities( - contact_sensor("binary_sensor.front_door_contact", None), - ) - - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_UNKNOWN - assert hass.states.get("binary_sensor.front_door").state == STATE_OFF - - await front_door_contact.open() - - assert hass.states.get("binary_sensor.front_door").state == STATE_OFF - - -async def test_door_unknown_to_close(hass, init_integration, init_entities): - [front_door_contact] = await init_entities( - contact_sensor("binary_sensor.front_door_contact", None), - ) - - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_UNKNOWN - assert hass.states.get("binary_sensor.front_door").state == STATE_OFF - - await front_door_contact.close() - - assert hass.states.get("binary_sensor.front_door").state == STATE_OFF - - -async def test_door_open_with_motion(hass, init_integration, init_entities): - [front_door_contact, front_door_motion] = await init_entities( - contact_sensor("binary_sensor.front_door_contact", True), - motion_sensor("binary_sensor.front_door_motion", False), - ) - - assert hass.states.get("binary_sensor.front_door").state == STATE_OFF - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON - assert hass.states.get("binary_sensor.front_door_motion").state == STATE_OFF - - await front_door_motion.motion() - - assert hass.states.get("binary_sensor.front_door").state == STATE_ON - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON - assert hass.states.get("binary_sensor.front_door_motion").state == STATE_ON - - await wait(hass) - - assert hass.states.get("binary_sensor.front_door").state == STATE_OFF - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON - assert hass.states.get("binary_sensor.front_door_motion").state == STATE_OFF - - -async def test_door_closed_with_motion(hass, init_integration, init_entities): - [front_door_contact, front_door_motion] = await init_entities( - contact_sensor("binary_sensor.front_door_contact", False), - motion_sensor("binary_sensor.front_door_motion", False), - ) - - assert hass.states.get("binary_sensor.front_door").state == STATE_OFF - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_OFF - assert hass.states.get("binary_sensor.front_door_motion").state == STATE_OFF - - await front_door_motion.motion() - - assert hass.states.get("binary_sensor.front_door").state == STATE_OFF - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_OFF - assert hass.states.get("binary_sensor.front_door_motion").state == STATE_ON - - await wait(hass) - - assert hass.states.get("binary_sensor.front_door").state == STATE_OFF - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_OFF - assert hass.states.get("binary_sensor.front_door_motion").state == STATE_OFF - - -async def test_motion_with_door_open(hass, init_integration, init_entities): - [front_door_contact, front_door_motion] = await init_entities( - contact_sensor("binary_sensor.front_door_contact", False), - motion_sensor("binary_sensor.front_door_motion", False), - ) - - await front_door_motion.motion() - - assert hass.states.get("binary_sensor.front_door").state == STATE_OFF - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_OFF - assert hass.states.get("binary_sensor.front_door_motion").state == STATE_ON - - await front_door_contact.open() - - assert hass.states.get("binary_sensor.front_door").state == STATE_ON - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON - assert hass.states.get("binary_sensor.front_door_motion").state == STATE_ON - - await wait(hass) - - assert hass.states.get("binary_sensor.front_door").state == STATE_OFF - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON - assert hass.states.get("binary_sensor.front_door_motion").state == STATE_OFF - - -async def test_motion_away_does_not_remove_contact_presence( - hass, init_integration, init_entities -): - [front_door_contact, front_door_motion] = await init_entities( - contact_sensor("binary_sensor.front_door_contact", False), - motion_sensor("binary_sensor.front_door_motion", False, timeout=2), - ) - - await front_door_motion.motion() - await front_door_contact.open() - - assert hass.states.get("binary_sensor.front_door").state == STATE_ON - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON - assert hass.states.get("binary_sensor.front_door_motion").state == STATE_ON - - # This makes sure the motion sensor is set to away - await wait(hass, 2) - - assert hass.states.get("binary_sensor.front_door").state == STATE_ON - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON - assert hass.states.get("binary_sensor.front_door_motion").state == STATE_OFF - - -async def test_contact_timeout_does_not_remove_motion_presence( - hass, init_integration, init_entities -): - [front_door_contact, front_door_motion] = await init_entities( - contact_sensor("binary_sensor.front_door_contact", False), - motion_sensor("binary_sensor.front_door_motion", False, timeout=10), - ) - - await front_door_motion.motion() - await front_door_contact.open() - - assert hass.states.get("binary_sensor.front_door").state == STATE_ON - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON - assert hass.states.get("binary_sensor.front_door_motion").state == STATE_ON - - # This makes sure the contact sensor is set to away - await wait(hass, 5) - - assert hass.states.get("binary_sensor.front_door").state == STATE_ON - assert hass.states.get("binary_sensor.front_door_contact").state == STATE_ON - assert hass.states.get("binary_sensor.front_door_motion").state == STATE_ON