diff --git a/fishjam/__init__.py b/fishjam/__init__.py index d9a0d55..13ad44b 100644 --- a/fishjam/__init__.py +++ b/fishjam/__init__.py @@ -14,7 +14,7 @@ # API from fishjam._webhook_notifier import receive_binary -from fishjam._ws_notifier import Notifier +from fishjam._ws_notifier import FishjamNotifier from fishjam.api._fishjam_client import ( FishjamClient, Peer, @@ -25,7 +25,7 @@ __all__ = [ "FishjamClient", - "Notifier", + "FishjamNotifier", "receive_binary", "PeerOptions", "RoomOptions", diff --git a/fishjam/_ws_notifier.py b/fishjam/_ws_notifier.py index a2da35b..e4c501f 100644 --- a/fishjam/_ws_notifier.py +++ b/fishjam/_ws_notifier.py @@ -9,7 +9,20 @@ from websockets import client from websockets.exceptions import ConnectionClosed -from fishjam.events import ServerMessageMetricsReport +from fishjam.events import ( + ServerMessagePeerAdded, + ServerMessagePeerConnected, + ServerMessagePeerCrashed, + ServerMessagePeerDeleted, + ServerMessagePeerDisconnected, + ServerMessagePeerMetadataUpdated, + ServerMessageRoomCrashed, + ServerMessageRoomCreated, + ServerMessageRoomDeleted, + ServerMessageTrackAdded, + ServerMessageTrackMetadataUpdated, + ServerMessageTrackRemoved, +) from fishjam.events._protos.fishjam import ( ServerMessage, ServerMessageAuthenticated, @@ -19,59 +32,64 @@ ServerMessageSubscribeResponse, ) +ALLOWED_NOTIFICATION = ( + ServerMessageRoomCreated, + ServerMessageRoomDeleted, + ServerMessageRoomCrashed, + ServerMessagePeerAdded, + ServerMessagePeerDeleted, + ServerMessagePeerConnected, + ServerMessagePeerDisconnected, + ServerMessagePeerMetadataUpdated, + ServerMessagePeerCrashed, + ServerMessageTrackAdded, + ServerMessageTrackRemoved, + ServerMessageTrackMetadataUpdated, +) + -class Notifier: +class FishjamNotifier: """ Allows for receiving WebSocket messages from Fishjam. """ def __init__(self, fishjam_url: str, management_token: str): """ - Create Notifier instance, providing the fishjam address and api token. - Set secure to `True` for `wss` and `False` for `ws` connection (default). + Create FishjamNotifier instance, providing the fishjam url and management token. """ - self._server_address = ( + self._fishjam_url = ( f"{fishjam_url.replace('http', 'ws')}/socket/server/websocket" ) - self._server_api_token = management_token + self._management_token = management_token self._websocket = None self._ready = False self._ready_event: asyncio.Event = None self._notification_handler: Callable = None - self._metrics_handler: Callable = None def on_server_notification(self, handler: Callable[[Any], None]): """ - Decorator used for defining handler for ServerNotifications - i.e. all messages other than `ServerMessageMetricsReport`. + Decorator used for defining handler for Fishjam Notifications """ self._notification_handler = handler return handler - def on_metrics(self, handler: Callable[[ServerMessageMetricsReport], None]): - """ - Decorator used for defining handler for `ServerMessageMetricsReport`. - """ - self._metrics_handler = handler - return handler - async def connect(self): """ - A coroutine which connects Notifier to Fishjam and listens for all incoming - messages from the Fishjam. + A coroutine which connects FishjamNotifier to Fishjam and listens for + all incoming messages from the Fishjam. It runs until the connection isn't closed. The incoming messages are handled by the functions defined using the - `on_server_notification` and `on_metrics` decorators. + `on_server_notification` decorator. - The handlers have to be defined before calling `connect`, + The handler have to be defined before calling `connect`, otherwise the messages won't be received. """ - async with client.connect(self._server_address) as websocket: + async with client.connect(self._fishjam_url) as websocket: try: self._websocket = websocket await self._authenticate() @@ -81,11 +99,6 @@ async def connect(self): event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION ) - if self._metrics_handler: - await self._subscribe_event( - event=ServerMessageEventType.EVENT_TYPE_METRICS - ) - self._ready = True if self._ready_event: self._ready_event.set() @@ -110,7 +123,7 @@ async def wait_ready(self) -> True: async def _authenticate(self): msg = ServerMessage( - auth_request=ServerMessageAuthRequest(token=self._server_api_token) + auth_request=ServerMessageAuthRequest(token=self._management_token) ) await self._websocket.send(bytes(msg)) @@ -132,9 +145,7 @@ async def _receive_loop(self): message = ServerMessage().parse(message) _which, message = betterproto.which_one_of(message, "content") - if isinstance(message, ServerMessageMetricsReport): - self._metrics_handler(message) - else: + if isinstance(message, ALLOWED_NOTIFICATION): self._notification_handler(message) async def _subscribe_event(self, event: ServerMessageEventType): diff --git a/fishjam/events/__init__.py b/fishjam/events/__init__.py index 2dee8ef..10e1029 100644 --- a/fishjam/events/__init__.py +++ b/fishjam/events/__init__.py @@ -4,14 +4,12 @@ # Exported messages from fishjam.events._protos.fishjam import ( - ServerMessageComponentCrashed, - ServerMessageHlsPlayable, - ServerMessageMetricsReport, ServerMessagePeerAdded, ServerMessagePeerConnected, ServerMessagePeerCrashed, ServerMessagePeerDeleted, ServerMessagePeerDisconnected, + ServerMessagePeerMetadataUpdated, ServerMessageRoomCrashed, ServerMessageRoomCreated, ServerMessageRoomDeleted, @@ -30,13 +28,11 @@ "ServerMessagePeerConnected", "ServerMessagePeerDeleted", "ServerMessagePeerDisconnected", + "ServerMessagePeerMetadataUpdated", "ServerMessagePeerCrashed", - "ServerMessageComponentCrashed", "ServerMessageTrack", "ServerMessageTrackType", "ServerMessageTrackAdded", "ServerMessageTrackMetadataUpdated", "ServerMessageTrackRemoved", - "ServerMessageHlsPlayable", - "ServerMessageMetricsReport", ] diff --git a/tests/support/asyncio_utils.py b/tests/support/asyncio_utils.py index 34540d3..e415766 100644 --- a/tests/support/asyncio_utils.py +++ b/tests/support/asyncio_utils.py @@ -2,19 +2,15 @@ import asyncio -from fishjam import Notifier +from fishjam import FishjamNotifier ASSERTION_TIMEOUT = 5.0 -async def assert_events(notifier: Notifier, event_checks: list): +async def assert_events(notifier: FishjamNotifier, event_checks: list): await _assert_messages(notifier.on_server_notification, event_checks) -async def assert_metrics(notifier: Notifier, metrics_checks: list): - await _assert_messages(notifier.on_metrics, metrics_checks) - - async def _assert_messages(notifier_callback, message_checks): success_event = asyncio.Event() diff --git a/tests/support/protos/fishjam/__init__.py b/tests/support/protos/fishjam/__init__.py index 4dfae38..822cc1b 100644 --- a/tests/support/protos/fishjam/__init__.py +++ b/tests/support/protos/fishjam/__init__.py @@ -19,6 +19,9 @@ class PeerMessage(betterproto.Message): 2, group="content" ) media_event: "PeerMessageMediaEvent" = betterproto.message_field(3, group="content") + rtc_stats_report: "PeerMessageRtcStatsReport" = betterproto.message_field( + 4, group="content" + ) @dataclass(eq=False, repr=False) @@ -40,3 +43,13 @@ class PeerMessageMediaEvent(betterproto.Message): """Any type of WebRTC messages passed betweend FJ and peer""" data: str = betterproto.string_field(1) + + +@dataclass(eq=False, repr=False) +class PeerMessageRtcStatsReport(betterproto.Message): + """ + PeerConnection stats sent by peer https://developer.mozilla.org/en- + US/docs/Web/API/RTCStatsReport#the_statistic_types + """ + + data: str = betterproto.string_field(1) diff --git a/tests/test_notifier.py b/tests/test_notifier.py index 47a1628..674219e 100644 --- a/tests/test_notifier.py +++ b/tests/test_notifier.py @@ -9,9 +9,8 @@ import pytest import requests -from fishjam import FishjamClient, Notifier, RoomOptions +from fishjam import FishjamClient, FishjamNotifier, RoomOptions from fishjam.events import ( - ServerMessageMetricsReport, ServerMessagePeerAdded, ServerMessagePeerConnected, ServerMessagePeerDeleted, @@ -19,7 +18,7 @@ ServerMessageRoomCreated, ServerMessageRoomDeleted, ) -from tests.support.asyncio_utils import assert_events, assert_metrics, cancel +from tests.support.asyncio_utils import assert_events, cancel from tests.support.peer_socket import PeerSocket from tests.support.webhook_notifier import run_server @@ -56,7 +55,9 @@ def start_server(): class TestConnectingToServer: @pytest.mark.asyncio async def test_valid_credentials(self): - notifier = Notifier(fishjam_url=FISHJAM_URL, management_token=SERVER_API_TOKEN) + notifier = FishjamNotifier( + fishjam_url=FISHJAM_URL, management_token=SERVER_API_TOKEN + ) notifier_task = asyncio.create_task(notifier.connect()) await notifier.wait_ready() @@ -67,7 +68,9 @@ async def test_valid_credentials(self): @pytest.mark.asyncio async def test_invalid_credentials(self): - notifier = Notifier(fishjam_url=FISHJAM_URL, management_token="wrong_token") + notifier = FishjamNotifier( + fishjam_url=FISHJAM_URL, management_token="wrong_token" + ) task = asyncio.create_task(notifier.connect()) @@ -82,7 +85,9 @@ def room_api(): @pytest.fixture def notifier(): - notifier = Notifier(fishjam_url=FISHJAM_URL, management_token=SERVER_API_TOKEN) + notifier = FishjamNotifier( + fishjam_url=FISHJAM_URL, management_token=SERVER_API_TOKEN + ) return notifier @@ -90,7 +95,7 @@ def notifier(): class TestReceivingNotifications: @pytest.mark.asyncio async def test_room_created_deleted( - self, room_api: FishjamClient, notifier: Notifier + self, room_api: FishjamClient, notifier: FishjamNotifier ): event_checks = [ServerMessageRoomCreated, ServerMessageRoomDeleted] assert_task = asyncio.create_task(assert_events(notifier, event_checks.copy())) @@ -111,7 +116,7 @@ async def test_room_created_deleted( @pytest.mark.asyncio async def test_peer_connected_disconnected( - self, room_api: FishjamClient, notifier: Notifier + self, room_api: FishjamClient, notifier: FishjamNotifier ): event_checks = [ ServerMessageRoomCreated, @@ -147,7 +152,7 @@ async def test_peer_connected_disconnected( @pytest.mark.asyncio async def test_peer_connected_disconnected_deleted( - self, room_api: FishjamClient, notifier: Notifier + self, room_api: FishjamClient, notifier: FishjamNotifier ): event_checks = [ ServerMessageRoomCreated, @@ -186,7 +191,7 @@ async def test_peer_connected_disconnected_deleted( @pytest.mark.asyncio async def test_peer_connected_room_deleted( - self, room_api: FishjamClient, notifier: Notifier + self, room_api: FishjamClient, notifier: FishjamNotifier ): event_checks = [ ServerMessageRoomCreated, @@ -221,26 +226,3 @@ async def test_peer_connected_room_deleted( def assert_event(self, event): data = queue.get(timeout=2.5) assert data == event or isinstance(data, event) - - -class TestReceivingMetrics: - @pytest.mark.asyncio - async def test_metrics_with_one_peer( - self, room_api: FishjamClient, notifier: Notifier - ): - room = room_api.create_room() - _peer, token = room_api.create_peer(room.id) - - peer_socket = PeerSocket(fishjam_url=FISHJAM_URL) - peer_task = asyncio.create_task(peer_socket.connect(token)) - - await peer_socket.wait_ready() - - assert_task = asyncio.create_task( - assert_metrics(notifier, [ServerMessageMetricsReport]) - ) - notifier_task = asyncio.create_task(notifier.connect()) - - await assert_task - await cancel(peer_task) - await cancel(notifier_task)