Skip to content

Commit

Permalink
L2CAP cases 2
Browse files Browse the repository at this point in the history
  • Loading branch information
uael committed Oct 25, 2023
1 parent e64ebd5 commit d91b848
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 201 deletions.
177 changes: 72 additions & 105 deletions avatar/cases/l2cap_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,14 @@
from avatar import BumblePandoraDevice
from avatar import PandoraDevice
from avatar import PandoraDevices
from avatar.common import make_bredr_connection
from avatar.common import make_le_connection
from avatar import pandora_snippet
from mobly import base_test
from mobly import test_runner
from mobly.asserts import assert_equal # type: ignore
from mobly.asserts import assert_is_not_none # type: ignore
from pandora import host_pb2
from pandora import l2cap_pb2
from typing import Any, Awaitable, Callable, Dict, Literal, Optional, Tuple, Union

CONNECTORS: Dict[
str,
Callable[[avatar.PandoraDevice, avatar.PandoraDevice], Awaitable[Tuple[host_pb2.Connection, host_pb2.Connection]]],
] = {
'Classic': make_bredr_connection,
'LE': make_le_connection,
}

FIXED_CHANNEL_CID = 0x3E
from typing import Any, Dict, Literal, Optional, Union

CLASSIC_PSM = 0xFEFF
LE_SPSM = 0xF0

Expand All @@ -49,6 +38,10 @@ class L2capTest(base_test.BaseTestClass): # type: ignore[misc]
dut: PandoraDevice
ref: PandoraDevice

# BR/EDR & Low-Energy connections.
dut_ref: Dict[str, host_pb2.Connection] = {}
ref_dut: Dict[str, host_pb2.Connection] = {}

def setup_class(self) -> None:
self.devices = PandoraDevices(self)
self.dut, self.ref, *_ = self.devices
Expand All @@ -66,160 +59,134 @@ def teardown_class(self) -> None:
async def setup_test(self) -> None: # pytype: disable=wrong-arg-types
await asyncio.gather(self.dut.reset(), self.ref.reset())

# Connect REF to DUT in both BR/EDR and Low-Energy.
self.ref_dut['BR/EDR'], self.dut_ref['BR/EDR'] = await pandora_snippet.connect(self.ref, self.dut)
self.ref_dut['LE'], self.dut_ref['LE'] = await pandora_snippet.connect_le_dummy(self.ref, self.dut)

