Skip to content

Commit

Permalink
Requested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Karolk99 committed Oct 1, 2024
1 parent 7d84d78 commit 1134054
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 36 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ jobs:
run: poetry run lint
- name: Check format
run: poetry run format_check
- name: Type checker
run: poetry run pyright

test:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ jobs:
- name: Build and publish to pypi
uses: JRubics/[email protected]
with:
pypi_token: ${{ secrets.PYPI_TOKEN }}
pypi_token: ${{ secrets.PYPI_TOKEN }}
12 changes: 5 additions & 7 deletions examples/room-manager/room_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def __init__(self, args: Namespace, logger: Logger):
management_token=args.management_token,
)
self.websocket_url = args.fishjam_url.replace("http", "ws")
self.room_name_to_room_id = {}
self.peer_name_to_access = {}
self.room_name_to_room_id: dict[str, str] = {}
self.peer_name_to_access: dict[str, PeerAccess] = {}
self.logger = logger
self.config = args

Expand All @@ -47,7 +47,7 @@ def get_peer_access(self, room_name: str, username: str) -> PeerAccess:
"Got room: %s", {"name": room_name, "id": room.id, "peers": room.peers}
)

if not peer_in_room:
if not peer_access or not peer_in_room:
return self.__create_peer(room_name, username)

self.logger.info("Peer and room exist: %s, %s", username, room_name)
Expand All @@ -63,13 +63,11 @@ def handle_notification(self, notification: betterproto.Message):
case _:
pass

return

def __find_or_create_room(self, room_name: str) -> Room:
if room_name in self.room_name_to_room_id:
self.logger.info("Room %s, already exists in the Fishjam", room_name)

room_id = self.room_name_to_room_id.get(room_name)
room_id = self.room_name_to_room_id[room_name]
return self.fishjam_client.get_room(room_id=room_id)

