Skip to content

Commit

Permalink
Reduce spam (#715)
Browse files Browse the repository at this point in the history
* reduce cpu check logs

* reduce common warnings, track disconnection buffer
  • Loading branch information
frostbyte73 authored Jul 2, 2024
1 parent ac3bea0 commit d1def63
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 36 deletions.
31 changes: 19 additions & 12 deletions pkg/pipeline/source/sdk/appwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,16 @@ type AppWriter struct {
*synchronizer.TrackSynchronizer

// state
state state
initialized bool
ticker *time.Ticker
muted atomic.Bool
disconnected atomic.Bool
playing core.Fuse
draining core.Fuse
endStream core.Fuse
finished core.Fuse
state state
initialized bool
ticker *time.Ticker
muted atomic.Bool
maybeDisconnected bool
disconnected atomic.Bool
playing core.Fuse
draining core.Fuse
endStream core.Fuse
finished core.Fuse
}

func NewAppWriter(
Expand Down Expand Up @@ -265,6 +266,8 @@ func (w *AppWriter) handlePlaying() {
if err != nil {
w.handleReadError(err)
return
} else {
w.maybeDisconnected = false
}

// initialize track synchronizer
Expand Down Expand Up @@ -313,9 +316,13 @@ func (w *AppWriter) handleReadError(err error) {
_ = w.pushSamples()
w.state = stateReconnecting
case errors.As(err, &netErr) && netErr.Timeout():
w.SetTrackDisconnected(true)
_ = w.pushSamples()
w.state = stateReconnecting
if w.maybeDisconnected {
w.SetTrackDisconnected(true)
_ = w.pushSamples()
w.state = stateReconnecting
} else {
w.maybeDisconnected = true
}
default:
// log non-EOF errors
if !errors.Is(err, io.EOF) {
Expand Down
12 changes: 8 additions & 4 deletions pkg/pipeline/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ const (
msgWrongThread = "Called from wrong thread"

// common gst warnings
msgKeyframe = "Could not request a keyframe. Files may not split at the exact location they should"
msgLatencyQuery = "Latency query failed"
msgTaps = "can't find exact taps"
msgInputDisappeared = "Can't copy metadata because input buffer disappeared"
msgKeyframe = "Could not request a keyframe. Files may not split at the exact location they should"
msgLatencyQuery = "Latency query failed"
msgTaps = "can't find exact taps"
msgInputDisappeared = "Can't copy metadata because input buffer disappeared"
fnGstAudioResampleCheckDiscont = "gst_audio_resample_check_discont"

// common gst fixmes
msgStreamStart = "stream-start event without group-id. Consider implementing group-id handling in the upstream elements"
Expand All @@ -83,6 +84,9 @@ func (c *Controller) gstLog(level gst.DebugLevel, file, function string, line in
lvl = "error"
}
case gst.LevelWarning:
if function == fnGstAudioResampleCheckDiscont {
return
}
switch message {
case msgKeyframe, msgLatencyQuery, msgTaps, msgInputDisappeared:
// ignore
Expand Down
39 changes: 20 additions & 19 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,30 +140,35 @@ func (m *Monitor) validateCPUConfig() error {
}

func (m *Monitor) CanAcceptWebRequest() bool {
m.mu.Lock()
defer m.mu.Unlock()

return m.canAcceptRequestLocked(&rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_Web{},
})
return m.webRequests.Load() < m.cpuCostConfig.MaxConcurrentWeb
}

func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool {
m.mu.Lock()
defer m.mu.Unlock()
fields, canAccept := m.canAcceptRequestLocked(req)
m.mu.Unlock()

return m.canAcceptRequestLocked(req)
logger.Debugw("cpu check", fields...)
return canAccept
}

func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool {
func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) ([]interface{}, bool) {
total, available, pending, used := m.getCPUUsageLocked()
fields := []interface{}{
"total", total,
"available", available,
"pending", pending,
"used", used,
"activeRequests", m.requests.Load(),
"activeWeb", m.webRequests.Load(),
}

var accept bool
var required float64
switch r := req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb {
return false
return fields, false
}
if r.RoomComposite.AudioOnly {
required = m.cpuCostConfig.AudioRoomCompositeCpuCost
Expand All @@ -172,7 +177,7 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool {
}
case *rpc.StartEgressRequest_Web:
if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb {
return false
return fields, false
}
if r.Web.AudioOnly {
required = m.cpuCostConfig.AudioWebCpuCost
Expand All @@ -188,17 +193,12 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool {
}
accept = available >= required

logger.Debugw("cpu check",
"total", total,
"pending", pending,
"used", used,
fields = append(fields,
"required", required,
"available", available,
"activeRequests", m.requests.Load(),
"canAccept", accept,
)

return accept
return fields, accept
}

func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
Expand All @@ -208,7 +208,8 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
if m.pending[req.EgressId] != nil {
return errors.ErrEgressAlreadyExists
}
if !m.canAcceptRequestLocked(req) {
if _, ok := m.canAcceptRequestLocked(req); !ok {
logger.Warnw("can not accept request", nil)
return errors.ErrNotEnoughCPU
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/stats/monitor_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package stats

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/livekit/protocol/rpc"
)

func (m *Monitor) initPrometheus() {
Expand Down Expand Up @@ -72,7 +74,11 @@ func (m *Monitor) promIsIdle() float64 {
}

func (m *Monitor) promCanAcceptRequest() float64 {
if !m.svc.IsDisabled() && m.CanAcceptWebRequest() {
m.mu.Lock()
_, canAccept := m.canAcceptRequestLocked(&rpc.StartEgressRequest{Request: &rpc.StartEgressRequest_Web{}})
m.mu.Unlock()

if !m.svc.IsDisabled() && canAccept {
return 1
}
return 0
Expand Down

0 comments on commit d1def63

Please sign in to comment.