From a00ffff1a8b6a568ba380db477dd2d2398475b3b Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 11 Jan 2024 12:18:45 -0800 Subject: [PATCH] await controller on rpc (#579) --- pkg/handler/handler.go | 46 ++++++------------------------------ pkg/handler/handler_ipc.go | 48 +++++++++++++++++++++++++++++++------- pkg/handler/handler_rpc.go | 17 +++++--------- 3 files changed, 53 insertions(+), 58 deletions(-) diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go index 75b163f0..5565c618 100644 --- a/pkg/handler/handler.go +++ b/pkg/handler/handler.go @@ -21,9 +21,6 @@ import ( "time" "github.com/frostbyte73/core" - "github.com/prometheus/client_golang/prometheus" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" "google.golang.org/grpc" "github.com/livekit/egress/pkg/config" @@ -41,11 +38,12 @@ type Handler struct { ipc.UnimplementedEgressHandlerServer conf *config.PipelineConfig - pipeline *pipeline.Controller + controller *pipeline.Controller rpcServer rpc.EgressHandlerServer ipcHandlerServer *grpc.Server ipcServiceClient ipc.EgressServiceClient ioClient rpc.IOInfoClient + initialized core.Fuse kill core.Fuse } @@ -60,6 +58,7 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc. ioClient: ioClient, ipcHandlerServer: grpc.NewServer(), ipcServiceClient: ipcClient, + initialized: core.NewFuse(), kill: core.NewFuse(), } @@ -86,7 +85,8 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc. return nil, err } - h.pipeline, err = pipeline.New(context.Background(), conf, h.ioClient) + h.controller, err = pipeline.New(context.Background(), conf, h.ioClient) + h.initialized.Break() if err != nil { if !errors.IsFatal(err) { // user error, send update @@ -110,7 +110,7 @@ func (h *Handler) Run() error { // start egress result := make(chan *livekit.EgressInfo, 1) go func() { - result <- h.pipeline.Run(ctx) + result <- h.controller.Run(ctx) }() kill := h.kill.Watch() @@ -118,7 +118,7 @@ func (h *Handler) Run() error { select { case <-kill: // kill signal received - h.pipeline.SendEOS(ctx) + h.controller.SendEOS(ctx) case res := <-result: // recording finished @@ -144,35 +144,3 @@ func (h *Handler) Run() error { func (h *Handler) Kill() { h.kill.Break() } - -func (h *Handler) GenerateMetrics(ctx context.Context) (string, error) { - metrics, err := prometheus.DefaultGatherer.Gather() - if err != nil { - return "", err - } - - metricsAsString, err := renderMetrics(metrics) - if err != nil { - return "", err - } - - return metricsAsString, nil -} - -func renderMetrics(metrics []*dto.MetricFamily) (string, error) { - // Create a StringWriter to render the metrics into text format - writer := &strings.Builder{} - totalCnt := 0 - for _, metric := range metrics { - // Write each metric family to text - cnt, err := expfmt.MetricFamilyToText(writer, metric) - if err != nil { - logger.Errorw("error writing metric family", err) - return "", err - } - totalCnt += cnt - } - - // Get the rendered metrics as a string from the StringWriter - return writer.String(), nil -} diff --git a/pkg/handler/handler_ipc.go b/pkg/handler/handler_ipc.go index 72648f92..d13b0b76 100644 --- a/pkg/handler/handler_ipc.go +++ b/pkg/handler/handler_ipc.go @@ -16,13 +16,17 @@ package handler import ( "context" + "strings" "time" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/ipc" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/pprof" "github.com/livekit/protocol/tracer" ) @@ -31,13 +35,11 @@ func (h *Handler) GetPipelineDot(ctx context.Context, _ *ipc.GstPipelineDebugDot ctx, span := tracer.Start(ctx, "Handler.GetPipelineDot") defer span.End() - if h.pipeline == nil { - return nil, errors.ErrEgressNotFound - } + <-h.initialized.Watch() res := make(chan string, 1) go func() { - res <- h.pipeline.GetGstPipelineDebugDot() + res <- h.controller.GetGstPipelineDebugDot() }() select { @@ -55,9 +57,7 @@ func (h *Handler) GetPProf(ctx context.Context, req *ipc.PProfRequest) (*ipc.PPr ctx, span := tracer.Start(ctx, "Handler.GetPProf") defer span.End() - if h.pipeline == nil { - return nil, errors.ErrEgressNotFound - } + <-h.initialized.Watch() b, err := pprof.GetProfileData(ctx, req.ProfileName, int(req.Timeout), int(req.Debug)) if err != nil { @@ -83,3 +83,35 @@ func (h *Handler) GetMetrics(ctx context.Context, req *ipc.MetricsRequest) (*ipc Metrics: metricsAsString, }, nil } + +func (h *Handler) GenerateMetrics(ctx context.Context) (string, error) { + metrics, err := prometheus.DefaultGatherer.Gather() + if err != nil { + return "", err + } + + metricsAsString, err := renderMetrics(metrics) + if err != nil { + return "", err + } + + return metricsAsString, nil +} + +func renderMetrics(metrics []*dto.MetricFamily) (string, error) { + // Create a StringWriter to render the metrics into text format + writer := &strings.Builder{} + totalCnt := 0 + for _, metric := range metrics { + // Write each metric family to text + cnt, err := expfmt.MetricFamilyToText(writer, metric) + if err != nil { + logger.Errorw("error writing metric family", err) + return "", err + } + totalCnt += cnt + } + + // Get the rendered metrics as a string from the StringWriter + return writer.String(), nil +} diff --git a/pkg/handler/handler_rpc.go b/pkg/handler/handler_rpc.go index bb539350..a7b6f7b6 100644 --- a/pkg/handler/handler_rpc.go +++ b/pkg/handler/handler_rpc.go @@ -17,7 +17,6 @@ package handler import ( "context" - "github.com/livekit/egress/pkg/errors" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/tracer" ) @@ -26,25 +25,21 @@ func (h *Handler) UpdateStream(ctx context.Context, req *livekit.UpdateStreamReq ctx, span := tracer.Start(ctx, "Handler.UpdateStream") defer span.End() - if h.pipeline == nil { - return nil, errors.ErrEgressNotFound - } + <-h.initialized.Watch() - err := h.pipeline.UpdateStream(ctx, req) + err := h.controller.UpdateStream(ctx, req) if err != nil { return nil, err } - return h.pipeline.Info, nil + return h.controller.Info, nil } func (h *Handler) StopEgress(ctx context.Context, _ *livekit.StopEgressRequest) (*livekit.EgressInfo, error) { ctx, span := tracer.Start(ctx, "Handler.StopEgress") defer span.End() - if h.pipeline == nil { - return nil, errors.ErrEgressNotFound - } + <-h.initialized.Watch() - h.pipeline.SendEOS(ctx) - return h.pipeline.Info, nil + h.controller.SendEOS(ctx) + return h.controller.Info, nil }