@avatar.parameterized(
('Classic', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))),
('LE', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))),
('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))),
('BR/EDR', dict(basic=dict(psm=CLASSIC_PSM))),
(
'LE',
dict(
le_credit_based=l2cap_pb2.CreditBasedChannelRequest(
spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256
)
),
dict(le_credit_based=dict(spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256)),
),
) # type: ignore[misc]
@avatar.asynchronous
async def test_connect(
self,
transport: Union[Literal['Classic'], Literal['LE']],
transport: Union[Literal['BR/EDR'], Literal['LE']],
request: Dict[str, Any],
) -> None:
dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref)
server = self.ref.aio.l2cap.OnConnection(connection=ref_dut_acl, **request)
ref_dut_res, dut_ref_res = await asyncio.gather(
anext(aiter(server)),
self.dut.aio.l2cap.Connect(connection=dut_ref_acl, **request),
ref_dut, dut_ref = await asyncio.gather(
self.ref.aio.l2cap.WaitConnection(connection=self.ref_dut[transport], **request),
self.dut.aio.l2cap.Connect(connection=self.dut_ref[transport], **request),
)
assert_is_not_none(ref_dut_res.channel)
assert_is_not_none(dut_ref_res.channel)
assert_is_not_none(ref_dut.channel)
assert_is_not_none(dut_ref.channel)

@avatar.parameterized(
('Classic', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))),
('LE', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))),
('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))),
('BR/EDR', dict(basic=dict(psm=CLASSIC_PSM))),
(
'LE',
dict(
le_credit_based=l2cap_pb2.CreditBasedChannelRequest(
spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256
)
),
dict(le_credit_based=dict(spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256)),
),
) # type: ignore[misc]
@avatar.asynchronous
async def test_on_connection(
async def test_wait_connection(
self,
transport: Union[Literal['Classic'], Literal['LE']],
transport: Union[Literal['BR/EDR'], Literal['LE']],
request: Dict[str, Any],
) -> None:
dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref)
server = self.dut.aio.l2cap.OnConnection(connection=dut_ref_acl, **request)
ref_dut_res, dut_ref_res = await asyncio.gather(
self.ref.aio.l2cap.Connect(connection=ref_dut_acl, **request),
anext(aiter(server)),
dut_ref, ref_dut = await asyncio.gather(
self.ref.aio.l2cap.WaitConnection(connection=self.dut_ref[transport], **request),
self.dut.aio.l2cap.Connect(connection=self.ref_dut[transport], **request),
)
assert_is_not_none(ref_dut_res.channel)
assert_is_not_none(dut_ref_res.channel)
assert_is_not_none(ref_dut.channel)
assert_is_not_none(dut_ref.channel)

@avatar.parameterized(
('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))),
('BR/EDR', dict(basic=dict(psm=CLASSIC_PSM))),
(
'LE',
dict(
le_credit_based=l2cap_pb2.CreditBasedChannelRequest(
spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256
)
),
dict(le_credit_based=dict(spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256)),
),
) # type: ignore[misc]
@avatar.asynchronous
async def test_disconnect(
self,
transport: Union[Literal['Classic'], Literal['LE']],
transport: Union[Literal['BR/EDR'], Literal['LE']],
request: Dict[str, Any],
) -> None:
dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref)
server = self.ref.aio.l2cap.OnConnection(connection=ref_dut_acl, **request)
ref_dut_res, dut_ref_res = await asyncio.gather(
anext(aiter(server)),
self.dut.aio.l2cap.Connect(connection=dut_ref_acl, **request),
dut_ref, ref_dut = await asyncio.gather(
self.ref.aio.l2cap.WaitConnection(connection=self.dut_ref[transport], **request),
self.dut.aio.l2cap.Connect(connection=self.ref_dut[transport], **request),
)
assert dut_ref_res.channel and ref_dut_res.channel
assert_is_not_none(ref_dut.channel)
assert_is_not_none(dut_ref.channel)
assert ref_dut.channel and dut_ref.channel

await asyncio.gather(
self.dut.aio.l2cap.Disconnect(channel=dut_ref_res.channel),
self.ref.aio.l2cap.WaitDisconnection(channel=ref_dut_res.channel),
_, dis = await asyncio.gather(
self.ref.aio.l2cap.WaitDisconnection(channel=ref_dut.channel),
self.dut.aio.l2cap.Disconnect(channel=dut_ref.channel),
)

assert_equal(dis.result_variant(), 'success')

@avatar.parameterized(
('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))),
('BR/EDR', dict(basic=dict(psm=CLASSIC_PSM))),
(
'LE',
dict(
le_credit_based=l2cap_pb2.CreditBasedChannelRequest(
spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256
)
),
dict(le_credit_based=dict(spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256)),
),
) # type: ignore[misc]
@avatar.asynchronous
async def test_wait_disconnection(
self,
transport: Union[Literal['Classic'], Literal['LE']],
transport: Union[Literal['BR/EDR'], Literal['LE']],
request: Dict[str, Any],
) -> None:
dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref)
server = self.ref.aio.l2cap.OnConnection(connection=ref_dut_acl, **request)
ref_dut_res, dut_ref_res = await asyncio.gather(
anext(aiter(server)),
self.dut.aio.l2cap.Connect(connection=dut_ref_acl, **request),
dut_ref, ref_dut = await asyncio.gather(
self.ref.aio.l2cap.WaitConnection(connection=self.dut_ref[transport], **request),
self.dut.aio.l2cap.Connect(connection=self.ref_dut[transport], **request),
)
assert dut_ref_res.channel and ref_dut_res.channel
assert_is_not_none(ref_dut.channel)
assert_is_not_none(dut_ref.channel)
assert ref_dut.channel and dut_ref.channel

await asyncio.gather(
self.ref.aio.l2cap.Disconnect(channel=ref_dut_res.channel),
self.dut.aio.l2cap.WaitDisconnection(channel=dut_ref_res.channel),
dis, _ = await asyncio.gather(
self.dut.aio.l2cap.WaitDisconnection(channel=dut_ref.channel),
self.ref.aio.l2cap.Disconnect(channel=ref_dut.channel),
)

assert_equal(dis.result_variant(), 'success')

@avatar.parameterized(
('Classic', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))),
('LE', dict(fixed=l2cap_pb2.FixedChannelRequest(cid=FIXED_CHANNEL_CID))),
('Classic', dict(basic=l2cap_pb2.ConnectionOrientedChannelRequest(psm=CLASSIC_PSM))),
('BR/EDR', dict(basic=dict(psm=CLASSIC_PSM))),
(
'LE',
dict(
le_credit_based=l2cap_pb2.CreditBasedChannelRequest(
spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256
)
),
dict(le_credit_based=dict(spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256)),
),
) # type: ignore[misc]
@avatar.asynchronous
async def test_send(
self,
transport: Union[Literal['Classic'], Literal['LE']],
transport: Union[Literal['BR/EDR'], Literal['LE']],
request: Dict[str, Any],
) -> None:
dut_ref_acl, ref_dut_acl = await CONNECTORS[transport](self.dut, self.ref)
server = self.dut.aio.l2cap.OnConnection(connection=dut_ref_acl, **request)
ref_dut_res, dut_ref_res = await asyncio.gather(
self.ref.aio.l2cap.Connect(connection=ref_dut_acl, **request),
anext(aiter(server)),
dut_ref, ref_dut = await asyncio.gather(
self.ref.aio.l2cap.WaitConnection(connection=self.dut_ref[transport], **request),
self.dut.aio.l2cap.Connect(connection=self.ref_dut[transport], **request),
)
ref_dut_channel = ref_dut_res.channel
dut_ref_channel = dut_ref_res.channel
assert_is_not_none(ref_dut_res.channel)
assert_is_not_none(dut_ref_res.channel)
assert ref_dut_channel and dut_ref_channel

dut_ref_stream = self.ref.aio.l2cap.Receive(channel=dut_ref_channel)
_send_res, recv_res = await asyncio.gather(
self.dut.aio.l2cap.Send(channel=ref_dut_channel, data=b"The quick brown fox jumps over the lazy dog"),
anext(aiter(dut_ref_stream)),
assert_is_not_none(ref_dut.channel)
assert_is_not_none(dut_ref.channel)
assert ref_dut.channel and dut_ref.channel

ref_source = self.ref.aio.l2cap.Receive(channel=dut_ref.channel)
_, recv = await asyncio.gather(
self.dut.aio.l2cap.Send(channel=ref_dut.channel, data=b"The quick brown fox jumps over the lazy dog"),
anext(aiter(ref_source)),
)
assert recv_res.data
assert_equal(recv_res.data, b"The quick brown fox jumps over the lazy dog")

assert_equal(recv.data, b"The quick brown fox jumps over the lazy dog")


if __name__ == "__main__":
Expand Down
33 changes: 6 additions & 27 deletions avatar/cases/le_security_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,38 +203,17 @@ async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]:
nonlocal ref_dut
nonlocal dut_ref

# Make LE connection task.
async def connect_le(
initiator: PandoraDevice,
acceptor: PandoraDevice,
initiator_addr_type: OwnAddressType,
acceptor_addr_type: OwnAddressType,
) -> Tuple[Connection, Connection]:
# Acceptor - Advertise
advertisement = acceptor.aio.host.Advertise(
legacy=True,
connectable=True,
own_address_type=acceptor_addr_type,
data=DataTypes(manufacturer_specific_data=b'pause cafe'),
)

# Initiator - Scan and fetch the address
scan = initiator.aio.host.Scan(own_address_type=initiator_addr_type)
acceptor_scan = await anext(
(x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data)
) # pytype: disable=name-error
scan.cancel()

# Initiator - LE connect
return await pandora_snippet.connect_le(initiator, advertisement, acceptor_scan, initiator_addr_type)

# Make LE connection.
if connect == 'incoming_connection':
# DUT is acceptor
ref_dut, dut_ref = await connect_le(self.ref, self.dut, ref_address_type, dut_address_type)
ref_dut, dut_ref = await pandora_snippet.connect_le_dummy(
self.ref, self.dut, ref_address_type, dut_address_type
)
else:
# DUT is initiator
dut_ref, ref_dut = await connect_le(self.dut, self.ref, dut_address_type, ref_address_type)
dut_ref, ref_dut = await pandora_snippet.connect_le_dummy(
self.dut, self.ref, dut_address_type, ref_address_type
)

# Pairing.

Expand Down
67 changes: 0 additions & 67 deletions avatar/common.py

This file was deleted.

Loading

0 comments on commit d91b848

Please sign in to comment.