diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go index 4c1e9ab7..75b163f0 100644 --- a/pkg/handler/handler.go +++ b/pkg/handler/handler.go @@ -21,6 +21,9 @@ import ( "time" "github.com/frostbyte73/core" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "google.golang.org/grpc" "github.com/livekit/egress/pkg/config" @@ -120,6 +123,17 @@ func (h *Handler) Run() error { case res := <-result: // recording finished _, _ = h.ioClient.UpdateEgress(ctx, res) + + m, err := h.GenerateMetrics(ctx) + if err == nil { + h.ipcServiceClient.HandlerShuttingDown(ctx, &ipc.HandlerShuttingDownRequest{ + EgressId: h.conf.Info.EgressId, + Metrics: m, + }) + } else { + logger.Errorw("failed generating handler metrics", err) + } + h.rpcServer.Shutdown() h.ipcHandlerServer.Stop() return nil @@ -130,3 +144,35 @@ func (h *Handler) Run() error { func (h *Handler) Kill() { h.kill.Break() } + +func (h *Handler) GenerateMetrics(ctx context.Context) (string, error) { + metrics, err := prometheus.DefaultGatherer.Gather() + if err != nil { + return "", err + } + + metricsAsString, err := renderMetrics(metrics) + if err != nil { + return "", err + } + + return metricsAsString, nil +} + +func renderMetrics(metrics []*dto.MetricFamily) (string, error) { + // Create a StringWriter to render the metrics into text format + writer := &strings.Builder{} + totalCnt := 0 + for _, metric := range metrics { + // Write each metric family to text + cnt, err := expfmt.MetricFamilyToText(writer, metric) + if err != nil { + logger.Errorw("error writing metric family", err) + return "", err + } + totalCnt += cnt + } + + // Get the rendered metrics as a string from the StringWriter + return writer.String(), nil +} diff --git a/pkg/handler/handler_ipc.go b/pkg/handler/handler_ipc.go index f1e1dbb3..72648f92 100644 --- a/pkg/handler/handler_ipc.go +++ b/pkg/handler/handler_ipc.go @@ -16,18 +16,13 @@ package handler import ( "context" - "strings" "time" - "github.com/prometheus/client_golang/prometheus" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/ipc" - "github.com/livekit/protocol/logger" "github.com/livekit/protocol/pprof" "github.com/livekit/protocol/tracer" ) @@ -79,12 +74,7 @@ func (h *Handler) GetMetrics(ctx context.Context, req *ipc.MetricsRequest) (*ipc ctx, span := tracer.Start(ctx, "Handler.GetMetrics") defer span.End() - metrics, err := prometheus.DefaultGatherer.Gather() - if err != nil { - return nil, err - } - - metricsAsString, err := renderMetrics(metrics) + metricsAsString, err := h.GenerateMetrics(ctx) if err != nil { return nil, err } @@ -93,21 +83,3 @@ func (h *Handler) GetMetrics(ctx context.Context, req *ipc.MetricsRequest) (*ipc Metrics: metricsAsString, }, nil } - -func renderMetrics(metrics []*dto.MetricFamily) (string, error) { - // Create a StringWriter to render the metrics into text format - writer := &strings.Builder{} - totalCnt := 0 - for _, metric := range metrics { - // Write each metric family to text - cnt, err := expfmt.MetricFamilyToText(writer, metric) - if err != nil { - logger.Errorw("error writing metric family", err) - return "", err - } - totalCnt += cnt - } - - // Get the rendered metrics as a string from the StringWriter - return writer.String(), nil -} diff --git a/pkg/ipc/ipc.pb.go b/pkg/ipc/ipc.pb.go index 62d46f4d..5704bdaa 100644 --- a/pkg/ipc/ipc.pb.go +++ b/pkg/ipc/ipc.pb.go @@ -15,7 +15,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.31.0 -// protoc v4.25.1 +// protoc v3.21.12 // source: ipc.proto package ipc @@ -82,6 +82,61 @@ func (x *HandlerReadyRequest) GetEgressId() string { return "" } +type HandlerShuttingDownRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + EgressId string `protobuf:"bytes,1,opt,name=egress_id,json=egressId,proto3" json:"egress_id,omitempty"` + Metrics string `protobuf:"bytes,2,opt,name=metrics,proto3" json:"metrics,omitempty"` +} + +func (x *HandlerShuttingDownRequest) Reset() { + *x = HandlerShuttingDownRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_ipc_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HandlerShuttingDownRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HandlerShuttingDownRequest) ProtoMessage() {} + +func (x *HandlerShuttingDownRequest) ProtoReflect() protoreflect.Message { + mi := &file_ipc_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HandlerShuttingDownRequest.ProtoReflect.Descriptor instead. +func (*HandlerShuttingDownRequest) Descriptor() ([]byte, []int) { + return file_ipc_proto_rawDescGZIP(), []int{1} +} + +func (x *HandlerShuttingDownRequest) GetEgressId() string { + if x != nil { + return x.EgressId + } + return "" +} + +func (x *HandlerShuttingDownRequest) GetMetrics() string { + if x != nil { + return x.Metrics + } + return "" +} + type GstPipelineDebugDotRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -91,7 +146,7 @@ type GstPipelineDebugDotRequest struct { func (x *GstPipelineDebugDotRequest) Reset() { *x = GstPipelineDebugDotRequest{} if protoimpl.UnsafeEnabled { - mi := &file_ipc_proto_msgTypes[1] + mi := &file_ipc_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -104,7 +159,7 @@ func (x *GstPipelineDebugDotRequest) String() string { func (*GstPipelineDebugDotRequest) ProtoMessage() {} func (x *GstPipelineDebugDotRequest) ProtoReflect() protoreflect.Message { - mi := &file_ipc_proto_msgTypes[1] + mi := &file_ipc_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -117,7 +172,7 @@ func (x *GstPipelineDebugDotRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use GstPipelineDebugDotRequest.ProtoReflect.Descriptor instead. func (*GstPipelineDebugDotRequest) Descriptor() ([]byte, []int) { - return file_ipc_proto_rawDescGZIP(), []int{1} + return file_ipc_proto_rawDescGZIP(), []int{2} } type GstPipelineDebugDotResponse struct { @@ -131,7 +186,7 @@ type GstPipelineDebugDotResponse struct { func (x *GstPipelineDebugDotResponse) Reset() { *x = GstPipelineDebugDotResponse{} if protoimpl.UnsafeEnabled { - mi := &file_ipc_proto_msgTypes[2] + mi := &file_ipc_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -144,7 +199,7 @@ func (x *GstPipelineDebugDotResponse) String() string { func (*GstPipelineDebugDotResponse) ProtoMessage() {} func (x *GstPipelineDebugDotResponse) ProtoReflect() protoreflect.Message { - mi := &file_ipc_proto_msgTypes[2] + mi := &file_ipc_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -157,7 +212,7 @@ func (x *GstPipelineDebugDotResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use GstPipelineDebugDotResponse.ProtoReflect.Descriptor instead. func (*GstPipelineDebugDotResponse) Descriptor() ([]byte, []int) { - return file_ipc_proto_rawDescGZIP(), []int{2} + return file_ipc_proto_rawDescGZIP(), []int{3} } func (x *GstPipelineDebugDotResponse) GetDotFile() string { @@ -180,7 +235,7 @@ type PProfRequest struct { func (x *PProfRequest) Reset() { *x = PProfRequest{} if protoimpl.UnsafeEnabled { - mi := &file_ipc_proto_msgTypes[3] + mi := &file_ipc_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -193,7 +248,7 @@ func (x *PProfRequest) String() string { func (*PProfRequest) ProtoMessage() {} func (x *PProfRequest) ProtoReflect() protoreflect.Message { - mi := &file_ipc_proto_msgTypes[3] + mi := &file_ipc_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -206,7 +261,7 @@ func (x *PProfRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use PProfRequest.ProtoReflect.Descriptor instead. func (*PProfRequest) Descriptor() ([]byte, []int) { - return file_ipc_proto_rawDescGZIP(), []int{3} + return file_ipc_proto_rawDescGZIP(), []int{4} } func (x *PProfRequest) GetProfileName() string { @@ -241,7 +296,7 @@ type PProfResponse struct { func (x *PProfResponse) Reset() { *x = PProfResponse{} if protoimpl.UnsafeEnabled { - mi := &file_ipc_proto_msgTypes[4] + mi := &file_ipc_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -254,7 +309,7 @@ func (x *PProfResponse) String() string { func (*PProfResponse) ProtoMessage() {} func (x *PProfResponse) ProtoReflect() protoreflect.Message { - mi := &file_ipc_proto_msgTypes[4] + mi := &file_ipc_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -267,7 +322,7 @@ func (x *PProfResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PProfResponse.ProtoReflect.Descriptor instead. func (*PProfResponse) Descriptor() ([]byte, []int) { - return file_ipc_proto_rawDescGZIP(), []int{4} + return file_ipc_proto_rawDescGZIP(), []int{5} } func (x *PProfResponse) GetPprofFile() []byte { @@ -286,7 +341,7 @@ type MetricsRequest struct { func (x *MetricsRequest) Reset() { *x = MetricsRequest{} if protoimpl.UnsafeEnabled { - mi := &file_ipc_proto_msgTypes[5] + mi := &file_ipc_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -299,7 +354,7 @@ func (x *MetricsRequest) String() string { func (*MetricsRequest) ProtoMessage() {} func (x *MetricsRequest) ProtoReflect() protoreflect.Message { - mi := &file_ipc_proto_msgTypes[5] + mi := &file_ipc_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -312,7 +367,7 @@ func (x *MetricsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use MetricsRequest.ProtoReflect.Descriptor instead. func (*MetricsRequest) Descriptor() ([]byte, []int) { - return file_ipc_proto_rawDescGZIP(), []int{5} + return file_ipc_proto_rawDescGZIP(), []int{6} } type MetricsResponse struct { @@ -326,7 +381,7 @@ type MetricsResponse struct { func (x *MetricsResponse) Reset() { *x = MetricsResponse{} if protoimpl.UnsafeEnabled { - mi := &file_ipc_proto_msgTypes[6] + mi := &file_ipc_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -339,7 +394,7 @@ func (x *MetricsResponse) String() string { func (*MetricsResponse) ProtoMessage() {} func (x *MetricsResponse) ProtoReflect() protoreflect.Message { - mi := &file_ipc_proto_msgTypes[6] + mi := &file_ipc_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -352,7 +407,7 @@ func (x *MetricsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use MetricsResponse.ProtoReflect.Descriptor instead. func (*MetricsResponse) Descriptor() ([]byte, []int) { - return file_ipc_proto_rawDescGZIP(), []int{6} + return file_ipc_proto_rawDescGZIP(), []int{7} } func (x *MetricsResponse) GetMetrics() string { @@ -371,47 +426,57 @@ var file_ipc_proto_rawDesc = []byte{ 0x13, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, - 0x64, 0x22, 0x1c, 0x0a, 0x1a, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, - 0x44, 0x65, 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, - 0x38, 0x0a, 0x1b, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x44, 0x65, - 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x19, - 0x0a, 0x08, 0x64, 0x6f, 0x74, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x07, 0x64, 0x6f, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x22, 0x61, 0x0a, 0x0c, 0x50, 0x50, 0x72, - 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x72, 0x6f, - 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0b, 0x70, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, - 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x74, - 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x62, 0x75, 0x67, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x64, 0x65, 0x62, 0x75, 0x67, 0x22, 0x2e, 0x0a, 0x0d, - 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, - 0x0a, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x09, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x46, 0x69, 0x6c, 0x65, 0x22, 0x10, 0x0a, 0x0e, - 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x2b, - 0x0a, 0x0f, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x32, 0x53, 0x0a, 0x0d, 0x45, - 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x42, 0x0a, 0x0c, - 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x18, 0x2e, 0x69, - 0x70, 0x63, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, - 0x32, 0xd6, 0x01, 0x0a, 0x0d, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x48, 0x61, 0x6e, 0x64, 0x6c, - 0x65, 0x72, 0x12, 0x55, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, - 0x65, 0x44, 0x6f, 0x74, 0x12, 0x1f, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x47, 0x73, 0x74, 0x50, 0x69, - 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x44, 0x65, 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x47, 0x73, 0x74, 0x50, - 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x44, 0x65, 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x33, 0x0a, 0x08, 0x47, 0x65, 0x74, - 0x50, 0x50, 0x72, 0x6f, 0x66, 0x12, 0x11, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x50, 0x50, 0x72, 0x6f, - 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x50, - 0x50, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x39, - 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x13, 0x2e, 0x69, - 0x70, 0x63, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x14, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, - 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, - 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x69, 0x70, 0x63, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x64, 0x22, 0x53, 0x0a, 0x1a, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x53, 0x68, 0x75, 0x74, + 0x74, 0x69, 0x6e, 0x67, 0x44, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x1b, 0x0a, 0x09, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x08, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, + 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, + 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x22, 0x1c, 0x0a, 0x1a, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, + 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x44, 0x65, 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x22, 0x38, 0x0a, 0x1b, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, + 0x69, 0x6e, 0x65, 0x44, 0x65, 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x64, 0x6f, 0x74, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x64, 0x6f, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x22, 0x61, + 0x0a, 0x0c, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x21, + 0x0a, 0x0c, 0x70, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x70, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x4e, 0x61, 0x6d, + 0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x05, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x64, + 0x65, 0x62, 0x75, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x64, 0x65, 0x62, 0x75, + 0x67, 0x22, 0x2e, 0x0a, 0x0d, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x5f, 0x66, 0x69, 0x6c, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x46, 0x69, 0x6c, + 0x65, 0x22, 0x10, 0x0a, 0x0e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x22, 0x2b, 0x0a, 0x0f, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, + 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, + 0x32, 0xa5, 0x01, 0x0a, 0x0d, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x12, 0x42, 0x0a, 0x0c, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x52, 0x65, 0x61, + 0x64, 0x79, 0x12, 0x18, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, + 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, + 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x50, 0x0a, 0x13, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, + 0x72, 0x53, 0x68, 0x75, 0x74, 0x74, 0x69, 0x6e, 0x67, 0x44, 0x6f, 0x77, 0x6e, 0x12, 0x1f, 0x2e, + 0x69, 0x70, 0x63, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x53, 0x68, 0x75, 0x74, 0x74, + 0x69, 0x6e, 0x67, 0x44, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x32, 0xd6, 0x01, 0x0a, 0x0d, 0x45, 0x67, 0x72, + 0x65, 0x73, 0x73, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x55, 0x0a, 0x0e, 0x47, 0x65, + 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x44, 0x6f, 0x74, 0x12, 0x1f, 0x2e, 0x69, + 0x70, 0x63, 0x2e, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x44, 0x65, + 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, + 0x69, 0x70, 0x63, 0x2e, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x44, + 0x65, 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x12, 0x33, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x12, 0x11, 0x2e, + 0x69, 0x70, 0x63, 0x2e, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x12, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x4d, 0x65, 0x74, + 0x72, 0x69, 0x63, 0x73, 0x12, 0x13, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, + 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x69, 0x70, 0x63, 0x2e, + 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x42, 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2f, 0x70, + 0x6b, 0x67, 0x2f, 0x69, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -426,28 +491,31 @@ func file_ipc_proto_rawDescGZIP() []byte { return file_ipc_proto_rawDescData } -var file_ipc_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_ipc_proto_msgTypes = make([]protoimpl.MessageInfo, 8) var file_ipc_proto_goTypes = []interface{}{ (*HandlerReadyRequest)(nil), // 0: ipc.HandlerReadyRequest - (*GstPipelineDebugDotRequest)(nil), // 1: ipc.GstPipelineDebugDotRequest - (*GstPipelineDebugDotResponse)(nil), // 2: ipc.GstPipelineDebugDotResponse - (*PProfRequest)(nil), // 3: ipc.PProfRequest - (*PProfResponse)(nil), // 4: ipc.PProfResponse - (*MetricsRequest)(nil), // 5: ipc.MetricsRequest - (*MetricsResponse)(nil), // 6: ipc.MetricsResponse - (*emptypb.Empty)(nil), // 7: google.protobuf.Empty + (*HandlerShuttingDownRequest)(nil), // 1: ipc.HandlerShuttingDownRequest + (*GstPipelineDebugDotRequest)(nil), // 2: ipc.GstPipelineDebugDotRequest + (*GstPipelineDebugDotResponse)(nil), // 3: ipc.GstPipelineDebugDotResponse + (*PProfRequest)(nil), // 4: ipc.PProfRequest + (*PProfResponse)(nil), // 5: ipc.PProfResponse + (*MetricsRequest)(nil), // 6: ipc.MetricsRequest + (*MetricsResponse)(nil), // 7: ipc.MetricsResponse + (*emptypb.Empty)(nil), // 8: google.protobuf.Empty } var file_ipc_proto_depIdxs = []int32{ 0, // 0: ipc.EgressService.HandlerReady:input_type -> ipc.HandlerReadyRequest - 1, // 1: ipc.EgressHandler.GetPipelineDot:input_type -> ipc.GstPipelineDebugDotRequest - 3, // 2: ipc.EgressHandler.GetPProf:input_type -> ipc.PProfRequest - 5, // 3: ipc.EgressHandler.GetMetrics:input_type -> ipc.MetricsRequest - 7, // 4: ipc.EgressService.HandlerReady:output_type -> google.protobuf.Empty - 2, // 5: ipc.EgressHandler.GetPipelineDot:output_type -> ipc.GstPipelineDebugDotResponse - 4, // 6: ipc.EgressHandler.GetPProf:output_type -> ipc.PProfResponse - 6, // 7: ipc.EgressHandler.GetMetrics:output_type -> ipc.MetricsResponse - 4, // [4:8] is the sub-list for method output_type - 0, // [0:4] is the sub-list for method input_type + 1, // 1: ipc.EgressService.HandlerShuttingDown:input_type -> ipc.HandlerShuttingDownRequest + 2, // 2: ipc.EgressHandler.GetPipelineDot:input_type -> ipc.GstPipelineDebugDotRequest + 4, // 3: ipc.EgressHandler.GetPProf:input_type -> ipc.PProfRequest + 6, // 4: ipc.EgressHandler.GetMetrics:input_type -> ipc.MetricsRequest + 8, // 5: ipc.EgressService.HandlerReady:output_type -> google.protobuf.Empty + 8, // 6: ipc.EgressService.HandlerShuttingDown:output_type -> google.protobuf.Empty + 3, // 7: ipc.EgressHandler.GetPipelineDot:output_type -> ipc.GstPipelineDebugDotResponse + 5, // 8: ipc.EgressHandler.GetPProf:output_type -> ipc.PProfResponse + 7, // 9: ipc.EgressHandler.GetMetrics:output_type -> ipc.MetricsResponse + 5, // [5:10] is the sub-list for method output_type + 0, // [0:5] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name @@ -472,7 +540,7 @@ func file_ipc_proto_init() { } } file_ipc_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GstPipelineDebugDotRequest); i { + switch v := v.(*HandlerShuttingDownRequest); i { case 0: return &v.state case 1: @@ -484,7 +552,7 @@ func file_ipc_proto_init() { } } file_ipc_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GstPipelineDebugDotResponse); i { + switch v := v.(*GstPipelineDebugDotRequest); i { case 0: return &v.state case 1: @@ -496,7 +564,7 @@ func file_ipc_proto_init() { } } file_ipc_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PProfRequest); i { + switch v := v.(*GstPipelineDebugDotResponse); i { case 0: return &v.state case 1: @@ -508,7 +576,7 @@ func file_ipc_proto_init() { } } file_ipc_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PProfResponse); i { + switch v := v.(*PProfRequest); i { case 0: return &v.state case 1: @@ -520,7 +588,7 @@ func file_ipc_proto_init() { } } file_ipc_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*MetricsRequest); i { + switch v := v.(*PProfResponse); i { case 0: return &v.state case 1: @@ -532,6 +600,18 @@ func file_ipc_proto_init() { } } file_ipc_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MetricsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_ipc_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*MetricsResponse); i { case 0: return &v.state @@ -550,7 +630,7 @@ func file_ipc_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_ipc_proto_rawDesc, NumEnums: 0, - NumMessages: 7, + NumMessages: 8, NumExtensions: 0, NumServices: 2, }, diff --git a/pkg/ipc/ipc.proto b/pkg/ipc/ipc.proto index 9a13b336..ccda578c 100644 --- a/pkg/ipc/ipc.proto +++ b/pkg/ipc/ipc.proto @@ -21,12 +21,18 @@ import "google/protobuf/empty.proto"; service EgressService { rpc HandlerReady(HandlerReadyRequest) returns (google.protobuf.Empty) {}; + rpc HandlerShuttingDown(HandlerShuttingDownRequest) returns (google.protobuf.Empty) {}; } message HandlerReadyRequest { string egress_id = 1; } +message HandlerShuttingDownRequest { + string egress_id = 1; + string metrics = 2; +} + service EgressHandler { rpc GetPipelineDot(GstPipelineDebugDotRequest) returns (GstPipelineDebugDotResponse) {}; rpc GetPProf(PProfRequest) returns (PProfResponse) {}; diff --git a/pkg/ipc/ipc_grpc.pb.go b/pkg/ipc/ipc_grpc.pb.go index 4f310c0b..84802445 100644 --- a/pkg/ipc/ipc_grpc.pb.go +++ b/pkg/ipc/ipc_grpc.pb.go @@ -15,7 +15,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.25.1 +// - protoc v3.21.12 // source: ipc.proto package ipc @@ -34,7 +34,8 @@ import ( const _ = grpc.SupportPackageIsVersion7 const ( - EgressService_HandlerReady_FullMethodName = "/ipc.EgressService/HandlerReady" + EgressService_HandlerReady_FullMethodName = "/ipc.EgressService/HandlerReady" + EgressService_HandlerShuttingDown_FullMethodName = "/ipc.EgressService/HandlerShuttingDown" ) // EgressServiceClient is the client API for EgressService service. @@ -42,6 +43,7 @@ const ( // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type EgressServiceClient interface { HandlerReady(ctx context.Context, in *HandlerReadyRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + HandlerShuttingDown(ctx context.Context, in *HandlerShuttingDownRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) } type egressServiceClient struct { @@ -61,11 +63,21 @@ func (c *egressServiceClient) HandlerReady(ctx context.Context, in *HandlerReady return out, nil } +func (c *egressServiceClient) HandlerShuttingDown(ctx context.Context, in *HandlerShuttingDownRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, EgressService_HandlerShuttingDown_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // EgressServiceServer is the server API for EgressService service. // All implementations must embed UnimplementedEgressServiceServer // for forward compatibility type EgressServiceServer interface { HandlerReady(context.Context, *HandlerReadyRequest) (*emptypb.Empty, error) + HandlerShuttingDown(context.Context, *HandlerShuttingDownRequest) (*emptypb.Empty, error) mustEmbedUnimplementedEgressServiceServer() } @@ -76,6 +88,9 @@ type UnimplementedEgressServiceServer struct { func (UnimplementedEgressServiceServer) HandlerReady(context.Context, *HandlerReadyRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method HandlerReady not implemented") } +func (UnimplementedEgressServiceServer) HandlerShuttingDown(context.Context, *HandlerShuttingDownRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method HandlerShuttingDown not implemented") +} func (UnimplementedEgressServiceServer) mustEmbedUnimplementedEgressServiceServer() {} // UnsafeEgressServiceServer may be embedded to opt out of forward compatibility for this service. @@ -107,6 +122,24 @@ func _EgressService_HandlerReady_Handler(srv interface{}, ctx context.Context, d return interceptor(ctx, in, info, handler) } +func _EgressService_HandlerShuttingDown_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HandlerShuttingDownRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EgressServiceServer).HandlerShuttingDown(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: EgressService_HandlerShuttingDown_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EgressServiceServer).HandlerShuttingDown(ctx, req.(*HandlerShuttingDownRequest)) + } + return interceptor(ctx, in, info, handler) +} + // EgressService_ServiceDesc is the grpc.ServiceDesc for EgressService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -118,6 +151,10 @@ var EgressService_ServiceDesc = grpc.ServiceDesc{ MethodName: "HandlerReady", Handler: _EgressService_HandlerReady_Handler, }, + { + MethodName: "HandlerShuttingDown", + Handler: _EgressService_HandlerShuttingDown_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "ipc.proto", diff --git a/pkg/service/process.go b/pkg/service/process.go index b399266a..d13da311 100644 --- a/pkg/service/process.go +++ b/pkg/service/process.go @@ -17,12 +17,9 @@ package service import ( "context" "os/exec" - "strings" "github.com/frostbyte73/core" dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - "golang.org/x/exp/maps" "github.com/livekit/egress/pkg/ipc" "github.com/livekit/protocol/livekit" @@ -76,18 +73,10 @@ func (p *Process) Gather() ([]*dto.MetricFamily, error) { logger.Warnw("failed to obtain metrics from handler", err, "egress_id", p.req.EgressId) return make([]*dto.MetricFamily, 0), nil // don't return an error, just skip this handler } - // Parse the result to match the Gatherer interface - parser := &expfmt.TextParser{} - families, err := parser.TextToMetricFamilies(strings.NewReader(metricsResponse.Metrics)) - if err != nil { - logger.Warnw("failed to parse metrics from handler", err, "egress_id", p.req.EgressId) - return make([]*dto.MetricFamily, 0), nil // don't return an error, just skip this handler - } - // Add an egress_id label to every metric all the families, if it doesn't already have one - applyDefaultLabel(p.info.EgressId, families) + // Parse the result to match the Gatherer interface + return deserializeMetrics(p.info.EgressId, metricsResponse.Metrics) - return maps.Values(families), nil } func applyDefaultLabel(egressID string, families map[string]*dto.MetricFamily) { diff --git a/pkg/service/service.go b/pkg/service/service.go index b5417b23..59781393 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -27,6 +27,7 @@ import ( "time" "github.com/frostbyte73/core" + dto "github.com/prometheus/client_model/go" "google.golang.org/grpc" "github.com/livekit/egress/pkg/config" @@ -52,6 +53,7 @@ type Service struct { mu sync.RWMutex activeHandlers map[string]*Process + pendingMetrics []*dto.MetricFamily shutdown core.Fuse } diff --git a/pkg/service/service_ipc.go b/pkg/service/service_ipc.go index 9afe6a3b..0421430e 100644 --- a/pkg/service/service_ipc.go +++ b/pkg/service/service_ipc.go @@ -35,3 +35,12 @@ func (s *Service) HandlerReady(ctx context.Context, req *ipc.HandlerReadyRequest close(p.ready) return &emptypb.Empty{}, nil } + +func (s *Service) HandlerShuttingDown(ctx context.Context, req *ipc.HandlerShuttingDownRequest) (*emptypb.Empty, error) { + err := s.storeProcessEndedMetrics(req.EgressId, req.Metrics) + if err != nil { + return nil, err + } + + return &emptypb.Empty{}, nil +} diff --git a/pkg/service/service_prom.go b/pkg/service/service_prom.go index 46f35bfe..ae6212cf 100644 --- a/pkg/service/service_prom.go +++ b/pkg/service/service_prom.go @@ -17,12 +17,16 @@ package service import ( "context" "net/http" + "strings" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "golang.org/x/exp/maps" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/tracer" ) @@ -32,16 +36,28 @@ func (s *Service) CreateGatherer() prometheus.Gatherer { _, span := tracer.Start(context.Background(), "Service.GathererOfHandlerMetrics") defer span.End() - s.mu.RLock() - defer s.mu.RUnlock() - gatherers := prometheus.Gatherers{} // Include the default repo gatherers = append(gatherers, prometheus.DefaultGatherer) + // Include process ended metrics + gatherers = append(gatherers, prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { + s.mu.Lock() + + m := s.pendingMetrics + s.pendingMetrics = nil + + s.mu.Unlock() + + return m, nil + })) + + s.mu.RLock() // add all the active handlers as sources for _, v := range s.activeHandlers { gatherers = append(gatherers, v) } + s.mu.RUnlock() + return gatherers.Gather() }) } @@ -72,3 +88,31 @@ func (s *Service) promCanAcceptRequest() float64 { } return 0 } + +func (s *Service) storeProcessEndedMetrics(egressID string, metrics string) error { + m, err := deserializeMetrics(egressID, metrics) + if err != nil { + return err + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.pendingMetrics = append(s.pendingMetrics, m...) + + return nil +} + +func deserializeMetrics(egressID string, s string) ([]*dto.MetricFamily, error) { + parser := &expfmt.TextParser{} + families, err := parser.TextToMetricFamilies(strings.NewReader(s)) + if err != nil { + logger.Warnw("failed to parse metrics from handler", err, "egress_id", egressID) + return make([]*dto.MetricFamily, 0), nil // don't return an error, just skip this handler + } + + // Add an egress_id label to every metric all the families, if it doesn't already have one + applyDefaultLabel(egressID, families) + + return maps.Values(families), nil +}