Skip to content

Commit

Permalink
Merge pull request pipecat-ai#820 from pipecat-ai/aleix/more-parallel…
Browse files Browse the repository at this point in the history
…pipeline-fixes

parallel_pipeline: fix system frames again
  • Loading branch information
aconchillo authored Dec 11, 2024
2 parents ffe1e02 + 7b6bbc2 commit 4f9a4eb
Showing 1 changed file with 9 additions and 13 deletions.
22 changes: 9 additions & 13 deletions src/pipecat/pipeline/parallel_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):

match direction:
case FrameDirection.UPSTREAM:
# We don't want to queue system frames as they would be
# processed by a separate task.
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
else:
# SystemFrames are pushed directly from ParallelPipeline.
if not isinstance(frame, SystemFrame):
await self._up_queue.put(frame)
case FrameDirection.DOWNSTREAM:
await self.push_frame(frame, direction)
Expand All @@ -49,11 +46,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
case FrameDirection.UPSTREAM:
await self.push_frame(frame, direction)
case FrameDirection.DOWNSTREAM:
# We don't want to queue system frames as they would be
# processed by a separate task.
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)
else:
# SystemFrames are pushed directly from ParallelPipeline.
if not isinstance(frame, SystemFrame):
await self._down_queue.put(frame)


Expand Down Expand Up @@ -123,11 +117,13 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
await asyncio.gather(*[s.queue_frame(frame, direction) for s in self._sinks])
elif direction == FrameDirection.DOWNSTREAM:
# If we get a downstream frame we process it in each source.
# TODO(aleix): We are creating task for each frame. For real-time
# video/audio this might be too slow. We should use an already
# created task instead.
await asyncio.gather(*[s.queue_frame(frame, direction) for s in self._sources])

# If we have a SystemFrame we will push it from this task. Note that the
# connected sinks and sources ignore SystemFrames.
if isinstance(frame, SystemFrame):
await self.push_frame(frame, direction)

# If we get an EndFrame we stop our queue processing tasks and wait on
# all the pipelines to finish.
if isinstance(frame, (CancelFrame, EndFrame)):
Expand Down

0 comments on commit 4f9a4eb

Please sign in to comment.