diff --git a/pkg/config/base.go b/pkg/config/base.go index df7718be..b602c132 100644 --- a/pkg/config/base.go +++ b/pkg/config/base.go @@ -36,11 +36,12 @@ type BaseConfig struct { WsUrl string `yaml:"ws_url"` // (env LIVEKIT_WS_URL) // optional - Logging *logger.Config `yaml:"logging"` // logging config - TemplateBase string `yaml:"template_base"` // custom template base url - ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to - EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration - MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes + Logging *logger.Config `yaml:"logging"` // logging config + TemplateBase string `yaml:"template_base"` // custom template base url + ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to + EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration + MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes + DisallowLocalStorage bool `yaml:"disallow_local_storage"` // require an upload config for all requests SessionLimits `yaml:"session_limits"` // session duration limits StorageConfig *StorageConfig `yaml:"storage,omitempty"` // storage config diff --git a/pkg/config/output_file.go b/pkg/config/output_file.go index d09bc0c0..d515b795 100644 --- a/pkg/config/output_file.go +++ b/pkg/config/output_file.go @@ -70,12 +70,17 @@ type fileRequest interface { } func (p *PipelineConfig) getFileConfig(outputType types.OutputType, req fileRequest) (*FileConfig, error) { + sc, err := p.getStorageConfig(req) + if err != nil { + return nil, err + } + conf := &FileConfig{ outputConfig: outputConfig{OutputType: outputType}, FileInfo: &livekit.FileInfo{}, StorageFilepath: clean(req.GetFilepath()), DisableManifest: req.GetDisableManifest(), - StorageConfig: p.getStorageConfig(req), + StorageConfig: sc, } // filename diff --git a/pkg/config/output_image.go b/pkg/config/output_image.go index 3a82b9ff..28649b25 100644 --- a/pkg/config/output_image.go +++ b/pkg/config/output_image.go @@ -64,6 +64,11 @@ func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConf return nil, err } + sc, err := p.getStorageConfig(images) + if err != nil { + return nil, err + } + filenamePrefix := clean(images.FilenamePrefix) conf := &ImageConfig{ outputConfig: outputConfig{ @@ -77,7 +82,7 @@ func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConf ImagePrefix: filenamePrefix, ImageSuffix: images.FilenameSuffix, DisableManifest: images.DisableManifest, - StorageConfig: p.getStorageConfig(images), + StorageConfig: sc, CaptureInterval: images.CaptureInterval, Width: images.Width, Height: images.Height, diff --git a/pkg/config/output_segment.go b/pkg/config/output_segment.go index f2f79a5a..27d11620 100644 --- a/pkg/config/output_segment.go +++ b/pkg/config/output_segment.go @@ -52,6 +52,11 @@ func (p *PipelineConfig) GetSegmentConfig() *SegmentConfig { // segments should always be added last, so we can check keyframe interval from file/stream func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput) (*SegmentConfig, error) { + sc, err := p.getStorageConfig(segments) + if err != nil { + return nil, err + } + conf := &SegmentConfig{ SegmentsInfo: &livekit.SegmentsInfo{}, SegmentPrefix: clean(segments.FilenamePrefix), @@ -60,7 +65,7 @@ func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput) LivePlaylistFilename: clean(segments.LivePlaylistName), SegmentDuration: int(segments.SegmentDuration), DisableManifest: segments.DisableManifest, - StorageConfig: p.getStorageConfig(segments), + StorageConfig: sc, } if conf.SegmentDuration == 0 { @@ -74,8 +79,7 @@ func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput) } // filename - err := conf.updatePrefixAndPlaylist(p) - if err != nil { + if err = conf.updatePrefixAndPlaylist(p); err != nil { return nil, err } diff --git a/pkg/config/storage.go b/pkg/config/storage.go index d96590ab..1541b291 100644 --- a/pkg/config/storage.go +++ b/pkg/config/storage.go @@ -17,6 +17,7 @@ package config import ( "time" + "github.com/livekit/egress/pkg/errors" "github.com/livekit/protocol/egress" ) @@ -61,7 +62,7 @@ type GCPConfig struct { ProxyConfig *ProxyConfig `yaml:"proxy_config"` } -func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConfig { +func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) (*StorageConfig, error) { sc := &StorageConfig{} if p.StorageConfig != nil { sc.PathPrefix = p.StorageConfig.PathPrefix @@ -102,7 +103,7 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf if sc.S3.MinRetryDelay == 0 { sc.S3.MinRetryDelay = time.Millisecond * 100 } - return sc + return sc, nil } if gcp := req.GetGcp(); gcp != nil { @@ -117,7 +118,7 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf Password: gcp.Proxy.Password, } } - return sc + return sc, nil } if azure := req.GetAzure(); azure != nil { @@ -126,7 +127,7 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf AccountKey: azure.AccountKey, ContainerName: azure.ContainerName, } - return sc + return sc, nil } if ali := req.GetAliOSS(); ali != nil { @@ -137,8 +138,17 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf Endpoint: ali.Endpoint, Bucket: ali.Bucket, } - return sc + return sc, nil } - return p.StorageConfig + sc = p.StorageConfig + if p.DisallowLocalStorage && (sc == nil || sc.IsLocal()) { + return nil, errors.ErrInvalidInput("output") + } + + return sc, nil +} + +func (c *StorageConfig) IsLocal() bool { + return c.S3 == nil && c.GCP == nil && c.Azure == nil && c.AliOSS == nil } diff --git a/pkg/server/server_rpc.go b/pkg/server/server_rpc.go index f068bd6d..9c737f78 100644 --- a/pkg/server/server_rpc.go +++ b/pkg/server/server_rpc.go @@ -73,19 +73,30 @@ func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) ( "request", p.Info.Request, ) - errChan := s.ioClient.CreateEgress(ctx, (*livekit.EgressInfo)(p.Info)) - s.launchProcess(req, (*livekit.EgressInfo)(p.Info)) - if err = <-errChan; err != nil { - s.AbortProcess(req.EgressId, err) - s.monitor.EgressAborted(req) - s.activeRequests.Dec() - return nil, err + info := (*livekit.EgressInfo)(p.Info) + + errChan := s.ioClient.CreateEgress(ctx, info) + launchErr := s.launchProcess(req, info) + createErr := <-errChan + + if launchErr != nil { + if createErr == nil { + // send failed update if it was saved to db + s.processEnded(req, info, launchErr) + } + return nil, launchErr + } else if createErr != nil { + // launched but failed to save - abort and return error + info.Error = createErr.Error() + info.ErrorCode = int32(http.StatusInternalServerError) + s.AbortProcess(req.EgressId, createErr) + return nil, createErr } return (*livekit.EgressInfo)(p.Info), nil } -func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.EgressInfo) { +func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.EgressInfo) error { _, span := tracer.Start(context.Background(), "Service.launchProcess") defer span.End() @@ -102,16 +113,14 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress if err != nil { span.RecordError(err) logger.Errorw("could not marshal config", err) - s.processEnded(req, info, err) - return + return err } reqString, err := protojson.Marshal(req) if err != nil { span.RecordError(err) logger.Errorw("could not marshal request", err) - s.processEnded(req, info, err) - return + return err } cmd := exec.Command("egress", @@ -125,13 +134,14 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true} if err = s.Launch(context.Background(), handlerID, req, info, cmd); err != nil { - s.processEnded(req, info, err) + return err } else { s.monitor.UpdatePID(info.EgressId, cmd.Process.Pid) go func() { err = cmd.Wait() s.processEnded(req, info, err) }() + return nil } } @@ -145,8 +155,8 @@ func (s *Server) processEnded(req *rpc.StartEgressRequest, info *livekit.EgressI info.Error = "internal error" info.ErrorCode = int32(http.StatusInternalServerError) _ = s.ioClient.UpdateEgress(context.Background(), info) - logger.Errorw("process failed, shutting down", err) - s.Shutdown(false, false) + + logger.Errorw("process failed", err) } avgCPU, maxCPU := s.monitor.EgressEnded(req)