options = RoomOptions(
Expand All @@ -86,7 +84,7 @@ def __find_or_create_room(self, room_name: str) -> Room:
return new_room

def __create_peer(self, room_name: str, peer_name: str) -> PeerAccess:
room_id = self.room_name_to_room_id.get(room_name)
room_id = self.room_name_to_room_id[room_name]

options = PeerOptions(
enable_simulcast=self.config.enable_simulcast,
Expand Down
5 changes: 4 additions & 1 deletion examples/room-manager/routes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import asdict

from flask import Flask, jsonify, request
from flask import Flask, abort, jsonify, request
from room_service import RoomService

from fishjam import receive_binary
Expand All @@ -24,6 +24,9 @@ def get_room_query():
room_name = request.args.get("roomName")
peer_name = request.args.get("peerName")

if not room_name or not peer_name:
return abort(400)

access_data = room_service.get_peer_access(room_name, peer_name)
response = asdict(access_data)

Expand Down
2 changes: 1 addition & 1 deletion fishjam/_webhook_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
def receive_binary(binary: bytes) -> betterproto.Message:
"""
Transform received protobuf notification to adequate notification instance.
The available notifications are listed in `fishjam.events` module.
"""
message = ServerMessage().parse(binary)
_which, message = betterproto.which_one_of(message, "content")

return message
35 changes: 23 additions & 12 deletions fishjam/_ws_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

import asyncio
from typing import Any, Callable
from typing import Any, Callable, Literal, cast

import betterproto
from websockets import client
Expand Down Expand Up @@ -61,13 +61,13 @@ def __init__(self, fishjam_url: str, management_token: str):
self._fishjam_url = (
f"{fishjam_url.replace('http', 'ws')}/socket/server/websocket"
)
self._management_token = management_token
self._websocket = None
self._ready = False
self._management_token: str = management_token
self._websocket: client.WebSocketClientProtocol | None = None
self._ready: bool = False

self._ready_event: asyncio.Event = None
self._ready_event: asyncio.Event | None = None

self._notification_handler: Callable = None
self._notification_handler: Callable | None = None

def on_server_notification(self, handler: Callable[[Any], None]):
"""
Expand Down Expand Up @@ -107,7 +107,7 @@ async def connect(self):
finally:
self._websocket = None

async def wait_ready(self) -> True:
async def wait_ready(self) -> Literal[True]:
"""
Waits until the notifier is connected and authenticated to Fishjam.
Expand All @@ -119,19 +119,22 @@ async def wait_ready(self) -> True:
if self._ready_event is None:
self._ready_event = asyncio.Event()

await self._ready_event.wait()
return await self._ready_event.wait()

async def _authenticate(self):
if not self._websocket:
raise RuntimeError("Websocket is not connected")

msg = ServerMessage(
auth_request=ServerMessageAuthRequest(token=self._management_token)
)
await self._websocket.send(bytes(msg))

try:
message = await self._websocket.recv()
message = cast(bytes, await self._websocket.recv())
except ConnectionClosed as exception:
if "invalid token" in str(exception):
raise RuntimeError("Invalid server_api_token") from exception
raise RuntimeError("Invalid management token") from exception
raise

message = ServerMessage().parse(message)
Expand All @@ -140,19 +143,27 @@ async def _authenticate(self):
assert isinstance(message, ServerMessageAuthenticated)

async def _receive_loop(self):
if not self._websocket:
raise RuntimeError("Websocket is not connected")
if not self._notification_handler:
raise RuntimeError("Notification handler is not defined")

while True:
message = await self._websocket.recv()
message = cast(bytes, await self._websocket.recv())
message = ServerMessage().parse(message)
_which, message = betterproto.which_one_of(message, "content")

if isinstance(message, ALLOWED_NOTIFICATION):
self._notification_handler(message)

async def _subscribe_event(self, event: ServerMessageEventType):
if not self._websocket:
raise RuntimeError("Websocket is not connected")

request = ServerMessage(subscribe_request=ServerMessageSubscribeRequest(event))

await self._websocket.send(bytes(request))
message = await self._websocket.recv()
message = cast(bytes, await self._websocket.recv())
message = ServerMessage().parse(message)
_which, message = betterproto.which_one_of(message, "content")
assert isinstance(message, ServerMessageSubscribeResponse)
3 changes: 1 addition & 2 deletions fishjam/api/_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from fishjam._openapi_client.client import AuthenticatedClient
from fishjam._openapi_client.models import Error
from fishjam._openapi_client.types import Response
from fishjam.errors import HTTPError


Expand All @@ -9,7 +8,7 @@ def __init__(self, fishjam_url: str, management_token: str):
self.client = AuthenticatedClient(f"{fishjam_url}", token=management_token)

def _request(self, method, **kwargs):
response: Response = method.sync_detailed(client=self.client, **kwargs)
response = method.sync_detailed(client=self.client, **kwargs)

if isinstance(response.parsed, Error):
raise HTTPError.from_response(response)
Expand Down
24 changes: 13 additions & 11 deletions fishjam/api/_fishjam_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

from dataclasses import dataclass
from typing import List, Literal, NewType, Tuple
from typing import Any, List, Literal, NewType, Tuple

from fishjam._openapi_client.api.room import add_peer as room_add_peer
from fishjam._openapi_client.api.room import create_room as room_create_room
Expand Down Expand Up @@ -42,26 +42,26 @@ class Room:
class RoomOptions:
"""Description of a room options"""

max_peers: int = None
max_peers: int | None = None
"""Maximum amount of peers allowed into the room"""
peer_disconnected_timeout: int = None
peer_disconnected_timeout: int | None = None
"""
Duration (in seconds) after which the peer will be removed if it is disconnected.
If not provided, this feature is disabled.
"""
peerless_purge_timeout: int = None
peerless_purge_timeout: int | None = None
"""
Duration (in seconds) after which the room will be removed
if no peers are connected. If not provided, this feature is disabled.
"""
room_id: str = None
room_id: str | None = None
"""
Custom id used for identifying room within Fishjam.
Must be unique across all rooms. If not provided, random UUID is generated.
"""
video_codec: Literal["h264", "vp8"] = None
video_codec: Literal["h264", "vp8"] | None = None
"""Enforces video codec for each peer in the room"""
webhook_url: str = None
webhook_url: str | None = None
"""URL where Fishjam notifications will be sent"""


Expand All @@ -71,7 +71,7 @@ class PeerOptions:

enable_simulcast: bool = True
"""Enables the peer to use simulcast"""
metadata: dict = None
metadata: dict[str, Any] | None = None
"""Peer metadata"""


Expand All @@ -85,7 +85,7 @@ def __init__(self, fishjam_url: str, management_token: str):
super().__init__(fishjam_url=fishjam_url, management_token=management_token)

def create_peer(
self, room_id: str, options: PeerOptions = PeerOptions()
self, room_id: str, options: PeerOptions | None = None
) -> Tuple[Peer, PeerToken]:
"""
Creates peer in the room
Expand All @@ -95,6 +95,7 @@ def create_peer(
The possible options to pass for peer are `PeerOptions`.
"""
options = options or PeerOptions()

peer_type = "webrtc"
peer_metadata = self.__parse_peer_metadata(options.metadata)
Expand All @@ -107,11 +108,12 @@ def create_peer(

return (resp.data.peer, resp.data.token)

def create_room(self, options: RoomOptions = RoomOptions()) -> Room:
def create_room(self, options: RoomOptions | None = None) -> Room:
"""
Creates a new room
Returns the created `Room`
"""
options = options or RoomOptions()

codec = None
if options.video_codec:
Expand Down Expand Up @@ -155,7 +157,7 @@ def delete_room(self, room_id: str) -> None:

return self._request(room_delete_room, room_id=room_id)

def __parse_peer_metadata(self, metadata: dict) -> PeerOptionsWebRTCMetadata:
def __parse_peer_metadata(self, metadata: dict | None) -> PeerOptionsWebRTCMetadata:
peer_metadata = PeerOptionsWebRTCMetadata()

if not metadata:
Expand Down
33 changes: 32 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions poetry_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ def check_exit_code(command):
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)

if not process.stdout:
raise RuntimeError("Process has no STDOUT")

while True:
output = process.stdout.readline()
if output == b"" and process.poll() is not None:
Expand All @@ -29,6 +32,7 @@ def run_tests():
)
check_exit_code("docker compose -f docker-compose-test.yaml down")


def run_local_test():
check_exit_code('poetry run pytest -m "not file_component_sources"')

Expand Down
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ betterproto = "2.0.0b6"
httpx = ">=0.20.0,<0.26.0"
attrs = ">=21.3.0"
flask = "^3.0.3"
pyright = "^1.1.382.post1"

[tool.poetry.group.dev.dependencies]
betterproto = { version = "= 2.0.0b6", extras = ["compiler"] }
Expand Down Expand Up @@ -62,3 +63,9 @@ ignore = []
markers = [
"file_component_sources: Tests requiring files uploaded for File Component"
]

[tool.pyright]
exclude = ["**/venv", "**/__pycache__", ".pytest_cache", ".ruff_cache", "lib", "fishjam/_openapi_client", "tests"]
typeCheckingMode = "basic"
venv = "venv"
venvPath = "."
4 changes: 4 additions & 0 deletions tests/test_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ async def test_valid_credentials(self):
fishjam_url=FISHJAM_URL, management_token=SERVER_API_TOKEN
)

@notifier.on_server_notification
def handle_notitifcation(_notification):
pass

notifier_task = asyncio.create_task(notifier.connect())
await notifier.wait_ready()
# pylint: disable=protected-access
Expand Down

0 comments on commit 1134054

Please sign in to comment.