diff --git a/examples/video-stream/README.md b/examples/video-stream/README.md new file mode 100644 index 00000000..44d58ae7 --- /dev/null +++ b/examples/video-stream/README.md @@ -0,0 +1,40 @@ +# Video and Audio Synchronization Examples + +This example demonstrates how to synchronize video and audio streams using the `AVSynchronizer` utility. + +## AVSynchronizer Usage + +The `AVSynchronizer` helps maintain synchronization between video and audio frames. The key principle is to push the initial synchronized video and audio frames together. After that, subsequent frames will be automatically synchronized according to the configured video FPS and audio sample rate. + +```python +av_sync = AVSynchronizer( + audio_source=audio_source, + video_source=video_source, + video_fps=30.0, + video_queue_size_ms=100 +) + +# Push frames to synchronizer +await av_sync.push(video_frame) +await av_sync.push(audio_frame) +``` + +## Examples + +### 1. Video File Playback (`video_play.py`) +Shows how to stream video and audio from separate sources while maintaining sync: + +- Reads video and audio streams separately from a media file +- Uses separate tasks to push video and audio frames to the synchronizer +- Since the streams are continuous, a larger `queue_size_ms` can be used, though this will increase memory usage + +### 2. Audio Visualization (`audio_wave.py`) +Demonstrates generating video based on audio input: + +- Generates audio frames with alternating sine waves and silence +- Creates video frames visualizing the audio waveform +- Shows how to handle cases with and without audio: + - When audio is present: Push synchronized video and audio frames + - During silence: Push only video frames +- Since video and audio frames are pushed in the same loop, audio frames must be smaller than the audio source queue size to avoid blocking +- Uses a small `queue_size_ms` (e.g. 50ms) to control frame generation speed during silence periods diff --git a/examples/video-stream/audio_wave.py b/examples/video-stream/audio_wave.py new file mode 100644 index 00000000..3f663c6b --- /dev/null +++ b/examples/video-stream/audio_wave.py @@ -0,0 +1,320 @@ +import asyncio +import logging +import os +import signal +import time +from collections import deque +from dataclasses import dataclass +from typing import AsyncIterable, Optional, Union + +import numpy as np +from livekit import rtc, api + +try: + import cv2 +except ImportError: + raise RuntimeError( + "cv2 is required to run this example, " + "install with `pip install opencv-python`" + ) + +# ensure LIVEKIT_URL, LIVEKIT_API_KEY, and LIVEKIT_API_SECRET are set + +logger = logging.getLogger(__name__) + + +@dataclass +class MediaInfo: + video_width: int + video_height: int + video_fps: float + audio_sample_rate: int + audio_channels: int + + +class _AudioEndSentinel: + pass + + +async def audio_generator( + media_info: MediaInfo, + output_audio: asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]], +): + """Generates audio frames with alternating sine wave and silence periods""" + frequency = 480 # Hz + amplitude = 0.5 + period = 7.0 + sine_duration = 5.0 # Duration of sine wave in each period + chunk_size = 1024 + + while True: + current_time = 0.0 + + # Generate audio for sine_duration seconds + while current_time < sine_duration: + t = np.linspace( + current_time, + current_time + chunk_size / media_info.audio_sample_rate, + num=chunk_size, + endpoint=False, + ) + # Create volume envelope using sine wave + volume = np.abs(np.sin(2 * np.pi * current_time / sine_duration)) + samples = amplitude * volume * np.sin(2 * np.pi * frequency * t) + + # Convert to int16, (samples, channels) + samples = (samples[:, np.newaxis] * 32767).astype(np.int16) + if media_info.audio_channels > 1: + samples = np.repeat(samples, media_info.audio_channels, axis=1) + + # Create audio frame + audio_frame = rtc.AudioFrame( + data=samples.tobytes(), + sample_rate=media_info.audio_sample_rate, + num_channels=samples.shape[1], + samples_per_channel=samples.shape[0], + ) + await output_audio.put(audio_frame) + current_time += chunk_size / media_info.audio_sample_rate + await asyncio.sleep(0) + await output_audio.put(_AudioEndSentinel()) + + # Simulate silence + silence_duration = period - sine_duration + await asyncio.sleep(silence_duration) + + +class WaveformVisualizer: + def __init__(self, history_length: int = 1000): + self.history_length = history_length + self.volume_history: deque[float] = deque(maxlen=history_length) + self.start_time = time.time() + + def draw_timestamp(self, canvas: np.ndarray, fps: float): + height, width = canvas.shape[:2] + text = f"{time.time() - self.start_time:.1f}s @ {fps:.1f}fps" + font_face = cv2.FONT_HERSHEY_SIMPLEX + font_scale = 2.0 + thickness = 2 + + (text_width, text_height), baseline = cv2.getTextSize( + text, font_face, font_scale, thickness + ) + x = (width - text_width) // 2 + y = int((height - text_height) * 0.4 + baseline) + cv2.putText(canvas, text, (x, y), font_face, font_scale, (0, 0, 0), thickness) + + def draw_current_wave( + self, canvas: np.ndarray, audio_samples: np.ndarray + ) -> np.ndarray: + """Draw the current waveform and return the current values""" + height, width = canvas.shape[:2] + center_y = height // 2 + 100 + + normalized_samples = audio_samples.astype(np.float32) / 32767.0 + normalized_samples = normalized_samples.mean(axis=1) # (samples,) + num_points = min(width, len(normalized_samples)) + + if len(normalized_samples) > num_points: + indices = np.linspace(0, len(normalized_samples) - 1, num_points, dtype=int) + plot_data = normalized_samples[indices] + else: + plot_data = normalized_samples + + x_coords = np.linspace(0, width, num_points, dtype=int) + y_coords = (plot_data * 200) + center_y + + cv2.line(canvas, (0, center_y), (width, center_y), (200, 200, 200), 1) + points = np.column_stack((x_coords, y_coords.astype(int))) + for i in range(len(points) - 1): + cv2.line(canvas, tuple(points[i]), tuple(points[i + 1]), (0, 255, 0), 2) + + return plot_data + + def draw_volume_history(self, canvas: np.ndarray, current_volume: float): + height, width = canvas.shape[:2] + center_y = height // 2 + + self.volume_history.append(current_volume) + cv2.line( + canvas, (0, center_y - 250), (width, center_y - 250), (200, 200, 200), 1 + ) + + volume_x = np.linspace(0, width, len(self.volume_history), dtype=int) + volume_y = center_y - 250 + (np.array(self.volume_history) * 200) + points = np.column_stack((volume_x, volume_y.astype(int))) + for i in range(len(points) - 1): + cv2.line(canvas, tuple(points[i]), tuple(points[i + 1]), (255, 0, 0), 2) + + def draw(self, canvas: np.ndarray, audio_samples: np.ndarray, fps: float): + self.draw_timestamp(canvas, fps) + plot_data = self.draw_current_wave(canvas, audio_samples) + current_volume = np.abs(plot_data).mean() + self.draw_volume_history(canvas, current_volume) + + +async def video_generator( + media_info: MediaInfo, + input_audio: asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]], + av_sync: rtc.AVSynchronizer, # only used for drawing the actual fps on the video +) -> AsyncIterable[tuple[rtc.VideoFrame, Optional[rtc.AudioFrame]]]: + canvas = np.zeros( + (media_info.video_height, media_info.video_width, 4), dtype=np.uint8 + ) + canvas.fill(255) + + def _np_to_video_frame(image: np.ndarray) -> rtc.VideoFrame: + return rtc.VideoFrame( + width=image.shape[1], + height=image.shape[0], + type=rtc.VideoBufferType.RGBA, + data=image.tobytes(), + ) + + audio_samples_per_frame = int(media_info.audio_sample_rate / media_info.video_fps) + audio_buffer = np.zeros((0, media_info.audio_channels), dtype=np.int16) + wave_visualizer = WaveformVisualizer() + while True: + try: + # timeout has to be shorter than the frame interval to avoid starvation + audio_frame = await asyncio.wait_for( + input_audio.get(), timeout=0.5 / media_info.video_fps + ) + except asyncio.TimeoutError: + # generate frame without audio (e.g. silence state) + new_frame = canvas.copy() + wave_visualizer.draw(new_frame, np.zeros((1, 2)), av_sync.actual_fps) + video_frame = _np_to_video_frame(new_frame) + yield video_frame, None + + # speed is controlled by the video fps in av_sync + await asyncio.sleep(0) + continue + + if isinstance(audio_frame, _AudioEndSentinel): + # drop the audio buffer when the audio finished + audio_buffer = np.zeros((0, media_info.audio_channels), dtype=np.int16) + continue + + audio_samples = np.frombuffer(audio_frame.data, dtype=np.int16).reshape( + -1, audio_frame.num_channels + ) # (samples, channels) + # accumulate audio samples to the buffer + audio_buffer = np.concatenate([audio_buffer, audio_samples], axis=0) + + while audio_buffer.shape[0] >= audio_samples_per_frame: + sub_samples = audio_buffer[:audio_samples_per_frame, :] + audio_buffer = audio_buffer[audio_samples_per_frame:, :] + + new_frame = canvas.copy() + wave_visualizer.draw(new_frame, sub_samples, av_sync.actual_fps) + video_frame = _np_to_video_frame(new_frame) + sub_audio_frame = rtc.AudioFrame( + data=sub_samples.tobytes(), + sample_rate=audio_frame.sample_rate, + num_channels=sub_samples.shape[1], + samples_per_channel=sub_samples.shape[0], + ) + yield video_frame, sub_audio_frame + + +async def main(room: rtc.Room): + token = ( + api.AccessToken() + .with_identity("python-publisher") + .with_name("Python Publisher") + .with_grants( + api.VideoGrants( + room_join=True, + room="room-ysBA-Q0hM", + agent=True, + ) + ) + .to_jwt() + ) + url = os.getenv("LIVEKIT_URL") + logging.info("connecting to %s", url) + + try: + await room.connect(url, token) + logging.info("connected to room %s", room.name) + except rtc.ConnectError as e: + logging.error("failed to connect to the room: %s", e) + return + + # Create media info + media_info = MediaInfo( + video_width=1280, + video_height=720, + video_fps=30.0, + audio_sample_rate=48000, + audio_channels=2, + ) + + # Create video and audio sources/tracks + queue_size_ms = 50 + video_source = rtc.VideoSource( + width=media_info.video_width, + height=media_info.video_height, + ) + audio_source = rtc.AudioSource( + sample_rate=media_info.audio_sample_rate, + num_channels=media_info.audio_channels, + queue_size_ms=queue_size_ms, + ) + + video_track = rtc.LocalVideoTrack.create_video_track("video", video_source) + audio_track = rtc.LocalAudioTrack.create_audio_track("audio", audio_source) + + # Publish tracks + video_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_CAMERA) + audio_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) + + await room.local_participant.publish_track(video_track, video_options) + await room.local_participant.publish_track(audio_track, audio_options) + + # Create AV synchronizer + av_sync = rtc.AVSynchronizer( + audio_source=audio_source, + video_source=video_source, + video_fps=media_info.video_fps, + video_queue_size_ms=queue_size_ms, + ) + + # Start audio generator + audio_queue = asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]](maxsize=1) + audio_task = asyncio.create_task(audio_generator(media_info, audio_queue)) + + try: + async for video_frame, audio_frame in video_generator( + media_info, audio_queue, av_sync=av_sync + ): + await av_sync.push(video_frame) + if audio_frame: + await av_sync.push(audio_frame) + finally: + audio_task.cancel() + await av_sync.aclose() + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + handlers=[logging.FileHandler("audio_wave.log"), logging.StreamHandler()], + ) + + loop = asyncio.get_event_loop() + room = rtc.Room(loop=loop) + + async def cleanup(): + await room.disconnect() + loop.stop() + + asyncio.ensure_future(main(room)) + for signal in [signal.SIGINT, signal.SIGTERM]: + loop.add_signal_handler(signal, lambda: asyncio.ensure_future(cleanup())) + + try: + loop.run_forever() + finally: + loop.close() diff --git a/examples/video-stream/video_play.py b/examples/video-stream/video_play.py new file mode 100644 index 00000000..98a71dd7 --- /dev/null +++ b/examples/video-stream/video_play.py @@ -0,0 +1,200 @@ +import asyncio +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import AsyncIterable, Union + +import numpy as np +import os +import signal +from livekit import api +from livekit import rtc + +try: + import av +except ImportError: + raise RuntimeError( + "av is required to run this example, install with `pip install av`" + ) + +# ensure LIVEKIT_URL, LIVEKIT_API_KEY, and LIVEKIT_API_SECRET are set + +logger = logging.getLogger(__name__) + + +@dataclass +class MediaInfo: + video_width: int + video_height: int + video_fps: float + audio_sample_rate: int + audio_channels: int + + +class MediaFileStreamer: + """Streams video and audio frames from a media file in an endless loop.""" + + def __init__(self, media_file: Union[str, Path]) -> None: + self._media_file = str(media_file) + # Create separate containers for each stream + self._video_container = av.open(self._media_file) + self._audio_container = av.open(self._media_file) + + # Cache media info + video_stream = self._video_container.streams.video[0] + audio_stream = self._audio_container.streams.audio[0] + self._info = MediaInfo( + video_width=video_stream.width, + video_height=video_stream.height, + video_fps=float(video_stream.average_rate), # type: ignore + audio_sample_rate=audio_stream.sample_rate, + audio_channels=audio_stream.channels, + ) + + @property + def info(self) -> MediaInfo: + return self._info + + async def stream_video(self) -> AsyncIterable[rtc.VideoFrame]: + """Streams video frames from the media file in an endless loop.""" + for av_frame in self._video_container.decode(video=0): + # Convert video frame to RGBA + frame = av_frame.to_rgb().to_ndarray() + frame_rgba = np.ones((frame.shape[0], frame.shape[1], 4), dtype=np.uint8) + frame_rgba[:, :, :3] = frame + yield rtc.VideoFrame( + width=frame.shape[1], + height=frame.shape[0], + type=rtc.VideoBufferType.RGBA, + data=frame_rgba.tobytes(), + ) + + async def stream_audio(self) -> AsyncIterable[rtc.AudioFrame]: + """Streams audio frames from the media file in an endless loop.""" + for av_frame in self._audio_container.decode(audio=0): + # Convert audio frame to raw int16 samples + frame = av_frame.to_ndarray().T # Transpose to (samples, channels) + frame = (frame * 32768).astype(np.int16) + yield rtc.AudioFrame( + data=frame.tobytes(), + sample_rate=self.info.audio_sample_rate, + num_channels=frame.shape[1], + samples_per_channel=frame.shape[0], + ) + + def reset(self): + self._video_container.seek(0) + self._audio_container.seek(0) + + async def aclose(self) -> None: + """Closes the media container and stops streaming.""" + self._video_container.close() + self._audio_container.close() + + +async def main(room: rtc.Room): + token = ( + api.AccessToken() + .with_identity("python-publisher") + .with_name("Python Publisher") + .with_grants( + api.VideoGrants( + room_join=True, + room="my-room", + ) + ) + .to_jwt() + ) + url = os.getenv("LIVEKIT_URL") + logging.info("connecting to %s", url) + + try: + await room.connect(url, token) + logging.info("connected to room %s", room.name) + except rtc.ConnectError as e: + logging.error("failed to connect to the room: %s", e) + return + + # Create media streamer + media_path = "/path/to/video.mp4" + streamer = MediaFileStreamer(media_path) + media_info = streamer.info + + # Create video and audio sources/tracks + queue_size_ms = 1000 # 1 second + video_source = rtc.VideoSource( + width=media_info.video_width, + height=media_info.video_height, + ) + logger.info(media_info) + audio_source = rtc.AudioSource( + sample_rate=media_info.audio_sample_rate, + num_channels=media_info.audio_channels, + queue_size_ms=queue_size_ms, + ) + + video_track = rtc.LocalVideoTrack.create_video_track("video", video_source) + audio_track = rtc.LocalAudioTrack.create_audio_track("audio", audio_source) + + # Publish tracks + video_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_CAMERA) + audio_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) + + await room.local_participant.publish_track(video_track, video_options) + await room.local_participant.publish_track(audio_track, audio_options) + + av_sync = rtc.AVSynchronizer( + audio_source=audio_source, + video_source=video_source, + video_fps=media_info.video_fps, + video_queue_size_ms=queue_size_ms, + ) + + async def _push_frames( + stream: AsyncIterable[rtc.VideoFrame | rtc.AudioFrame], + av_sync: rtc.AVSynchronizer, + ): + async for frame in stream: + await av_sync.push(frame) + await asyncio.sleep(0) + + try: + while True: + streamer.reset() + video_task = asyncio.create_task( + _push_frames(streamer.stream_video(), av_sync) + ) + audio_task = asyncio.create_task( + _push_frames(streamer.stream_audio(), av_sync) + ) + + # wait for both tasks to complete + await asyncio.gather(video_task, audio_task) + await av_sync.wait_for_playout() + logger.info("playout finished") + finally: + await streamer.aclose() + await av_sync.aclose() + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + handlers=[logging.FileHandler("video_play.log"), logging.StreamHandler()], + ) + + loop = asyncio.get_event_loop() + room = rtc.Room(loop=loop) + + async def cleanup(): + await room.disconnect() + loop.stop() + + asyncio.ensure_future(main(room)) + for signal in [signal.SIGINT, signal.SIGTERM]: + loop.add_signal_handler(signal, lambda: asyncio.ensure_future(cleanup())) + + try: + loop.run_forever() + finally: + loop.close() diff --git a/livekit-rtc/livekit/rtc/__init__.py b/livekit-rtc/livekit/rtc/__init__.py index 61c2ae51..bfecba2b 100644 --- a/livekit-rtc/livekit/rtc/__init__.py +++ b/livekit-rtc/livekit/rtc/__init__.py @@ -73,6 +73,7 @@ from .audio_resampler import AudioResampler, AudioResamplerQuality from .utils import combine_audio_frames from .rpc import RpcError, RpcInvocationData +from .synchronizer import AVSynchronizer __all__ = [ "ConnectionQuality", @@ -137,5 +138,6 @@ "RpcInvocationData", "EventEmitter", "combine_audio_frames", + "AVSynchronizer", "__version__", ] diff --git a/livekit-rtc/livekit/rtc/synchronizer.py b/livekit-rtc/livekit/rtc/synchronizer.py new file mode 100644 index 00000000..09d442cd --- /dev/null +++ b/livekit-rtc/livekit/rtc/synchronizer.py @@ -0,0 +1,174 @@ +import asyncio +import logging +import time +from collections import deque +from typing import Optional, Union + +from .video_frame import VideoFrame +from .audio_frame import AudioFrame +from .audio_source import AudioSource +from .video_source import VideoSource + +logger = logging.getLogger(__name__) + + +class AVSynchronizer: + """Synchronize audio and video capture. + + Usage: + av_sync = AVSynchronizer( + audio_source=audio_source, + video_source=video_source, + video_fps=video_fps, + ) + + async for video_frame, audio_frame in video_generator: + await av_sync.push(video_frame) + await av_sync.push(audio_frame) + """ + + def __init__( + self, + *, + audio_source: AudioSource, + video_source: VideoSource, + video_fps: float, + video_queue_size_ms: float = 100, + _max_delay_tolerance_ms: float = 300, + ): + self._audio_source = audio_source + self._video_source = video_source + self._video_fps = video_fps + self._video_queue_size_ms = video_queue_size_ms + self._max_delay_tolerance_ms = _max_delay_tolerance_ms + + self._stopped = False + + self._video_queue_max_size = int( + self._video_fps * self._video_queue_size_ms / 1000 + ) + if self._video_queue_size_ms > 0: + # ensure queue is bounded if queue size is specified + self._video_queue_max_size = max(1, self._video_queue_max_size) + + self._video_queue = asyncio.Queue[VideoFrame]( + maxsize=self._video_queue_max_size + ) + self._fps_controller = _FPSController( + expected_fps=self._video_fps, + max_delay_tolerance_ms=self._max_delay_tolerance_ms, + ) + self._capture_video_task = asyncio.create_task(self._capture_video()) + + async def push(self, frame: Union[VideoFrame, AudioFrame]) -> None: + if isinstance(frame, AudioFrame): + await self._audio_source.capture_frame(frame) + return + + await self._video_queue.put(frame) + + async def clear_queue(self) -> None: + self._audio_source.clear_queue() + while not self._video_queue.empty(): + await self._video_queue.get() + + async def wait_for_playout(self) -> None: + """Wait until all video and audio frames are played out.""" + await self._audio_source.wait_for_playout() + await self._video_queue.join() + + async def _capture_video(self) -> None: + while not self._stopped: + frame = await self._video_queue.get() + async with self._fps_controller: + self._video_source.capture_frame(frame) + self._video_queue.task_done() + + async def aclose(self) -> None: + self._stopped = True + if self._capture_video_task: + self._capture_video_task.cancel() + + @property + def actual_fps(self) -> float: + return self._fps_controller.actual_fps + + +class _FPSController: + def __init__( + self, *, expected_fps: float, max_delay_tolerance_ms: float = 300 + ) -> None: + """Controls frame rate by adjusting sleep time based on actual FPS. + + Usage: + async with _FPSController(expected_fps=30): + # process frame + pass + + Args: + expected_fps: Target frames per second + max_delay_tolerance_ms: Maximum delay tolerance in milliseconds + """ + self._expected_fps = expected_fps + self._frame_interval = 1.0 / expected_fps + self._max_delay_tolerance_secs = max_delay_tolerance_ms / 1000 + + self._next_frame_time: Optional[float] = None + self._fps_calc_winsize = max(2, int(1.0 * expected_fps)) + self._send_timestamps: deque[float] = deque(maxlen=self._fps_calc_winsize) + + async def __aenter__(self) -> None: + await self.wait_next_process() + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + self.after_process() + + async def wait_next_process(self) -> None: + """Wait until it's time for the next frame. + + Adjusts sleep time based on actual FPS to maintain target rate. + """ + current_time = time.perf_counter() + + # initialize the next frame time + if self._next_frame_time is None: + self._next_frame_time = current_time + + # calculate sleep time + sleep_time = self._next_frame_time - current_time + if sleep_time > 0: + await asyncio.sleep(sleep_time) + else: + # check if significantly behind schedule + if -sleep_time > self._max_delay_tolerance_secs: + logger.warning( + f"Frame capture was behind schedule for " + f"{-sleep_time * 1000:.2f} ms" + ) + self._next_frame_time = time.perf_counter() + + def after_process(self) -> None: + """Update timing information after processing a frame.""" + assert ( + self._next_frame_time is not None + ), "wait_next_process must be called first" + + # update timing information + self._send_timestamps.append(time.perf_counter()) + + # calculate next frame time + self._next_frame_time += self._frame_interval + + @property + def expected_fps(self) -> float: + return self._expected_fps + + @property + def actual_fps(self) -> float: + """Get current average FPS.""" + if len(self._send_timestamps) < 2: + return 0 + + return (len(self._send_timestamps) - 1) / ( + self._send_timestamps[-1] - self._send_timestamps[0] + )