Skip to content

Commit

Permalink
Uploads cleanup (#539)
Browse files Browse the repository at this point in the history
* cleanup

* fix panic

* more fixes

* shorten video delay on participant multi
  • Loading branch information
frostbyte73 authored Nov 15, 2023
1 parent d5fffdb commit f186c66
Show file tree
Hide file tree
Showing 18 changed files with 107 additions and 93 deletions.
57 changes: 28 additions & 29 deletions pkg/config/uploads.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"

"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/utils"
)
Expand All @@ -42,20 +43,21 @@ type EgressS3Upload struct {

func (p *PipelineConfig) getUploadConfig(req uploadRequest) UploadConfig {
if s3 := req.GetS3(); s3 != nil {
s3StorageConfigFromReq := &EgressS3Upload{
s3Conf := &EgressS3Upload{
S3Upload: s3,
}
// merge in options from config (proxy, retry limit, delay and aws logging) if specified
if p.S3 != nil {
// parse config.yaml options and get defaults
S3StorageConfigFromConfigYaml := p.ToUploadConfig().(*EgressS3Upload)
// merge into pipeline config created from request options
s3StorageConfigFromReq.Proxy = S3StorageConfigFromConfigYaml.Proxy
s3StorageConfigFromReq.MaxRetries = S3StorageConfigFromConfigYaml.MaxRetries
s3StorageConfigFromReq.MaxRetryDelay = S3StorageConfigFromConfigYaml.MaxRetryDelay
s3StorageConfigFromReq.AwsLogLevel = S3StorageConfigFromConfigYaml.AwsLogLevel
if s3Base, ok := p.ToUploadConfig().(*EgressS3Upload); ok {
// merge into pipeline config created from request options
s3Conf.Proxy = s3Base.Proxy
s3Conf.MaxRetries = s3Base.MaxRetries
s3Conf.MaxRetryDelay = s3Base.MaxRetryDelay
s3Conf.AwsLogLevel = s3Base.AwsLogLevel
}
}
return s3StorageConfigFromReq
return s3Conf
}
if gcp := req.GetGcp(); gcp != nil {
return gcp
Expand All @@ -72,7 +74,7 @@ func (p *PipelineConfig) getUploadConfig(req uploadRequest) UploadConfig {

func (c StorageConfig) ToUploadConfig() UploadConfig {
if c.S3 != nil {
s3StorageConfig := &EgressS3Upload{
s3 := &EgressS3Upload{
S3Upload: &livekit.S3Upload{
AccessKey: c.S3.AccessKey,
Secret: c.S3.Secret,
Expand All @@ -81,41 +83,38 @@ func (c StorageConfig) ToUploadConfig() UploadConfig {
Bucket: c.S3.Bucket,
ForcePathStyle: c.S3.ForcePathStyle,
},
Proxy: c.S3.Proxy,
Proxy: c.S3.Proxy,
MaxRetries: 3,
MaxRetryDelay: time.Second * 5,
MinRetryDelay: time.Millisecond * 100,
}
// Handle max retries with default
if c.S3.MaxRetries > 0 {
s3StorageConfig.MaxRetries = c.S3.MaxRetries
} else {
s3StorageConfig.MaxRetries = 3
s3.MaxRetries = c.S3.MaxRetries
}
// Handle min/max delay (for backoff) with defaults
if c.S3.MaxRetryDelay > 0 {
s3StorageConfig.MaxRetryDelay = c.S3.MaxRetryDelay
} else {
s3StorageConfig.MaxRetryDelay = time.Second * 5
s3.MaxRetryDelay = c.S3.MaxRetryDelay
}
if c.S3.MinRetryDelay > 0 {
s3StorageConfig.MinRetryDelay = c.S3.MinRetryDelay
} else {
s3StorageConfig.MinRetryDelay = time.Millisecond * 100
s3.MinRetryDelay = c.S3.MinRetryDelay
}
// Handle AWS log level with default

// Handle AWS log level
switch c.S3.AwsLogLevel {
case "LogDebugWithRequestRetries":
s3StorageConfig.AwsLogLevel = aws.LogDebugWithRequestRetries
s3.AwsLogLevel = aws.LogDebugWithRequestRetries
case "LogDebug":
s3StorageConfig.AwsLogLevel = aws.LogDebug
s3.AwsLogLevel = aws.LogDebug
case "LogDebugWithRequestErrors":
s3StorageConfig.AwsLogLevel = aws.LogDebugWithRequestErrors
s3.AwsLogLevel = aws.LogDebugWithRequestErrors
case "LogDebugWithHTTPBody":
s3StorageConfig.AwsLogLevel = aws.LogDebugWithHTTPBody
s3.AwsLogLevel = aws.LogDebugWithHTTPBody
case "LogDebugWithSigning":
s3StorageConfig.AwsLogLevel = aws.LogDebugWithSigning
s3.AwsLogLevel = aws.LogDebugWithSigning
default:
s3StorageConfig.AwsLogLevel = aws.LogOff
s3.AwsLogLevel = aws.LogOff
}
return s3StorageConfig

return s3
}
if c.Azure != nil {
return &livekit.AzureBlobUpload{
Expand Down
5 changes: 2 additions & 3 deletions pkg/pipeline/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import (
"sync"
"time"

"github.com/livekit/egress/pkg/stats"

"github.com/go-gst/go-gst/gst"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/pipeline/sink/uploader"
"github.com/livekit/egress/pkg/stats"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/pprof"
Expand All @@ -39,7 +38,7 @@ func (c *Controller) GetGstPipelineDebugDot() string {
}

func (c *Controller) uploadDebugFiles() {
monitor := *stats.NewHandlerMonitor(c.NodeID, c.ClusterID, c.Info.EgressId)
monitor := stats.NewHandlerMonitor(c.NodeID, c.ClusterID, c.Info.EgressId)
u, err := uploader.New(c.Debug.ToUploadConfig(), "", monitor)
if err != nil {
logger.Errorw("failed to create uploader", err)
Expand Down
5 changes: 2 additions & 3 deletions pkg/pipeline/sink/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import (
"sync"
"time"

"github.com/livekit/egress/pkg/stats"

"github.com/frostbyte73/core"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/gstreamer"
"github.com/livekit/egress/pkg/pipeline/sink/m3u8"
"github.com/livekit/egress/pkg/pipeline/sink/uploader"
"github.com/livekit/egress/pkg/stats"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/logger"
)
Expand Down Expand Up @@ -72,7 +71,7 @@ type SegmentUpdate struct {
uploadComplete chan struct{}
}

func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig, callbacks *gstreamer.Callbacks, monitor stats.HandlerMonitor) (*SegmentSink, error) {
func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig, callbacks *gstreamer.Callbacks, monitor *stats.HandlerMonitor) (*SegmentSink, error) {
playlistName := path.Join(o.LocalDir, o.PlaylistFilename)
playlist, err := m3u8.NewEventPlaylistWriter(playlistName, o.SegmentDuration)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/pipeline/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ type Sink interface {

func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[types.EgressType][]Sink, error) {
sinks := make(map[types.EgressType][]Sink)
monitor := stats.NewHandlerMonitor(p.NodeID, p.ClusterID, p.Info.EgressId)
for egressType, c := range p.Outputs {
if len(c) == 0 {
continue
}

var s Sink
var err error
monitor := *stats.NewHandlerMonitor(p.NodeID, p.ClusterID, p.Info.EgressId)
switch egressType {
case types.EgressTypeFile:
o := c[0].(*config.FileConfig)
Expand Down
7 changes: 3 additions & 4 deletions pkg/pipeline/sink/uploader/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/livekit/egress/pkg/config"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/logger"
"github.com/livekit/psrpc"
Expand Down Expand Up @@ -74,7 +74,8 @@ func newS3Uploader(conf *config.EgressS3Upload) (uploader, error) {
S3ForcePathStyle: aws.Bool(conf.ForcePathStyle),
LogLevel: aws.LogLevel(conf.AwsLogLevel),
}
logger.Debugw("setting AWS config", "maxRetries", conf.MaxRetries,
logger.Debugw("setting S3 config",
"maxRetries", conf.MaxRetries,
"maxDelay", conf.MaxRetryDelay,
"minDelay", conf.MinRetryDelay,
)
Expand Down Expand Up @@ -115,8 +116,6 @@ func newS3Uploader(conf *config.EgressS3Upload) (uploader, error) {
}
u.awsConfig.HTTPClient = &http.Client{Transport: proxyTransport}
}
} else {
logger.Debugw("not configuring s3 with proxy since none was provided in config")
}

if len(conf.Metadata) > 0 {
Expand Down
18 changes: 11 additions & 7 deletions pkg/pipeline/sink/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ import (
"path"
"time"

"github.com/livekit/egress/pkg/stats"

"github.com/pkg/errors"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/stats"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
)
Expand All @@ -43,13 +42,15 @@ type uploader interface {
upload(string, string, types.OutputType) (string, int64, error)
}

func New(conf config.UploadConfig, backup string, monitor stats.HandlerMonitor) (Uploader, error) {
func New(conf config.UploadConfig, backup string, monitor *stats.HandlerMonitor) (Uploader, error) {
var u uploader
var err error

switch c := conf.(type) {
case *config.EgressS3Upload:
u, err = newS3Uploader(c)
case *livekit.S3Upload:
u, err = newS3Uploader(&config.EgressS3Upload{S3Upload: c})
case *livekit.GCPUpload:
u, err = newGCPUploader(c)
case *livekit.AzureBlobUpload:
Expand All @@ -63,26 +64,28 @@ func New(conf config.UploadConfig, backup string, monitor stats.HandlerMonitor)
return nil, err
}

remoteUploader := &remoteUploader{
remote := &remoteUploader{
uploader: u,
backup: backup,
monitor: monitor,
}

return remoteUploader, nil
return remote, nil
}

type remoteUploader struct {
uploader

backup string
monitor stats.HandlerMonitor
monitor *stats.HandlerMonitor
}

func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, fileType string) (string, int64, error) {
start := time.Now()
location, size, err := u.upload(localFilepath, storageFilepath, outputType)
elapsed := time.Since(start).Milliseconds()

// success
if err == nil {
u.monitor.IncUploadCountSuccess(fileType, float64(elapsed))
if deleteAfterUpload {
Expand All @@ -91,8 +94,9 @@ func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputTyp

return location, size, nil
}
u.monitor.IncUploadCountFailure(fileType, float64(elapsed))

// failure
u.monitor.IncUploadCountFailure(fileType, float64(elapsed))
if u.backup != "" {
stat, err := os.Stat(localFilepath)
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions pkg/service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"strings"
"time"

"github.com/frostbyte73/core"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"

"github.com/frostbyte73/core"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -38,8 +38,6 @@ import (
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/tracer"
"github.com/livekit/psrpc"

"github.com/prometheus/client_golang/prometheus"
)

const network = "unix"
Expand Down
7 changes: 3 additions & 4 deletions pkg/service/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package service

import (
"context"
"github.com/prometheus/common/expfmt"
"golang.org/x/exp/maps"
"net"
"os"
"os/exec"
Expand All @@ -27,6 +25,9 @@ import (
"time"

"github.com/frostbyte73/core"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"golang.org/x/exp/maps"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/encoding/protojson"
Expand All @@ -40,8 +41,6 @@ import (
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/tracer"
"github.com/livekit/protocol/utils"

dto "github.com/prometheus/client_model/go"
)

type Process struct {
Expand Down
5 changes: 2 additions & 3 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"

"github.com/frostbyte73/core"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
dto "github.com/prometheus/client_model/go"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/stats"
Expand Down
14 changes: 14 additions & 0 deletions pkg/stats/handler_monitor.go → pkg/stats/handler.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stats

import (
Expand Down
9 changes: 6 additions & 3 deletions test/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,28 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/api/option"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)

func download(t *testing.T, uploadParams interface{}, localFilepath, storageFilepath string) {
logger.Debugw("download", "localFilepath", localFilepath, "storageFilepath", storageFilepath)
switch u := uploadParams.(type) {
case *livekit.S3Upload:
case *config.EgressS3Upload:
logger.Debugw("s3 download", "localFilepath", localFilepath, "storageFilepath", storageFilepath)
downloadS3(t, u, localFilepath, storageFilepath)

case *livekit.GCPUpload:
logger.Debugw("gcp download", "localFilepath", localFilepath, "storageFilepath", storageFilepath)
downloadGCP(t, u, localFilepath, storageFilepath)

case *livekit.AzureBlobUpload:
logger.Debugw("azure download", "localFilepath", localFilepath, "storageFilepath", storageFilepath)
downloadAzure(t, u, localFilepath, storageFilepath)
}
}

func downloadS3(t *testing.T, conf *livekit.S3Upload, localFilepath, storageFilepath string) {
func downloadS3(t *testing.T, conf *config.EgressS3Upload, localFilepath, storageFilepath string) {
sess, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(conf.AccessKey, conf.Secret, ""),
Endpoint: aws.String(conf.Endpoint),
Expand Down
2 changes: 1 addition & 1 deletion test/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type testCase struct {

// used by segmented file tests
playlist string
live_playlist string
livePlaylist string
filenameSuffix livekit.SegmentedFileSuffix

// used by images tests
Expand Down
Loading

0 comments on commit f186c66

Please sign in to comment.