Skip to content

Commit

Permalink
Add more options to the S3 section of config.yaml for egress
Browse files Browse the repository at this point in the history
- HTTP proxy
- max retries
- max/min retry delay
- AWS logging option
  • Loading branch information
dandoug committed Oct 24, 2023
1 parent c4f2e12 commit cd51bf7
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 20 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ s3:
region: AWS_DEFAULT_REGION env or IAM role can be used instead
endpoint: (optional) custom endpoint
bucket: bucket to upload files to
# the following s3 options can only be set in config, *not* per request, they will be added to any per-request options
proxy: (optional, no default) proxy url
max_retries: (optional, default=3) number or retries to attempt
max_retry_delay: (optional, default=5s) max delay between retries (e.g. 5s, 100ms, 1m...)
min_retry_delay: (optional, default=500ms) min delay between retries (e.g. 100ms, 1s...)
aws_log_level: (optional, default=LogOff) log level for aws sdk (LogDebugWithRequestRetries, LogDebug, ...)
azure:
account_name: AZURE_STORAGE_ACCOUNT env can be used instead
account_key: AZURE_STORAGE_KEY env can be used instead
Expand Down
17 changes: 11 additions & 6 deletions pkg/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,17 @@ type StorageConfig struct {
}

type S3Config struct {
AccessKey string `yaml:"access_key"` // (env AWS_ACCESS_KEY_ID)
Secret string `yaml:"secret"` // (env AWS_SECRET_ACCESS_KEY)
Region string `yaml:"region"` // (env AWS_DEFAULT_REGION)
Endpoint string `yaml:"endpoint"`
Bucket string `yaml:"bucket"`
ForcePathStyle bool `yaml:"force_path_style"`
AccessKey string `yaml:"access_key"` // (env AWS_ACCESS_KEY_ID)
Secret string `yaml:"secret"` // (env AWS_SECRET_ACCESS_KEY)
Region string `yaml:"region"` // (env AWS_DEFAULT_REGION)
Endpoint string `yaml:"endpoint"`
Bucket string `yaml:"bucket"`
ForcePathStyle bool `yaml:"force_path_style"`
Proxy string `yaml:"proxy"`
MaxRetries int `yaml:"max_retries"`
MaxRetryDelay time.Duration `yaml:"max_retry_delay"`
MinRetryDelay time.Duration `yaml:"min_retry_delay"`
AwsLogLevel string `yaml:"aws_log_level"`
}

type AzureConfig struct {
Expand Down
76 changes: 68 additions & 8 deletions pkg/config/uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package config

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/utils"
"time"
)

type UploadConfig interface{}
Expand All @@ -28,9 +30,31 @@ type uploadRequest interface {
GetAliOSS() *livekit.AliOSSUpload
}

type EgressS3Upload struct {
*livekit.S3Upload
Proxy string
MaxRetries int
MaxRetryDelay time.Duration
MinRetryDelay time.Duration
AwsLogLevel aws.LogLevelType
}

