Skip to content

Commit

Permalink
always close source
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Sep 13, 2023
1 parent 0545c89 commit 84b3334
Showing 1 changed file with 95 additions and 100 deletions.
195 changes: 95 additions & 100 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,44 +172,7 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo {
defer span.End()

c.Info.StartedAt = time.Now().UnixNano()
defer func() {
now := time.Now().UnixNano()

if c.SourceType == types.SourceTypeSDK {
c.updateDuration(c.src.GetEndedAt())
}

c.Info.UpdatedAt = now
c.Info.EndedAt = now
if c.SourceType == types.SourceTypeSDK {
c.updateDuration(c.src.GetEndedAt())
}

// update status
if c.Info.Error != "" {
c.Info.Status = livekit.EgressStatus_EGRESS_FAILED

if o := c.GetStreamConfig(); o != nil {
for _, streamInfo := range o.StreamInfo {
streamInfo.Status = livekit.StreamInfo_FAILED
}
}
}

// ensure egress ends with a final state
switch c.Info.Status {
case livekit.EgressStatus_EGRESS_STARTING:
c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED

case livekit.EgressStatus_EGRESS_ACTIVE,
livekit.EgressStatus_EGRESS_ENDING:
c.Info.Status = livekit.EgressStatus_EGRESS_COMPLETE
}

for _, s := range c.sinks {
s.Cleanup()
}
}()
defer c.Close()

// session limit timer
c.startSessionLimitTimer(ctx)
Expand All @@ -226,7 +189,6 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo {
logger.Debugw("waiting for start signal")
select {
case <-c.stopped.Watch():
c.src.Close()
c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED
return c.Info
case <-start:
Expand All @@ -236,80 +198,19 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo {

for _, s := range c.sinks {
if err := s.Start(); err != nil {
c.src.Close()
c.Info.Error = err.Error()
return c.Info
}
}

if err := c.p.Run(); err != nil {
c.src.Close()
c.Info.Error = err.Error()
return c.Info
}

return c.Info
}

func (c *Controller) startSessionLimitTimer(ctx context.Context) {
var timeout time.Duration
for egressType := range c.Outputs {
var t time.Duration
switch egressType {
case types.EgressTypeFile:
t = c.FileOutputMaxDuration
case types.EgressTypeStream, types.EgressTypeWebsocket:
t = c.StreamOutputMaxDuration
case types.EgressTypeSegments:
t = c.SegmentOutputMaxDuration
}
if t > 0 && (timeout == 0 || t < timeout) {
timeout = t
}
}

if timeout > 0 {
c.limitTimer = time.AfterFunc(timeout, func() {
switch c.Info.Status {
case livekit.EgressStatus_EGRESS_STARTING,
livekit.EgressStatus_EGRESS_ACTIVE:
c.Info.Status = livekit.EgressStatus_EGRESS_LIMIT_REACHED
}
if c.playing.IsBroken() {
c.SendEOS(ctx)
} else {
c.p.Stop()
}
})
}
}

func (c *Controller) updateStartTime(startedAt int64) {
for egressType, o := range c.Outputs {
switch egressType {
case types.EgressTypeStream, types.EgressTypeWebsocket:
c.mu.Lock()
for _, streamInfo := range o.(*config.StreamConfig).StreamInfo {
streamInfo.Status = livekit.StreamInfo_ACTIVE
streamInfo.StartedAt = startedAt
}
c.mu.Unlock()

case types.EgressTypeFile:
o.(*config.FileConfig).FileInfo.StartedAt = startedAt

case types.EgressTypeSegments:
o.(*config.SegmentConfig).SegmentsInfo.StartedAt = startedAt
}
}

if c.Info.Status == livekit.EgressStatus_EGRESS_STARTING {
c.Info.Status = livekit.EgressStatus_EGRESS_ACTIVE
c.Info.UpdatedAt = time.Now().UnixNano()
c.OnUpdate(context.Background(), c.Info)
}
}

func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStreamRequest) error {
ctx, span := tracer.Start(ctx, "Pipeline.UpdateStream")
defer span.End()
Expand Down Expand Up @@ -496,6 +397,100 @@ func (c *Controller) OnError(err error) {
go c.p.Stop()
}

func (c *Controller) Close() {
c.src.Close()

now := time.Now().UnixNano()
c.Info.UpdatedAt = now
c.Info.EndedAt = now
if c.SourceType == types.SourceTypeSDK {
c.updateDuration(c.src.GetEndedAt())
}

// update status
if c.Info.Error != "" {
c.Info.Status = livekit.EgressStatus_EGRESS_FAILED
if o := c.GetStreamConfig(); o != nil {
for _, streamInfo := range o.StreamInfo {
streamInfo.Status = livekit.StreamInfo_FAILED
}
}
}

// ensure egress ends with a final state
switch c.Info.Status {
case livekit.EgressStatus_EGRESS_STARTING:
c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED

case livekit.EgressStatus_EGRESS_ACTIVE,
livekit.EgressStatus_EGRESS_ENDING:
c.Info.Status = livekit.EgressStatus_EGRESS_COMPLETE
}

for _, s := range c.sinks {
s.Cleanup()
}
}

func (c *Controller) startSessionLimitTimer(ctx context.Context) {
var timeout time.Duration
for egressType := range c.Outputs {
var t time.Duration
switch egressType {
case types.EgressTypeFile:
t = c.FileOutputMaxDuration
case types.EgressTypeStream, types.EgressTypeWebsocket:
t = c.StreamOutputMaxDuration
case types.EgressTypeSegments:
t = c.SegmentOutputMaxDuration
}
if t > 0 && (timeout == 0 || t < timeout) {
timeout = t
}
}

if timeout > 0 {
c.limitTimer = time.AfterFunc(timeout, func() {
switch c.Info.Status {
case livekit.EgressStatus_EGRESS_STARTING,
livekit.EgressStatus_EGRESS_ACTIVE:
c.Info.Status = livekit.EgressStatus_EGRESS_LIMIT_REACHED
}
if c.playing.IsBroken() {
c.SendEOS(ctx)
} else {
c.p.Stop()
}
})
}
}

func (c *Controller) updateStartTime(startedAt int64) {
for egressType, o := range c.Outputs {
switch egressType {
case types.EgressTypeStream, types.EgressTypeWebsocket:
c.mu.Lock()
for _, streamInfo := range o.(*config.StreamConfig).StreamInfo {
streamInfo.Status = livekit.StreamInfo_ACTIVE
streamInfo.StartedAt = startedAt
}
c.mu.Unlock()

case types.EgressTypeFile:
o.(*config.FileConfig).FileInfo.StartedAt = startedAt

case types.EgressTypeSegments:
o.(*config.SegmentConfig).SegmentsInfo.StartedAt = startedAt
}
}

if c.Info.Status == livekit.EgressStatus_EGRESS_STARTING {
c.Info.Status = livekit.EgressStatus_EGRESS_ACTIVE
c.Info.UpdatedAt = time.Now().UnixNano()
c.OnUpdate(context.Background(), c.Info)
}
}

func (c *Controller) updateDuration(endedAt int64) {
for egressType, o := range c.Outputs {
switch egressType {
Expand Down

0 comments on commit 84b3334

Please sign in to comment.