Skip to content

Commit

Permalink
Update FishjamNotifier
Browse files Browse the repository at this point in the history
  • Loading branch information
Karolk99 committed Oct 1, 2024
1 parent 7d7d4d0 commit 7d84d78
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 77 deletions.
4 changes: 2 additions & 2 deletions fishjam/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

# API
from fishjam._webhook_notifier import receive_binary
from fishjam._ws_notifier import Notifier
from fishjam._ws_notifier import FishjamNotifier
from fishjam.api._fishjam_client import (
FishjamClient,
Peer,
Expand All @@ -25,7 +25,7 @@

__all__ = [
"FishjamClient",
"Notifier",
"FishjamNotifier",
"receive_binary",
"PeerOptions",
"RoomOptions",
Expand Down
71 changes: 41 additions & 30 deletions fishjam/_ws_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,20 @@
from websockets import client
from websockets.exceptions import ConnectionClosed

from fishjam.events import ServerMessageMetricsReport
from fishjam.events import (
ServerMessagePeerAdded,
ServerMessagePeerConnected,
ServerMessagePeerCrashed,
ServerMessagePeerDeleted,
ServerMessagePeerDisconnected,
ServerMessagePeerMetadataUpdated,
ServerMessageRoomCrashed,
ServerMessageRoomCreated,
ServerMessageRoomDeleted,
ServerMessageTrackAdded,
ServerMessageTrackMetadataUpdated,
ServerMessageTrackRemoved,
)
from fishjam.events._protos.fishjam import (
ServerMessage,
ServerMessageAuthenticated,
Expand All @@ -19,59 +32,64 @@
ServerMessageSubscribeResponse,
)

ALLOWED_NOTIFICATION = (
ServerMessageRoomCreated,
ServerMessageRoomDeleted,
ServerMessageRoomCrashed,
ServerMessagePeerAdded,
ServerMessagePeerDeleted,
ServerMessagePeerConnected,
ServerMessagePeerDisconnected,
ServerMessagePeerMetadataUpdated,
ServerMessagePeerCrashed,
ServerMessageTrackAdded,
ServerMessageTrackRemoved,
ServerMessageTrackMetadataUpdated,
)


class Notifier:
class FishjamNotifier:
"""
Allows for receiving WebSocket messages from Fishjam.
"""

def __init__(self, fishjam_url: str, management_token: str):
"""
Create Notifier instance, providing the fishjam address and api token.
Set secure to `True` for `wss` and `False` for `ws` connection (default).
Create FishjamNotifier instance, providing the fishjam url and management token.
"""

self._server_address = (
self._fishjam_url = (
f"{fishjam_url.replace('http', 'ws')}/socket/server/websocket"
)
self._server_api_token = management_token
self._management_token = management_token
self._websocket = None
self._ready = False

self._ready_event: asyncio.Event = None

self._notification_handler: Callable = None
self._metrics_handler: Callable = None

def on_server_notification(self, handler: Callable[[Any], None]):
"""
Decorator used for defining handler for ServerNotifications
i.e. all messages other than `ServerMessageMetricsReport`.
Decorator used for defining handler for Fishjam Notifications
"""
self._notification_handler = handler
return handler

def on_metrics(self, handler: Callable[[ServerMessageMetricsReport], None]):
"""
Decorator used for defining handler for `ServerMessageMetricsReport`.
"""
self._metrics_handler = handler
return handler

async def connect(self):
"""
A coroutine which connects Notifier to Fishjam and listens for all incoming
messages from the Fishjam.
A coroutine which connects FishjamNotifier to Fishjam and listens for
all incoming messages from the Fishjam.
It runs until the connection isn't closed.
The incoming messages are handled by the functions defined using the
`on_server_notification` and `on_metrics` decorators.
`on_server_notification` decorator.
The handlers have to be defined before calling `connect`,
The handler have to be defined before calling `connect`,
otherwise the messages won't be received.
"""
async with client.connect(self._server_address) as websocket:
async with client.connect(self._fishjam_url) as websocket:
try:
self._websocket = websocket
await self._authenticate()
Expand All @@ -81,11 +99,6 @@ async def connect(self):
event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION
)

if self._metrics_handler:
await self._subscribe_event(
event=ServerMessageEventType.EVENT_TYPE_METRICS
)

self._ready = True
if self._ready_event:
self._ready_event.set()
Expand All @@ -110,7 +123,7 @@ async def wait_ready(self) -> True:

async def _authenticate(self):
msg = ServerMessage(
auth_request=ServerMessageAuthRequest(token=self._server_api_token)
auth_request=ServerMessageAuthRequest(token=self._management_token)
)
await self._websocket.send(bytes(msg))

Expand All @@ -132,9 +145,7 @@ async def _receive_loop(self):
message = ServerMessage().parse(message)
_which, message = betterproto.which_one_of(message, "content")

if isinstance(message, ServerMessageMetricsReport):
self._metrics_handler(message)
else:
if isinstance(message, ALLOWED_NOTIFICATION):
self._notification_handler(message)

async def _subscribe_event(self, event: ServerMessageEventType):
Expand Down
8 changes: 2 additions & 6 deletions fishjam/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@

# Exported messages
from fishjam.events._protos.fishjam import (
ServerMessageComponentCrashed,
ServerMessageHlsPlayable,
ServerMessageMetricsReport,
ServerMessagePeerAdded,
ServerMessagePeerConnected,
ServerMessagePeerCrashed,
ServerMessagePeerDeleted,
ServerMessagePeerDisconnected,
ServerMessagePeerMetadataUpdated,
ServerMessageRoomCrashed,
ServerMessageRoomCreated,
ServerMessageRoomDeleted,
Expand All @@ -30,13 +28,11 @@
"ServerMessagePeerConnected",
"ServerMessagePeerDeleted",
"ServerMessagePeerDisconnected",
"ServerMessagePeerMetadataUpdated",
"ServerMessagePeerCrashed",
"ServerMessageComponentCrashed",
"ServerMessageTrack",
"ServerMessageTrackType",
"ServerMessageTrackAdded",
"ServerMessageTrackMetadataUpdated",
"ServerMessageTrackRemoved",
"ServerMessageHlsPlayable",
"ServerMessageMetricsReport",
]
8 changes: 2 additions & 6 deletions tests/support/asyncio_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@

import asyncio

from fishjam import Notifier
from fishjam import FishjamNotifier

ASSERTION_TIMEOUT = 5.0


async def assert_events(notifier: Notifier, event_checks: list):
async def assert_events(notifier: FishjamNotifier, event_checks: list):
await _assert_messages(notifier.on_server_notification, event_checks)


async def assert_metrics(notifier: Notifier, metrics_checks: list):
await _assert_messages(notifier.on_metrics, metrics_checks)


async def _assert_messages(notifier_callback, message_checks):
success_event = asyncio.Event()

Expand Down
13 changes: 13 additions & 0 deletions tests/support/protos/fishjam/__init__.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 15 additions & 33 deletions tests/test_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,16 @@
import pytest
import requests

from fishjam import FishjamClient, Notifier, RoomOptions
from fishjam import FishjamClient, FishjamNotifier, RoomOptions
from fishjam.events import (
ServerMessageMetricsReport,
ServerMessagePeerAdded,
ServerMessagePeerConnected,
ServerMessagePeerDeleted,
ServerMessagePeerDisconnected,
ServerMessageRoomCreated,
ServerMessageRoomDeleted,
)
from tests.support.asyncio_utils import assert_events, assert_metrics, cancel
from tests.support.asyncio_utils import assert_events, cancel
from tests.support.peer_socket import PeerSocket
from tests.support.webhook_notifier import run_server

Expand Down Expand Up @@ -56,7 +55,9 @@ def start_server():
class TestConnectingToServer:
@pytest.mark.asyncio
async def test_valid_credentials(self):
notifier = Notifier(fishjam_url=FISHJAM_URL, management_token=SERVER_API_TOKEN)
notifier = FishjamNotifier(
fishjam_url=FISHJAM_URL, management_token=SERVER_API_TOKEN
)

notifier_task = asyncio.create_task(notifier.connect())
await notifier.wait_ready()
Expand All @@ -67,7 +68,9 @@ async def test_valid_credentials(self):

@pytest.mark.asyncio
async def test_invalid_credentials(self):
notifier = Notifier(fishjam_url=FISHJAM_URL, management_token="wrong_token")
notifier = FishjamNotifier(
fishjam_url=FISHJAM_URL, management_token="wrong_token"
)

task = asyncio.create_task(notifier.connect())

Expand All @@ -82,15 +85,17 @@ def room_api():

@pytest.fixture
def notifier():
notifier = Notifier(fishjam_url=FISHJAM_URL, management_token=SERVER_API_TOKEN)
notifier = FishjamNotifier(
fishjam_url=FISHJAM_URL, management_token=SERVER_API_TOKEN
)

return notifier


class TestReceivingNotifications:
@pytest.mark.asyncio
async def test_room_created_deleted(
self, room_api: FishjamClient, notifier: Notifier
self, room_api: FishjamClient, notifier: FishjamNotifier
):
event_checks = [ServerMessageRoomCreated, ServerMessageRoomDeleted]
assert_task = asyncio.create_task(assert_events(notifier, event_checks.copy()))
Expand All @@ -111,7 +116,7 @@ async def test_room_created_deleted(

@pytest.mark.asyncio
async def test_peer_connected_disconnected(
self, room_api: FishjamClient, notifier: Notifier
self, room_api: FishjamClient, notifier: FishjamNotifier
):
event_checks = [
ServerMessageRoomCreated,
Expand Down Expand Up @@ -147,7 +152,7 @@ async def test_peer_connected_disconnected(

@pytest.mark.asyncio
async def test_peer_connected_disconnected_deleted(
self, room_api: FishjamClient, notifier: Notifier
self, room_api: FishjamClient, notifier: FishjamNotifier
):
event_checks = [
ServerMessageRoomCreated,
Expand Down Expand Up @@ -186,7 +191,7 @@ async def test_peer_connected_disconnected_deleted(

@pytest.mark.asyncio
async def test_peer_connected_room_deleted(
self, room_api: FishjamClient, notifier: Notifier
self, room_api: FishjamClient, notifier: FishjamNotifier
):
event_checks = [
ServerMessageRoomCreated,
Expand Down Expand Up @@ -221,26 +226,3 @@ async def test_peer_connected_room_deleted(
def assert_event(self, event):
data = queue.get(timeout=2.5)
assert data == event or isinstance(data, event)


class TestReceivingMetrics:
@pytest.mark.asyncio
async def test_metrics_with_one_peer(
self, room_api: FishjamClient, notifier: Notifier
):
room = room_api.create_room()
_peer, token = room_api.create_peer(room.id)

peer_socket = PeerSocket(fishjam_url=FISHJAM_URL)
peer_task = asyncio.create_task(peer_socket.connect(token))

await peer_socket.wait_ready()

assert_task = asyncio.create_task(
assert_metrics(notifier, [ServerMessageMetricsReport])
)
notifier_task = asyncio.create_task(notifier.connect())

await assert_task
await cancel(peer_task)
await cancel(notifier_task)

0 comments on commit 7d84d78

Please sign in to comment.