Skip to content

Commit

Permalink
feat: verify packet is not an older packet (#112)
Browse files Browse the repository at this point in the history
* feat: verfiy packet is not an older packet

* feat: move to function and fix tests

* chore: revise discard comment

* chore: revise discard comment

* chore: apply suggestions from code review

Co-authored-by: Ernst Klamer <[email protected]>

---------

Co-authored-by: Ernst Klamer <[email protected]>
  • Loading branch information
thecode and Ernst79 authored Mar 2, 2024
1 parent 3bf6e69 commit 113e49d
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 38 deletions.
74 changes: 56 additions & 18 deletions src/bthome_ble/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from bluetooth_sensor_state_data import BluetoothData
from cryptography.exceptions import InvalidTag
from cryptography.hazmat.primitives.ciphers.aead import AESCCM
from home_assistant_bluetooth import BluetoothServiceInfo
from home_assistant_bluetooth import BluetoothServiceInfoBleak
from sensor_state_data.description import (
BaseBinarySensorDescription,
BaseSensorDescription,
Expand Down Expand Up @@ -163,7 +163,7 @@ def __init__(self, bindkey: bytes | None = None) -> None:
# The last service_info we saw that had a payload
# We keep this to help in reauth flows where we want to reprocess and old
# value with a new bindkey.
self.last_service_info: BluetoothServiceInfo | None = None
self.last_service_info: BluetoothServiceInfoBleak | None = None

# If this is True, the device is not sending advertisements in a regular interval
self.sleepy_device = False
Expand All @@ -176,7 +176,7 @@ def set_bindkey(self, bindkey: bytes | None) -> None:
else:
self.cipher = None

def supported(self, data: BluetoothServiceInfo) -> bool:
def supported(self, data: BluetoothServiceInfoBleak) -> bool:
if not super().supported(data):
return False

Expand All @@ -194,7 +194,7 @@ def supported(self, data: BluetoothServiceInfo) -> bool:

return True

def _start_update(self, service_info: BluetoothServiceInfo) -> None:
def _start_update(self, service_info: BluetoothServiceInfoBleak) -> None:
"""Update from BLE advertisement data."""
_LOGGER.debug("Parsing BTHome BLE advertisement data: %s", service_info)
for uuid, service_data in service_info.service_data.items():
Expand All @@ -210,7 +210,7 @@ def _start_update(self, service_info: BluetoothServiceInfo) -> None:
return None

def _parse_bthome_v1(
self, service_info: BluetoothServiceInfo, service_data: bytes
self, service_info: BluetoothServiceInfoBleak, service_data: bytes
) -> bool:
"""Parser for BTHome sensors version V1"""
identifier = short_address(service_info.address)
Expand Down Expand Up @@ -269,10 +269,10 @@ def _parse_bthome_v1(
else:
return False

return self._parse_payload(payload, sw_version)
return self._parse_payload(payload, sw_version, service_info.time)

def _parse_bthome_v2(
self, service_info: BluetoothServiceInfo, service_data: bytes
self, service_info: BluetoothServiceInfoBleak, service_data: bytes
) -> bool:
"""Parser for BTHome sensors version V2"""
identifier = short_address(service_info.address)
Expand Down Expand Up @@ -370,9 +370,54 @@ def _parse_bthome_v2(
except (ValueError, TypeError):
return True

return self._parse_payload(payload, sw_version)
return self._parse_payload(payload, sw_version, service_info.time)

def _parse_payload(self, payload: bytes, sw_version: int) -> bool:
def _skip_old_or_duplicated_advertisement(
self, new_packet_id: float, adv_time: float
) -> bool:
"""
Detect duplicated or older packets
Devices may send duplicated advertisements or advertisements order can change
when passing through a proxy. If more than 4 seconds pass since the last
advertisement assume it is a new packet even if it has the same packet id.
Packet id rollover at 255 to 0, validate that the difference between last packet id
and new packet id is less than 64. This assumes device is not sending more than 16
advertisements per second.
"""
last_packet_id = self.packet_id

# no history, first packet, don't discard packet
if last_packet_id is None or self.last_service_info is None:
_LOGGER.debug("First packet, not filtering packet_id %i", new_packet_id)
return False

# more than 4 seconds since last packet, don't discard packet
if adv_time - self.last_service_info.time > 4:
_LOGGER.debug(
"Not filtering packet_id, more than 4 seconds since last packet. "
"New time: %i, Old time: %i",
adv_time,
self.last_service_info.time,
)
return False

# distance between new packet and old packet is less then 64
if (new_packet_id > last_packet_id and new_packet_id - last_packet_id < 64) or (
new_packet_id < last_packet_id and new_packet_id + 256 - last_packet_id < 64
):
return False

# discard packet (new_packet_id=last_packet_id or older packet)
_LOGGER.debug(
"New packet_id %i indicates an older packet (previous packet_id %i). "
"BLE advertisement will be skipped",
new_packet_id,
last_packet_id,
)
return True

def _parse_payload(self, payload: bytes, sw_version: int, adv_time: float) -> bool:
payload_length = len(payload)
next_obj_start = 0
prev_obj_meas_type = 0
Expand Down Expand Up @@ -433,15 +478,8 @@ def _parse_payload(self, payload: bytes, sw_version: int) -> bool:

# Filter BLE advertisements with packet_id that has already been parsed.
if obj_meas_type == 0:
last_packet_id = self.packet_id
new_packet_id = parse_uint(payload[obj_data_start:next_obj_start])
if new_packet_id == last_packet_id:
_LOGGER.debug(
"New counter_id %i is the same as the previous received counter_id %i. "
"BLE advertisement will be skipped",
new_packet_id,
last_packet_id,
)
if self._skip_old_or_duplicated_advertisement(new_packet_id, adv_time):
break
self.packet_id = new_packet_id

Expand Down Expand Up @@ -557,7 +595,7 @@ def _parse_payload(self, payload: bytes, sw_version: int) -> bool:

def _decrypt_bthome(
self,
service_info: BluetoothServiceInfo,
service_info: BluetoothServiceInfoBleak,
service_data: bytes,
bthome_mac: bytes,
sw_version: int,
Expand Down
27 changes: 21 additions & 6 deletions tests/test_parser_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from unittest.mock import patch

import pytest
from bluetooth_sensor_state_data import BluetoothServiceInfo, SensorUpdate
from bluetooth_sensor_state_data import SensorUpdate
from home_assistant_bluetooth import BluetoothServiceInfoBleak
from sensor_state_data import (
BinarySensorDescription,
BinarySensorDeviceClass,
Expand All @@ -20,6 +21,8 @@

from bthome_ble.parser import BTHomeBluetoothDeviceData, EncryptionScheme

ADVERTISEMENT_TIME = 1709331995.5181565

KEY_BATTERY = DeviceKey(key="battery", device_id=None)
KEY_BINARY_GENERIC = DeviceKey(key="generic", device_id=None)
KEY_BINARY_OPENING = DeviceKey(key="opening", device_id=None)
Expand Down Expand Up @@ -60,31 +63,39 @@ def mock_platform():

def bytes_to_service_info(
payload: bytes, local_name: str, address: str = "00:00:00:00:00:00"
) -> BluetoothServiceInfo:
) -> BluetoothServiceInfoBleak:
"""Convert bytes to service info"""
return BluetoothServiceInfo(
return BluetoothServiceInfoBleak(
name=local_name,
address=address,
rssi=-60,
manufacturer_data={},
service_data={"0000181c-0000-1000-8000-00805f9b34fb": payload},
service_uuids=["0000181c-0000-1000-8000-00805f9b34fb"],
source="",
device=None,
advertisement=None,
connectable=False,
time=ADVERTISEMENT_TIME,
)


def bytes_to_encrypted_service_info(
payload: bytes, local_name: str, address: str = "00:00:00:00:00:00"
) -> BluetoothServiceInfo:
) -> BluetoothServiceInfoBleak:
"""Convert bytes to service info"""
return BluetoothServiceInfo(
return BluetoothServiceInfoBleak(
name=local_name,
address=address,
rssi=-60,
manufacturer_data={},
service_data={"0000181e-0000-1000-8000-00805f9b34fb": payload},
service_uuids=["0000181e-0000-1000-8000-00805f9b34fb"],
source="",
device=None,
advertisement=None,
connectable=False,
time=ADVERTISEMENT_TIME,
)


Expand Down Expand Up @@ -1164,7 +1175,7 @@ def test_bthome_event_dimmer_rotate_left_3_steps(caplog):

def test_bthome_multiple_uuids(caplog):
"""Test BTHome parser for a device that broadcasts multiple uuids."""
advertisement = BluetoothServiceInfo(
advertisement = BluetoothServiceInfoBleak(
name="ATC_8D18B2",
address="A4:C1:38:8D:18:B2",
rssi=-60,
Expand All @@ -1180,6 +1191,10 @@ def test_bthome_multiple_uuids(caplog):
"0000181c-0000-1000-8000-00805f9b34fb",
],
source="",
device=None,
advertisement=None,
connectable=False,
time=ADVERTISEMENT_TIME,
)

device = BTHomeBluetoothDeviceData()
Expand Down
Loading

0 comments on commit 113e49d

Please sign in to comment.