Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add terminating metric #697

Merged
merged 3 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ func runService(c *cli.Context) error {
select {
case sig := <-stopChan:
logger.Infow("exit requested, finishing recording then shutting down", "signal", sig)
svc.Shutdown(false)
svc.Shutdown(true, false)
case sig := <-killChan:
logger.Infow("exit requested, stopping recording and shutting down", "signal", sig)
svc.Shutdown(true)
svc.Shutdown(true, true)
}
}()

Expand Down
10 changes: 9 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Server struct {
ioClient rpc.IOInfoClient

activeRequests atomic.Int32
terminating core.Fuse
shutdown core.Fuse
}

Expand Down Expand Up @@ -169,7 +170,14 @@ func (s *Server) IsDisabled() bool {
return s.shutdown.IsBroken()
}

func (s *Server) Shutdown(kill bool) {
func (s *Server) IsTerminating() bool {
return s.terminating.IsBroken()
}

func (s *Server) Shutdown(terminating, kill bool) {
if terminating {
s.terminating.Break()
}
s.shutdown.Once(func() {
s.psrpcServer.DeregisterStartEgressTopic(s.conf.ClusterID)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *Server) HandlerUpdate(ctx context.Context, info *livekit.EgressInfo) (*

if info.ErrorCode == int32(http.StatusInternalServerError) {
logger.Errorw("internal error, shutting down", errors.New(info.Error))
s.Shutdown(false)
s.Shutdown(false, false)
}

return &emptypb.Empty{}, nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/server/server_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ func (s *Server) processEnded(req *rpc.StartEgressRequest, info *livekit.EgressI
info.Error = "internal error"
info.ErrorCode = int32(http.StatusInternalServerError)
_, _ = s.ioClient.UpdateEgress(context.Background(), info)
s.Shutdown(false)

logger.Errorw("process failed, shutting down", err)
s.Shutdown(false, false)
}

avgCPU, maxCPU := s.monitor.EgressEnded(req)
Expand Down
1 change: 1 addition & 0 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (

type Service interface {
IsDisabled() bool
IsTerminating() bool
KillProcess(string, float64)
}

Expand Down
16 changes: 15 additions & 1 deletion pkg/stats/monitor_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ func (m *Monitor) initPrometheus() {
ConstLabels: prometheus.Labels{"node_id": m.nodeID, "cluster_id": m.clusterID},
}, m.promIsDisabled)

promIsTerminating := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "livekit",
Subsystem: "egress",
Name: "is_terminating",
ConstLabels: prometheus.Labels{"node_id": m.nodeID, "cluster_id": m.clusterID},
}, m.promIsTerminating)

m.promCPULoad = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "livekit",
Subsystem: "node",
Expand All @@ -57,7 +64,7 @@ func (m *Monitor) initPrometheus() {
ConstLabels: prometheus.Labels{"node_id": m.nodeID, "cluster_id": m.clusterID},
}, []string{"type"})

prometheus.MustRegister(promNodeAvailable, promCanAcceptRequest, promIsDisabled, m.promCPULoad, m.requestGauge)
prometheus.MustRegister(promNodeAvailable, promCanAcceptRequest, promIsDisabled, promIsTerminating, m.promCPULoad, m.requestGauge)
}

func (m *Monitor) promIsIdle() float64 {
Expand All @@ -84,3 +91,10 @@ func (m *Monitor) promIsDisabled() float64 {
}
return 0
}

func (m *Monitor) promIsTerminating() float64 {
if m.svc.IsTerminating() {
return 1
}
return 0
}
4 changes: 2 additions & 2 deletions test/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type Server interface {
GetGstPipelineDotFile(string) (string, error)
IsIdle() bool
KillAll()
Shutdown(bool)
Shutdown(bool, bool)
Drain()
}

Expand Down Expand Up @@ -161,7 +161,7 @@ func (r *Runner) Run(t *testing.T, svc Server, bus psrpc.MessageBus, templateFs
if r.room != nil {
r.room.Disconnect()
}
r.svc.Shutdown(true)
r.svc.Shutdown(false, true)
r.svc.Drain()
})

Expand Down
Loading