Skip to content

Commit

Permalink
Implement zigpy network scanning
Browse files Browse the repository at this point in the history
  • Loading branch information
puddly committed Aug 19, 2024
1 parent 0035116 commit dfcba7d
Showing 1 changed file with 46 additions and 0 deletions.
46 changes: 46 additions & 0 deletions zigpy_znp/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import asyncio
import logging
from typing import AsyncGenerator

import zigpy.zcl
import zigpy.zdo
Expand Down Expand Up @@ -248,6 +249,51 @@ async def set_tx_power(self, dbm: int) -> None:
"Requested TX power %d was adjusted to %d", dbm, rsp.StatusOrPower
)

async def network_scan(
self, channels: t.Channels, duration_exp: int
) -> AsyncGenerator[zigpy.types.NetworkBeacon, None]:
# XXX: Network scanning only works if the stack is not running!
await self._znp.reset()

async with self._znp.capture_responses(
[
c.ZDO.BeaconNotifyInd.Callback(partial=True),
c.ZDO.NwkDiscoveryCnf.Callback(partial=True),
]
) as updates:
await self._znp.request(
c.ZDO.NetworkDiscoveryReq.Req(
Channels=channels,
ScanDuration=duration_exp,
),
RspStatus=t.Status.SUCCESS,
)

while True:
update = await updates.get()

if isinstance(update, c.ZDO.NwkDiscoveryCnf.Callback):
break

for beacon in update.Beacons:
yield zigpy.types.NetworkBeacon(
pan_id=beacon.PanId,
extended_pan_id=beacon.ExtendedPanId,
channel=beacon.Channel,
permit_joining=beacon.PermitJoining,
stack_profile=beacon.StackProfile,
nwk_update_id=beacon.UpdateId,
lqi=beacon.LQI,
src=beacon.Src,
depth=beacon.Depth,
router_capacity=beacon.RouterCapacity,
device_capacity=beacon.DeviceCapacity,
protocol_version=beacon.ProtocolVersion,
# Only thing not supported is `rssi`
)

await self.start_network()

def get_dst_address(self, cluster: zigpy.zcl.Cluster) -> zdo_t.MultiAddress:
"""
Helper to get a dst address for bind/unbind operations.
Expand Down

0 comments on commit dfcba7d

Please sign in to comment.