diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index bd390d04..eb93574b 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -208,6 +208,13 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { return c.Info } + for _, s := range c.sinks { + if err := s.Close(); err != nil { + c.Info.Error = err.Error() + return c.Info + } + } + return c.Info } diff --git a/pkg/pipeline/sink/file.go b/pkg/pipeline/sink/file.go index 70d403ab..97acb050 100644 --- a/pkg/pipeline/sink/file.go +++ b/pkg/pipeline/sink/file.go @@ -43,7 +43,7 @@ func (s *FileSink) Start() error { return nil } -func (s *FileSink) OnStop() error { +func (s *FileSink) Close() error { location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false) if err != nil { return err diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index fd64f01b..d8e2f99b 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -207,7 +207,7 @@ func (s *SegmentSink) endSegment(filename string, endTime int64) error { return nil } -func (s *SegmentSink) OnStop() error { +func (s *SegmentSink) Close() error { // wait for all pending upload jobs to finish close(s.endedSegments) <-s.done.Watch() diff --git a/pkg/pipeline/sink/sink.go b/pkg/pipeline/sink/sink.go index b1f5b1a2..05e4a41b 100644 --- a/pkg/pipeline/sink/sink.go +++ b/pkg/pipeline/sink/sink.go @@ -23,7 +23,7 @@ import ( type Sink interface { Start() error - OnStop() error + Close() error Cleanup() } @@ -69,7 +69,6 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[ } if s != nil { - callbacks.AddOnStop(s.OnStop) sinks[egressType] = s } } diff --git a/pkg/pipeline/sink/websocket.go b/pkg/pipeline/sink/websocket.go index 3e3d1d86..86ce787b 100644 --- a/pkg/pipeline/sink/websocket.go +++ b/pkg/pipeline/sink/websocket.go @@ -196,10 +196,6 @@ func (s *WebsocketSink) writeMutedMessage(muted bool) error { return s.conn.WriteMessage(websocket.TextMessage, data) } -func (s *WebsocketSink) OnStop() error { - return nil -} - func (s *WebsocketSink) Close() error { s.mu.Lock() defer s.mu.Unlock()