Skip to content

Commit

Permalink
Add terminating metric (#697)
Browse files Browse the repository at this point in the history
* add terminating metric

* fix test

* actually add the metric
  • Loading branch information
frostbyte73 authored Jun 11, 2024
1 parent 7c2ce61 commit 04c688e
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 8 deletions.
4 changes: 2 additions & 2 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ func runService(c *cli.Context) error {
select {
case sig := <-stopChan:
logger.Infow("exit requested, finishing recording then shutting down", "signal", sig)
svc.Shutdown(false)
svc.Shutdown(true, false)
case sig := <-killChan:
logger.Infow("exit requested, stopping recording and shutting down", "signal", sig)
svc.Shutdown(true)
svc.Shutdown(true, true)
}
}()

Expand Down
10 changes: 9 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Server struct {
ioClient rpc.IOInfoClient

activeRequests atomic.Int32
terminating core.Fuse
shutdown core.Fuse
}

Expand Down Expand Up @@ -169,7 +170,14 @@ func (s *Server) IsDisabled() bool {
return s.shutdown.IsBroken()
}

func (s *Server) Shutdown(kill bool) {
func (s *Server) IsTerminating() bool {
return s.terminating.IsBroken()
}

func (s *Server) Shutdown(terminating, kill bool) {
if terminating {
s.terminating.Break()
}
s.shutdown.Once(func() {
s.psrpcServer.DeregisterStartEgressTopic(s.conf.ClusterID)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *Server) HandlerUpdate(ctx context.Context, info *livekit.EgressInfo) (*

if info.ErrorCode == int32(http.StatusInternalServerError) {
logger.Errorw("internal error, shutting down", errors.New(info.Error))
s.Shutdown(false)
s.Shutdown(false, false)
}

return &emptypb.Empty{}, nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/server/server_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ func (s *Server) processEnded(req *rpc.StartEgressRequest, info *livekit.EgressI
info.Error = "internal error"
info.ErrorCode = int32(http.StatusInternalServerError)
_, _ = s.ioClient.UpdateEgress(context.Background(), info)
s.Shutdown(false)

logger.Errorw("process failed, shutting down", err)
s.Shutdown(false, false)
}

avgCPU, maxCPU := s.monitor.EgressEnded(req)
Expand Down
1 change: 1 addition & 0 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (

type Service interface {
IsDisabled() bool
IsTerminating() bool
KillProcess(string, float64)
}

Expand Down
16 changes: 15 additions & 1 deletion pkg/stats/monitor_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ func (m *Monitor) initPrometheus() {
ConstLabels: prometheus.Labels{"node_id": m.nodeID, "cluster_id": m.clusterID},
}, m.promIsDisabled)

promIsTerminating := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "livekit",
Subsystem: "egress",
Name: "is_terminating",
ConstLabels: prometheus.Labels{"node_id": m.nodeID, "cluster_id": m.clusterID},
}, m.promIsTerminating)

m.promCPULoad = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "livekit",
Subsystem: "node",
Expand All @@ -57,7 +64,7 @@ func (m *Monitor) initPrometheus() {
ConstLabels: prometheus.Labels{"node_id": m.nodeID, "cluster_id": m.clusterID},
}, []string{"type"})

prometheus.MustRegister(promNodeAvailable, promCanAcceptRequest, promIsDisabled, m.promCPULoad, m.requestGauge)
prometheus.MustRegister(promNodeAvailable, promCanAcceptRequest, promIsDisabled, promIsTerminating, m.promCPULoad, m.requestGauge)
}

func (m *Monitor) promIsIdle() float64 {
Expand All @@ -84,3 +91,10 @@ func (m *Monitor) promIsDisabled() float64 {
}
return 0
}

func (m *Monitor) promIsTerminating() float64 {
if m.svc.IsTerminating() {
return 1
}
return 0
}
4 changes: 2 additions & 2 deletions test/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type Server interface {
GetGstPipelineDotFile(string) (string, error)
IsIdle() bool
KillAll()
Shutdown(bool)
Shutdown(bool, bool)
Drain()
}

Expand Down Expand Up @@ -161,7 +161,7 @@ func (r *Runner) Run(t *testing.T, svc Server, bus psrpc.MessageBus, templateFs
if r.room != nil {
r.room.Disconnect()
}
r.svc.Shutdown(true)
r.svc.Shutdown(false, true)
r.svc.Drain()
})

Expand Down

0 comments on commit 04c688e

Please sign in to comment.