diff --git a/cmd/server/main.go b/cmd/server/main.go index b4b9e4f2..08806525 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -136,10 +136,10 @@ func runService(c *cli.Context) error { select { case sig := <-stopChan: logger.Infow("exit requested, finishing recording then shutting down", "signal", sig) - svc.Shutdown(false) + svc.Shutdown(true, false) case sig := <-killChan: logger.Infow("exit requested, stopping recording and shutting down", "signal", sig) - svc.Shutdown(true) + svc.Shutdown(true, true) } }() diff --git a/pkg/server/server.go b/pkg/server/server.go index 43bbaaac..10d2cc0f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -54,6 +54,7 @@ type Server struct { ioClient rpc.IOInfoClient activeRequests atomic.Int32 + terminating core.Fuse shutdown core.Fuse } @@ -169,7 +170,14 @@ func (s *Server) IsDisabled() bool { return s.shutdown.IsBroken() } -func (s *Server) Shutdown(kill bool) { +func (s *Server) IsTerminating() bool { + return s.terminating.IsBroken() +} + +func (s *Server) Shutdown(terminating, kill bool) { + if terminating { + s.terminating.Break() + } s.shutdown.Once(func() { s.psrpcServer.DeregisterStartEgressTopic(s.conf.ClusterID) }) diff --git a/pkg/server/server_ipc.go b/pkg/server/server_ipc.go index eec99e24..9ef88737 100644 --- a/pkg/server/server_ipc.go +++ b/pkg/server/server_ipc.go @@ -41,7 +41,7 @@ func (s *Server) HandlerUpdate(ctx context.Context, info *livekit.EgressInfo) (* if info.ErrorCode == int32(http.StatusInternalServerError) { logger.Errorw("internal error, shutting down", errors.New(info.Error)) - s.Shutdown(false) + s.Shutdown(false, false) } return &emptypb.Empty{}, nil diff --git a/pkg/server/server_rpc.go b/pkg/server/server_rpc.go index 56b09c8f..8a7b6cc6 100644 --- a/pkg/server/server_rpc.go +++ b/pkg/server/server_rpc.go @@ -147,7 +147,9 @@ func (s *Server) processEnded(req *rpc.StartEgressRequest, info *livekit.EgressI info.Error = "internal error" info.ErrorCode = int32(http.StatusInternalServerError) _, _ = s.ioClient.UpdateEgress(context.Background(), info) - s.Shutdown(false) + + logger.Errorw("process failed, shutting down", err) + s.Shutdown(false, false) } avgCPU, maxCPU := s.monitor.EgressEnded(req) diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index d468bc35..e190a391 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -39,6 +39,7 @@ const ( type Service interface { IsDisabled() bool + IsTerminating() bool KillProcess(string, float64) } diff --git a/pkg/stats/monitor_prom.go b/pkg/stats/monitor_prom.go index 70f522a9..1410a160 100644 --- a/pkg/stats/monitor_prom.go +++ b/pkg/stats/monitor_prom.go @@ -43,6 +43,13 @@ func (m *Monitor) initPrometheus() { ConstLabels: prometheus.Labels{"node_id": m.nodeID, "cluster_id": m.clusterID}, }, m.promIsDisabled) + promIsTerminating := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "livekit", + Subsystem: "egress", + Name: "is_terminating", + ConstLabels: prometheus.Labels{"node_id": m.nodeID, "cluster_id": m.clusterID}, + }, m.promIsTerminating) + m.promCPULoad = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "livekit", Subsystem: "node", @@ -57,7 +64,7 @@ func (m *Monitor) initPrometheus() { ConstLabels: prometheus.Labels{"node_id": m.nodeID, "cluster_id": m.clusterID}, }, []string{"type"}) - prometheus.MustRegister(promNodeAvailable, promCanAcceptRequest, promIsDisabled, m.promCPULoad, m.requestGauge) + prometheus.MustRegister(promNodeAvailable, promCanAcceptRequest, promIsDisabled, promIsTerminating, m.promCPULoad, m.requestGauge) } func (m *Monitor) promIsIdle() float64 { @@ -84,3 +91,10 @@ func (m *Monitor) promIsDisabled() float64 { } return 0 } + +func (m *Monitor) promIsTerminating() float64 { + if m.svc.IsTerminating() { + return 1 + } + return 0 +} diff --git a/test/runner.go b/test/runner.go index 0a6e3032..9b6a1452 100644 --- a/test/runner.go +++ b/test/runner.go @@ -75,7 +75,7 @@ type Server interface { GetGstPipelineDotFile(string) (string, error) IsIdle() bool KillAll() - Shutdown(bool) + Shutdown(bool, bool) Drain() } @@ -161,7 +161,7 @@ func (r *Runner) Run(t *testing.T, svc Server, bus psrpc.MessageBus, templateFs if r.room != nil { r.room.Disconnect() } - r.svc.Shutdown(true) + r.svc.Shutdown(false, true) r.svc.Drain() })