diff --git a/pkg/pipeline/source/sdk/appwriter.go b/pkg/pipeline/source/sdk/appwriter.go index 2165010d..03adac9d 100644 --- a/pkg/pipeline/source/sdk/appwriter.go +++ b/pkg/pipeline/source/sdk/appwriter.go @@ -74,15 +74,16 @@ type AppWriter struct { *synchronizer.TrackSynchronizer // state - state state - initialized bool - ticker *time.Ticker - muted atomic.Bool - disconnected atomic.Bool - playing core.Fuse - draining core.Fuse - endStream core.Fuse - finished core.Fuse + state state + initialized bool + ticker *time.Ticker + muted atomic.Bool + maybeDisconnected bool + disconnected atomic.Bool + playing core.Fuse + draining core.Fuse + endStream core.Fuse + finished core.Fuse } func NewAppWriter( @@ -265,6 +266,8 @@ func (w *AppWriter) handlePlaying() { if err != nil { w.handleReadError(err) return + } else { + w.maybeDisconnected = false } // initialize track synchronizer @@ -313,9 +316,13 @@ func (w *AppWriter) handleReadError(err error) { _ = w.pushSamples() w.state = stateReconnecting case errors.As(err, &netErr) && netErr.Timeout(): - w.SetTrackDisconnected(true) - _ = w.pushSamples() - w.state = stateReconnecting + if w.maybeDisconnected { + w.SetTrackDisconnected(true) + _ = w.pushSamples() + w.state = stateReconnecting + } else { + w.maybeDisconnected = true + } default: // log non-EOF errors if !errors.Is(err, io.EOF) { diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 9f791e0c..7ab2e062 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -58,10 +58,11 @@ const ( msgWrongThread = "Called from wrong thread" // common gst warnings - msgKeyframe = "Could not request a keyframe. Files may not split at the exact location they should" - msgLatencyQuery = "Latency query failed" - msgTaps = "can't find exact taps" - msgInputDisappeared = "Can't copy metadata because input buffer disappeared" + msgKeyframe = "Could not request a keyframe. Files may not split at the exact location they should" + msgLatencyQuery = "Latency query failed" + msgTaps = "can't find exact taps" + msgInputDisappeared = "Can't copy metadata because input buffer disappeared" + fnGstAudioResampleCheckDiscont = "gst_audio_resample_check_discont" // common gst fixmes msgStreamStart = "stream-start event without group-id. Consider implementing group-id handling in the upstream elements" @@ -83,6 +84,9 @@ func (c *Controller) gstLog(level gst.DebugLevel, file, function string, line in lvl = "error" } case gst.LevelWarning: + if function == fnGstAudioResampleCheckDiscont { + return + } switch message { case msgKeyframe, msgLatencyQuery, msgTaps, msgInputDisappeared: // ignore diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index 5c90b287..aa451063 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -140,30 +140,35 @@ func (m *Monitor) validateCPUConfig() error { } func (m *Monitor) CanAcceptWebRequest() bool { - m.mu.Lock() - defer m.mu.Unlock() - - return m.canAcceptRequestLocked(&rpc.StartEgressRequest{ - Request: &rpc.StartEgressRequest_Web{}, - }) + return m.webRequests.Load() < m.cpuCostConfig.MaxConcurrentWeb } func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool { m.mu.Lock() - defer m.mu.Unlock() + fields, canAccept := m.canAcceptRequestLocked(req) + m.mu.Unlock() - return m.canAcceptRequestLocked(req) + logger.Debugw("cpu check", fields...) + return canAccept } -func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool { +func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) ([]interface{}, bool) { total, available, pending, used := m.getCPUUsageLocked() + fields := []interface{}{ + "total", total, + "available", available, + "pending", pending, + "used", used, + "activeRequests", m.requests.Load(), + "activeWeb", m.webRequests.Load(), + } var accept bool var required float64 switch r := req.Request.(type) { case *rpc.StartEgressRequest_RoomComposite: if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb { - return false + return fields, false } if r.RoomComposite.AudioOnly { required = m.cpuCostConfig.AudioRoomCompositeCpuCost @@ -172,7 +177,7 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool { } case *rpc.StartEgressRequest_Web: if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb { - return false + return fields, false } if r.Web.AudioOnly { required = m.cpuCostConfig.AudioWebCpuCost @@ -188,17 +193,12 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool { } accept = available >= required - logger.Debugw("cpu check", - "total", total, - "pending", pending, - "used", used, + fields = append(fields, "required", required, - "available", available, - "activeRequests", m.requests.Load(), "canAccept", accept, ) - return accept + return fields, accept } func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error { @@ -208,7 +208,8 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error { if m.pending[req.EgressId] != nil { return errors.ErrEgressAlreadyExists } - if !m.canAcceptRequestLocked(req) { + if _, ok := m.canAcceptRequestLocked(req); !ok { + logger.Warnw("can not accept request", nil) return errors.ErrNotEnoughCPU } diff --git a/pkg/stats/monitor_prom.go b/pkg/stats/monitor_prom.go index bd407839..24a6eee3 100644 --- a/pkg/stats/monitor_prom.go +++ b/pkg/stats/monitor_prom.go @@ -16,6 +16,8 @@ package stats import ( "github.com/prometheus/client_golang/prometheus" + + "github.com/livekit/protocol/rpc" ) func (m *Monitor) initPrometheus() { @@ -72,7 +74,11 @@ func (m *Monitor) promIsIdle() float64 { } func (m *Monitor) promCanAcceptRequest() float64 { - if !m.svc.IsDisabled() && m.CanAcceptWebRequest() { + m.mu.Lock() + _, canAccept := m.canAcceptRequestLocked(&rpc.StartEgressRequest{Request: &rpc.StartEgressRequest_Web{}}) + m.mu.Unlock() + + if !m.svc.IsDisabled() && canAccept { return 1 } return 0