Skip to content

Commit

Permalink
Check ghost pads, fix available cpu (#594)
Browse files Browse the repository at this point in the history
* check ghost pads, fix available cpu

* update cpu check log
  • Loading branch information
frostbyte73 authored Jan 23, 2024
1 parent 129f79c commit df7be7d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
26 changes: 19 additions & 7 deletions pkg/gstreamer/bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,11 @@ func (b *Bin) removeBin(name string, direction gst.PadDirection) (bool, error) {

func (b *Bin) probeRemoveSource(src *Bin) {
src.mu.Lock()
srcGhostPad, sinkGhostPad := deleteGhostPadsLocked(src, b)
srcGhostPad, sinkGhostPad, ok := deleteGhostPadsLocked(src, b)
src.mu.Unlock()
if !ok {
return
}

srcGhostPad.AddProbe(gst.PadProbeTypeIdle, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn {
sinkPad := sinkGhostPad.GetTarget()
Expand All @@ -231,8 +234,11 @@ func (b *Bin) probeRemoveSource(src *Bin) {

func (b *Bin) probeRemoveSink(sink *Bin) {
sink.mu.Lock()
srcGhostPad, sinkGhostPad := deleteGhostPadsLocked(b, sink)
srcGhostPad, sinkGhostPad, ok := deleteGhostPadsLocked(b, sink)
sink.mu.Unlock()
if !ok {
return
}

srcGhostPad.AddProbe(gst.PadProbeTypeBlockDownstream, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn {
srcGhostPad.Unlink(sinkGhostPad.Pad)
Expand All @@ -257,14 +263,20 @@ func (b *Bin) probeRemoveSink(sink *Bin) {
})
}

func deleteGhostPadsLocked(src, sink *Bin) (*gst.GhostPad, *gst.GhostPad) {
srcPad := src.pads[sink.bin.GetName()]
sinkPad := sink.pads[src.bin.GetName()]

func deleteGhostPadsLocked(src, sink *Bin) (*gst.GhostPad, *gst.GhostPad, bool) {
srcPad, srcOK := src.pads[sink.bin.GetName()]
if !srcOK {
logger.Errorw("source pad missing", nil, "bin", src.bin.GetName())
}
delete(src.pads, sink.bin.GetName())

sinkPad, sinkOK := sink.pads[src.bin.GetName()]
if !sinkOK {
logger.Errorw("sink pad missing", nil, "bin", sink.bin.GetName())
}
delete(sink.pads, src.bin.GetName())

return srcPad, sinkPad
return srcPad, sinkPad, srcOK && sinkOK
}

func (b *Bin) SetState(state gst.State) error {
Expand Down
11 changes: 6 additions & 5 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,11 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool {
accept := false
total := m.cpuStats.NumCPU()

var available float64
var available, used float64
if m.requests.Load() == 0 {
// if no requests, use total
available = total
} else {
var used float64
for _, ps := range m.pending {
if ps.pendingUsage > ps.lastUsage {
used += ps.pendingUsage
Expand All @@ -294,7 +293,7 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool {
}

// if already running requests, cap usage at MaxCpuUtilization
available = total - used - (total * (1 - m.cpuCostConfig.MaxCpuUtilization))
available = total*m.cpuCostConfig.MaxCpuUtilization - used
}

var required float64
Expand All @@ -313,9 +312,11 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool {
accept = available >= required

logger.Debugw("cpu check",
"available", available,
"total", total,
"used", used,
"required", required,
"activeRequests", m.requests,
"available", available,
"activeRequests", m.requests.Load(),
"canAccept", accept,
)

Expand Down

0 comments on commit df7be7d

Please sign in to comment.