Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate device diagnostics and create a tool to regenerate them #284

Draft
wants to merge 18 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ To bootstrap a development environment for ZHA, follow these steps:
- `pre-commit install`: This command installs Git pre-commit hooks for the project. Pre-commit hooks are scripts that run before each commit is made in a Git repository, allowing you to enforce certain checks or actions before committing changes.
</details>

4. If creating new entities or modifying existing ones, unit tests that use device diagnostic files will fail. You can regenerate existing device diagnostic JSON files to incorporate the new entities:

```shell
python -m tools.regenerate_diagnostics
```

## License

ZHA is released under the [Apache 2.0 License](https://opensource.org/license/apache-2-0). Please refer to the LICENSE file for more details
218 changes: 112 additions & 106 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import asyncio
from collections.abc import Awaitable, Callable
from contextlib import suppress
from datetime import datetime
import itertools
import json
import logging
Expand Down Expand Up @@ -313,123 +313,118 @@ def zigpy_device_from_device_data(
quirk: Optional[Callable] = None,
) -> zigpy.device.Device:
"""Make a fake device using the specified cluster classes."""
ieee = zigpy.types.EUI64.convert(device_data["ieee"])
nwk = device_data["nwk"]
manufacturer = device_data["manufacturer"]
model = device_data["model"]
node_descriptor = device_data["signature"]["node_descriptor"]
endpoints = device_data["signature"]["endpoints"]
cluster_data = device_data["cluster_details"]

device = zigpy.device.Device(app, ieee, nwk)
device.manufacturer = manufacturer
device.model = model

node_desc = zdo_t.NodeDescriptor(
logical_type=node_descriptor["logical_type"],
complex_descriptor_available=node_descriptor["complex_descriptor_available"],
user_descriptor_available=node_descriptor["user_descriptor_available"],
reserved=node_descriptor["reserved"],
aps_flags=node_descriptor["aps_flags"],
frequency_band=node_descriptor["frequency_band"],
mac_capability_flags=node_descriptor["mac_capability_flags"],
manufacturer_code=node_descriptor["manufacturer_code"],
maximum_buffer_size=node_descriptor["maximum_buffer_size"],
maximum_incoming_transfer_size=node_descriptor[
"maximum_incoming_transfer_size"
],
server_mask=node_descriptor["server_mask"],
maximum_outgoing_transfer_size=node_descriptor[
"maximum_outgoing_transfer_size"
],
descriptor_capability_field=node_descriptor["descriptor_capability_field"],
device = zigpy.device.Device(
application=app,
ieee=zigpy.types.EUI64.convert(device_data["ieee"]),
nwk=zigpy.types.NWK.convert(device_data["nwk"][2:]),
)
device.node_desc = node_desc
device.last_seen = time.time()
device.manufacturer = device_data["manufacturer"]
device.model = device_data["model"]
device.last_seen = datetime.fromisoformat(device_data["last_seen"])
device.lqi = int(device_data["lqi"]) if device_data["lqi"] is not None else None
device.rssi = int(device_data["rssi"]) if device_data["rssi"] is not None else None
device.node_desc = zdo_t.NodeDescriptor(**device_data["node_descriptor"])

if device_data.get("original_signature", {}):
for epid, ep in device_data["original_signature"]["endpoints"].items():
endpoint = device.add_endpoint(int(epid))
profile_id = int(ep["profile_id"], 16)
device_type = int(ep["device_type"], 16)

if profile_id in zigpy.profiles.PROFILES:
profile = zigpy.profiles.PROFILES[profile_id]
endpoint.profile_id = profile.PROFILE_ID
endpoint.device_type = profile.DeviceType(device_type)
else:
endpoint.profile_id = profile_id
endpoint.device_type = device_type

orig_endpoints = (
device_data["original_signature"]["endpoints"]
if "original_signature" in device_data
else endpoints
)
for epid, ep in orig_endpoints.items():
endpoint = device.add_endpoint(int(epid))
profile = None
with suppress(Exception):
profile = zigpy.profiles.PROFILES[int(ep["profile_id"], 16)]

endpoint.device_type = (
profile.DeviceType(int(ep["device_type"], 16))
if profile
else int(ep["device_type"], 16)
)
endpoint.profile_id = (
profile.PROFILE_ID if profile else int(ep["profile_id"], 16)
)
endpoint.request = AsyncMock(return_value=[0])
for cluster_id in ep["input_clusters"]:
endpoint.add_input_cluster(int(cluster_id, 16))

for cluster_id in ep["output_clusters"]:
endpoint.add_output_cluster(int(cluster_id, 16))
else:
for epid, ep in device_data["endpoints"].items():
endpoint = device.add_endpoint(int(epid))

profile_id = ep["profile_id"]
device_type = ep["device_type"]["id"]

for cluster_id in ep["input_clusters"]:
endpoint.add_input_cluster(int(cluster_id, 16))
if profile_id in zigpy.profiles.PROFILES:
profile = zigpy.profiles.PROFILES[profile_id]
endpoint.profile_id = profile.PROFILE_ID
endpoint.device_type = profile.DeviceType(device_type)
else:
endpoint.profile_id = profile_id
endpoint.device_type = device_type

for cluster in ep["in_clusters"]:
endpoint.add_input_cluster(int(cluster["cluster_id"], 16))

for cluster_id in ep["output_clusters"]:
endpoint.add_output_cluster(int(cluster_id, 16))
for cluster in ep["out_clusters"]:
endpoint.add_output_cluster(int(cluster["cluster_id"], 16))

if quirk:
device = quirk(app, device.ieee, device.nwk, device)
else:
device = quirks_get_device(device)

for epid, ep in cluster_data.items():
for epid, ep in device_data["endpoints"].items():
endpoint = device.endpoints[int(epid)]
endpoint.request = AsyncMock(return_value=[0])
for cluster_id, cluster in ep["in_clusters"].items():
real_cluster = device.endpoints[int(epid)].in_clusters[int(cluster_id, 16)]
if patch_cluster:
patch_cluster_for_testing(real_cluster)
for attr_id, attr in cluster["attributes"].items():
if (
attr["value"] is None
or attr_id in cluster["unsupported_attributes"]
):
continue
real_cluster._attr_cache[int(attr_id, 16)] = attr["value"]
real_cluster.PLUGGED_ATTR_READS[int(attr_id, 16)] = attr["value"]
for unsupported_attr in cluster["unsupported_attributes"]:
if isinstance(unsupported_attr, str) and unsupported_attr.startswith(
"0x"
):
attrid = int(unsupported_attr, 16)
real_cluster.unsupported_attributes.add(attrid)
if attrid in real_cluster.attributes:
real_cluster.unsupported_attributes.add(
real_cluster.attributes[attrid].name
)
else:
real_cluster.unsupported_attributes.add(unsupported_attr)

for cluster_id, cluster in ep["out_clusters"].items():
real_cluster = device.endpoints[int(epid)].out_clusters[int(cluster_id, 16)]
if patch_cluster:
patch_cluster_for_testing(real_cluster)
for attr_id, attr in cluster["attributes"].items():
if (
attr["value"] is None
or attr_id in cluster["unsupported_attributes"]
):
continue
real_cluster._attr_cache[int(attr_id, 16)] = attr["value"]
real_cluster.PLUGGED_ATTR_READS[int(attr_id, 16)] = attr["value"]
for unsupported_attr in cluster["unsupported_attributes"]:
if isinstance(unsupported_attr, str) and unsupported_attr.startswith(
"0x"
):
attrid = int(unsupported_attr, 16)
real_cluster.unsupported_attributes.add(attrid)
if attrid in real_cluster.attributes:
real_cluster.unsupported_attributes.add(
real_cluster.attributes[attrid].name
)
else:
real_cluster.unsupported_attributes.add(unsupported_attr)

for cluster_type in ("in_clusters", "out_clusters"):
for cluster in ep[cluster_type]:
real_cluster = getattr(endpoint, cluster_type)[
int(cluster["cluster_id"], 16)
]

if patch_cluster:
patch_cluster_for_testing(real_cluster)

for attr in cluster["attributes"]:
attrid = int(attr["id"], 16)

if attr["value"] is not None:
real_cluster._attr_cache[attrid] = attr["value"]
real_cluster.PLUGGED_ATTR_READS[attrid] = attr["value"]

if attr["unsupported"]:
real_cluster.unsupported_attributes.add(attrid)

if attr["name"] is not None:
real_cluster.unsupported_attributes.add(attr["name"])

for obj in device_data["neighbors"]:
app.topology.neighbors[device.ieee].append(
zdo_t.Neighbor(
device_type=zdo_t.Neighbor.DeviceType[obj["device_type"]],
rx_on_when_idle=zdo_t.Neighbor.RxOnWhenIdle[obj["rx_on_when_idle"]],
relationship=zdo_t.Neighbor.Relationship[obj["relationship"]],
extended_pan_id=t.ExtendedPanId.convert(obj["extended_pan_id"]),
ieee=t.EUI64.convert(obj["ieee"]),
nwk=t.NWK.convert(obj["nwk"][2:]),
permit_joining=zdo_t.Neighbor.PermitJoins[obj["permit_joining"]],
reserved2=0,
depth=obj["depth"],
lqi=obj["lqi"],
)
)

for obj in device_data["routes"]:
app.topology.routes[device.ieee].append(
zdo_t.Route(
DstNWK=t.NWK.convert(obj["dest_nwk"][2:]),
RouteStatus=zdo_t.RouteStatus[obj["route_status"]],
MemoryConstrained=obj["memory_constrained"],
ManyToOne=obj["many_to_one"],
RouteRecordRequired=obj["route_record_required"],
Reserved=0,
NextHop=t.NWK.convert(obj["next_hop"][2:]),
)
)

return device

Expand Down Expand Up @@ -542,3 +537,14 @@ def create_mock_zigpy_device(
cluster._attr_cache[attr_id] = value

return device


class ZhaJsonEncoder(json.JSONEncoder):
"""JSON encoder to handle common Python data types, currently just `set`."""

def default(self, obj):
"""Convert non-JSON types."""
if isinstance(obj, set):
return sorted(obj, key=repr)

return super().default(obj)
67 changes: 41 additions & 26 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ async def request(
expect_reply: bool = True,
use_ieee: bool = False,
extended_timeout: bool = False,
ask_for_ack: bool | None = None,
priority: int = zigpy.types.PacketPriority.NORMAL,
):
pass

Expand Down Expand Up @@ -199,9 +201,9 @@ def verify_cleanup(
)


@pytest.fixture(name="zigpy_app_controller")
async def zigpy_app_controller_fixture():
"""Zigpy ApplicationController fixture."""
@contextmanager
def make_zigpy_app_controller():
"""Mock zigpy ApplicationController."""
app = _FakeApp(
{
zigpy.config.CONF_DATABASE: None,
Expand Down Expand Up @@ -243,17 +245,22 @@ async def zigpy_app_controller_fixture():
yield app


@pytest.fixture()
def zigpy_app_controller():
"""Zigpy ApplicationController fixture."""
with make_zigpy_app_controller() as app:
yield app


@pytest.fixture(name="caplog")
def caplog_fixture(caplog: pytest.LogCaptureFixture) -> pytest.LogCaptureFixture:
"""Set log level to debug for tests using the caplog fixture."""
caplog.set_level(logging.DEBUG)
return caplog


@pytest.fixture(name="zha_data")
def zha_data_fixture() -> ZHAData:
"""Fixture representing zha configuration data."""

def make_zha_data() -> ZHAData:
"""Create ZHA data."""
return ZHAData(
config=ZHAConfiguration(
coordinator_configuration=CoordinatorConfiguration(
Expand All @@ -273,21 +280,40 @@ def zha_data_fixture() -> ZHAData:
)


@pytest.fixture(name="zha_data")
def zha_data_fixture() -> ZHAData:
"""Fixture representing zha configuration data."""
return make_zha_data()


class TestGateway:
"""Test ZHA gateway context manager."""

def __init__(self, data: ZHAData):
def __init__(self, data: ZHAData, app: ControllerApplication):
"""Initialize the ZHA gateway."""
self.zha_data: ZHAData = data
self.zha_gateway: Gateway
self.app = app

async def __aenter__(self) -> Gateway:
"""Start the ZHA gateway."""
self.zha_gateway = await Gateway.async_from_config(self.zha_data)
await self.zha_gateway.async_initialize()
await self.zha_gateway.async_block_till_done()
await self.zha_gateway.async_initialize_devices_and_entities()
INSTANCES.append(self.zha_gateway)

with (
patch(
"bellows.zigbee.application.ControllerApplication.new",
return_value=self.app,
),
patch(
"bellows.zigbee.application.ControllerApplication",
return_value=self.app,
),
):
self.zha_gateway = await Gateway.async_from_config(self.zha_data)
await self.zha_gateway.async_initialize()
await self.zha_gateway.async_block_till_done()
await self.zha_gateway.async_initialize_devices_and_entities()
INSTANCES.append(self.zha_gateway)

return self.zha_gateway

async def __aexit__(
Expand All @@ -306,19 +332,8 @@ async def zha_gateway(
caplog, # pylint: disable=unused-argument
):
"""Set up ZHA component."""

with (
patch(
"bellows.zigbee.application.ControllerApplication.new",
return_value=zigpy_app_controller,
),
patch(
"bellows.zigbee.application.ControllerApplication",
return_value=zigpy_app_controller,
),
):
async with TestGateway(zha_data) as gateway:
yield gateway
async with TestGateway(zha_data, zigpy_app_controller) as gateway:
yield gateway


@pytest.fixture(scope="session", autouse=True)
Expand Down
Loading
Loading