diff --git a/README.md b/README.md index 43bb114a..a1dab27d 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,12 @@ s3: region: AWS_DEFAULT_REGION env or IAM role can be used instead endpoint: (optional) custom endpoint bucket: bucket to upload files to + # the following s3 options can only be set in config, *not* per request, they will be added to any per-request options + proxy: (optional, no default) proxy url + max_retries: (optional, default=3) number or retries to attempt + max_retry_delay: (optional, default=5s) max delay between retries (e.g. 5s, 100ms, 1m...) + min_retry_delay: (optional, default=500ms) min delay between retries (e.g. 100ms, 1s...) + aws_log_level: (optional, default=LogOff) log level for aws sdk (LogDebugWithRequestRetries, LogDebug, ...) azure: account_name: AZURE_STORAGE_ACCOUNT env can be used instead account_key: AZURE_STORAGE_KEY env can be used instead diff --git a/build/egress/Dockerfile b/build/egress/Dockerfile index 70aeb09d..130767d3 100644 --- a/build/egress/Dockerfile +++ b/build/egress/Dockerfile @@ -20,9 +20,9 @@ WORKDIR /workspace # install go RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then GOARCH=arm64; else GOARCH=amd64; fi && \ - wget https://go.dev/dl/go1.20.7.linux-${GOARCH}.tar.gz && \ + wget https://go.dev/dl/go1.21.4.linux-${GOARCH}.tar.gz && \ rm -rf /usr/local/go && \ - tar -C /usr/local -xzf go1.20.7.linux-${GOARCH}.tar.gz + tar -C /usr/local -xzf go1.21.4.linux-${GOARCH}.tar.gz ENV PATH="/usr/local/go/bin:${PATH}" diff --git a/build/test/Dockerfile b/build/test/Dockerfile index 1cdcd460..4feaaed6 100644 --- a/build/test/Dockerfile +++ b/build/test/Dockerfile @@ -20,9 +20,9 @@ ARG TARGETPLATFORM # install go RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then GOARCH=arm64; else GOARCH=amd64; fi && \ - wget https://go.dev/dl/go1.20.7.linux-${GOARCH}.tar.gz && \ + wget https://go.dev/dl/go1.21.4.linux-${GOARCH}.tar.gz && \ rm -rf /usr/local/go && \ - tar -C /usr/local -xzf go1.20.7.linux-${GOARCH}.tar.gz + tar -C /usr/local -xzf go1.21.4.linux-${GOARCH}.tar.gz ENV PATH="/usr/local/go/bin:${PATH}" # download go modules diff --git a/go.mod b/go.mod index 8125f277..8d6ac7ec 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/livekit/egress -go 1.20 +go 1.21 + +toolchain go1.21.1 require ( cloud.google.com/go/storage v1.31.0 diff --git a/pkg/config/base.go b/pkg/config/base.go index 6c3e8870..85692c19 100644 --- a/pkg/config/base.go +++ b/pkg/config/base.go @@ -63,12 +63,17 @@ type StorageConfig struct { } type S3Config struct { - AccessKey string `yaml:"access_key"` // (env AWS_ACCESS_KEY_ID) - Secret string `yaml:"secret"` // (env AWS_SECRET_ACCESS_KEY) - Region string `yaml:"region"` // (env AWS_DEFAULT_REGION) - Endpoint string `yaml:"endpoint"` - Bucket string `yaml:"bucket"` - ForcePathStyle bool `yaml:"force_path_style"` + AccessKey string `yaml:"access_key"` // (env AWS_ACCESS_KEY_ID) + Secret string `yaml:"secret"` // (env AWS_SECRET_ACCESS_KEY) + Region string `yaml:"region"` // (env AWS_DEFAULT_REGION) + Endpoint string `yaml:"endpoint"` + Bucket string `yaml:"bucket"` + ForcePathStyle bool `yaml:"force_path_style"` + Proxy string `yaml:"proxy"` + MaxRetries int `yaml:"max_retries"` + MaxRetryDelay time.Duration `yaml:"max_retry_delay"` + MinRetryDelay time.Duration `yaml:"min_retry_delay"` + AwsLogLevel string `yaml:"aws_log_level"` } type AzureConfig struct { diff --git a/pkg/config/uploads.go b/pkg/config/uploads.go index 0dc24bf2..2d8e96c2 100644 --- a/pkg/config/uploads.go +++ b/pkg/config/uploads.go @@ -15,8 +15,10 @@ package config import ( + "github.com/aws/aws-sdk-go/aws" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" + "time" ) type UploadConfig interface{} @@ -28,9 +30,31 @@ type uploadRequest interface { GetAliOSS() *livekit.AliOSSUpload } +type EgressS3Upload struct { + *livekit.S3Upload + Proxy string + MaxRetries int + MaxRetryDelay time.Duration + MinRetryDelay time.Duration + AwsLogLevel aws.LogLevelType +} + func (p *PipelineConfig) getUploadConfig(req uploadRequest) UploadConfig { if s3 := req.GetS3(); s3 != nil { - return s3 + s3StorageConfigFromReq := &EgressS3Upload{ + S3Upload: s3, + } + // merge in options from config (proxy, retry limit, delay and aws logging) if specified + if p.S3 != nil { + // parse config.yaml options and get defaults + S3StorageConfigFromConfigYaml := p.ToUploadConfig().(*EgressS3Upload) + // merge into pipeline config created from request options + s3StorageConfigFromReq.Proxy = S3StorageConfigFromConfigYaml.Proxy + s3StorageConfigFromReq.MaxRetries = S3StorageConfigFromConfigYaml.MaxRetries + s3StorageConfigFromReq.MaxRetryDelay = S3StorageConfigFromConfigYaml.MaxRetryDelay + s3StorageConfigFromReq.AwsLogLevel = S3StorageConfigFromConfigYaml.AwsLogLevel + } + return s3StorageConfigFromReq } if gcp := req.GetGcp(); gcp != nil { return gcp @@ -47,14 +71,50 @@ func (p *PipelineConfig) getUploadConfig(req uploadRequest) UploadConfig { func (c StorageConfig) ToUploadConfig() UploadConfig { if c.S3 != nil { - return &livekit.S3Upload{ - AccessKey: c.S3.AccessKey, - Secret: c.S3.Secret, - Region: c.S3.Region, - Endpoint: c.S3.Endpoint, - Bucket: c.S3.Bucket, - ForcePathStyle: c.S3.ForcePathStyle, + s3StorageConfig := &EgressS3Upload{ + S3Upload: &livekit.S3Upload{ + AccessKey: c.S3.AccessKey, + Secret: c.S3.Secret, + Region: c.S3.Region, + Endpoint: c.S3.Endpoint, + Bucket: c.S3.Bucket, + ForcePathStyle: c.S3.ForcePathStyle, + }, + Proxy: c.S3.Proxy, + } + // Handle max retries with default + if c.S3.MaxRetries > 0 { + s3StorageConfig.MaxRetries = c.S3.MaxRetries + } else { + s3StorageConfig.MaxRetries = 3 + } + // Handle min/max delay (for backoff) with defaults + if c.S3.MaxRetryDelay > 0 { + s3StorageConfig.MaxRetryDelay = c.S3.MaxRetryDelay + } else { + s3StorageConfig.MaxRetryDelay = time.Second * 5 + } + if c.S3.MinRetryDelay > 0 { + s3StorageConfig.MinRetryDelay = c.S3.MinRetryDelay + } else { + s3StorageConfig.MinRetryDelay = time.Millisecond * 100 + } + // Handle AWS log level with default + switch c.S3.AwsLogLevel { + case "LogDebugWithRequestRetries": + s3StorageConfig.AwsLogLevel = aws.LogDebugWithRequestRetries + case "LogDebug": + s3StorageConfig.AwsLogLevel = aws.LogDebug + case "LogDebugWithRequestErrors": + s3StorageConfig.AwsLogLevel = aws.LogDebugWithRequestErrors + case "LogDebugWithHTTPBody": + s3StorageConfig.AwsLogLevel = aws.LogDebugWithHTTPBody + case "LogDebugWithSigning": + s3StorageConfig.AwsLogLevel = aws.LogDebugWithSigning + default: + s3StorageConfig.AwsLogLevel = aws.LogOff } + return s3StorageConfig } if c.Azure != nil { return &livekit.AzureBlobUpload{ diff --git a/pkg/ipc/ipc.pb.go b/pkg/ipc/ipc.pb.go index cbc28fc2..ed7dbecd 100644 --- a/pkg/ipc/ipc.pb.go +++ b/pkg/ipc/ipc.pb.go @@ -1,7 +1,21 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.27.1 -// protoc v3.21.9 +// protoc-gen-go v1.28.1 +// protoc v4.24.4 // source: ipc.proto package ipc @@ -215,6 +229,91 @@ func (x *PProfResponse) GetPprofFile() []byte { return nil } +type MetricsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *MetricsRequest) Reset() { + *x = MetricsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_ipc_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MetricsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetricsRequest) ProtoMessage() {} + +func (x *MetricsRequest) ProtoReflect() protoreflect.Message { + mi := &file_ipc_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MetricsRequest.ProtoReflect.Descriptor instead. +func (*MetricsRequest) Descriptor() ([]byte, []int) { + return file_ipc_proto_rawDescGZIP(), []int{4} +} + +type MetricsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Metrics string `protobuf:"bytes,1,opt,name=metrics,proto3" json:"metrics,omitempty"` +} + +func (x *MetricsResponse) Reset() { + *x = MetricsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_ipc_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MetricsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetricsResponse) ProtoMessage() {} + +func (x *MetricsResponse) ProtoReflect() protoreflect.Message { + mi := &file_ipc_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MetricsResponse.ProtoReflect.Descriptor instead. +func (*MetricsResponse) Descriptor() ([]byte, []int) { + return file_ipc_proto_rawDescGZIP(), []int{5} +} + +func (x *MetricsResponse) GetMetrics() string { + if x != nil { + return x.Metrics + } + return "" +} + var File_ipc_proto protoreflect.FileDescriptor var file_ipc_proto_rawDesc = []byte{ @@ -233,20 +332,27 @@ var file_ipc_proto_rawDesc = []byte{ 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x64, 0x65, 0x62, 0x75, 0x67, 0x22, 0x2e, 0x0a, 0x0d, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x09, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x46, 0x69, 0x6c, 0x65, 0x32, 0x9b, 0x01, 0x0a, 0x0d, - 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x55, 0x0a, - 0x0e, 0x47, 0x65, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x44, 0x6f, 0x74, 0x12, - 0x1f, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, - 0x65, 0x44, 0x65, 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x20, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, - 0x6e, 0x65, 0x44, 0x65, 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x12, 0x33, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x50, 0x50, 0x72, 0x6f, 0x66, - 0x12, 0x11, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, - 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, - 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x69, 0x70, 0x63, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x52, 0x09, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x46, 0x69, 0x6c, 0x65, 0x22, 0x10, 0x0a, 0x0e, 0x4d, + 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x2b, 0x0a, + 0x0f, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x32, 0xd6, 0x01, 0x0a, 0x0d, 0x45, + 0x67, 0x72, 0x65, 0x73, 0x73, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x55, 0x0a, 0x0e, + 0x47, 0x65, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x44, 0x6f, 0x74, 0x12, 0x1f, + 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, + 0x44, 0x65, 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x20, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, + 0x65, 0x44, 0x65, 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x00, 0x12, 0x33, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x12, + 0x11, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x4d, + 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x13, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x4d, 0x65, 0x74, + 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x69, 0x70, + 0x63, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x00, 0x42, 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, + 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x69, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -261,20 +367,24 @@ func file_ipc_proto_rawDescGZIP() []byte { return file_ipc_proto_rawDescData } -var file_ipc_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_ipc_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_ipc_proto_goTypes = []interface{}{ (*GstPipelineDebugDotRequest)(nil), // 0: ipc.GstPipelineDebugDotRequest (*GstPipelineDebugDotResponse)(nil), // 1: ipc.GstPipelineDebugDotResponse (*PProfRequest)(nil), // 2: ipc.PProfRequest (*PProfResponse)(nil), // 3: ipc.PProfResponse + (*MetricsRequest)(nil), // 4: ipc.MetricsRequest + (*MetricsResponse)(nil), // 5: ipc.MetricsResponse } var file_ipc_proto_depIdxs = []int32{ 0, // 0: ipc.EgressHandler.GetPipelineDot:input_type -> ipc.GstPipelineDebugDotRequest 2, // 1: ipc.EgressHandler.GetPProf:input_type -> ipc.PProfRequest - 1, // 2: ipc.EgressHandler.GetPipelineDot:output_type -> ipc.GstPipelineDebugDotResponse - 3, // 3: ipc.EgressHandler.GetPProf:output_type -> ipc.PProfResponse - 2, // [2:4] is the sub-list for method output_type - 0, // [0:2] is the sub-list for method input_type + 4, // 2: ipc.EgressHandler.GetMetrics:input_type -> ipc.MetricsRequest + 1, // 3: ipc.EgressHandler.GetPipelineDot:output_type -> ipc.GstPipelineDebugDotResponse + 3, // 4: ipc.EgressHandler.GetPProf:output_type -> ipc.PProfResponse + 5, // 5: ipc.EgressHandler.GetMetrics:output_type -> ipc.MetricsResponse + 3, // [3:6] is the sub-list for method output_type + 0, // [0:3] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name @@ -334,6 +444,30 @@ func file_ipc_proto_init() { return nil } } + file_ipc_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MetricsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_ipc_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MetricsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -341,7 +475,7 @@ func file_ipc_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_ipc_proto_rawDesc, NumEnums: 0, - NumMessages: 4, + NumMessages: 6, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/ipc/ipc.proto b/pkg/ipc/ipc.proto index be647f51..ecb38e4b 100644 --- a/pkg/ipc/ipc.proto +++ b/pkg/ipc/ipc.proto @@ -20,6 +20,7 @@ option go_package = "github.com/livekit/egress/pkg/ipc"; service EgressHandler { rpc GetPipelineDot(GstPipelineDebugDotRequest) returns (GstPipelineDebugDotResponse) {}; rpc GetPProf(PProfRequest) returns (PProfResponse) {}; + rpc GetMetrics(MetricsRequest) returns (MetricsResponse) {}; } message GstPipelineDebugDotRequest {} @@ -37,3 +38,9 @@ message PProfRequest { message PProfResponse { bytes pprof_file = 1; } + +message MetricsRequest {} + +message MetricsResponse { + string metrics = 1; +} diff --git a/pkg/ipc/ipc_grpc.pb.go b/pkg/ipc/ipc_grpc.pb.go index 7c0a41c9..a87f4953 100644 --- a/pkg/ipc/ipc_grpc.pb.go +++ b/pkg/ipc/ipc_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.21.9 +// - protoc v4.24.4 // source: ipc.proto package ipc @@ -24,6 +24,7 @@ const _ = grpc.SupportPackageIsVersion7 type EgressHandlerClient interface { GetPipelineDot(ctx context.Context, in *GstPipelineDebugDotRequest, opts ...grpc.CallOption) (*GstPipelineDebugDotResponse, error) GetPProf(ctx context.Context, in *PProfRequest, opts ...grpc.CallOption) (*PProfResponse, error) + GetMetrics(ctx context.Context, in *MetricsRequest, opts ...grpc.CallOption) (*MetricsResponse, error) } type egressHandlerClient struct { @@ -52,12 +53,22 @@ func (c *egressHandlerClient) GetPProf(ctx context.Context, in *PProfRequest, op return out, nil } +func (c *egressHandlerClient) GetMetrics(ctx context.Context, in *MetricsRequest, opts ...grpc.CallOption) (*MetricsResponse, error) { + out := new(MetricsResponse) + err := c.cc.Invoke(ctx, "/ipc.EgressHandler/GetMetrics", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // EgressHandlerServer is the server API for EgressHandler service. // All implementations must embed UnimplementedEgressHandlerServer // for forward compatibility type EgressHandlerServer interface { GetPipelineDot(context.Context, *GstPipelineDebugDotRequest) (*GstPipelineDebugDotResponse, error) GetPProf(context.Context, *PProfRequest) (*PProfResponse, error) + GetMetrics(context.Context, *MetricsRequest) (*MetricsResponse, error) mustEmbedUnimplementedEgressHandlerServer() } @@ -71,6 +82,9 @@ func (UnimplementedEgressHandlerServer) GetPipelineDot(context.Context, *GstPipe func (UnimplementedEgressHandlerServer) GetPProf(context.Context, *PProfRequest) (*PProfResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetPProf not implemented") } +func (UnimplementedEgressHandlerServer) GetMetrics(context.Context, *MetricsRequest) (*MetricsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetMetrics not implemented") +} func (UnimplementedEgressHandlerServer) mustEmbedUnimplementedEgressHandlerServer() {} // UnsafeEgressHandlerServer may be embedded to opt out of forward compatibility for this service. @@ -120,6 +134,24 @@ func _EgressHandler_GetPProf_Handler(srv interface{}, ctx context.Context, dec f return interceptor(ctx, in, info, handler) } +func _EgressHandler_GetMetrics_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(MetricsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EgressHandlerServer).GetMetrics(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/ipc.EgressHandler/GetMetrics", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EgressHandlerServer).GetMetrics(ctx, req.(*MetricsRequest)) + } + return interceptor(ctx, in, info, handler) +} + // EgressHandler_ServiceDesc is the grpc.ServiceDesc for EgressHandler service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -135,6 +167,10 @@ var EgressHandler_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetPProf", Handler: _EgressHandler_GetPProf_Handler, }, + { + MethodName: "GetMetrics", + Handler: _EgressHandler_GetMetrics_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "ipc.proto", diff --git a/pkg/pipeline/debug.go b/pkg/pipeline/debug.go index 0f2ca1c1..39be92ea 100644 --- a/pkg/pipeline/debug.go +++ b/pkg/pipeline/debug.go @@ -17,6 +17,7 @@ package pipeline import ( "context" "fmt" + "github.com/livekit/egress/pkg/stats" "os" "path" "strings" @@ -37,7 +38,8 @@ func (c *Controller) GetGstPipelineDebugDot() string { } func (c *Controller) uploadDebugFiles() { - u, err := uploader.New(c.Debug.ToUploadConfig(), "") + monitor := *stats.NewHandlerMonitor(c.NodeID, c.ClusterID, c.Info.EgressId) + u, err := uploader.New(c.Debug.ToUploadConfig(), "", monitor) if err != nil { logger.Errorw("failed to create uploader", err) return @@ -89,7 +91,7 @@ func (c *Controller) uploadTrackFiles(u uploader.Uploader) { if strings.HasSuffix(f.Name(), ".csv") { local := path.Join(dir, f.Name()) storage := path.Join(c.Debug.PathPrefix, f.Name()) - _, _, err = u.Upload(local, storage, types.OutputTypeBlob, false) + _, _, err = u.Upload(local, storage, types.OutputTypeBlob, false, "track") if err != nil { logger.Errorw("failed to upload debug file", err) return @@ -137,7 +139,7 @@ func (c *Controller) uploadDebugFile(u uploader.Uploader, data []byte, fileExten return } - _, _, err = u.Upload(local, storage, types.OutputTypeBlob, false) + _, _, err = u.Upload(local, storage, types.OutputTypeBlob, false, "debug") if err != nil { logger.Errorw("failed to upload debug file", err) return diff --git a/pkg/pipeline/sink/file.go b/pkg/pipeline/sink/file.go index 97acb050..0eb9c2ac 100644 --- a/pkg/pipeline/sink/file.go +++ b/pkg/pipeline/sink/file.go @@ -16,12 +16,11 @@ package sink import ( "fmt" - "os" - "path" - "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/pipeline/sink/uploader" "github.com/livekit/protocol/logger" + "os" + "path" ) type FileSink struct { @@ -44,10 +43,8 @@ func (s *FileSink) Start() error { } func (s *FileSink) Close() error { - location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false) - if err != nil { - return err - } + location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false, "file") + s.FileInfo.Location = location s.FileInfo.Size = size diff --git a/pkg/pipeline/sink/image.go b/pkg/pipeline/sink/image.go index e9721ea9..33a62ba6 100644 --- a/pkg/pipeline/sink/image.go +++ b/pkg/pipeline/sink/image.go @@ -108,7 +108,7 @@ func (s *ImageSink) handleNewImage(update *imageUpdate) error { imageStoragePath := path.Join(s.StorageDir, filename) - _, size, err := s.Upload(imageLocalPath, imageStoragePath, s.OutputType, true) + _, size, err := s.Upload(imageLocalPath, imageStoragePath, s.OutputType, true, "image") if err != nil { return err } diff --git a/pkg/pipeline/sink/image_manifest.go b/pkg/pipeline/sink/image_manifest.go index 8cc466a3..36a6090c 100644 --- a/pkg/pipeline/sink/image_manifest.go +++ b/pkg/pipeline/sink/image_manifest.go @@ -71,7 +71,7 @@ func (m *ImageManifest) updateManifest(u uploader.Uploader, localFilepath, stora return err } - _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false) + _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false, "image_manifest") return err } diff --git a/pkg/pipeline/sink/manifest.go b/pkg/pipeline/sink/manifest.go index 6d967971..68556b8f 100644 --- a/pkg/pipeline/sink/manifest.go +++ b/pkg/pipeline/sink/manifest.go @@ -16,11 +16,10 @@ package sink import ( "encoding/json" - "os" - "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/pipeline/sink/uploader" "github.com/livekit/egress/pkg/types" + "os" ) type Manifest struct { @@ -55,7 +54,8 @@ func uploadManifest(p *config.PipelineConfig, u uploader.Uploader, localFilepath return err } - _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false) + _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false, "manifest") + return err } diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index 57413929..9dbb1ff8 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -16,6 +16,7 @@ package sink import ( "fmt" + "github.com/livekit/egress/pkg/stats" "os" "path" "strings" @@ -70,7 +71,7 @@ type SegmentUpdate struct { uploadComplete chan struct{} } -func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig, callbacks *gstreamer.Callbacks) (*SegmentSink, error) { +func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig, callbacks *gstreamer.Callbacks, monitor stats.HandlerMonitor) (*SegmentSink, error) { playlistName := path.Join(o.LocalDir, o.PlaylistFilename) playlist, err := m3u8.NewEventPlaylistWriter(playlistName, o.SegmentDuration) if err != nil { @@ -91,7 +92,7 @@ func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Seg outputType = types.OutputTypeTS } - return &SegmentSink{ + s := &SegmentSink{ Uploader: u, SegmentConfig: o, conf: p, @@ -104,7 +105,19 @@ func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Seg playlistUpdates: make(chan SegmentUpdate, maxPendingUploads), throttle: core.NewThrottle(time.Second * 2), done: core.NewFuse(), - }, nil + } + + // Register gauges that track the number of segments and playlist updates pending upload + monitor.RegisterPlaylistChannelSizeGauge(s.conf.NodeID, s.conf.ClusterID, s.conf.Info.EgressId, + func() float64 { + return float64(len(s.playlistUpdates)) + }) + monitor.RegisterSegmentsChannelSizeGauge(s.conf.NodeID, s.conf.ClusterID, s.conf.Info.EgressId, + func() float64 { + return float64(len(s.closedSegments)) + }) + + return s, nil } func (s *SegmentSink) Start() error { @@ -139,11 +152,7 @@ func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) { go func() { defer close(update.uploadComplete) - _, size, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true) - if err != nil { - s.callbacks.OnError(err) - return - } + _, size, _ := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true, "segment") // lock segment info updates s.infoLock.Lock() @@ -306,7 +315,7 @@ func (s *SegmentSink) uploadPlaylist() error { var err error playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) - s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) + s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false, "playlist") return err } @@ -314,6 +323,6 @@ func (s *SegmentSink) uploadLivePlaylist() error { var err error liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename) liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename) - s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false) + s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false, "live_playlist") return err } diff --git a/pkg/pipeline/sink/sink.go b/pkg/pipeline/sink/sink.go index e3951117..edf5805b 100644 --- a/pkg/pipeline/sink/sink.go +++ b/pkg/pipeline/sink/sink.go @@ -18,6 +18,7 @@ import ( "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/gstreamer" "github.com/livekit/egress/pkg/pipeline/sink/uploader" + "github.com/livekit/egress/pkg/stats" "github.com/livekit/egress/pkg/types" ) @@ -35,11 +36,12 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[ } var s Sink var err error + monitor := *stats.NewHandlerMonitor(p.NodeID, p.ClusterID, p.Info.EgressId) switch egressType { case types.EgressTypeFile: o := c[0].(*config.FileConfig) - u, err := uploader.New(o.UploadConfig, p.BackupStorage) + u, err := uploader.New(o.UploadConfig, p.BackupStorage, monitor) if err != nil { return nil, err } @@ -49,12 +51,12 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[ case types.EgressTypeSegments: o := c[0].(*config.SegmentConfig) - u, err := uploader.New(o.UploadConfig, p.BackupStorage) + u, err := uploader.New(o.UploadConfig, p.BackupStorage, monitor) if err != nil { return nil, err } - s, err = newSegmentSink(u, p, o, callbacks) + s, err = newSegmentSink(u, p, o, callbacks, monitor) if err != nil { return nil, err } @@ -73,7 +75,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[ for _, ci := range c { o := ci.(*config.ImageConfig) - u, err := uploader.New(o.UploadConfig, p.BackupStorage) + u, err := uploader.New(o.UploadConfig, p.BackupStorage, monitor) if err != nil { return nil, err } diff --git a/pkg/pipeline/sink/uploader/s3.go b/pkg/pipeline/sink/uploader/s3.go index 6641f83b..f30b8299 100644 --- a/pkg/pipeline/sink/uploader/s3.go +++ b/pkg/pipeline/sink/uploader/s3.go @@ -16,16 +16,19 @@ package uploader import ( "fmt" - "os" - "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/client" "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/livekit/egress/pkg/config" + "net/http" + "net/url" + "os" "github.com/livekit/egress/pkg/types" - "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/psrpc" ) @@ -34,6 +37,17 @@ const ( getBucketLocationRegion = "us-east-1" ) +// CustomRetryer wraps the SDK's built in DefaultRetryer adding additional +// custom features. Namely, to always retry. +type CustomRetryer struct { + client.DefaultRetryer +} + +// ShouldRetry overrides the SDK's built in DefaultRetryer because the PUTs for segments/playlists are always idempotent +func (r CustomRetryer) ShouldRetry(req *request.Request) bool { + return true +} + type S3Uploader struct { awsConfig *aws.Config bucket *string @@ -41,11 +55,24 @@ type S3Uploader struct { tagging *string } -func newS3Uploader(conf *livekit.S3Upload) (uploader, error) { +func newS3Uploader(conf *config.EgressS3Upload) (uploader, error) { awsConfig := &aws.Config{ - MaxRetries: aws.Int(maxRetries), // Switching to v2 of the aws Go SDK would allow to set a maxDelay as well. + Retryer: &CustomRetryer{ + DefaultRetryer: client.DefaultRetryer{ + NumMaxRetries: conf.MaxRetries, + MaxRetryDelay: conf.MaxRetryDelay, + MaxThrottleDelay: conf.MaxRetryDelay, + MinRetryDelay: conf.MinRetryDelay, + MinThrottleDelay: conf.MinRetryDelay, + }, + }, S3ForcePathStyle: aws.Bool(conf.ForcePathStyle), + LogLevel: aws.LogLevel(conf.AwsLogLevel), } + logger.Infow("setting AWS config", "maxRetries", conf.MaxRetries, + "maxDelay", conf.MaxRetryDelay, + "minDelay", conf.MinRetryDelay, + ) if conf.AccessKey != "" && conf.Secret != "" { awsConfig.Credentials = credentials.NewStaticCredentials(conf.AccessKey, conf.Secret, "") } @@ -71,6 +98,22 @@ func newS3Uploader(conf *livekit.S3Upload) (uploader, error) { u.awsConfig.Region = aws.String(region) } + if conf.Proxy != "" { + logger.Infow("configuring s3 with proxy", "proxyEndpoint", conf.Proxy) + // Proxy configuration + proxyURL, err := url.Parse(conf.Proxy) + if err != nil { + logger.Errorw("failed to parse proxy URL -- proxy not set", err, "proxy", conf.Proxy) + } else { + proxyTransport := &http.Transport{ + Proxy: http.ProxyURL(proxyURL), + } + u.awsConfig.HTTPClient = &http.Client{Transport: proxyTransport} + } + } else { + logger.Infow("not configuring s3 with proxy since none was provided in config") + } + if len(conf.Metadata) > 0 { u.metadata = make(map[string]*string, len(conf.Metadata)) for k, v := range conf.Metadata { diff --git a/pkg/pipeline/sink/uploader/uploader.go b/pkg/pipeline/sink/uploader/uploader.go index 7d96c900..c5b05644 100644 --- a/pkg/pipeline/sink/uploader/uploader.go +++ b/pkg/pipeline/sink/uploader/uploader.go @@ -16,6 +16,7 @@ package uploader import ( "fmt" + "github.com/livekit/egress/pkg/stats" "os" "path" "time" @@ -34,19 +35,19 @@ const ( ) type Uploader interface { - Upload(string, string, types.OutputType, bool) (string, int64, error) + Upload(string, string, types.OutputType, bool, string) (string, int64, error) } type uploader interface { upload(string, string, types.OutputType) (string, int64, error) } -func New(conf config.UploadConfig, backup string) (Uploader, error) { +func New(conf config.UploadConfig, backup string, monitor stats.HandlerMonitor) (Uploader, error) { var u uploader var err error switch c := conf.(type) { - case *livekit.S3Upload: + case *config.EgressS3Upload: u, err = newS3Uploader(c) case *livekit.GCPUpload: u, err = newGCPUploader(c) @@ -61,27 +62,35 @@ func New(conf config.UploadConfig, backup string) (Uploader, error) { return nil, err } - return &remoteUploader{ + remoteUploader := &remoteUploader{ uploader: u, backup: backup, - }, nil + monitor: monitor, + } + + return remoteUploader, nil } type remoteUploader struct { uploader - backup string + backup string + monitor stats.HandlerMonitor } -func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool) (string, int64, error) { +func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, fileType string) (string, int64, error) { + start := time.Now() location, size, err := u.upload(localFilepath, storageFilepath, outputType) + elapsed := time.Since(start).Milliseconds() if err == nil { + u.monitor.IncUploadCountSuccess(fileType, float64(elapsed)) if deleteAfterUpload { _ = os.Remove(localFilepath) } return location, size, nil } + u.monitor.IncUploadCountFailure(fileType, float64(elapsed)) if u.backup != "" { stat, err := os.Stat(localFilepath) @@ -93,6 +102,7 @@ func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputTyp if err = os.Rename(localFilepath, backupFilepath); err != nil { return "", 0, err } + u.monitor.IncBackupStorageWrites(string(outputType)) return backupFilepath, stat.Size(), nil } @@ -102,7 +112,7 @@ func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputTyp type localUploader struct{} -func (u *localUploader) Upload(localFilepath, _ string, _ types.OutputType, _ bool) (string, int64, error) { +func (u *localUploader) Upload(localFilepath, _ string, _ types.OutputType, _ bool, _ string) (string, int64, error) { stat, err := os.Stat(localFilepath) if err != nil { return "", 0, err diff --git a/pkg/service/handler.go b/pkg/service/handler.go index a57f024b..a1806183 100644 --- a/pkg/service/handler.go +++ b/pkg/service/handler.go @@ -16,7 +16,10 @@ package service import ( "context" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "net" + "strings" "time" "github.com/frostbyte73/core" @@ -34,6 +37,8 @@ import ( "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/tracer" "github.com/livekit/psrpc" + + "github.com/prometheus/client_golang/prometheus" ) const network = "unix" @@ -196,6 +201,47 @@ func (h *Handler) GetPProf(ctx context.Context, req *ipc.PProfRequest) (*ipc.PPr }, nil } +// GetMetrics implement the handler-side gathering of metrics to return over IPC +func (h *Handler) GetMetrics(ctx context.Context, req *ipc.MetricsRequest) (*ipc.MetricsResponse, error) { + ctx, span := tracer.Start(ctx, "Handler.GetMetrics") + defer span.End() + + metrics, err := prometheus.DefaultGatherer.Gather() + if err != nil { + return nil, err + } + + logger.Debugw("returning metrics from handler process", "sizeOfFamilies", len(metrics)) + metricsAsString, cnt, err := renderMetrics(metrics) + if err != nil { + return &ipc.MetricsResponse{ + Metrics: "", + }, err + } + logger.Debugw("Metrics returned from handler process", "cnt", cnt, "metrics", metricsAsString) + return &ipc.MetricsResponse{ + Metrics: metricsAsString, + }, nil +} + +func renderMetrics(metrics []*dto.MetricFamily) (string, int, error) { + // Create a StringWriter to render the metrics into text format + writer := &strings.Builder{} + totalCnt := 0 + for _, metric := range metrics { + // Write each metric family to text + cnt, err := expfmt.MetricFamilyToText(writer, metric) + if err != nil { + logger.Errorw("Error writing metric family", err) + return "", 0, err + } + totalCnt += cnt + } + + // Get the rendered metrics as a string from the StringWriter + return writer.String(), totalCnt, nil +} + func (h *Handler) Kill() { h.kill.Break() } diff --git a/pkg/service/process.go b/pkg/service/process.go index 6809a3a7..38c6a03f 100644 --- a/pkg/service/process.go +++ b/pkg/service/process.go @@ -16,10 +16,13 @@ package service import ( "context" + "github.com/prometheus/common/expfmt" + "golang.org/x/exp/maps" "net" "os" "os/exec" "path" + "strings" "syscall" "time" @@ -37,6 +40,8 @@ import ( "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/tracer" "github.com/livekit/protocol/utils" + + dto "github.com/prometheus/client_model/go" ) type Process struct { @@ -195,3 +200,52 @@ func (s *Service) KillAll() { func getSocketAddress(handlerTmpDir string) string { return path.Join(handlerTmpDir, "service_rpc.sock") } + +// Gather implements the prometheus.Gatherer interface on server-side to allow aggregation of handler metrics +func (p *Process) Gather() ([]*dto.MetricFamily, error) { + // Get the metrics from the handler via IPC + logger.Debugw("gathering metrics from handler process", "egress_id", p.req.EgressId) + metricsResponse, err := p.grpcClient.GetMetrics(context.Background(), &ipc.MetricsRequest{}) + if err != nil { + logger.Warnw("Error obtaining metrics from handler, skipping", err, "egress_id", p.req.EgressId) + return make([]*dto.MetricFamily, 0), nil // don't return an error, just skip this handler + } + // Parse the result to match the Gatherer interface + parser := &expfmt.TextParser{} + families, err := parser.TextToMetricFamilies(strings.NewReader(metricsResponse.Metrics)) + if err != nil { + logger.Warnw("Error parsing metrics from handler, skipping", err, "egress_id", p.req.EgressId) + return make([]*dto.MetricFamily, 0), nil // don't return an error, just skip this handler + } + + // Add an egress_id label to every metric all the families, if it doesn't already have one + applyDefaultLabel(p, families) + + return maps.Values(families), nil +} + +func applyDefaultLabel(p *Process, families map[string]*dto.MetricFamily) { + egressLabelPair := &dto.LabelPair{ + Name: StringPtr("egress_id"), + Value: &p.req.EgressId, + } + for _, family := range families { + for _, metric := range family.Metric { + if metric.Label == nil { + metric.Label = make([]*dto.LabelPair, 0) + } + found := false + for _, label := range metric.Label { + if label.GetName() == "egress_id" { + found = true + break + } + } + if !found { + metric.Label = append(metric.Label, egressLabelPair) + } + } + } +} + +func StringPtr(v string) *string { return &v } diff --git a/pkg/service/service.go b/pkg/service/service.go index 22625e32..612b9487 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -18,6 +18,8 @@ import ( "context" "encoding/json" "fmt" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "net" "net/http" "sync" @@ -65,7 +67,7 @@ func NewService(conf *config.ServiceConfig, ioClient rpc.IOInfoClient) (*Service if conf.PrometheusPort > 0 { s.promServer = &http.Server{ Addr: fmt.Sprintf(":%d", conf.PrometheusPort), - Handler: promhttp.Handler(), + Handler: s.PromHandler(), } } @@ -224,3 +226,30 @@ func (s *Service) Close() { logger.Infow("closing server") s.psrpcServer.Shutdown() } + +func (s *Service) CreateGatherer() prometheus.Gatherer { + return prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { + _, span := tracer.Start(context.Background(), "Service.GathererOfHandlerMetrics") + defer span.End() + + s.mu.RLock() + defer s.mu.RUnlock() + + logger.Debugw("gathering metrics", "numHandlers", len(s.activeHandlers)) + + gatherers := prometheus.Gatherers{} + // Include the default repo + gatherers = append(gatherers, prometheus.DefaultGatherer) + // add all the active handlers as sources + for _, v := range s.activeHandlers { + gatherers = append(gatherers, v) + } + return gatherers.Gather() + }) +} + +func (s *Service) PromHandler() http.Handler { + return promhttp.InstrumentMetricHandler( + prometheus.DefaultRegisterer, promhttp.HandlerFor(s.CreateGatherer(), promhttp.HandlerOpts{}), + ) +} diff --git a/pkg/stats/handler_monitor.go b/pkg/stats/handler_monitor.go new file mode 100644 index 00000000..b6e8fe93 --- /dev/null +++ b/pkg/stats/handler_monitor.go @@ -0,0 +1,86 @@ +package stats + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +type HandlerMonitor struct { + uploadsCounter *prometheus.CounterVec + uploadsResponseTime *prometheus.HistogramVec + backupCounter *prometheus.CounterVec +} + +func NewHandlerMonitor(nodeId string, clusterId string, egressId string) *HandlerMonitor { + m := &HandlerMonitor{} + + constantLabels := prometheus.Labels{"node_id": nodeId, "cluster_id": clusterId, "egress_id": egressId} + + m.uploadsCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "livekit", + Subsystem: "egress", + Name: "pipeline_uploads", + Help: "Number of uploads per pipeline with type and status labels", + ConstLabels: constantLabels, + }, []string{"type", "status"}) // type: file, manifest, segment, liveplaylist, playlist; status: success,failure + + m.uploadsResponseTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "livekit", + Subsystem: "egress", + Name: "pipline_upload_response_time_ms", + Help: "A histogram of latencies for upload requests in milliseconds.", + Buckets: []float64{10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 15000, 20000, 30000}, + ConstLabels: constantLabels, + }, []string{"type", "status"}) + + m.backupCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "livekit", + Subsystem: "egress", + Name: "backup_storage_writes", + Help: "number of writes to backup storage location by output type", + ConstLabels: constantLabels, + }, []string{"output_type"}) + + prometheus.MustRegister(m.uploadsCounter, m.uploadsResponseTime, m.backupCounter) + + return m +} + +func (m *HandlerMonitor) IncUploadCountSuccess(uploadType string, elapsed float64) { + labels := prometheus.Labels{"type": uploadType, "status": "success"} + m.uploadsCounter.With(labels).Add(1) + m.uploadsResponseTime.With(labels).Observe(elapsed) +} + +func (m *HandlerMonitor) IncUploadCountFailure(uploadType string, elapsed float64) { + labels := prometheus.Labels{"type": uploadType, "status": "failure"} + m.uploadsCounter.With(labels).Add(1) + m.uploadsResponseTime.With(labels).Observe(elapsed) +} + +func (m *HandlerMonitor) IncBackupStorageWrites(outputType string) { + m.backupCounter.With(prometheus.Labels{"output_type": outputType}).Add(1) +} + +func (m *HandlerMonitor) RegisterSegmentsChannelSizeGauge(nodeId string, clusterId string, egressId string, channelSizeFunction func() float64) { + segmentsUploadsGauge := prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "livekit", + Subsystem: "egress", + Name: "segments_uploads_channel_size", + Help: "number of segment uploads pending in channel", + ConstLabels: prometheus.Labels{"node_id": nodeId, "cluster_id": clusterId, "egress_id": egressId}, + }, channelSizeFunction) + prometheus.MustRegister(segmentsUploadsGauge) +} + +func (m *HandlerMonitor) RegisterPlaylistChannelSizeGauge(nodeId string, clusterId string, egressId string, channelSizeFunction func() float64) { + playlistUploadsGauge := prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "livekit", + Subsystem: "egress", + Name: "playlist_uploads_channel_size", + Help: "number of playlist updates pending in channel", + ConstLabels: prometheus.Labels{"node_id": nodeId, "cluster_id": clusterId, "egress_id": egressId}, + }, channelSizeFunction) + prometheus.MustRegister(playlistUploadsGauge) +}