diff --git a/pkg/gstreamer/bin.go b/pkg/gstreamer/bin.go index 02adc096..1c82910f 100644 --- a/pkg/gstreamer/bin.go +++ b/pkg/gstreamer/bin.go @@ -208,8 +208,11 @@ func (b *Bin) removeBin(name string, direction gst.PadDirection) (bool, error) { func (b *Bin) probeRemoveSource(src *Bin) { src.mu.Lock() - srcGhostPad, sinkGhostPad := deleteGhostPadsLocked(src, b) + srcGhostPad, sinkGhostPad, ok := deleteGhostPadsLocked(src, b) src.mu.Unlock() + if !ok { + return + } srcGhostPad.AddProbe(gst.PadProbeTypeIdle, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn { sinkPad := sinkGhostPad.GetTarget() @@ -231,8 +234,11 @@ func (b *Bin) probeRemoveSource(src *Bin) { func (b *Bin) probeRemoveSink(sink *Bin) { sink.mu.Lock() - srcGhostPad, sinkGhostPad := deleteGhostPadsLocked(b, sink) + srcGhostPad, sinkGhostPad, ok := deleteGhostPadsLocked(b, sink) sink.mu.Unlock() + if !ok { + return + } srcGhostPad.AddProbe(gst.PadProbeTypeBlockDownstream, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn { srcGhostPad.Unlink(sinkGhostPad.Pad) @@ -257,14 +263,20 @@ func (b *Bin) probeRemoveSink(sink *Bin) { }) } -func deleteGhostPadsLocked(src, sink *Bin) (*gst.GhostPad, *gst.GhostPad) { - srcPad := src.pads[sink.bin.GetName()] - sinkPad := sink.pads[src.bin.GetName()] - +func deleteGhostPadsLocked(src, sink *Bin) (*gst.GhostPad, *gst.GhostPad, bool) { + srcPad, srcOK := src.pads[sink.bin.GetName()] + if !srcOK { + logger.Errorw("source pad missing", nil, "bin", src.bin.GetName()) + } delete(src.pads, sink.bin.GetName()) + + sinkPad, sinkOK := sink.pads[src.bin.GetName()] + if !sinkOK { + logger.Errorw("sink pad missing", nil, "bin", sink.bin.GetName()) + } delete(sink.pads, src.bin.GetName()) - return srcPad, sinkPad + return srcPad, sinkPad, srcOK && sinkOK } func (b *Bin) SetState(state gst.State) error { diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index 2510b1ce..c349c034 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -272,12 +272,11 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool { accept := false total := m.cpuStats.NumCPU() - var available float64 + var available, used float64 if m.requests.Load() == 0 { // if no requests, use total available = total } else { - var used float64 for _, ps := range m.pending { if ps.pendingUsage > ps.lastUsage { used += ps.pendingUsage @@ -294,7 +293,7 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool { } // if already running requests, cap usage at MaxCpuUtilization - available = total - used - (total * (1 - m.cpuCostConfig.MaxCpuUtilization)) + available = total*m.cpuCostConfig.MaxCpuUtilization - used } var required float64 @@ -313,9 +312,11 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool { accept = available >= required logger.Debugw("cpu check", - "available", available, + "total", total, + "used", used, "required", required, - "activeRequests", m.requests, + "available", available, + "activeRequests", m.requests.Load(), "canAccept", accept, )