From d0629cec1f3ac3022ab5d5dca5fcb74888e85caa Mon Sep 17 00:00:00 2001 From: David Colburn <xero73@gmail.com> Date: Wed, 16 Oct 2024 11:37:55 -0700 Subject: [PATCH] backup storage options (#792) * backup storage options * fix paths * move tmpDir to pipeline config creation * tmpDir overloaded * move ipc svc back to os tmp * final answer * remove extra slashes --- cmd/server/main.go | 1 + pkg/config/base.go | 65 ++--------- pkg/config/output_file.go | 29 +---- pkg/config/output_image.go | 26 ++--- pkg/config/output_segment.go | 49 ++++----- pkg/config/pipeline.go | 2 + pkg/config/service.go | 2 - pkg/config/storage.go | 142 +++++++++++++++++++++++++ pkg/config/uploads.go | 136 ----------------------- pkg/handler/handler.go | 5 +- pkg/pipeline/controller.go | 6 -- pkg/pipeline/debug.go | 34 ++---- pkg/pipeline/sink/file.go | 23 +--- pkg/pipeline/sink/image.go | 22 +--- pkg/pipeline/sink/image_manifest.go | 4 +- pkg/pipeline/sink/manifest.go | 4 +- pkg/pipeline/sink/segments.go | 56 ++++------ pkg/pipeline/sink/sink.go | 7 +- pkg/pipeline/sink/uploader/alioss.go | 21 ++-- pkg/pipeline/sink/uploader/azure.go | 11 +- pkg/pipeline/sink/uploader/gcp.go | 25 +++-- pkg/pipeline/sink/uploader/local.go | 64 +++++++++++ pkg/pipeline/sink/uploader/s3.go | 18 ++-- pkg/pipeline/sink/uploader/uploader.go | 122 +++++++++------------ pkg/pipeline/sink/websocket.go | 2 - pkg/pipeline/source/sdk.go | 10 +- pkg/server/server.go | 6 +- pkg/server/server_rpc.go | 4 +- pkg/service/process.go | 11 +- test/download.go | 36 +++---- test/ffprobe.go | 2 +- test/file.go | 8 +- test/images.go | 10 +- test/segments.go | 26 ++--- 34 files changed, 450 insertions(+), 539 deletions(-) create mode 100644 pkg/config/storage.go delete mode 100644 pkg/config/uploads.go create mode 100644 pkg/pipeline/sink/uploader/local.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 447175af..4c10da8e 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -180,6 +180,7 @@ func runHandler(c *cli.Context) error { if err != nil { return err } + defer os.RemoveAll(conf.TmpDir) _ = os.Setenv("TMPDIR", conf.TmpDir) rc, err := lkredis.GetRedisClient(conf.Redis) diff --git a/pkg/config/base.go b/pkg/config/base.go index 6da0f6c4..df7718be 100644 --- a/pkg/config/base.go +++ b/pkg/config/base.go @@ -24,6 +24,8 @@ import ( lksdk "github.com/livekit/server-sdk-go/v2" ) +const TmpDir = "/home/egress/tmp" + type BaseConfig struct { NodeID string // do not supply - will be overwritten @@ -34,66 +36,26 @@ type BaseConfig struct { WsUrl string `yaml:"ws_url"` // (env LIVEKIT_WS_URL) // optional - Logging *logger.Config `yaml:"logging"` // logging config - TemplateBase string `yaml:"template_base"` // custom template base url - BackupStorage string `yaml:"backup_storage"` // backup file location for failed uploads - ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to - EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration - MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes - StorageConfig `yaml:",inline"` // upload config (S3, Azure, GCP, or AliOSS) - SessionLimits `yaml:"session_limits"` // session duration limits + Logging *logger.Config `yaml:"logging"` // logging config + TemplateBase string `yaml:"template_base"` // custom template base url + ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to + EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration + MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes + + SessionLimits `yaml:"session_limits"` // session duration limits + StorageConfig *StorageConfig `yaml:"storage,omitempty"` // storage config + BackupConfig *StorageConfig `yaml:"backup,omitempty"` // backup config, for storage failures // dev/debugging Insecure bool `yaml:"insecure"` // allow chrome to connect to an insecure websocket Debug DebugConfig `yaml:"debug"` // create dot file on internal error - - // deprecated - LogLevel string `yaml:"log_level"` // Use Logging instead } type DebugConfig struct { EnableProfiling bool `yaml:"enable_profiling"` // create dot file and pprof on internal error - PathPrefix string `yaml:"path_prefix"` // filepath prefix for uploads StorageConfig `yaml:",inline"` // upload config (S3, Azure, GCP, or AliOSS) } -type StorageConfig struct { - S3 *S3Config `yaml:"s3"` - Azure *AzureConfig `yaml:"azure"` - GCP *GCPConfig `yaml:"gcp"` - AliOSS *S3Config `yaml:"alioss"` -} - -type S3Config struct { - AccessKey string `yaml:"access_key"` // (env AWS_ACCESS_KEY_ID) - Secret string `yaml:"secret"` // (env AWS_SECRET_ACCESS_KEY) - SessionToken string `yaml:"session_token"` // (env AWS_SESSION_TOKEN) - Region string `yaml:"region"` // (env AWS_DEFAULT_REGION) - Endpoint string `yaml:"endpoint"` - Bucket string `yaml:"bucket"` - ForcePathStyle bool `yaml:"force_path_style"` - ProxyConfig *ProxyConfig `yaml:"proxy_config"` - MaxRetries int `yaml:"max_retries"` - MaxRetryDelay time.Duration `yaml:"max_retry_delay"` - MinRetryDelay time.Duration `yaml:"min_retry_delay"` - AwsLogLevel string `yaml:"aws_log_level"` - - // deprecated - Proxy string `yaml:"proxy"` // use ProxyConfig instead -} - -type AzureConfig struct { - AccountName string `yaml:"account_name"` // (env AZURE_STORAGE_ACCOUNT) - AccountKey string `yaml:"account_key"` // (env AZURE_STORAGE_KEY) - ContainerName string `yaml:"container_name"` -} - -type GCPConfig struct { - CredentialsJSON string `yaml:"credentials_json"` // (env GOOGLE_APPLICATION_CREDENTIALS) - Bucket string `yaml:"bucket"` - ProxyConfig *ProxyConfig `yaml:"proxy_config"` -} - type ProxyConfig struct { Url string `yaml:"url"` Username string `yaml:"username"` @@ -108,11 +70,6 @@ type SessionLimits struct { } func (c *BaseConfig) initLogger(values ...interface{}) error { - if c.LogLevel != "" { - logger.Warnw("log_level deprecated. use logging instead", nil) - c.Logging.Level = c.LogLevel - } - var gstDebug []string switch c.Logging.Level { case "debug": diff --git a/pkg/config/output_file.go b/pkg/config/output_file.go index 4aba0fbb..d09bc0c0 100644 --- a/pkg/config/output_file.go +++ b/pkg/config/output_file.go @@ -16,7 +16,6 @@ package config import ( "fmt" - "os" "path" "strings" "time" @@ -34,7 +33,7 @@ type FileConfig struct { StorageFilepath string DisableManifest bool - UploadConfig UploadConfig + StorageConfig *StorageConfig } func (p *PipelineConfig) GetFileConfig() *FileConfig { @@ -76,7 +75,7 @@ func (p *PipelineConfig) getFileConfig(outputType types.OutputType, req fileRequ FileInfo: &livekit.FileInfo{}, StorageFilepath: clean(req.GetFilepath()), DisableManifest: req.GetDisableManifest(), - UploadConfig: p.getUploadConfig(req), + StorageConfig: p.getStorageConfig(req), } // filename @@ -137,28 +136,10 @@ func (o *FileConfig) updateFilepath(p *PipelineConfig, identifier string, replac o.FileInfo.Filename = o.StorageFilepath // get local filepath - dir, filename := path.Split(o.StorageFilepath) - if o.UploadConfig == nil { - if dir != "" { - // create local directory - if err := os.MkdirAll(dir, 0755); err != nil { - return err - } - } - // write directly to requested location - o.LocalFilepath = o.StorageFilepath - } else { - // prepend the configuration base directory and the egress Id - tempDir := path.Join(TmpDir, p.Info.EgressId) + _, filename := path.Split(o.StorageFilepath) - // create temporary directory - if err := os.MkdirAll(tempDir, 0755); err != nil { - return err - } - - // write to tmp dir - o.LocalFilepath = path.Join(tempDir, filename) - } + // write to tmp dir + o.LocalFilepath = path.Join(p.TmpDir, filename) return nil } diff --git a/pkg/config/output_image.go b/pkg/config/output_image.go index d49ddd83..3a82b9ff 100644 --- a/pkg/config/output_image.go +++ b/pkg/config/output_image.go @@ -39,7 +39,7 @@ type ImageConfig struct { ImageExtension types.FileExtension DisableManifest bool - UploadConfig UploadConfig + StorageConfig *StorageConfig CaptureInterval uint32 Width int32 @@ -77,7 +77,7 @@ func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConf ImagePrefix: filenamePrefix, ImageSuffix: images.FilenameSuffix, DisableManifest: images.DisableManifest, - UploadConfig: p.getUploadConfig(images), + StorageConfig: p.getStorageConfig(images), CaptureInterval: images.CaptureInterval, Width: images.Width, Height: images.Height, @@ -128,25 +128,17 @@ func (o *ImageConfig) updatePrefix(p *PipelineConfig) error { // update config o.ImagePrefix = imagesPrefix - if o.UploadConfig == nil { - o.LocalDir = imagesDir - } else { - // Prepend the configuration base directory and the egress Id, and slug to prevent conflict if - // there is more than one image output - // os.ModeDir creates a directory with mode 000 when mapping the directory outside the container - // Append a "/" to the path for consistency with the "UploadConfig == nil" case - o.LocalDir = path.Join(TmpDir, p.Info.EgressId, o.Id) + "/" - } - - // create local directories - if o.LocalDir != "" { - if err := os.MkdirAll(o.LocalDir, 0755); err != nil { - return err - } + // Prepend the configuration base directory and the egress Id, and slug to prevent conflict if + // there is more than one image output + // os.ModeDir creates a directory with mode 000 when mapping the directory outside the container + o.LocalDir = path.Join(p.TmpDir, o.Id) + if err := os.MkdirAll(o.LocalDir, 0755); err != nil { + return err } return nil } + func getMimeTypes(imageCodec livekit.ImageCodec) (types.MimeType, types.OutputType, error) { switch imageCodec { case livekit.ImageCodec_IC_DEFAULT, livekit.ImageCodec_IC_JPEG: diff --git a/pkg/config/output_segment.go b/pkg/config/output_segment.go index ef173470..f2f79a5a 100644 --- a/pkg/config/output_segment.go +++ b/pkg/config/output_segment.go @@ -39,7 +39,7 @@ type SegmentConfig struct { SegmentDuration int DisableManifest bool - UploadConfig UploadConfig + StorageConfig *StorageConfig } func (p *PipelineConfig) GetSegmentConfig() *SegmentConfig { @@ -60,7 +60,7 @@ func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput) LivePlaylistFilename: clean(segments.LivePlaylistName), SegmentDuration: int(segments.SegmentDuration), DisableManifest: segments.DisableManifest, - UploadConfig: p.getUploadConfig(segments), + StorageConfig: p.getStorageConfig(segments), } if conf.SegmentDuration == 0 { @@ -105,7 +105,7 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error { playlistDir, playlistName := path.Split(o.PlaylistFilename) livePlaylistDir, livePlaylistName := path.Split(o.LivePlaylistFilename) - fileDir, filePrefix := path.Split(o.SegmentPrefix) + segmentDir, segmentPrefix := path.Split(o.SegmentPrefix) // force live playlist to be in the same directory as the main playlist if livePlaylistDir != "" && livePlaylistDir != playlistDir { @@ -116,21 +116,21 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error { playlistName = removeKnownExtension(playlistName) livePlaylistName = removeKnownExtension(livePlaylistName) - // only keep fileDir if it is a subdirectory of playlistDir - if fileDir != "" { - if playlistDir == fileDir { - fileDir = "" + // only keep segmentDir if it is a subdirectory of playlistDir + if segmentDir != "" { + if playlistDir == segmentDir { + segmentDir = "" } else if playlistDir == "" { - playlistDir = fileDir - fileDir = "" + playlistDir = segmentDir + segmentDir = "" } } o.StorageDir = playlistDir // ensure playlistName if playlistName == "" { - if filePrefix != "" { - playlistName = filePrefix + if segmentPrefix != "" { + playlistName = segmentPrefix } else { playlistName = fmt.Sprintf("%s-%s", identifier, time.Now().Format("2006-01-02T150405")) } @@ -138,8 +138,8 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error { // live playlist disabled by default // ensure filePrefix - if filePrefix == "" { - filePrefix = playlistName + if segmentPrefix == "" { + segmentPrefix = playlistName } // update config @@ -148,28 +148,17 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error { if livePlaylistName != "" { o.LivePlaylistFilename = fmt.Sprintf("%s%s", livePlaylistName, ext) } - o.SegmentPrefix = fmt.Sprintf("%s%s", fileDir, filePrefix) + o.SegmentPrefix = fmt.Sprintf("%s%s", segmentDir, segmentPrefix) if o.PlaylistFilename == o.LivePlaylistFilename { return errors.ErrInvalidInput("live_playlist_name cannot be identical to playlist_name") } - if o.UploadConfig == nil { - o.LocalDir = playlistDir - } else { - // Prepend the configuration base directory and the egress Id - // os.ModeDir creates a directory with mode 000 when mapping the directory outside the container - // Append a "/" to the path for consistency with the "UploadConfig == nil" case - o.LocalDir = path.Join(TmpDir, p.Info.EgressId) + "/" - } - - // create local directories - if fileDir != "" { - if err := os.MkdirAll(path.Join(o.LocalDir, fileDir), 0755); err != nil { - return err - } - } else if o.LocalDir != "" { - if err := os.MkdirAll(o.LocalDir, 0755); err != nil { + // Prepend the configuration base directory and the egress Id + // os.ModeDir creates a directory with mode 000 when mapping the directory outside the container + o.LocalDir = p.TmpDir + if segmentDir != "" { + if err := os.MkdirAll(path.Join(o.LocalDir, segmentDir), 0755); err != nil { return err } } diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index b0cd7ffa..1c8ae580 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -17,6 +17,7 @@ package config import ( "context" "net/url" + "path" "strings" "time" @@ -148,6 +149,7 @@ func GetValidatedPipelineConfig(conf *ServiceConfig, req *rpc.StartEgressRequest p := &PipelineConfig{ BaseConfig: conf.BaseConfig, + TmpDir: path.Join(TmpDir, req.EgressId), Outputs: make(map[types.EgressType][]OutputConfig), } diff --git a/pkg/config/service.go b/pkg/config/service.go index 98fcff65..90539bcf 100644 --- a/pkg/config/service.go +++ b/pkg/config/service.go @@ -26,8 +26,6 @@ import ( ) const ( - TmpDir = "/home/egress/tmp" - roomCompositeCpuCost = 4 audioRoomCompositeCpuCost = 1 webCpuCost = 4 diff --git a/pkg/config/storage.go b/pkg/config/storage.go new file mode 100644 index 00000000..942f1a0f --- /dev/null +++ b/pkg/config/storage.go @@ -0,0 +1,142 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "time" + + "github.com/livekit/protocol/egress" +) + +type StorageConfig struct { + PathPrefix string `yaml:"prefix"` // prefix applied to all filenames + + S3 *S3Config `yaml:"s3"` // upload to s3 + Azure *AzureConfig `yaml:"azure"` // upload to azure + GCP *GCPConfig `yaml:"gcp"` // upload to gcp + AliOSS *S3Config `yaml:"alioss"` // upload to aliyun +} + +type S3Config struct { + AccessKey string `yaml:"access_key"` // (env AWS_ACCESS_KEY_ID) + Secret string `yaml:"secret"` // (env AWS_SECRET_ACCESS_KEY) + SessionToken string `yaml:"session_token"` // (env AWS_SESSION_TOKEN) + Region string `yaml:"region"` // (env AWS_DEFAULT_REGION) + Endpoint string `yaml:"endpoint"` + Bucket string `yaml:"bucket"` + ForcePathStyle bool `yaml:"force_path_style"` + ProxyConfig *ProxyConfig `yaml:"proxy_config"` + + MaxRetries int `yaml:"max_retries"` + MaxRetryDelay time.Duration `yaml:"max_retry_delay"` + MinRetryDelay time.Duration `yaml:"min_retry_delay"` + + Metadata map[string]string `yaml:"metadata"` + Tagging string `yaml:"tagging"` + ContentDisposition string `yaml:"content_disposition"` +} + +type AzureConfig struct { + AccountName string `yaml:"account_name"` // (env AZURE_STORAGE_ACCOUNT) + AccountKey string `yaml:"account_key"` // (env AZURE_STORAGE_KEY) + ContainerName string `yaml:"container_name"` +} + +type GCPConfig struct { + CredentialsJSON string `yaml:"credentials_json"` // (env GOOGLE_APPLICATION_CREDENTIALS) + Bucket string `yaml:"bucket"` + ProxyConfig *ProxyConfig `yaml:"proxy_config"` +} + +func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConfig { + sc := &StorageConfig{} + if p.StorageConfig != nil { + sc.PathPrefix = p.StorageConfig.PathPrefix + } + + if s3 := req.GetS3(); s3 != nil { + sc.S3 = &S3Config{ + AccessKey: s3.AccessKey, + Secret: s3.Secret, + SessionToken: s3.SessionToken, + Region: s3.Region, + Endpoint: s3.Endpoint, + Bucket: s3.Bucket, + ForcePathStyle: s3.ForcePathStyle, + Metadata: s3.Metadata, + Tagging: s3.Tagging, + ContentDisposition: s3.ContentDisposition, + } + if p.StorageConfig != nil && p.StorageConfig.S3 != nil { + sc.S3.MaxRetries = p.StorageConfig.S3.MaxRetries + sc.S3.MaxRetryDelay = p.StorageConfig.S3.MaxRetryDelay + sc.S3.MinRetryDelay = p.StorageConfig.S3.MinRetryDelay + } + if s3.Proxy != nil { + sc.S3.ProxyConfig = &ProxyConfig{ + Url: s3.Proxy.Url, + Username: s3.Proxy.Username, + Password: s3.Proxy.Password, + } + } + if sc.S3.MaxRetries == 0 { + sc.S3.MaxRetries = 5 + } + if sc.S3.MaxRetryDelay == 0 { + sc.S3.MaxRetryDelay = time.Second * 5 + } + if sc.S3.MinRetryDelay == 0 { + sc.S3.MinRetryDelay = time.Millisecond * 100 + } + return sc + } + + if gcp := req.GetGcp(); gcp != nil { + sc.GCP = &GCPConfig{ + CredentialsJSON: gcp.Credentials, + Bucket: gcp.Bucket, + } + if gcp.Proxy != nil { + sc.GCP.ProxyConfig = &ProxyConfig{ + Url: gcp.Proxy.Url, + Username: gcp.Proxy.Username, + Password: gcp.Proxy.Password, + } + } + return sc + } + + if azure := req.GetAzure(); azure != nil { + sc.Azure = &AzureConfig{ + AccountName: azure.AccountName, + AccountKey: azure.AccountKey, + ContainerName: azure.ContainerName, + } + return sc + } + + if ali := req.GetAliOSS(); ali != nil { + sc.AliOSS = &S3Config{ + AccessKey: ali.AccessKey, + Secret: ali.Secret, + Region: ali.Region, + Endpoint: ali.Endpoint, + Bucket: ali.Bucket, + } + return sc + } + + return p.StorageConfig +} diff --git a/pkg/config/uploads.go b/pkg/config/uploads.go deleted file mode 100644 index 582f57b8..00000000 --- a/pkg/config/uploads.go +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package config - -import ( - "time" - - "github.com/livekit/protocol/egress" - "github.com/livekit/protocol/livekit" -) - -type UploadConfig interface{} - -type EgressS3Upload struct { - *livekit.S3Upload - MaxRetries int - MaxRetryDelay time.Duration - MinRetryDelay time.Duration -} - -func (p *PipelineConfig) getUploadConfig(req egress.UploadRequest) UploadConfig { - if s3 := req.GetS3(); s3 != nil { - s3Conf := &EgressS3Upload{ - S3Upload: s3, - MaxRetries: 3, - MaxRetryDelay: time.Second * 5, - MinRetryDelay: time.Millisecond * 100, - } - // merge in options from config (retry limit, delay and aws logging) if specified - if p.S3 != nil { - // parse config.yaml options and get defaults - if s3Base, ok := p.ToUploadConfig().(*EgressS3Upload); ok { - // merge into pipeline config created from request options - s3Conf.MaxRetries = s3Base.MaxRetries - s3Conf.MaxRetryDelay = s3Base.MaxRetryDelay - s3Conf.MinRetryDelay = s3Base.MinRetryDelay - } - } - return s3Conf - } - if gcp := req.GetGcp(); gcp != nil { - return gcp - } - if azure := req.GetAzure(); azure != nil { - return azure - } - if ali := req.GetAliOSS(); ali != nil { - return ali - } - - return p.ToUploadConfig() -} - -func (c StorageConfig) ToUploadConfig() UploadConfig { - if c.S3 != nil { - s3 := &EgressS3Upload{ - S3Upload: &livekit.S3Upload{ - AccessKey: c.S3.AccessKey, - Secret: c.S3.Secret, - SessionToken: c.S3.SessionToken, - Region: c.S3.Region, - Endpoint: c.S3.Endpoint, - Bucket: c.S3.Bucket, - ForcePathStyle: c.S3.ForcePathStyle, - }, - MaxRetries: 3, - MaxRetryDelay: time.Second * 5, - MinRetryDelay: time.Millisecond * 100, - } - if c.S3.ProxyConfig != nil { - s3.Proxy = &livekit.ProxyConfig{ - Url: c.S3.ProxyConfig.Url, - Username: c.S3.ProxyConfig.Username, - Password: c.S3.ProxyConfig.Password, - } - } else if c.S3.Proxy != "" { - s3.Proxy = &livekit.ProxyConfig{ - Url: c.S3.Proxy, - } - } - if c.S3.MaxRetries > 0 { - s3.MaxRetries = c.S3.MaxRetries - } - if c.S3.MaxRetryDelay > 0 { - s3.MaxRetryDelay = c.S3.MaxRetryDelay - } - if c.S3.MinRetryDelay > 0 { - s3.MinRetryDelay = c.S3.MinRetryDelay - } - - return s3 - } - if c.Azure != nil { - return &livekit.AzureBlobUpload{ - AccountName: c.Azure.AccountName, - AccountKey: c.Azure.AccountKey, - ContainerName: c.Azure.ContainerName, - } - } - if c.GCP != nil { - gcp := &livekit.GCPUpload{ - Credentials: c.GCP.CredentialsJSON, - Bucket: c.GCP.Bucket, - } - if c.GCP.ProxyConfig != nil { - gcp.Proxy = &livekit.ProxyConfig{ - Url: c.GCP.ProxyConfig.Url, - Username: c.GCP.ProxyConfig.Username, - Password: c.GCP.ProxyConfig.Password, - } - } - return gcp - } - if c.AliOSS != nil { - return &livekit.AliOSSUpload{ - AccessKey: c.AliOSS.AccessKey, - Secret: c.AliOSS.Secret, - Region: c.AliOSS.Region, - Endpoint: c.AliOSS.Endpoint, - Bucket: c.AliOSS.Bucket, - } - } - return nil -} diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go index d3ed5fa9..2109e8b0 100644 --- a/pkg/handler/handler.go +++ b/pkg/handler/handler.go @@ -17,7 +17,6 @@ package handler import ( "context" "path" - "strings" "github.com/frostbyte73/core" "google.golang.org/grpc" @@ -45,7 +44,7 @@ type Handler struct { } func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus) (*Handler, error) { - ipcClient, err := ipc.NewServiceClient(path.Join(conf.TmpDir[:strings.LastIndex(conf.TmpDir, "/")], conf.NodeID)) + ipcClient, err := ipc.NewServiceClient(path.Join(config.TmpDir, conf.NodeID)) if err != nil { return nil, err } @@ -57,7 +56,7 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus) (*Handler, er } ipc.RegisterEgressHandlerServer(h.ipcHandlerServer, h) - if err = ipc.StartHandlerListener(h.ipcHandlerServer, conf.TmpDir); err != nil { + if err = ipc.StartHandlerListener(h.ipcHandlerServer, path.Join(config.TmpDir, conf.HandlerID)); err != nil { return nil, err } diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 9c280b0d..05b8890b 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -457,12 +457,6 @@ func (c *Controller) Close() { livekit.EgressStatus_EGRESS_ENDING: c.Info.SetComplete() } - - for _, si := range c.sinks { - for _, s := range si { - s.Cleanup() - } - } } func (c *Controller) startSessionLimitTimer(ctx context.Context) { diff --git a/pkg/pipeline/debug.go b/pkg/pipeline/debug.go index b91c237e..4ccb9d4a 100644 --- a/pkg/pipeline/debug.go +++ b/pkg/pipeline/debug.go @@ -36,7 +36,7 @@ func (c *Controller) GetGstPipelineDebugDot() string { } func (c *Controller) uploadDebugFiles() { - u, err := uploader.New(c.Debug.ToUploadConfig(), "", c.monitor) + u, err := uploader.New(&c.Debug.StorageConfig, nil, c.monitor) if err != nil { logger.Errorw("failed to create uploader", err) return @@ -80,11 +80,7 @@ func (c *Controller) uploadDebugFiles() { } } -func (c *Controller) uploadTrackFiles(u uploader.Uploader) { - if c.Debug.ToUploadConfig() == nil { - return - } - +func (c *Controller) uploadTrackFiles(u *uploader.Uploader) { files, err := os.ReadDir(c.TmpDir) if err != nil { return @@ -93,8 +89,8 @@ func (c *Controller) uploadTrackFiles(u uploader.Uploader) { for _, f := range files { if strings.HasSuffix(f.Name(), ".csv") { local := path.Join(c.TmpDir, f.Name()) - storage := path.Join(c.Debug.PathPrefix, c.Info.EgressId, f.Name()) - _, _, err = u.Upload(local, storage, types.OutputTypeBlob, false, "track") + storage := path.Join(c.Info.EgressId, f.Name()) + _, _, err = u.Upload(local, storage, types.OutputTypeBlob, false) if err != nil { logger.Errorw("failed to upload debug file", err) return @@ -103,12 +99,12 @@ func (c *Controller) uploadTrackFiles(u uploader.Uploader) { } } -func (c *Controller) uploadDotFile(u uploader.Uploader) { +func (c *Controller) uploadDotFile(u *uploader.Uploader) { dot := c.GetGstPipelineDebugDot() c.uploadDebugFile(u, []byte(dot), ".dot") } -func (c *Controller) uploadPProf(u uploader.Uploader) { +func (c *Controller) uploadPProf(u *uploader.Uploader) { b, err := pprof.GetProfileData(context.Background(), "goroutine", 0, 0) if err != nil { logger.Errorw("failed to get profile data", err) @@ -117,17 +113,11 @@ func (c *Controller) uploadPProf(u uploader.Uploader) { c.uploadDebugFile(u, b, ".prof") } -func (c *Controller) uploadDebugFile(u uploader.Uploader, data []byte, fileExtension string) { - storageDir := path.Join(c.Debug.PathPrefix, c.Info.EgressId) - var dir string - if c.Debug.ToUploadConfig() == nil { - dir = storageDir - } else { - dir = c.TmpDir - } +func (c *Controller) uploadDebugFile(u *uploader.Uploader, data []byte, fileExtension string) { + storageDir := c.Info.EgressId filename := fmt.Sprintf("%s%s", c.Info.EgressId, fileExtension) - local := path.Join(dir, filename) + local := path.Join(c.TmpDir, filename) f, err := os.Create(local) if err != nil { logger.Errorw("failed to create debug file", err) @@ -141,11 +131,7 @@ func (c *Controller) uploadDebugFile(u uploader.Uploader, data []byte, fileExten return } - if c.Debug.ToUploadConfig() == nil { - return - } - - _, _, err = u.Upload(local, path.Join(storageDir, filename), types.OutputTypeBlob, false, "debug") + _, _, err = u.Upload(local, path.Join(storageDir, filename), types.OutputTypeBlob, false) if err != nil { logger.Errorw("failed to upload debug file", err) return diff --git a/pkg/pipeline/sink/file.go b/pkg/pipeline/sink/file.go index 3fb216fa..a1aa73e0 100644 --- a/pkg/pipeline/sink/file.go +++ b/pkg/pipeline/sink/file.go @@ -16,22 +16,19 @@ package sink import ( "fmt" - "os" - "path" "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/pipeline/sink/uploader" - "github.com/livekit/protocol/logger" ) type FileSink struct { - uploader.Uploader + *uploader.Uploader conf *config.PipelineConfig *config.FileConfig } -func newFileSink(u uploader.Uploader, conf *config.PipelineConfig, o *config.FileConfig) *FileSink { +func newFileSink(u *uploader.Uploader, conf *config.PipelineConfig, o *config.FileConfig) *FileSink { return &FileSink{ Uploader: u, conf: conf, @@ -44,7 +41,7 @@ func (s *FileSink) Start() error { } func (s *FileSink) Close() error { - location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false, "file") + location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false) if err != nil { return err } @@ -62,17 +59,3 @@ func (s *FileSink) Close() error { return nil } - -func (s *FileSink) Cleanup() { - if s.LocalFilepath == s.StorageFilepath { - return - } - - dir, _ := path.Split(s.LocalFilepath) - if dir != "" { - logger.Debugw("removing temporary directory", "path", dir) - if err := os.RemoveAll(dir); err != nil { - logger.Errorw("could not delete temp dir", err) - } - } -} diff --git a/pkg/pipeline/sink/image.go b/pkg/pipeline/sink/image.go index 8bfe2e7d..6d114dde 100644 --- a/pkg/pipeline/sink/image.go +++ b/pkg/pipeline/sink/image.go @@ -32,7 +32,7 @@ import ( ) type ImageSink struct { - uploader.Uploader + *uploader.Uploader *config.ImageConfig @@ -53,7 +53,7 @@ type imageUpdate struct { filename string } -func newImageSink(u uploader.Uploader, p *config.PipelineConfig, o *config.ImageConfig, callbacks *gstreamer.Callbacks) (*ImageSink, error) { +func newImageSink(u *uploader.Uploader, p *config.PipelineConfig, o *config.ImageConfig, callbacks *gstreamer.Callbacks) (*ImageSink, error) { maxPendingUploads := (p.MaxUploadQueue * 60) / int(o.CaptureInterval) return &ImageSink{ Uploader: u, @@ -108,7 +108,7 @@ func (s *ImageSink) handleNewImage(update *imageUpdate) error { imageStoragePath := path.Join(s.StorageDir, filename) - _, size, err := s.Upload(imageLocalPath, imageStoragePath, s.OutputType, true, "image") + _, size, err := s.Upload(imageLocalPath, imageStoragePath, s.OutputType, true) if err != nil { return err } @@ -146,7 +146,7 @@ func (s *ImageSink) NewImage(filepath string, ts uint64) error { return fmt.Errorf("invalid filepath") } - filename := filepath[len(s.LocalDir):] + filename := filepath[len(s.LocalDir)+1:] s.createdImages <- &imageUpdate{ filename: filename, @@ -162,17 +162,3 @@ func (s *ImageSink) Close() error { return nil } - -func (s *ImageSink) Cleanup() { - if s.LocalDir == s.StorageDir { - return - } - - if s.LocalDir != "" { - logger.Debugw("removing temporary directory", "path", s.LocalDir) - if err := os.RemoveAll(s.LocalDir); err != nil { - logger.Errorw("could not delete temp dir", err) - } - } - -} diff --git a/pkg/pipeline/sink/image_manifest.go b/pkg/pipeline/sink/image_manifest.go index 36a6090c..95cb88d8 100644 --- a/pkg/pipeline/sink/image_manifest.go +++ b/pkg/pipeline/sink/image_manifest.go @@ -50,7 +50,7 @@ func (m *ImageManifest) imageCreated(filename string, ts time.Time, size int64) }) } -func (m *ImageManifest) updateManifest(u uploader.Uploader, localFilepath, storageFilepath string) error { +func (m *ImageManifest) updateManifest(u *uploader.Uploader, localFilepath, storageFilepath string) error { manifest, err := os.Create(localFilepath) if err != nil { return err @@ -71,7 +71,7 @@ func (m *ImageManifest) updateManifest(u uploader.Uploader, localFilepath, stora return err } - _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false, "image_manifest") + _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false) return err } diff --git a/pkg/pipeline/sink/manifest.go b/pkg/pipeline/sink/manifest.go index 50764a0b..6594d107 100644 --- a/pkg/pipeline/sink/manifest.go +++ b/pkg/pipeline/sink/manifest.go @@ -39,7 +39,7 @@ type Manifest struct { SegmentCount int64 `json:"segment_count,omitempty"` } -func uploadManifest(p *config.PipelineConfig, u uploader.Uploader, localFilepath, storageFilepath string) error { +func uploadManifest(p *config.PipelineConfig, u *uploader.Uploader, localFilepath, storageFilepath string) error { manifest, err := os.Create(localFilepath) if err != nil { return err @@ -55,7 +55,7 @@ func uploadManifest(p *config.PipelineConfig, u uploader.Uploader, localFilepath return err } - _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false, "manifest") + _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false) return err } diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index d000f7d4..c28dfb82 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -16,7 +16,6 @@ package sink import ( "fmt" - "os" "path" "strings" "sync" @@ -39,7 +38,7 @@ const ( ) type SegmentSink struct { - uploader.Uploader + *uploader.Uploader *config.SegmentConfig conf *config.PipelineConfig @@ -70,7 +69,7 @@ type SegmentUpdate struct { uploadComplete chan struct{} } -func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig, callbacks *gstreamer.Callbacks, monitor *stats.HandlerMonitor) (*SegmentSink, error) { +func newSegmentSink(u *uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig, callbacks *gstreamer.Callbacks, monitor *stats.HandlerMonitor) (*SegmentSink, error) { playlistName := path.Join(o.LocalDir, o.PlaylistFilename) playlist, err := m3u8.NewEventPlaylistWriter(playlistName, o.SegmentDuration) if err != nil { @@ -150,7 +149,7 @@ func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) { go func() { defer close(update.uploadComplete) - _, size, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true, "segment") + _, size, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true) if err != nil { s.callbacks.OnError(err) return @@ -214,6 +213,22 @@ func (s *SegmentSink) shouldUploadPlaylist() bool { return s.segmentCount < segmentsPerHour || s.segmentCount%frequency == 0 } +func (s *SegmentSink) uploadPlaylist() error { + var err error + playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) + playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) + s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) + return err +} + +func (s *SegmentSink) uploadLivePlaylist() error { + var err error + liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename) + liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename) + s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false) + return err +} + func (s *SegmentSink) UpdateStartDate(t time.Time) { s.segmentLock.Lock() defer s.segmentLock.Unlock() @@ -226,7 +241,7 @@ func (s *SegmentSink) FragmentOpened(filepath string, startTime uint64) error { return fmt.Errorf("invalid filepath") } - filename := filepath[len(s.LocalDir):] + filename := filepath[len(s.LocalDir)+1:] s.segmentLock.Lock() defer s.segmentLock.Unlock() @@ -249,7 +264,7 @@ func (s *SegmentSink) FragmentClosed(filepath string, endTime uint64) error { return fmt.Errorf("invalid filepath") } - filename := filepath[len(s.LocalDir):] + filename := filepath[len(s.LocalDir)+1:] select { case s.closedSegments <- SegmentUpdate{ @@ -302,32 +317,3 @@ func (s *SegmentSink) Close() error { return nil } - -func (s *SegmentSink) Cleanup() { - if s.LocalDir == s.StorageDir { - return - } - - if s.LocalDir != "" { - logger.Debugw("removing temporary directory", "path", s.LocalDir) - if err := os.RemoveAll(s.LocalDir); err != nil { - logger.Errorw("could not delete temp dir", err) - } - } -} - -func (s *SegmentSink) uploadPlaylist() error { - var err error - playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) - playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) - s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false, "playlist") - return err -} - -func (s *SegmentSink) uploadLivePlaylist() error { - var err error - liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename) - liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename) - s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false, "live_playlist") - return err -} diff --git a/pkg/pipeline/sink/sink.go b/pkg/pipeline/sink/sink.go index aaf5f9eb..cc675da2 100644 --- a/pkg/pipeline/sink/sink.go +++ b/pkg/pipeline/sink/sink.go @@ -25,7 +25,6 @@ import ( type Sink interface { Start() error Close() error - Cleanup() } func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monitor *stats.HandlerMonitor) (map[types.EgressType][]Sink, error) { @@ -41,7 +40,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monit case types.EgressTypeFile: o := c[0].(*config.FileConfig) - u, err := uploader.New(o.UploadConfig, p.BackupStorage, monitor) + u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor) if err != nil { return nil, err } @@ -51,7 +50,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monit case types.EgressTypeSegments: o := c[0].(*config.SegmentConfig) - u, err := uploader.New(o.UploadConfig, p.BackupStorage, monitor) + u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor) if err != nil { return nil, err } @@ -75,7 +74,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monit for _, ci := range c { o := ci.(*config.ImageConfig) - u, err := uploader.New(o.UploadConfig, p.BackupStorage, monitor) + u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor) if err != nil { return nil, err } diff --git a/pkg/pipeline/sink/uploader/alioss.go b/pkg/pipeline/sink/uploader/alioss.go index e2f75a67..5139a2c5 100644 --- a/pkg/pipeline/sink/uploader/alioss.go +++ b/pkg/pipeline/sink/uploader/alioss.go @@ -17,26 +17,31 @@ package uploader import ( "fmt" "os" + "path" "github.com/aliyun/aliyun-oss-go-sdk/oss" + "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/types" - "github.com/livekit/protocol/livekit" ) type AliOSSUploader struct { - conf *livekit.AliOSSUpload + conf *config.S3Config + prefix string } -func newAliOSSUploader(conf *livekit.AliOSSUpload) (uploader, error) { +func newAliOSSUploader(conf *config.S3Config, prefix string) (uploader, error) { return &AliOSSUploader{ - conf: conf, + conf: conf, + prefix: prefix, }, nil } -func (u *AliOSSUploader) upload(localFilePath, requestedPath string, _ types.OutputType) (string, int64, error) { - stat, err := os.Stat(localFilePath) +func (u *AliOSSUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) { + storageFilepath = path.Join(u.prefix, storageFilepath) + + stat, err := os.Stat(localFilepath) if err != nil { return "", 0, errors.ErrUploadFailed("AliOSS", err) } @@ -51,10 +56,10 @@ func (u *AliOSSUploader) upload(localFilePath, requestedPath string, _ types.Out return "", 0, errors.ErrUploadFailed("AliOSS", err) } - err = bucket.PutObjectFromFile(requestedPath, localFilePath) + err = bucket.PutObjectFromFile(storageFilepath, localFilepath) if err != nil { return "", 0, errors.ErrUploadFailed("AliOSS", err) } - return fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, u.conf.Endpoint, requestedPath), stat.Size(), nil + return fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, u.conf.Endpoint, storageFilepath), stat.Size(), nil } diff --git a/pkg/pipeline/sink/uploader/azure.go b/pkg/pipeline/sink/uploader/azure.go index 130abd3a..5c3d9f95 100644 --- a/pkg/pipeline/sink/uploader/azure.go +++ b/pkg/pipeline/sink/uploader/azure.go @@ -19,27 +19,32 @@ import ( "fmt" "net/url" "os" + "path" "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/types" - "github.com/livekit/protocol/livekit" ) type AzureUploader struct { - conf *livekit.AzureBlobUpload + conf *config.AzureConfig + prefix string container string } -func newAzureUploader(conf *livekit.AzureBlobUpload) (uploader, error) { +func newAzureUploader(conf *config.AzureConfig, prefix string) (uploader, error) { return &AzureUploader{ conf: conf, + prefix: prefix, container: fmt.Sprintf("https://%s.blob.core.windows.net/%s", conf.AccountName, conf.ContainerName), }, nil } func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) { + storageFilepath = path.Join(u.prefix, storageFilepath) + credential, err := azblob.NewSharedKeyCredential( u.conf.AccountName, u.conf.AccountKey, diff --git a/pkg/pipeline/sink/uploader/gcp.go b/pkg/pipeline/sink/uploader/gcp.go index 36cae501..64acc7fc 100644 --- a/pkg/pipeline/sink/uploader/gcp.go +++ b/pkg/pipeline/sink/uploader/gcp.go @@ -22,32 +22,35 @@ import ( "net/http" "net/url" "os" + "path" "cloud.google.com/go/storage" "github.com/googleapis/gax-go/v2" "golang.org/x/oauth2/google" "google.golang.org/api/option" + "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/types" - "github.com/livekit/protocol/livekit" ) const storageScope = "https://www.googleapis.com/auth/devstorage.read_write" type GCPUploader struct { - conf *livekit.GCPUpload + conf *config.GCPConfig + prefix string client *storage.Client } -func newGCPUploader(conf *livekit.GCPUpload) (uploader, error) { +func newGCPUploader(conf *config.GCPConfig, prefix string) (uploader, error) { u := &GCPUploader{ - conf: conf, + conf: conf, + prefix: prefix, } var opts []option.ClientOption - if conf.Credentials != "" { - jwtConfig, err := google.JWTConfigFromJSON([]byte(conf.Credentials), storageScope) + if conf.CredentialsJSON != "" { + jwtConfig, err := google.JWTConfigFromJSON([]byte(conf.CredentialsJSON), storageScope) if err != nil { return nil, err } @@ -57,14 +60,14 @@ func newGCPUploader(conf *livekit.GCPUpload) (uploader, error) { defaultTransport := http.DefaultTransport.(*http.Transport) transportClone := defaultTransport.Clone() - if conf.Proxy != nil { - proxyUrl, err := url.Parse(conf.Proxy.Url) + if conf.ProxyConfig != nil { + proxyUrl, err := url.Parse(conf.ProxyConfig.Url) if err != nil { return nil, err } defaultTransport.Proxy = http.ProxyURL(proxyUrl) - if conf.Proxy.Username != "" && conf.Proxy.Password != "" { - auth := fmt.Sprintf("%s:%s", conf.Proxy.Username, conf.Proxy.Password) + if conf.ProxyConfig.Username != "" && conf.ProxyConfig.Password != "" { + auth := fmt.Sprintf("%s:%s", conf.ProxyConfig.Username, conf.ProxyConfig.Password) basicAuth := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth)) defaultTransport.ProxyConnectHeader = http.Header{} defaultTransport.ProxyConnectHeader.Add("Proxy-Authorization", basicAuth) @@ -83,6 +86,8 @@ func newGCPUploader(conf *livekit.GCPUpload) (uploader, error) { } func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) { + storageFilepath = path.Join(u.prefix, storageFilepath) + file, err := os.Open(localFilepath) if err != nil { return "", 0, errors.ErrUploadFailed("GCP", err) diff --git a/pkg/pipeline/sink/uploader/local.go b/pkg/pipeline/sink/uploader/local.go new file mode 100644 index 00000000..dc7d6750 --- /dev/null +++ b/pkg/pipeline/sink/uploader/local.go @@ -0,0 +1,64 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package uploader + +import ( + "io" + "os" + "path" + + "github.com/livekit/egress/pkg/types" +) + +type localUploader struct { + prefix string +} + +func newLocalUploader(prefix string) (*localUploader, error) { + return &localUploader{prefix: prefix}, nil +} + +func (u *localUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) { + storageFilepath = path.Join(u.prefix, storageFilepath) + + stat, err := os.Stat(localFilepath) + if err != nil { + return "", 0, err + } + + dir, _ := path.Split(storageFilepath) + if err = os.MkdirAll(dir, 0755); err != nil { + return "", 0, err + } + + local, err := os.Open(localFilepath) + if err != nil { + return "", 0, err + } + defer local.Close() + + storage, err := os.Create(storageFilepath) + if err != nil { + return "", 0, err + } + defer storage.Close() + + _, err = io.Copy(storage, local) + if err != nil { + return "", 0, err + } + + return storageFilepath, stat.Size(), nil +} diff --git a/pkg/pipeline/sink/uploader/s3.go b/pkg/pipeline/sink/uploader/s3.go index afbacc08..0f4490c3 100644 --- a/pkg/pipeline/sink/uploader/s3.go +++ b/pkg/pipeline/sink/uploader/s3.go @@ -21,6 +21,7 @@ import ( "net/http" "net/url" "os" + "path" "strings" "sync" @@ -44,12 +45,12 @@ const ( ) type S3Uploader struct { - mu sync.Mutex - conf *config.EgressS3Upload + conf *config.S3Config + prefix string awsConf *aws.Config } -func newS3Uploader(conf *config.EgressS3Upload) (uploader, error) { +func newS3Uploader(conf *config.S3Config, prefix string) (uploader, error) { opts := func(o *awsConfig.LoadOptions) error { if conf.Region != "" { o.Region = conf.Region @@ -75,15 +76,15 @@ func newS3Uploader(conf *config.EgressS3Upload) (uploader, error) { }) } - if conf.Proxy != nil { - proxyUrl, err := url.Parse(conf.Proxy.Url) + if conf.ProxyConfig != nil { + proxyUrl, err := url.Parse(conf.ProxyConfig.Url) if err != nil { return err } s3Transport := http.DefaultTransport.(*http.Transport).Clone() s3Transport.Proxy = http.ProxyURL(proxyUrl) - if conf.Proxy.Username != "" && conf.Proxy.Password != "" { - auth := fmt.Sprintf("%s:%s", conf.Proxy.Username, conf.Proxy.Password) + if conf.ProxyConfig.Username != "" && conf.ProxyConfig.Password != "" { + auth := fmt.Sprintf("%s:%s", conf.ProxyConfig.Username, conf.ProxyConfig.Password) basicAuth := "Basic " + base64.StdEncoding.EncodeToString([]byte(auth)) s3Transport.ProxyConnectHeader = http.Header{} s3Transport.ProxyConnectHeader.Add("Proxy-Authorization", basicAuth) @@ -109,6 +110,7 @@ func newS3Uploader(conf *config.EgressS3Upload) (uploader, error) { return &S3Uploader{ conf: conf, + prefix: prefix, awsConf: &awsConf, }, nil } @@ -131,6 +133,8 @@ func updateRegion(awsConf *aws.Config, bucket string) error { } func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) { + storageFilepath = path.Join(u.prefix, storageFilepath) + file, err := os.Open(localFilepath) if err != nil { return "", 0, errors.ErrUploadFailed("S3", err) diff --git a/pkg/pipeline/sink/uploader/uploader.go b/pkg/pipeline/sink/uploader/uploader.go index 7510ee94..d106a54e 100644 --- a/pkg/pipeline/sink/uploader/uploader.go +++ b/pkg/pipeline/sink/uploader/uploader.go @@ -16,13 +16,13 @@ package uploader import ( "os" - "path" "time" "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/stats" "github.com/livekit/egress/pkg/types" - "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "github.com/livekit/psrpc" ) const ( @@ -31,99 +31,81 @@ const ( maxDelay = time.Second * 5 ) -type Uploader interface { - Upload(string, string, types.OutputType, bool, string) (string, int64, error) -} - type uploader interface { upload(string, string, types.OutputType) (string, int64, error) } -func New(conf config.UploadConfig, backup string, monitor *stats.HandlerMonitor) (Uploader, error) { - var u uploader - var err error - - switch c := conf.(type) { - case *config.EgressS3Upload: - u, err = newS3Uploader(c) - case *livekit.S3Upload: - u, err = newS3Uploader(&config.EgressS3Upload{S3Upload: c}) - case *livekit.GCPUpload: - u, err = newGCPUploader(c) - case *livekit.AzureBlobUpload: - u, err = newAzureUploader(c) - case *livekit.AliOSSUpload: - u, err = newAliOSSUploader(c) - default: - return &localUploader{}, nil - } +type Uploader struct { + primary uploader + backup uploader + monitor *stats.HandlerMonitor +} + +func New(conf, backup *config.StorageConfig, monitor *stats.HandlerMonitor) (*Uploader, error) { + p, err := getUploader(conf) if err != nil { return nil, err } - remote := &remoteUploader{ - uploader: u, - backup: backup, - monitor: monitor, + u := &Uploader{ + primary: p, + monitor: monitor, } - return remote, nil -} + if backup != nil { + b, err := getUploader(backup) + if err != nil { + logger.Errorw("failed to create backup uploader", err) + } else { + u.backup = b + } + } -type remoteUploader struct { - uploader + return u, nil +} - backup string - monitor *stats.HandlerMonitor +func getUploader(conf *config.StorageConfig) (uploader, error) { + switch { + case conf == nil: + return newLocalUploader("") + case conf.S3 != nil: + return newS3Uploader(conf.S3, conf.PathPrefix) + case conf.GCP != nil: + return newGCPUploader(conf.GCP, conf.PathPrefix) + case conf.Azure != nil: + return newAzureUploader(conf.Azure, conf.PathPrefix) + case conf.AliOSS != nil: + return newAliOSSUploader(conf.AliOSS, conf.PathPrefix) + default: + return newLocalUploader(conf.PathPrefix) + } } -func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, fileType string) (string, int64, error) { +func (u *Uploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool) (string, int64, error) { start := time.Now() - location, size, uploadErr := u.upload(localFilepath, storageFilepath, outputType) + location, size, primaryErr := u.primary.upload(localFilepath, storageFilepath, outputType) elapsed := time.Since(start) - // success - if uploadErr == nil { - u.monitor.IncUploadCountSuccess(fileType, float64(elapsed.Milliseconds())) + if primaryErr == nil { + // success + u.monitor.IncUploadCountSuccess(string(outputType), float64(elapsed.Milliseconds())) if deleteAfterUpload { _ = os.Remove(localFilepath) } - return location, size, nil } - // failure - u.monitor.IncUploadCountFailure(fileType, float64(elapsed.Milliseconds())) - if u.backup != "" { - stat, err := os.Stat(localFilepath) - if err != nil { - return "", 0, err - } - - backupDir := path.Join(u.backup, path.Dir(storageFilepath)) - backupFileName := path.Base(storageFilepath) - if err = os.MkdirAll(backupDir, 0755); err != nil { - return "", 0, err - } - backupFilepath := path.Join(backupDir, backupFileName) - if err = os.Rename(localFilepath, backupFilepath); err != nil { - return "", 0, err + u.monitor.IncUploadCountFailure(string(outputType), float64(elapsed.Milliseconds())) + if u.backup != nil { + location, size, backupErr := u.backup.upload(localFilepath, storageFilepath, outputType) + if backupErr == nil { + u.monitor.IncBackupStorageWrites(string(outputType)) + return location, size, nil } - u.monitor.IncBackupStorageWrites(string(outputType)) - return backupFilepath, stat.Size(), nil - } - - return "", 0, uploadErr -} - -type localUploader struct{} - -func (u *localUploader) Upload(localFilepath, _ string, _ types.OutputType, _ bool, _ string) (string, int64, error) { - stat, err := os.Stat(localFilepath) - if err != nil { - return "", 0, err + return "", 0, psrpc.NewErrorf(psrpc.InvalidArgument, + "primary and backup uploads failed: %s\n%s", primaryErr.Error(), backupErr.Error()) } - return localFilepath, stat.Size(), nil + return "", 0, primaryErr } diff --git a/pkg/pipeline/sink/websocket.go b/pkg/pipeline/sink/websocket.go index 078344be..8f3d95c5 100644 --- a/pkg/pipeline/sink/websocket.go +++ b/pkg/pipeline/sink/websocket.go @@ -217,5 +217,3 @@ func (s *WebsocketSink) Close() error { return nil } - -func (s *WebsocketSink) Cleanup() {} diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index fb40f8a6..2d3b1213 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -17,7 +17,6 @@ package source import ( "context" "fmt" - "os" "path" "strings" "sync" @@ -493,14 +492,7 @@ func (s *SDKSource) createWriter( ) (*sdk.AppWriter, error) { var logFilename string if s.Debug.EnableProfiling { - if s.Debug.ToUploadConfig() == nil { - if err := os.MkdirAll(path.Join(s.Debug.PathPrefix, s.Info.EgressId), 0755); err != nil { - return nil, err - } - logFilename = path.Join(s.Debug.PathPrefix, s.Info.EgressId, fmt.Sprintf("%s.csv", track.ID())) - } else { - logFilename = path.Join(s.TmpDir, fmt.Sprintf("%s.csv", track.ID())) - } + logFilename = path.Join(s.TmpDir, fmt.Sprintf("%s.csv", track.ID())) } src, err := gst.NewElementWithName("appsrc", fmt.Sprintf("app_%s", track.ID())) diff --git a/pkg/server/server.go b/pkg/server/server.go index 9f9035a1..5ea333aa 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -96,13 +96,13 @@ func NewServer(conf *config.ServiceConfig, bus psrpc.MessageBus, ioClient info.I }() } - tmpDir := path.Join(os.TempDir(), s.conf.NodeID) - if err := os.MkdirAll(tmpDir, 0755); err != nil { + ipcSvcDir := path.Join(config.TmpDir, s.conf.NodeID) + if err = os.MkdirAll(ipcSvcDir, 0755); err != nil { return nil, err } ipc.RegisterEgressServiceServer(s.ipcServiceServer, s) - if err := ipc.StartServiceListener(s.ipcServiceServer, tmpDir); err != nil { + if err := ipc.StartServiceListener(s.ipcServiceServer, ipcSvcDir); err != nil { return nil, err } diff --git a/pkg/server/server_rpc.go b/pkg/server/server_rpc.go index 6c91009d..f068bd6d 100644 --- a/pkg/server/server_rpc.go +++ b/pkg/server/server_rpc.go @@ -95,7 +95,7 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress p := &config.PipelineConfig{ BaseConfig: s.conf.BaseConfig, HandlerID: handlerID, - TmpDir: path.Join(os.TempDir(), handlerID), + TmpDir: path.Join(config.TmpDir, req.EgressId), } confString, err := yaml.Marshal(p) @@ -124,7 +124,7 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress cmd.Stderr = os.Stderr cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true} - if err = s.Launch(context.Background(), handlerID, req, info, cmd, p.TmpDir); err != nil { + if err = s.Launch(context.Background(), handlerID, req, info, cmd); err != nil { s.processEnded(req, info, err) } else { s.monitor.UpdatePID(info.EgressId, cmd.Process.Pid) diff --git a/pkg/service/process.go b/pkg/service/process.go index 25c4acf7..5f0e4476 100644 --- a/pkg/service/process.go +++ b/pkg/service/process.go @@ -17,7 +17,9 @@ package service import ( "context" "net/http" + "os" "os/exec" + "path" "sync" "syscall" "time" @@ -26,6 +28,7 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/ipc" "github.com/livekit/protocol/livekit" @@ -52,9 +55,13 @@ func (pm *ProcessManager) Launch( req *rpc.StartEgressRequest, info *livekit.EgressInfo, cmd *exec.Cmd, - tmpDir string, ) error { - ipcClient, err := ipc.NewHandlerClient(tmpDir) + ipcHandlerDir := path.Join(config.TmpDir, handlerID) + if err := os.MkdirAll(ipcHandlerDir, 0755); err != nil { + return err + } + + ipcClient, err := ipc.NewHandlerClient(ipcHandlerDir) if err != nil { return err } diff --git a/test/download.go b/test/download.go index d20f22c0..0d82b927 100644 --- a/test/download.go +++ b/test/download.go @@ -36,27 +36,25 @@ import ( "google.golang.org/api/option" "github.com/livekit/egress/pkg/config" - "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" ) -func download(t *testing.T, uploadParams interface{}, localFilepath, storageFilepath string) { - switch u := uploadParams.(type) { - case *config.EgressS3Upload: - logger.Debugw("s3 download", "localFilepath", localFilepath, "storageFilepath", storageFilepath) - downloadS3(t, u, localFilepath, storageFilepath) - - case *livekit.GCPUpload: - logger.Debugw("gcp download", "localFilepath", localFilepath, "storageFilepath", storageFilepath) - downloadGCP(t, u, localFilepath, storageFilepath) - - case *livekit.AzureBlobUpload: - logger.Debugw("azure download", "localFilepath", localFilepath, "storageFilepath", storageFilepath) - downloadAzure(t, u, localFilepath, storageFilepath) +func download(t *testing.T, c *config.StorageConfig, localFilepath, storageFilepath string) { + if c != nil { + if c.S3 != nil { + logger.Debugw("s3 download", "localFilepath", localFilepath, "storageFilepath", storageFilepath) + downloadS3(t, c.S3, localFilepath, storageFilepath) + } else if c.GCP != nil { + logger.Debugw("gcp download", "localFilepath", localFilepath, "storageFilepath", storageFilepath) + downloadGCP(t, c.GCP, localFilepath, storageFilepath) + } else if c.Azure != nil { + logger.Debugw("azure download", "localFilepath", localFilepath, "storageFilepath", storageFilepath) + downloadAzure(t, c.Azure, localFilepath, storageFilepath) + } } } -func downloadS3(t *testing.T, conf *config.EgressS3Upload, localFilepath, storageFilepath string) { +func downloadS3(t *testing.T, conf *config.S3Config, localFilepath, storageFilepath string) { file, err := os.Create(localFilepath) require.NoError(t, err) defer file.Close() @@ -93,7 +91,7 @@ func downloadS3(t *testing.T, conf *config.EgressS3Upload, localFilepath, storag require.NoError(t, err) } -func downloadAzure(t *testing.T, conf *livekit.AzureBlobUpload, localFilepath, storageFilepath string) { +func downloadAzure(t *testing.T, conf *config.AzureConfig, localFilepath, storageFilepath string) { credential, err := azblob.NewSharedKeyCredential( conf.AccountName, conf.AccountKey, @@ -131,13 +129,13 @@ func downloadAzure(t *testing.T, conf *livekit.AzureBlobUpload, localFilepath, s require.NoError(t, err) } -func downloadGCP(t *testing.T, conf *livekit.GCPUpload, localFilepath, storageFilepath string) { +func downloadGCP(t *testing.T, conf *config.GCPConfig, localFilepath, storageFilepath string) { ctx := context.Background() var client *storage.Client var err error - if conf.Credentials != "" { - client, err = storage.NewClient(ctx, option.WithCredentialsJSON([]byte(conf.Credentials))) + if conf.CredentialsJSON != "" { + client, err = storage.NewClient(ctx, option.WithCredentialsJSON([]byte(conf.CredentialsJSON))) } else { client, err = storage.NewClient(ctx) } diff --git a/test/ffprobe.go b/test/ffprobe.go index f5541242..7f5a3577 100644 --- a/test/ffprobe.go +++ b/test/ffprobe.go @@ -118,7 +118,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre case <-time.After(time.Second * 15): t.Fatal("no response from ffprobe") case <-done: - require.NoError(t, err, "input %s does not exist", in) + require.NoError(t, err, "ffprobe failed for input %s", in) } switch egressType { diff --git a/test/file.go b/test/file.go index 04f1cb66..9e350daf 100644 --- a/test/file.go +++ b/test/file.go @@ -264,11 +264,9 @@ func (r *Runner) verifyFile(t *testing.T, p *config.PipelineConfig, res *livekit storageFilename := path.Base(storagePath) // download from cloud storage - if uploadConfig := p.GetFileConfig().UploadConfig; uploadConfig != nil { - localPath = path.Join(r.FilePrefix, storageFilename) - download(t, uploadConfig, localPath, storagePath) - download(t, uploadConfig, localPath+".json", storagePath+".json") - } + localPath = path.Join(r.FilePrefix, storageFilename) + download(t, p.GetFileConfig().StorageConfig, localPath, storagePath) + download(t, p.GetFileConfig().StorageConfig, localPath+".json", storagePath+".json") // verify verify(t, localPath, p, res, types.EgressTypeFile, r.Muting, r.sourceFramerate, false) diff --git a/test/images.go b/test/images.go index 960cbaaa..9c4202e9 100644 --- a/test/images.go +++ b/test/images.go @@ -111,11 +111,9 @@ func (r *Runner) verifyImages(t *testing.T, p *config.PipelineConfig, res *livek require.Greater(t, images.ImageCount, int64(0)) imageConfig := p.GetImageConfigs()[0] - if uploadConfig := imageConfig.UploadConfig; uploadConfig != nil { - for i := range images.ImageCount { - storagePath := fmt.Sprintf("%s_%05d%s", images.FilenamePrefix, i, imageConfig.ImageExtension) - localPath := path.Join(r.FilePrefix, path.Base(storagePath)) - download(t, uploadConfig, localPath, storagePath) - } + for i := range images.ImageCount { + storagePath := fmt.Sprintf("%s_%05d%s", images.FilenamePrefix, i, imageConfig.ImageExtension) + localPath := path.Join(r.FilePrefix, path.Base(storagePath)) + download(t, imageConfig.StorageConfig, localPath, storagePath) } } diff --git a/test/segments.go b/test/segments.go index 81092113..f9f5a587 100644 --- a/test/segments.go +++ b/test/segments.go @@ -204,9 +204,7 @@ func (r *Runner) verifySegments(t *testing.T, p *config.PipelineConfig, filename func (r *Runner) verifyManifest(t *testing.T, p *config.PipelineConfig, plName string) { localPlaylistPath := path.Join(r.FilePrefix, path.Base(plName)) - if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil { - download(t, uploadConfig, localPlaylistPath+".json", plName+".json") - } + download(t, p.GetSegmentConfig().StorageConfig, localPlaylistPath+".json", plName+".json") } func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, filenameSuffix livekit.SegmentedFileSuffix, plName string, plLocation string, segmentCount int, res *livekit.EgressInfo, plType m3u8.PlaylistType) { @@ -217,17 +215,15 @@ func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, fil localPlaylistPath := plName // download from cloud storage - if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil { - localPlaylistPath = path.Join(r.FilePrefix, path.Base(storedPlaylistPath)) - download(t, uploadConfig, localPlaylistPath, storedPlaylistPath) - if plType == m3u8.PlaylistTypeEvent { - // Only download segments once - base := storedPlaylistPath[:len(storedPlaylistPath)-5] - for i := 0; i < segmentCount; i++ { - cloudPath := fmt.Sprintf("%s_%05d.ts", base, i) - localPath := path.Join(r.FilePrefix, path.Base(cloudPath)) - download(t, uploadConfig, localPath, cloudPath) - } + localPlaylistPath = path.Join(r.FilePrefix, path.Base(storedPlaylistPath)) + download(t, p.GetSegmentConfig().StorageConfig, localPlaylistPath, storedPlaylistPath) + if plType == m3u8.PlaylistTypeEvent { + // Only download segments once + base := storedPlaylistPath[:len(storedPlaylistPath)-5] + for i := 0; i < segmentCount; i++ { + cloudPath := fmt.Sprintf("%s_%05d.ts", base, i) + localPath := path.Join(r.FilePrefix, path.Base(cloudPath)) + download(t, p.GetSegmentConfig().StorageConfig, localPath, cloudPath) } } @@ -317,7 +313,7 @@ func readPlaylist(filename string) (*Playlist, error) { Segments: make([]*Segment, 0), } - for i := segmentLineStart; i < len(lines)-3; i += 3 { + for i = segmentLineStart; i < len(lines)-3; i += 3 { startTime, _ := time.Parse("2006-01-02T15:04:05.999Z07:00", strings.SplitN(lines[i], ":", 2)[1]) durStr := strings.Split(lines[i+1], ":")[1] durStr = durStr[:len(durStr)-1] // remove trailing comma