Skip to content

Commit

Permalink
more fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Nov 15, 2023
1 parent 21bffe7 commit 0f65ddc
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 46 deletions.
56 changes: 27 additions & 29 deletions pkg/config/uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,21 @@ type EgressS3Upload struct {

func (p *PipelineConfig) getUploadConfig(req uploadRequest) UploadConfig {
if s3 := req.GetS3(); s3 != nil {
s3StorageConfigFromReq := &EgressS3Upload{
s3Conf := &EgressS3Upload{
S3Upload: s3,
}
// merge in options from config (proxy, retry limit, delay and aws logging) if specified
if p.S3 != nil {
// parse config.yaml options and get defaults
S3StorageConfigFromConfigYaml := p.ToUploadConfig().(*EgressS3Upload)
// merge into pipeline config created from request options
s3StorageConfigFromReq.Proxy = S3StorageConfigFromConfigYaml.Proxy
s3StorageConfigFromReq.MaxRetries = S3StorageConfigFromConfigYaml.MaxRetries
s3StorageConfigFromReq.MaxRetryDelay = S3StorageConfigFromConfigYaml.MaxRetryDelay
s3StorageConfigFromReq.AwsLogLevel = S3StorageConfigFromConfigYaml.AwsLogLevel
if s3Base, ok := p.ToUploadConfig().(*EgressS3Upload); ok {
// merge into pipeline config created from request options
s3Conf.Proxy = s3Base.Proxy
s3Conf.MaxRetries = s3Base.MaxRetries
s3Conf.MaxRetryDelay = s3Base.MaxRetryDelay
s3Conf.AwsLogLevel = s3Base.AwsLogLevel
}
}
return s3StorageConfigFromReq
return s3Conf
}
if gcp := req.GetGcp(); gcp != nil {
return gcp
Expand All @@ -73,7 +74,7 @@ func (p *PipelineConfig) getUploadConfig(req uploadRequest) UploadConfig {

func (c StorageConfig) ToUploadConfig() UploadConfig {
if c.S3 != nil {
s3StorageConfig := &EgressS3Upload{
s3 := &EgressS3Upload{
S3Upload: &livekit.S3Upload{
AccessKey: c.S3.AccessKey,
Secret: c.S3.Secret,
Expand All @@ -82,41 +83,38 @@ func (c StorageConfig) ToUploadConfig() UploadConfig {
Bucket: c.S3.Bucket,
ForcePathStyle: c.S3.ForcePathStyle,
},
Proxy: c.S3.Proxy,
Proxy: c.S3.Proxy,
MaxRetries: 3,
MaxRetryDelay: time.Second * 5,
MinRetryDelay: time.Millisecond * 100,
}
// Handle max retries with default
if c.S3.MaxRetries > 0 {
s3StorageConfig.MaxRetries = c.S3.MaxRetries
} else {
s3StorageConfig.MaxRetries = 3
s3.MaxRetries = c.S3.MaxRetries
}
// Handle min/max delay (for backoff) with defaults
if c.S3.MaxRetryDelay > 0 {
s3StorageConfig.MaxRetryDelay = c.S3.MaxRetryDelay
} else {
s3StorageConfig.MaxRetryDelay = time.Second * 5
s3.MaxRetryDelay = c.S3.MaxRetryDelay
}
if c.S3.MinRetryDelay > 0 {
s3StorageConfig.MinRetryDelay = c.S3.MinRetryDelay
} else {
s3StorageConfig.MinRetryDelay = time.Millisecond * 100
s3.MinRetryDelay = c.S3.MinRetryDelay
}
// Handle AWS log level with default

// Handle AWS log level
switch c.S3.AwsLogLevel {
case "LogDebugWithRequestRetries":
s3StorageConfig.AwsLogLevel = aws.LogDebugWithRequestRetries
s3.AwsLogLevel = aws.LogDebugWithRequestRetries
case "LogDebug":
s3StorageConfig.AwsLogLevel = aws.LogDebug
s3.AwsLogLevel = aws.LogDebug
case "LogDebugWithRequestErrors":
s3StorageConfig.AwsLogLevel = aws.LogDebugWithRequestErrors
s3.AwsLogLevel = aws.LogDebugWithRequestErrors
case "LogDebugWithHTTPBody":
s3StorageConfig.AwsLogLevel = aws.LogDebugWithHTTPBody
s3.AwsLogLevel = aws.LogDebugWithHTTPBody
case "LogDebugWithSigning":
s3StorageConfig.AwsLogLevel = aws.LogDebugWithSigning
s3.AwsLogLevel = aws.LogDebugWithSigning
default:
s3StorageConfig.AwsLogLevel = aws.LogOff
s3.AwsLogLevel = aws.LogOff
}
return s3StorageConfig

return s3
}
if c.Azure != nil {
return &livekit.AzureBlobUpload{
Expand Down
5 changes: 2 additions & 3 deletions pkg/pipeline/sink/uploader/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func newS3Uploader(conf *config.EgressS3Upload) (uploader, error) {
S3ForcePathStyle: aws.Bool(conf.ForcePathStyle),
LogLevel: aws.LogLevel(conf.AwsLogLevel),
}
logger.Debugw("setting AWS config", "maxRetries", conf.MaxRetries,
logger.Debugw("setting S3 config",
"maxRetries", conf.MaxRetries,
"maxDelay", conf.MaxRetryDelay,
"minDelay", conf.MinRetryDelay,
)
Expand Down Expand Up @@ -115,8 +116,6 @@ func newS3Uploader(conf *config.EgressS3Upload) (uploader, error) {
}
u.awsConfig.HTTPClient = &http.Client{Transport: proxyTransport}
}
} else {
logger.Debugw("not configuring s3 with proxy since none was provided in config")
}

if len(conf.Metadata) > 0 {
Expand Down
9 changes: 6 additions & 3 deletions test/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,28 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/api/option"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)

func download(t *testing.T, uploadParams interface{}, localFilepath, storageFilepath string) {
logger.Debugw("download", "localFilepath", localFilepath, "storageFilepath", storageFilepath)
switch u := uploadParams.(type) {
case *livekit.S3Upload:
case *config.EgressS3Upload:
logger.Debugw("s3 download", "localFilepath", localFilepath, "storageFilepath", storageFilepath)
downloadS3(t, u, localFilepath, storageFilepath)

case *livekit.GCPUpload:
logger.Debugw("gcp download", "localFilepath", localFilepath, "storageFilepath", storageFilepath)
downloadGCP(t, u, localFilepath, storageFilepath)

case *livekit.AzureBlobUpload:
logger.Debugw("azure download", "localFilepath", localFilepath, "storageFilepath", storageFilepath)
downloadAzure(t, u, localFilepath, storageFilepath)
}
}

func downloadS3(t *testing.T, conf *livekit.S3Upload, localFilepath, storageFilepath string) {
func downloadS3(t *testing.T, conf *config.EgressS3Upload, localFilepath, storageFilepath string) {
sess, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(conf.AccessKey, conf.Secret, ""),
Endpoint: aws.String(conf.Endpoint),
Expand Down
2 changes: 1 addition & 1 deletion test/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type testCase struct {

// used by segmented file tests
playlist string
live_playlist string
livePlaylist string
filenameSuffix livekit.SegmentedFileSuffix

// used by images tests
Expand Down
4 changes: 2 additions & 2 deletions test/room_composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (r *Runner) testRoomCompositeSegments(t *testing.T) {
},
filename: "r_{room_name}_{time}",
playlist: "r_{room_name}_{time}.m3u8",
live_playlist: "r_live_{room_name}_{time}.m3u8",
livePlaylist: "r_live_{room_name}_{time}.m3u8",
filenameSuffix: livekit.SegmentedFileSuffix_TIMESTAMP,
expectVideoEncoding: true,
},
Expand All @@ -222,7 +222,7 @@ func (r *Runner) testRoomCompositeSegments(t *testing.T) {
segmentOutput := &livekit.SegmentedFileOutput{
FilenamePrefix: r.getFilePath(test.filename),
PlaylistName: test.playlist,
LivePlaylistName: test.live_playlist,
LivePlaylistName: test.livePlaylist,
FilenameSuffix: test.filenameSuffix,
}
if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.GCPUpload != nil {
Expand Down
2 changes: 1 addition & 1 deletion test/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (r *Runner) runSegmentsTest(t *testing.T, req *rpc.StartEgressRequest, test
p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req)
require.NoError(t, err)

r.verifySegments(t, p, test.filenameSuffix, res, test.live_playlist != "")
r.verifySegments(t, p, test.filenameSuffix, res, test.livePlaylist != "")
if !test.audioOnly {
require.Equal(t, test.expectVideoEncoding, p.VideoEncoding)
}
Expand Down
14 changes: 7 additions & 7 deletions test/track_composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ func (r *Runner) testTrackCompositeSegments(t *testing.T) {
playlist: "tcs_{publisher_identity}_vp8_{time}.m3u8",
},
{
name: "H264",
audioCodec: types.MimeTypeOpus,
videoCodec: types.MimeTypeH264,
filename: "tcs_{room_name}_h264_{time}",
playlist: "tcs_{room_name}_h264_{time}.m3u8",
live_playlist: "tcs_live_{room_name}_h264_{time}.m3u8",
name: "H264",
audioCodec: types.MimeTypeOpus,
videoCodec: types.MimeTypeH264,
filename: "tcs_{room_name}_h264_{time}",
playlist: "tcs_{room_name}_h264_{time}.m3u8",
livePlaylist: "tcs_live_{room_name}_h264_{time}.m3u8",
},
{
name: "Audio Only",
Expand All @@ -189,7 +189,7 @@ func (r *Runner) testTrackCompositeSegments(t *testing.T) {
segmentOutput := &livekit.SegmentedFileOutput{
FilenamePrefix: r.getFilePath(test.filename),
PlaylistName: test.playlist,
LivePlaylistName: test.live_playlist,
LivePlaylistName: test.livePlaylist,
FilenameSuffix: test.filenameSuffix,
}
if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.S3Upload != nil {
Expand Down

0 comments on commit 0f65ddc

Please sign in to comment.