Skip to content

Commit

Permalink
backup storage options (#792)
Browse files Browse the repository at this point in the history
* backup storage options

* fix paths

* move tmpDir to pipeline config creation

* tmpDir overloaded

* move ipc svc back to os tmp

* final answer

* remove extra slashes
  • Loading branch information
frostbyte73 authored Oct 16, 2024
1 parent cedb6d8 commit d0629ce
Show file tree
Hide file tree
Showing 34 changed files with 450 additions and 539 deletions.
1 change: 1 addition & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func runHandler(c *cli.Context) error {
if err != nil {
return err
}
defer os.RemoveAll(conf.TmpDir)
_ = os.Setenv("TMPDIR", conf.TmpDir)

rc, err := lkredis.GetRedisClient(conf.Redis)
Expand Down
65 changes: 11 additions & 54 deletions pkg/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
lksdk "github.com/livekit/server-sdk-go/v2"
)

const TmpDir = "/home/egress/tmp"

type BaseConfig struct {
NodeID string // do not supply - will be overwritten

Expand All @@ -34,66 +36,26 @@ type BaseConfig struct {
WsUrl string `yaml:"ws_url"` // (env LIVEKIT_WS_URL)

// optional
Logging *logger.Config `yaml:"logging"` // logging config
TemplateBase string `yaml:"template_base"` // custom template base url
BackupStorage string `yaml:"backup_storage"` // backup file location for failed uploads
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to
EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration
MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes
StorageConfig `yaml:",inline"` // upload config (S3, Azure, GCP, or AliOSS)
SessionLimits `yaml:"session_limits"` // session duration limits
Logging *logger.Config `yaml:"logging"` // logging config
TemplateBase string `yaml:"template_base"` // custom template base url
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to
EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration
MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes

SessionLimits `yaml:"session_limits"` // session duration limits
StorageConfig *StorageConfig `yaml:"storage,omitempty"` // storage config
BackupConfig *StorageConfig `yaml:"backup,omitempty"` // backup config, for storage failures

// dev/debugging
Insecure bool `yaml:"insecure"` // allow chrome to connect to an insecure websocket
Debug DebugConfig `yaml:"debug"` // create dot file on internal error

// deprecated
LogLevel string `yaml:"log_level"` // Use Logging instead
}

type DebugConfig struct {
EnableProfiling bool `yaml:"enable_profiling"` // create dot file and pprof on internal error
PathPrefix string `yaml:"path_prefix"` // filepath prefix for uploads
StorageConfig `yaml:",inline"` // upload config (S3, Azure, GCP, or AliOSS)
}

type StorageConfig struct {
S3 *S3Config `yaml:"s3"`
Azure *AzureConfig `yaml:"azure"`
GCP *GCPConfig `yaml:"gcp"`
AliOSS *S3Config `yaml:"alioss"`
}

type S3Config struct {
AccessKey string `yaml:"access_key"` // (env AWS_ACCESS_KEY_ID)
Secret string `yaml:"secret"` // (env AWS_SECRET_ACCESS_KEY)
SessionToken string `yaml:"session_token"` // (env AWS_SESSION_TOKEN)
Region string `yaml:"region"` // (env AWS_DEFAULT_REGION)
Endpoint string `yaml:"endpoint"`
Bucket string `yaml:"bucket"`
ForcePathStyle bool `yaml:"force_path_style"`
ProxyConfig *ProxyConfig `yaml:"proxy_config"`
MaxRetries int `yaml:"max_retries"`
MaxRetryDelay time.Duration `yaml:"max_retry_delay"`
MinRetryDelay time.Duration `yaml:"min_retry_delay"`
AwsLogLevel string `yaml:"aws_log_level"`

// deprecated
Proxy string `yaml:"proxy"` // use ProxyConfig instead
}

type AzureConfig struct {
AccountName string `yaml:"account_name"` // (env AZURE_STORAGE_ACCOUNT)
AccountKey string `yaml:"account_key"` // (env AZURE_STORAGE_KEY)
ContainerName string `yaml:"container_name"`
}

type GCPConfig struct {
CredentialsJSON string `yaml:"credentials_json"` // (env GOOGLE_APPLICATION_CREDENTIALS)
Bucket string `yaml:"bucket"`
ProxyConfig *ProxyConfig `yaml:"proxy_config"`
}

type ProxyConfig struct {
Url string `yaml:"url"`
Username string `yaml:"username"`
Expand All @@ -108,11 +70,6 @@ type SessionLimits struct {
}

func (c *BaseConfig) initLogger(values ...interface{}) error {
if c.LogLevel != "" {
logger.Warnw("log_level deprecated. use logging instead", nil)
c.Logging.Level = c.LogLevel
}

var gstDebug []string
switch c.Logging.Level {
case "debug":
Expand Down
29 changes: 5 additions & 24 deletions pkg/config/output_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package config

import (
"fmt"
"os"
"path"
"strings"
"time"
Expand All @@ -34,7 +33,7 @@ type FileConfig struct {
StorageFilepath string

DisableManifest bool
UploadConfig UploadConfig
StorageConfig *StorageConfig
}

func (p *PipelineConfig) GetFileConfig() *FileConfig {
Expand Down Expand Up @@ -76,7 +75,7 @@ func (p *PipelineConfig) getFileConfig(outputType types.OutputType, req fileRequ
FileInfo: &livekit.FileInfo{},
StorageFilepath: clean(req.GetFilepath()),
DisableManifest: req.GetDisableManifest(),
UploadConfig: p.getUploadConfig(req),
StorageConfig: p.getStorageConfig(req),
}

// filename
Expand Down Expand Up @@ -137,28 +136,10 @@ func (o *FileConfig) updateFilepath(p *PipelineConfig, identifier string, replac
o.FileInfo.Filename = o.StorageFilepath

// get local filepath
dir, filename := path.Split(o.StorageFilepath)
if o.UploadConfig == nil {
if dir != "" {
// create local directory
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
}
// write directly to requested location
o.LocalFilepath = o.StorageFilepath
} else {
// prepend the configuration base directory and the egress Id
tempDir := path.Join(TmpDir, p.Info.EgressId)
_, filename := path.Split(o.StorageFilepath)

// create temporary directory
if err := os.MkdirAll(tempDir, 0755); err != nil {
return err
}

// write to tmp dir
o.LocalFilepath = path.Join(tempDir, filename)
}
// write to tmp dir
o.LocalFilepath = path.Join(p.TmpDir, filename)

return nil
}
Expand Down
26 changes: 9 additions & 17 deletions pkg/config/output_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type ImageConfig struct {
ImageExtension types.FileExtension

DisableManifest bool
UploadConfig UploadConfig
StorageConfig *StorageConfig

CaptureInterval uint32
Width int32
Expand Down Expand Up @@ -77,7 +77,7 @@ func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConf
ImagePrefix: filenamePrefix,
ImageSuffix: images.FilenameSuffix,
DisableManifest: images.DisableManifest,
UploadConfig: p.getUploadConfig(images),
StorageConfig: p.getStorageConfig(images),
CaptureInterval: images.CaptureInterval,
Width: images.Width,
Height: images.Height,
Expand Down Expand Up @@ -128,25 +128,17 @@ func (o *ImageConfig) updatePrefix(p *PipelineConfig) error {
// update config
o.ImagePrefix = imagesPrefix

if o.UploadConfig == nil {
o.LocalDir = imagesDir
} else {
// Prepend the configuration base directory and the egress Id, and slug to prevent conflict if
// there is more than one image output
// os.ModeDir creates a directory with mode 000 when mapping the directory outside the container
// Append a "/" to the path for consistency with the "UploadConfig == nil" case
o.LocalDir = path.Join(TmpDir, p.Info.EgressId, o.Id) + "/"
}

// create local directories
if o.LocalDir != "" {
if err := os.MkdirAll(o.LocalDir, 0755); err != nil {
return err
}
// Prepend the configuration base directory and the egress Id, and slug to prevent conflict if
// there is more than one image output
// os.ModeDir creates a directory with mode 000 when mapping the directory outside the container
o.LocalDir = path.Join(p.TmpDir, o.Id)
if err := os.MkdirAll(o.LocalDir, 0755); err != nil {
return err
}

return nil
}

func getMimeTypes(imageCodec livekit.ImageCodec) (types.MimeType, types.OutputType, error) {
switch imageCodec {
case livekit.ImageCodec_IC_DEFAULT, livekit.ImageCodec_IC_JPEG:
Expand Down
49 changes: 19 additions & 30 deletions pkg/config/output_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type SegmentConfig struct {
SegmentDuration int

DisableManifest bool
UploadConfig UploadConfig
StorageConfig *StorageConfig
}

func (p *PipelineConfig) GetSegmentConfig() *SegmentConfig {
Expand All @@ -60,7 +60,7 @@ func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput)
LivePlaylistFilename: clean(segments.LivePlaylistName),
SegmentDuration: int(segments.SegmentDuration),
DisableManifest: segments.DisableManifest,
UploadConfig: p.getUploadConfig(segments),
StorageConfig: p.getStorageConfig(segments),
}

if conf.SegmentDuration == 0 {
Expand Down Expand Up @@ -105,7 +105,7 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error {

playlistDir, playlistName := path.Split(o.PlaylistFilename)
livePlaylistDir, livePlaylistName := path.Split(o.LivePlaylistFilename)
fileDir, filePrefix := path.Split(o.SegmentPrefix)
segmentDir, segmentPrefix := path.Split(o.SegmentPrefix)

// force live playlist to be in the same directory as the main playlist
if livePlaylistDir != "" && livePlaylistDir != playlistDir {
Expand All @@ -116,30 +116,30 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error {
playlistName = removeKnownExtension(playlistName)
livePlaylistName = removeKnownExtension(livePlaylistName)

// only keep fileDir if it is a subdirectory of playlistDir
if fileDir != "" {
if playlistDir == fileDir {
fileDir = ""
// only keep segmentDir if it is a subdirectory of playlistDir
if segmentDir != "" {
if playlistDir == segmentDir {
segmentDir = ""
} else if playlistDir == "" {
playlistDir = fileDir
fileDir = ""
playlistDir = segmentDir
segmentDir = ""
}
}
o.StorageDir = playlistDir

// ensure playlistName
if playlistName == "" {
if filePrefix != "" {
playlistName = filePrefix
if segmentPrefix != "" {
playlistName = segmentPrefix
} else {
playlistName = fmt.Sprintf("%s-%s", identifier, time.Now().Format("2006-01-02T150405"))
}
}
// live playlist disabled by default

// ensure filePrefix
if filePrefix == "" {
filePrefix = playlistName
if segmentPrefix == "" {
segmentPrefix = playlistName
}

// update config
Expand All @@ -148,28 +148,17 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error {
if livePlaylistName != "" {
o.LivePlaylistFilename = fmt.Sprintf("%s%s", livePlaylistName, ext)
}
o.SegmentPrefix = fmt.Sprintf("%s%s", fileDir, filePrefix)
o.SegmentPrefix = fmt.Sprintf("%s%s", segmentDir, segmentPrefix)

if o.PlaylistFilename == o.LivePlaylistFilename {
return errors.ErrInvalidInput("live_playlist_name cannot be identical to playlist_name")
}

if o.UploadConfig == nil {
o.LocalDir = playlistDir
} else {
// Prepend the configuration base directory and the egress Id
// os.ModeDir creates a directory with mode 000 when mapping the directory outside the container
// Append a "/" to the path for consistency with the "UploadConfig == nil" case
o.LocalDir = path.Join(TmpDir, p.Info.EgressId) + "/"
}

// create local directories
if fileDir != "" {
if err := os.MkdirAll(path.Join(o.LocalDir, fileDir), 0755); err != nil {
return err
}
} else if o.LocalDir != "" {
if err := os.MkdirAll(o.LocalDir, 0755); err != nil {
// Prepend the configuration base directory and the egress Id
// os.ModeDir creates a directory with mode 000 when mapping the directory outside the container
o.LocalDir = p.TmpDir
if segmentDir != "" {
if err := os.MkdirAll(path.Join(o.LocalDir, segmentDir), 0755); err != nil {
return err
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package config
import (
"context"
"net/url"
"path"
"strings"
"time"

Expand Down Expand Up @@ -148,6 +149,7 @@ func GetValidatedPipelineConfig(conf *ServiceConfig, req *rpc.StartEgressRequest

p := &PipelineConfig{
BaseConfig: conf.BaseConfig,
TmpDir: path.Join(TmpDir, req.EgressId),
Outputs: make(map[types.EgressType][]OutputConfig),
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
)

const (
TmpDir = "/home/egress/tmp"

roomCompositeCpuCost = 4
audioRoomCompositeCpuCost = 1
webCpuCost = 4
Expand Down
Loading

0 comments on commit d0629ce

Please sign in to comment.