diff --git a/avatar/cases/l2cap_test.py b/avatar/cases/l2cap_test.py index 917fc76..731f7b9 100644 --- a/avatar/cases/l2cap_test.py +++ b/avatar/cases/l2cap_test.py @@ -19,25 +19,14 @@ from avatar import BumblePandoraDevice from avatar import PandoraDevice from avatar import PandoraDevices -from avatar.common import make_bredr_connection -from avatar.common import make_le_connection +from avatar import pandora_snippet from mobly import base_test from mobly import test_runner from mobly.asserts import assert_equal # type: ignore from mobly.asserts import assert_is_not_none # type: ignore from pandora import host_pb2 -from pandora import l2cap_pb2 -from typing import Any, Awaitable, Callable, Dict, Literal, Optional, Tuple, Union - -CONNECTORS: Dict[ - str, - Callable[[avatar.PandoraDevice, avatar.PandoraDevice], Awaitable[Tuple[host_pb2.Connection, host_pb2.Connection]]], -] = { - 'Classic': make_bredr_connection, - 'LE': make_le_connection, -} - -FIXED_CHANNEL_CID = 0x3E +from typing import Any, Dict, Literal, Optional, Union + CLASSIC_PSM = 0xFEFF LE_SPSM = 0xF0 @@ -49,6 +38,10 @@ class L2capTest(base_test.BaseTestClass): # type: ignore[misc] dut: PandoraDevice ref: PandoraDevice + # BR/EDR & Low-Energy connections. + dut_ref: Dict[str, host_pb2.Connection] = {} + ref_dut: Dict[str, host_pb2.Connection] = {} + def setup_class(self) -> None: self.devices = PandoraDevices(self) self.dut, self.ref, *_ = self.devices @@ -66,160 +59,134 @@ def teardown_class(self) -> None: async def setup_test(self) -> None: # pytype: disable=wrong-arg-types await asyncio.gather(self.dut.reset(), self.ref.reset()) + # Connect REF to DUT in both BR/EDR and Low-Energy. + self.ref_dut['BR/EDR'], self.dut_ref['BR/EDR'] = await pandora_snippet.connect(self.ref, self.dut) + self.ref_dut['LE'], self.dut_ref['LE'] = await pandora_snippet.connect_le_dummy(self.ref, self.dut) + @avatar.parameterized( - ('Classic', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))), - ('LE', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))), - ('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))), + ('BR/EDR', dict(basic=dict(psm=CLASSIC_PSM))), ( 'LE', - dict( - le_credit_based=l2cap_pb2.CreditBasedChannelRequest( - spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256 - ) - ), + dict(le_credit_based=dict(spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256)), ), ) # type: ignore[misc] @avatar.asynchronous async def test_connect( self, - transport: Union[Literal['Classic'], Literal['LE']], + transport: Union[Literal['BR/EDR'], Literal['LE']], request: Dict[str, Any], ) -> None: - dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref) - server = self.ref.aio.l2cap.OnConnection(connection=ref_dut_acl, **request) - ref_dut_res, dut_ref_res = await asyncio.gather( - anext(aiter(server)), - self.dut.aio.l2cap.Connect(connection=dut_ref_acl, **request), + ref_dut, dut_ref = await asyncio.gather( + self.ref.aio.l2cap.WaitConnection(connection=self.ref_dut[transport], **request), + self.dut.aio.l2cap.Connect(connection=self.dut_ref[transport], **request), ) - assert_is_not_none(ref_dut_res.channel) - assert_is_not_none(dut_ref_res.channel) + assert_is_not_none(ref_dut.channel) + assert_is_not_none(dut_ref.channel) @avatar.parameterized( - ('Classic', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))), - ('LE', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))), - ('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))), + ('BR/EDR', dict(basic=dict(psm=CLASSIC_PSM))), ( 'LE', - dict( - le_credit_based=l2cap_pb2.CreditBasedChannelRequest( - spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256 - ) - ), + dict(le_credit_based=dict(spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256)), ), ) # type: ignore[misc] @avatar.asynchronous - async def test_on_connection( + async def test_wait_connection( self, - transport: Union[Literal['Classic'], Literal['LE']], + transport: Union[Literal['BR/EDR'], Literal['LE']], request: Dict[str, Any], ) -> None: - dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref) - server = self.dut.aio.l2cap.OnConnection(connection=dut_ref_acl, **request) - ref_dut_res, dut_ref_res = await asyncio.gather( - self.ref.aio.l2cap.Connect(connection=ref_dut_acl, **request), - anext(aiter(server)), + dut_ref, ref_dut = await asyncio.gather( + self.ref.aio.l2cap.WaitConnection(connection=self.dut_ref[transport], **request), + self.dut.aio.l2cap.Connect(connection=self.ref_dut[transport], **request), ) - assert_is_not_none(ref_dut_res.channel) - assert_is_not_none(dut_ref_res.channel) + assert_is_not_none(ref_dut.channel) + assert_is_not_none(dut_ref.channel) @avatar.parameterized( - ('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))), + ('BR/EDR', dict(basic=dict(psm=CLASSIC_PSM))), ( 'LE', - dict( - le_credit_based=l2cap_pb2.CreditBasedChannelRequest( - spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256 - ) - ), + dict(le_credit_based=dict(spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256)), ), ) # type: ignore[misc] @avatar.asynchronous async def test_disconnect( self, - transport: Union[Literal['Classic'], Literal['LE']], + transport: Union[Literal['BR/EDR'], Literal['LE']], request: Dict[str, Any], ) -> None: - dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref) - server = self.ref.aio.l2cap.OnConnection(connection=ref_dut_acl, **request) - ref_dut_res, dut_ref_res = await asyncio.gather( - anext(aiter(server)), - self.dut.aio.l2cap.Connect(connection=dut_ref_acl, **request), + dut_ref, ref_dut = await asyncio.gather( + self.ref.aio.l2cap.WaitConnection(connection=self.dut_ref[transport], **request), + self.dut.aio.l2cap.Connect(connection=self.ref_dut[transport], **request), ) - assert dut_ref_res.channel and ref_dut_res.channel + assert_is_not_none(ref_dut.channel) + assert_is_not_none(dut_ref.channel) + assert ref_dut.channel and dut_ref.channel - await asyncio.gather( - self.dut.aio.l2cap.Disconnect(channel=dut_ref_res.channel), - self.ref.aio.l2cap.WaitDisconnection(channel=ref_dut_res.channel), + _, dis = await asyncio.gather( + self.ref.aio.l2cap.WaitDisconnection(channel=ref_dut.channel), + self.dut.aio.l2cap.Disconnect(channel=dut_ref.channel), ) + assert_equal(dis.result_variant(), 'success') + @avatar.parameterized( - ('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))), + ('BR/EDR', dict(basic=dict(psm=CLASSIC_PSM))), ( 'LE', - dict( - le_credit_based=l2cap_pb2.CreditBasedChannelRequest( - spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256 - ) - ), + dict(le_credit_based=dict(spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256)), ), ) # type: ignore[misc] @avatar.asynchronous async def test_wait_disconnection( self, - transport: Union[Literal['Classic'], Literal['LE']], + transport: Union[Literal['BR/EDR'], Literal['LE']], request: Dict[str, Any], ) -> None: - dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref) - server = self.ref.aio.l2cap.OnConnection(connection=ref_dut_acl, **request) - ref_dut_res, dut_ref_res = await asyncio.gather( - anext(aiter(server)), - self.dut.aio.l2cap.Connect(connection=dut_ref_acl, **request), + dut_ref, ref_dut = await asyncio.gather( + self.ref.aio.l2cap.WaitConnection(connection=self.dut_ref[transport], **request), + self.dut.aio.l2cap.Connect(connection=self.ref_dut[transport], **request), ) - assert dut_ref_res.channel and ref_dut_res.channel + assert_is_not_none(ref_dut.channel) + assert_is_not_none(dut_ref.channel) + assert ref_dut.channel and dut_ref.channel - await asyncio.gather( - self.ref.aio.l2cap.Disconnect(channel=ref_dut_res.channel), - self.dut.aio.l2cap.WaitDisconnection(channel=dut_ref_res.channel), + dis, _ = await asyncio.gather( + self.dut.aio.l2cap.WaitDisconnection(channel=dut_ref.channel), + self.ref.aio.l2cap.Disconnect(channel=ref_dut.channel), ) + assert_equal(dis.result_variant(), 'success') + @avatar.parameterized( - ('Classic', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))), - ('LE', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))), - ('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))), + ('BR/EDR', dict(basic=dict(psm=CLASSIC_PSM))), ( 'LE', - dict( - le_credit_based=l2cap_pb2.CreditBasedChannelRequest( - spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256 - ) - ), + dict(le_credit_based=dict(spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256)), ), ) # type: ignore[misc] @avatar.asynchronous async def test_send( self, - transport: Union[Literal['Classic'], Literal['LE']], + transport: Union[Literal['BR/EDR'], Literal['LE']], request: Dict[str, Any], ) -> None: - dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref) - server = self.dut.aio.l2cap.OnConnection(connection=dut_ref_acl, **request) - ref_dut_res, dut_ref_res = await asyncio.gather( - self.ref.aio.l2cap.Connect(connection=ref_dut_acl, **request), - anext(aiter(server)), + dut_ref, ref_dut = await asyncio.gather( + self.ref.aio.l2cap.WaitConnection(connection=self.dut_ref[transport], **request), + self.dut.aio.l2cap.Connect(connection=self.ref_dut[transport], **request), ) - ref_dut_channel = ref_dut_res.channel - dut_ref_channel = dut_ref_res.channel - assert_is_not_none(ref_dut_res.channel) - assert_is_not_none(dut_ref_res.channel) - assert ref_dut_channel and dut_ref_channel - - dut_ref_stream = self.ref.aio.l2cap.Receive(channel=dut_ref_channel) - _send_res, recv_res = await asyncio.gather( - self.dut.aio.l2cap.Send(channel=ref_dut_channel, data=b"The quick brown fox jumps over the lazy dog"), - anext(aiter(dut_ref_stream)), + assert_is_not_none(ref_dut.channel) + assert_is_not_none(dut_ref.channel) + assert ref_dut.channel and dut_ref.channel + + ref_source = self.ref.aio.l2cap.Receive(channel=dut_ref.channel) + _, recv = await asyncio.gather( + self.dut.aio.l2cap.Send(channel=ref_dut.channel, data=b"The quick brown fox jumps over the lazy dog"), + anext(aiter(ref_source)), ) - assert recv_res.data - assert_equal(recv_res.data, b"The quick brown fox jumps over the lazy dog") + + assert_equal(recv.data, b"The quick brown fox jumps over the lazy dog") if __name__ == "__main__": diff --git a/avatar/cases/le_security_test.py b/avatar/cases/le_security_test.py index d8495e9..0f3c34a 100644 --- a/avatar/cases/le_security_test.py +++ b/avatar/cases/le_security_test.py @@ -203,38 +203,17 @@ async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]: nonlocal ref_dut nonlocal dut_ref - # Make LE connection task. - async def connect_le( - initiator: PandoraDevice, - acceptor: PandoraDevice, - initiator_addr_type: OwnAddressType, - acceptor_addr_type: OwnAddressType, - ) -> Tuple[Connection, Connection]: - # Acceptor - Advertise - advertisement = acceptor.aio.host.Advertise( - legacy=True, - connectable=True, - own_address_type=acceptor_addr_type, - data=DataTypes(manufacturer_specific_data=b'pause cafe'), - ) - - # Initiator - Scan and fetch the address - scan = initiator.aio.host.Scan(own_address_type=initiator_addr_type) - acceptor_scan = await anext( - (x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data) - ) # pytype: disable=name-error - scan.cancel() - - # Initiator - LE connect - return await pandora_snippet.connect_le(initiator, advertisement, acceptor_scan, initiator_addr_type) - # Make LE connection. if connect == 'incoming_connection': # DUT is acceptor - ref_dut, dut_ref = await connect_le(self.ref, self.dut, ref_address_type, dut_address_type) + ref_dut, dut_ref = await pandora_snippet.connect_le_dummy( + self.ref, self.dut, ref_address_type, dut_address_type + ) else: # DUT is initiator - dut_ref, ref_dut = await connect_le(self.dut, self.ref, dut_address_type, ref_address_type) + dut_ref, ref_dut = await pandora_snippet.connect_le_dummy( + self.dut, self.ref, dut_address_type, ref_address_type + ) # Pairing. diff --git a/avatar/common.py b/avatar/common.py deleted file mode 100644 index bb3aa6d..0000000 --- a/avatar/common.py +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio - -from avatar import PandoraDevice -from mobly.asserts import assert_equal # type: ignore -from mobly.asserts import assert_is_not_none # type: ignore -from pandora.host_pb2 import RANDOM -from pandora.host_pb2 import Connection -from pandora.host_pb2 import DataTypes -from pandora.host_pb2 import OwnAddressType -from typing import Tuple - - -# Make classic connection task. -async def make_bredr_connection(initiator: PandoraDevice, acceptor: PandoraDevice) -> Tuple[Connection, Connection]: - init_res, wait_res = await asyncio.gather( - initiator.aio.host.Connect(address=acceptor.address), - acceptor.aio.host.WaitConnection(address=initiator.address), - ) - assert_equal(init_res.result_variant(), 'connection') - assert_equal(wait_res.result_variant(), 'connection') - assert init_res.connection is not None and wait_res.connection is not None - return init_res.connection, wait_res.connection - - -# Make LE connection task. -async def make_le_connection( - central: PandoraDevice, - peripheral: PandoraDevice, - central_address_type: OwnAddressType = RANDOM, - peripheral_address_type: OwnAddressType = RANDOM, -) -> Tuple[Connection, Connection]: - advertise = peripheral.aio.host.Advertise( - legacy=True, - connectable=True, - own_address_type=peripheral_address_type, - data=DataTypes(manufacturer_specific_data=b'pause cafe'), - ) - - scan = central.aio.host.Scan(own_address_type=central_address_type) - ref = await anext((x async for x in scan if x.data.manufacturer_specific_data == b'pause cafe')) - scan.cancel() - - adv_res, conn_res = await asyncio.gather( - anext(aiter(advertise)), - central.aio.host.ConnectLE(**ref.address_asdict(), own_address_type=central_address_type), - ) - assert_equal(conn_res.result_variant(), 'connection') - cen_per, per_cen = conn_res.connection, adv_res.connection - assert_is_not_none(cen_per) - assert_is_not_none(per_cen) - assert cen_per, per_cen - advertise.cancel() - return cen_per, per_cen diff --git a/avatar/pandora_snippet.py b/avatar/pandora_snippet.py index 31695ee..35207a6 100644 --- a/avatar/pandora_snippet.py +++ b/avatar/pandora_snippet.py @@ -20,8 +20,10 @@ from mobly.asserts import assert_equal # type: ignore from mobly.asserts import assert_is_not_none # type: ignore from pandora._utils import AioStream +from pandora.host_pb2 import RANDOM from pandora.host_pb2 import AdvertiseResponse from pandora.host_pb2 import Connection +from pandora.host_pb2 import DataTypes from pandora.host_pb2 import OwnAddressType from pandora.host_pb2 import ScanningResponse from typing import Optional, Tuple @@ -65,3 +67,29 @@ async def connect_le( assert_is_not_none(init_res.connection) assert init_res.connection return init_res.connection, wait_res.connection + + +# Make LE connection task. +async def connect_le_dummy( + initiator: PandoraDevice, + acceptor: PandoraDevice, + initiator_addr_type: OwnAddressType = RANDOM, + acceptor_addr_type: OwnAddressType = RANDOM, +) -> Tuple[Connection, Connection]: + # Acceptor - Advertise + advertisement = acceptor.aio.host.Advertise( + legacy=True, + connectable=True, + own_address_type=acceptor_addr_type, + data=DataTypes(manufacturer_specific_data=b'pause cafe'), + ) + + # Initiator - Scan and fetch the address + scan = initiator.aio.host.Scan(own_address_type=initiator_addr_type) + acceptor_scan = await anext( + (x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data) + ) # pytype: disable=name-error + scan.cancel() + + # Initiator - LE connect + return await connect_le(initiator, advertisement, acceptor_scan, initiator_addr_type) diff --git a/pyproject.toml b/pyproject.toml index 4688611..d76e6e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,8 +9,8 @@ classifiers = [ "License :: OSI Approved :: Apache Software License" ] dependencies = [ - "bt-test-interfaces>=0.0.4", - "bumble>=0.0.176", + "bumble@git+https://github.com/google/bumble@uael/l2cap_pandora", + "bt-test-interfaces@git+https://github.com/google/bt-test-interfaces@uael/l2cap-update#subdirectory=python", "protobuf==4.24.2", "grpcio==1.57", "mobly==1.12.2",