func (p *PipelineConfig) getUploadConfig(req uploadRequest) UploadConfig {
if s3 := req.GetS3(); s3 != nil {
return s3
s3StorageConfigFromReq := &EgressS3Upload{
S3Upload: s3,
}
// merge in options from config (proxy, retry limit, delay and aws logging) if specified
if p.S3 != nil {
// parse config.yaml options and get defaults
S3StorageConfigFromConfigYaml := p.ToUploadConfig().(*EgressS3Upload)
// merge into pipeline config created from request options
s3StorageConfigFromReq.Proxy = S3StorageConfigFromConfigYaml.Proxy
s3StorageConfigFromReq.MaxRetries = S3StorageConfigFromConfigYaml.MaxRetries
s3StorageConfigFromReq.MaxRetryDelay = S3StorageConfigFromConfigYaml.MaxRetryDelay
s3StorageConfigFromReq.AwsLogLevel = S3StorageConfigFromConfigYaml.AwsLogLevel
}
return s3StorageConfigFromReq
}
if gcp := req.GetGcp(); gcp != nil {
return gcp
Expand All @@ -47,14 +71,50 @@ func (p *PipelineConfig) getUploadConfig(req uploadRequest) UploadConfig {

func (c StorageConfig) ToUploadConfig() UploadConfig {
if c.S3 != nil {
return &livekit.S3Upload{
AccessKey: c.S3.AccessKey,
Secret: c.S3.Secret,
Region: c.S3.Region,
Endpoint: c.S3.Endpoint,
Bucket: c.S3.Bucket,
ForcePathStyle: c.S3.ForcePathStyle,
s3StorageConfig := &EgressS3Upload{
S3Upload: &livekit.S3Upload{
AccessKey: c.S3.AccessKey,
Secret: c.S3.Secret,
Region: c.S3.Region,
Endpoint: c.S3.Endpoint,
Bucket: c.S3.Bucket,
ForcePathStyle: c.S3.ForcePathStyle,
},
Proxy: c.S3.Proxy,
}
// Handle max retries with default
if c.S3.MaxRetries > 0 {
s3StorageConfig.MaxRetries = c.S3.MaxRetries
} else {
s3StorageConfig.MaxRetries = 3
}
// Handle min/max delay (for backoff) with defaults
if c.S3.MaxRetryDelay > 0 {
s3StorageConfig.MaxRetryDelay = c.S3.MaxRetryDelay
} else {
s3StorageConfig.MaxRetryDelay = time.Second * 5
}
if c.S3.MinRetryDelay > 0 {
s3StorageConfig.MinRetryDelay = c.S3.MinRetryDelay
} else {
s3StorageConfig.MinRetryDelay = time.Millisecond * 100
}
// Handle AWS log level with default
switch c.S3.AwsLogLevel {
case "LogDebugWithRequestRetries":
s3StorageConfig.AwsLogLevel = aws.LogDebugWithRequestRetries
case "LogDebug":
s3StorageConfig.AwsLogLevel = aws.LogDebug
case "LogDebugWithRequestErrors":
s3StorageConfig.AwsLogLevel = aws.LogDebugWithRequestErrors
case "LogDebugWithHTTPBody":
s3StorageConfig.AwsLogLevel = aws.LogDebugWithHTTPBody
case "LogDebugWithSigning":
s3StorageConfig.AwsLogLevel = aws.LogDebugWithSigning
default:
s3StorageConfig.AwsLogLevel = aws.LogOff
}
return s3StorageConfig
}
if c.Azure != nil {
return &livekit.AzureBlobUpload{
Expand Down
53 changes: 48 additions & 5 deletions pkg/pipeline/sink/uploader/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ package uploader

import (
"fmt"
"os"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/livekit/egress/pkg/config"
"net/http"
"net/url"
"os"

"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/psrpc"
)
Expand All @@ -34,18 +37,42 @@ const (
getBucketLocationRegion = "us-east-1"
)

// CustomRetryer wraps the SDK's built in DefaultRetryer adding additional
// custom features. Namely, to always retry.
type CustomRetryer struct {
client.DefaultRetryer
}

// ShouldRetry overrides the SDK's built in DefaultRetryer because the PUTs for segments/playlists are always idempotent
func (r CustomRetryer) ShouldRetry(req *request.Request) bool {
return true
}

type S3Uploader struct {
awsConfig *aws.Config
bucket *string
metadata map[string]*string
tagging *string
}

func newS3Uploader(conf *livekit.S3Upload) (uploader, error) {
func newS3Uploader(conf *config.EgressS3Upload) (uploader, error) {
awsConfig := &aws.Config{
MaxRetries: aws.Int(maxRetries), // Switching to v2 of the aws Go SDK would allow to set a maxDelay as well.
Retryer: &CustomRetryer{
DefaultRetryer: client.DefaultRetryer{
NumMaxRetries: conf.MaxRetries,
MaxRetryDelay: conf.MaxRetryDelay,
MaxThrottleDelay: conf.MaxRetryDelay,
MinRetryDelay: conf.MinRetryDelay,
MinThrottleDelay: conf.MinRetryDelay,
},
},
S3ForcePathStyle: aws.Bool(conf.ForcePathStyle),
LogLevel: aws.LogLevel(conf.AwsLogLevel),
}
logger.Infow("setting AWS config", "maxRetries", conf.MaxRetries,
"maxDelay", conf.MaxRetryDelay,
"minDelay", conf.MinRetryDelay,
)
if conf.AccessKey != "" && conf.Secret != "" {
awsConfig.Credentials = credentials.NewStaticCredentials(conf.AccessKey, conf.Secret, "")
}
Expand All @@ -71,6 +98,22 @@ func newS3Uploader(conf *livekit.S3Upload) (uploader, error) {
u.awsConfig.Region = aws.String(region)
}

if conf.Proxy != "" {
logger.Infow("configuring s3 with proxy", "proxyEndpoint", conf.Proxy)
// Proxy configuration
proxyURL, err := url.Parse(conf.Proxy)
if err != nil {
logger.Errorw("failed to parse proxy URL -- proxy not set", err, "proxy", conf.Proxy)
} else {
proxyTransport := &http.Transport{
Proxy: http.ProxyURL(proxyURL),
}
u.awsConfig.HTTPClient = &http.Client{Transport: proxyTransport}
}
} else {
logger.Infow("not configuring s3 with proxy since none was provided in config")
}

if len(conf.Metadata) > 0 {
u.metadata = make(map[string]*string, len(conf.Metadata))
for k, v := range conf.Metadata {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/sink/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func New(conf config.UploadConfig, backup string) (Uploader, error) {
var err error

switch c := conf.(type) {
case *livekit.S3Upload:
case *config.EgressS3Upload:
u, err = newS3Uploader(c)
case *livekit.GCPUpload:
u, err = newGCPUploader(c)
Expand Down

0 comments on commit cd51bf7

Please sign in to comment.