From 7b6bbc29edcdf0cdcacb8392ba43803953f2c0a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 10 Dec 2024 18:12:33 -0800 Subject: [PATCH] parallel_pipeline: fix system frames again --- src/pipecat/pipeline/parallel_pipeline.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/pipecat/pipeline/parallel_pipeline.py b/src/pipecat/pipeline/parallel_pipeline.py index 1ccd14a98..48ec1a172 100644 --- a/src/pipecat/pipeline/parallel_pipeline.py +++ b/src/pipecat/pipeline/parallel_pipeline.py @@ -27,11 +27,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): match direction: case FrameDirection.UPSTREAM: - # We don't want to queue system frames as they would be - # processed by a separate task. - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - else: + # SystemFrames are pushed directly from ParallelPipeline. + if not isinstance(frame, SystemFrame): await self._up_queue.put(frame) case FrameDirection.DOWNSTREAM: await self.push_frame(frame, direction) @@ -49,11 +46,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): case FrameDirection.UPSTREAM: await self.push_frame(frame, direction) case FrameDirection.DOWNSTREAM: - # We don't want to queue system frames as they would be - # processed by a separate task. - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - else: + # SystemFrames are pushed directly from ParallelPipeline. + if not isinstance(frame, SystemFrame): await self._down_queue.put(frame) @@ -123,11 +117,13 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await asyncio.gather(*[s.queue_frame(frame, direction) for s in self._sinks]) elif direction == FrameDirection.DOWNSTREAM: # If we get a downstream frame we process it in each source. - # TODO(aleix): We are creating task for each frame. For real-time - # video/audio this might be too slow. We should use an already - # created task instead. await asyncio.gather(*[s.queue_frame(frame, direction) for s in self._sources]) + # If we have a SystemFrame we will push it from this task. Note that the + # connected sinks and sources ignore SystemFrames. + if isinstance(frame, SystemFrame): + await self.push_frame(frame, direction) + # If we get an EndFrame we stop our queue processing tasks and wait on # all the pipelines to finish. if isinstance(frame, (CancelFrame, EndFrame)):