diff --git a/README.md b/README.md index 43bb114a..a1dab27d 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,12 @@ s3: region: AWS_DEFAULT_REGION env or IAM role can be used instead endpoint: (optional) custom endpoint bucket: bucket to upload files to + # the following s3 options can only be set in config, *not* per request, they will be added to any per-request options + proxy: (optional, no default) proxy url + max_retries: (optional, default=3) number or retries to attempt + max_retry_delay: (optional, default=5s) max delay between retries (e.g. 5s, 100ms, 1m...) + min_retry_delay: (optional, default=500ms) min delay between retries (e.g. 100ms, 1s...) + aws_log_level: (optional, default=LogOff) log level for aws sdk (LogDebugWithRequestRetries, LogDebug, ...) azure: account_name: AZURE_STORAGE_ACCOUNT env can be used instead account_key: AZURE_STORAGE_KEY env can be used instead diff --git a/pkg/config/base.go b/pkg/config/base.go index a8e1f232..6ee8b405 100644 --- a/pkg/config/base.go +++ b/pkg/config/base.go @@ -63,12 +63,17 @@ type StorageConfig struct { } type S3Config struct { - AccessKey string `yaml:"access_key"` // (env AWS_ACCESS_KEY_ID) - Secret string `yaml:"secret"` // (env AWS_SECRET_ACCESS_KEY) - Region string `yaml:"region"` // (env AWS_DEFAULT_REGION) - Endpoint string `yaml:"endpoint"` - Bucket string `yaml:"bucket"` - ForcePathStyle bool `yaml:"force_path_style"` + AccessKey string `yaml:"access_key"` // (env AWS_ACCESS_KEY_ID) + Secret string `yaml:"secret"` // (env AWS_SECRET_ACCESS_KEY) + Region string `yaml:"region"` // (env AWS_DEFAULT_REGION) + Endpoint string `yaml:"endpoint"` + Bucket string `yaml:"bucket"` + ForcePathStyle bool `yaml:"force_path_style"` + Proxy string `yaml:"proxy"` + MaxRetries int `yaml:"max_retries"` + MaxRetryDelay time.Duration `yaml:"max_retry_delay"` + MinRetryDelay time.Duration `yaml:"min_retry_delay"` + AwsLogLevel string `yaml:"aws_log_level"` } type AzureConfig struct { diff --git a/pkg/config/uploads.go b/pkg/config/uploads.go index 0dc24bf2..2d8e96c2 100644 --- a/pkg/config/uploads.go +++ b/pkg/config/uploads.go @@ -15,8 +15,10 @@ package config import ( + "github.com/aws/aws-sdk-go/aws" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" + "time" ) type UploadConfig interface{} @@ -28,9 +30,31 @@ type uploadRequest interface { GetAliOSS() *livekit.AliOSSUpload } +type EgressS3Upload struct { + *livekit.S3Upload + Proxy string + MaxRetries int + MaxRetryDelay time.Duration + MinRetryDelay time.Duration + AwsLogLevel aws.LogLevelType +} + func (p *PipelineConfig) getUploadConfig(req uploadRequest) UploadConfig { if s3 := req.GetS3(); s3 != nil { - return s3 + s3StorageConfigFromReq := &EgressS3Upload{ + S3Upload: s3, + } + // merge in options from config (proxy, retry limit, delay and aws logging) if specified + if p.S3 != nil { + // parse config.yaml options and get defaults + S3StorageConfigFromConfigYaml := p.ToUploadConfig().(*EgressS3Upload) + // merge into pipeline config created from request options + s3StorageConfigFromReq.Proxy = S3StorageConfigFromConfigYaml.Proxy + s3StorageConfigFromReq.MaxRetries = S3StorageConfigFromConfigYaml.MaxRetries + s3StorageConfigFromReq.MaxRetryDelay = S3StorageConfigFromConfigYaml.MaxRetryDelay + s3StorageConfigFromReq.AwsLogLevel = S3StorageConfigFromConfigYaml.AwsLogLevel + } + return s3StorageConfigFromReq } if gcp := req.GetGcp(); gcp != nil { return gcp @@ -47,14 +71,50 @@ func (p *PipelineConfig) getUploadConfig(req uploadRequest) UploadConfig { func (c StorageConfig) ToUploadConfig() UploadConfig { if c.S3 != nil { - return &livekit.S3Upload{ - AccessKey: c.S3.AccessKey, - Secret: c.S3.Secret, - Region: c.S3.Region, - Endpoint: c.S3.Endpoint, - Bucket: c.S3.Bucket, - ForcePathStyle: c.S3.ForcePathStyle, + s3StorageConfig := &EgressS3Upload{ + S3Upload: &livekit.S3Upload{ + AccessKey: c.S3.AccessKey, + Secret: c.S3.Secret, + Region: c.S3.Region, + Endpoint: c.S3.Endpoint, + Bucket: c.S3.Bucket, + ForcePathStyle: c.S3.ForcePathStyle, + }, + Proxy: c.S3.Proxy, + } + // Handle max retries with default + if c.S3.MaxRetries > 0 { + s3StorageConfig.MaxRetries = c.S3.MaxRetries + } else { + s3StorageConfig.MaxRetries = 3 + } + // Handle min/max delay (for backoff) with defaults + if c.S3.MaxRetryDelay > 0 { + s3StorageConfig.MaxRetryDelay = c.S3.MaxRetryDelay + } else { + s3StorageConfig.MaxRetryDelay = time.Second * 5 + } + if c.S3.MinRetryDelay > 0 { + s3StorageConfig.MinRetryDelay = c.S3.MinRetryDelay + } else { + s3StorageConfig.MinRetryDelay = time.Millisecond * 100 + } + // Handle AWS log level with default + switch c.S3.AwsLogLevel { + case "LogDebugWithRequestRetries": + s3StorageConfig.AwsLogLevel = aws.LogDebugWithRequestRetries + case "LogDebug": + s3StorageConfig.AwsLogLevel = aws.LogDebug + case "LogDebugWithRequestErrors": + s3StorageConfig.AwsLogLevel = aws.LogDebugWithRequestErrors + case "LogDebugWithHTTPBody": + s3StorageConfig.AwsLogLevel = aws.LogDebugWithHTTPBody + case "LogDebugWithSigning": + s3StorageConfig.AwsLogLevel = aws.LogDebugWithSigning + default: + s3StorageConfig.AwsLogLevel = aws.LogOff } + return s3StorageConfig } if c.Azure != nil { return &livekit.AzureBlobUpload{ diff --git a/pkg/pipeline/sink/uploader/s3.go b/pkg/pipeline/sink/uploader/s3.go index 5c6526b6..3154bcdd 100644 --- a/pkg/pipeline/sink/uploader/s3.go +++ b/pkg/pipeline/sink/uploader/s3.go @@ -16,16 +16,19 @@ package uploader import ( "fmt" - "os" - "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/client" "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/livekit/egress/pkg/config" + "net/http" + "net/url" + "os" "github.com/livekit/egress/pkg/types" - "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/psrpc" ) @@ -34,6 +37,17 @@ const ( getBucketLocationRegion = "us-east-1" ) +// CustomRetryer wraps the SDK's built in DefaultRetryer adding additional +// custom features. Namely, to always retry. +type CustomRetryer struct { + client.DefaultRetryer +} + +// ShouldRetry overrides the SDK's built in DefaultRetryer because the PUTs for segments/playlists are always idempotent +func (r CustomRetryer) ShouldRetry(req *request.Request) bool { + return true +} + type S3Uploader struct { awsConfig *aws.Config bucket *string @@ -41,11 +55,24 @@ type S3Uploader struct { tagging *string } -func newS3Uploader(conf *livekit.S3Upload) (uploader, error) { +func newS3Uploader(conf *config.EgressS3Upload) (uploader, error) { awsConfig := &aws.Config{ - MaxRetries: aws.Int(maxRetries), // Switching to v2 of the aws Go SDK would allow to set a maxDelay as well. + Retryer: &CustomRetryer{ + DefaultRetryer: client.DefaultRetryer{ + NumMaxRetries: conf.MaxRetries, + MaxRetryDelay: conf.MaxRetryDelay, + MaxThrottleDelay: conf.MaxRetryDelay, + MinRetryDelay: conf.MinRetryDelay, + MinThrottleDelay: conf.MinRetryDelay, + }, + }, S3ForcePathStyle: aws.Bool(conf.ForcePathStyle), + LogLevel: aws.LogLevel(conf.AwsLogLevel), } + logger.Infow("setting AWS config", "maxRetries", conf.MaxRetries, + "maxDelay", conf.MaxRetryDelay, + "minDelay", conf.MinRetryDelay, + ) if conf.AccessKey != "" && conf.Secret != "" { awsConfig.Credentials = credentials.NewStaticCredentials(conf.AccessKey, conf.Secret, "") } @@ -71,6 +98,22 @@ func newS3Uploader(conf *livekit.S3Upload) (uploader, error) { u.awsConfig.Region = aws.String(region) } + if conf.Proxy != "" { + logger.Infow("configuring s3 with proxy", "proxyEndpoint", conf.Proxy) + // Proxy configuration + proxyURL, err := url.Parse(conf.Proxy) + if err != nil { + logger.Errorw("failed to parse proxy URL -- proxy not set", err, "proxy", conf.Proxy) + } else { + proxyTransport := &http.Transport{ + Proxy: http.ProxyURL(proxyURL), + } + u.awsConfig.HTTPClient = &http.Client{Transport: proxyTransport} + } + } else { + logger.Infow("not configuring s3 with proxy since none was provided in config") + } + if len(conf.Metadata) > 0 { u.metadata = make(map[string]*string, len(conf.Metadata)) for k, v := range conf.Metadata { diff --git a/pkg/pipeline/sink/uploader/uploader.go b/pkg/pipeline/sink/uploader/uploader.go index 7d96c900..06ba1a8c 100644 --- a/pkg/pipeline/sink/uploader/uploader.go +++ b/pkg/pipeline/sink/uploader/uploader.go @@ -46,7 +46,7 @@ func New(conf config.UploadConfig, backup string) (Uploader, error) { var err error switch c := conf.(type) { - case *livekit.S3Upload: + case *config.EgressS3Upload: u, err = newS3Uploader(c) case *livekit.GCPUpload: u, err = newGCPUploader(c)