Skip to content

Commit

Permalink
await controller on rpc (#579)
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 authored Jan 11, 2024
1 parent c96ede3 commit a00ffff
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 58 deletions.
46 changes: 7 additions & 39 deletions pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ import (
"time"

"github.com/frostbyte73/core"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"google.golang.org/grpc"

"github.com/livekit/egress/pkg/config"
Expand All @@ -41,11 +38,12 @@ type Handler struct {
ipc.UnimplementedEgressHandlerServer

conf *config.PipelineConfig
pipeline *pipeline.Controller
controller *pipeline.Controller
rpcServer rpc.EgressHandlerServer
ipcHandlerServer *grpc.Server
ipcServiceClient ipc.EgressServiceClient
ioClient rpc.IOInfoClient
initialized core.Fuse
kill core.Fuse
}

Expand All @@ -60,6 +58,7 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc.
ioClient: ioClient,
ipcHandlerServer: grpc.NewServer(),
ipcServiceClient: ipcClient,
initialized: core.NewFuse(),
kill: core.NewFuse(),
}

Expand All @@ -86,7 +85,8 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc.
return nil, err
}

h.pipeline, err = pipeline.New(context.Background(), conf, h.ioClient)
h.controller, err = pipeline.New(context.Background(), conf, h.ioClient)
h.initialized.Break()
if err != nil {
if !errors.IsFatal(err) {
// user error, send update
Expand All @@ -110,15 +110,15 @@ func (h *Handler) Run() error {
// start egress
result := make(chan *livekit.EgressInfo, 1)
go func() {
result <- h.pipeline.Run(ctx)
result <- h.controller.Run(ctx)
}()

kill := h.kill.Watch()
for {
select {
case <-kill:
// kill signal received
h.pipeline.SendEOS(ctx)
h.controller.SendEOS(ctx)

case res := <-result:
// recording finished
Expand All @@ -144,35 +144,3 @@ func (h *Handler) Run() error {
func (h *Handler) Kill() {
h.kill.Break()
}

func (h *Handler) GenerateMetrics(ctx context.Context) (string, error) {
metrics, err := prometheus.DefaultGatherer.Gather()
if err != nil {
return "", err
}

metricsAsString, err := renderMetrics(metrics)
if err != nil {
return "", err
}

return metricsAsString, nil
}

func renderMetrics(metrics []*dto.MetricFamily) (string, error) {
// Create a StringWriter to render the metrics into text format
writer := &strings.Builder{}
totalCnt := 0
for _, metric := range metrics {
// Write each metric family to text
cnt, err := expfmt.MetricFamilyToText(writer, metric)
if err != nil {
logger.Errorw("error writing metric family", err)
return "", err
}
totalCnt += cnt
}

// Get the rendered metrics as a string from the StringWriter
return writer.String(), nil
}
48 changes: 40 additions & 8 deletions pkg/handler/handler_ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ package handler

import (
"context"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/pprof"
"github.com/livekit/protocol/tracer"
)
Expand All @@ -31,13 +35,11 @@ func (h *Handler) GetPipelineDot(ctx context.Context, _ *ipc.GstPipelineDebugDot
ctx, span := tracer.Start(ctx, "Handler.GetPipelineDot")
defer span.End()

if h.pipeline == nil {
return nil, errors.ErrEgressNotFound
}
<-h.initialized.Watch()

res := make(chan string, 1)
go func() {
res <- h.pipeline.GetGstPipelineDebugDot()
res <- h.controller.GetGstPipelineDebugDot()
}()

select {
Expand All @@ -55,9 +57,7 @@ func (h *Handler) GetPProf(ctx context.Context, req *ipc.PProfRequest) (*ipc.PPr
ctx, span := tracer.Start(ctx, "Handler.GetPProf")
defer span.End()

if h.pipeline == nil {
return nil, errors.ErrEgressNotFound
}
<-h.initialized.Watch()

b, err := pprof.GetProfileData(ctx, req.ProfileName, int(req.Timeout), int(req.Debug))
if err != nil {
Expand All @@ -83,3 +83,35 @@ func (h *Handler) GetMetrics(ctx context.Context, req *ipc.MetricsRequest) (*ipc
Metrics: metricsAsString,
}, nil
}

func (h *Handler) GenerateMetrics(ctx context.Context) (string, error) {
metrics, err := prometheus.DefaultGatherer.Gather()
if err != nil {
return "", err
}

metricsAsString, err := renderMetrics(metrics)
if err != nil {
return "", err
}

return metricsAsString, nil
}

func renderMetrics(metrics []*dto.MetricFamily) (string, error) {
// Create a StringWriter to render the metrics into text format
writer := &strings.Builder{}
totalCnt := 0
for _, metric := range metrics {
// Write each metric family to text
cnt, err := expfmt.MetricFamilyToText(writer, metric)
if err != nil {
logger.Errorw("error writing metric family", err)
return "", err
}
totalCnt += cnt
}

// Get the rendered metrics as a string from the StringWriter
return writer.String(), nil
}
17 changes: 6 additions & 11 deletions pkg/handler/handler_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package handler
import (
"context"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/tracer"
)
Expand All @@ -26,25 +25,21 @@ func (h *Handler) UpdateStream(ctx context.Context, req *livekit.UpdateStreamReq
ctx, span := tracer.Start(ctx, "Handler.UpdateStream")
defer span.End()

if h.pipeline == nil {
return nil, errors.ErrEgressNotFound
}
<-h.initialized.Watch()

err := h.pipeline.UpdateStream(ctx, req)
err := h.controller.UpdateStream(ctx, req)
if err != nil {
return nil, err
}
return h.pipeline.Info, nil
return h.controller.Info, nil
}

func (h *Handler) StopEgress(ctx context.Context, _ *livekit.StopEgressRequest) (*livekit.EgressInfo, error) {
ctx, span := tracer.Start(ctx, "Handler.StopEgress")
defer span.End()

if h.pipeline == nil {
return nil, errors.ErrEgressNotFound
}
<-h.initialized.Watch()

h.pipeline.SendEOS(ctx)
return h.pipeline.Info, nil
h.controller.SendEOS(ctx)
return h.controller.Info, nil
}

0 comments on commit a00ffff

Please sign in to comment.