Skip to content

Commit

Permalink
only close sinks if started
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Sep 14, 2023
1 parent 403cb9a commit c80e8e0
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 8 deletions.
7 changes: 7 additions & 0 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo {
return c.Info
}

for _, s := range c.sinks {
if err := s.Close(); err != nil {
c.Info.Error = err.Error()
return c.Info
}
}

return c.Info
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/sink/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *FileSink) Start() error {
return nil
}

func (s *FileSink) OnStop() error {
func (s *FileSink) Close() error {
location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/sink/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (s *SegmentSink) endSegment(filename string, endTime int64) error {
return nil
}

func (s *SegmentSink) OnStop() error {
func (s *SegmentSink) Close() error {
// wait for all pending upload jobs to finish
close(s.endedSegments)
<-s.done.Watch()
Expand Down
3 changes: 1 addition & 2 deletions pkg/pipeline/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

type Sink interface {
Start() error
OnStop() error
Close() error
Cleanup()
}

Expand Down Expand Up @@ -69,7 +69,6 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[
}

if s != nil {
callbacks.AddOnStop(s.OnStop)
sinks[egressType] = s
}
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/pipeline/sink/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,6 @@ func (s *WebsocketSink) writeMutedMessage(muted bool) error {
return s.conn.WriteMessage(websocket.TextMessage, data)
}

func (s *WebsocketSink) OnStop() error {
return nil
}

func (s *WebsocketSink) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down

0 comments on commit c80e8e0

Please sign in to comment.