Skip to content

Commit

Permalink
add disallow-local config option
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Oct 23, 2024
1 parent 1f35c4b commit ffdc501
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 31 deletions.
11 changes: 6 additions & 5 deletions pkg/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ type BaseConfig struct {
WsUrl string `yaml:"ws_url"` // (env LIVEKIT_WS_URL)

// optional
Logging *logger.Config `yaml:"logging"` // logging config
TemplateBase string `yaml:"template_base"` // custom template base url
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to
EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration
MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes
Logging *logger.Config `yaml:"logging"` // logging config
TemplateBase string `yaml:"template_base"` // custom template base url
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to
EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration
MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes
DisallowLocalStorage bool `yaml:"disallow_local_storage"` // require an upload config for all requests

SessionLimits `yaml:"session_limits"` // session duration limits
StorageConfig *StorageConfig `yaml:"storage,omitempty"` // storage config
Expand Down
7 changes: 6 additions & 1 deletion pkg/config/output_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,17 @@ type fileRequest interface {
}

func (p *PipelineConfig) getFileConfig(outputType types.OutputType, req fileRequest) (*FileConfig, error) {
sc, err := p.getStorageConfig(req)
if err != nil {
return nil, err
}

conf := &FileConfig{
outputConfig: outputConfig{OutputType: outputType},
FileInfo: &livekit.FileInfo{},
StorageFilepath: clean(req.GetFilepath()),
DisableManifest: req.GetDisableManifest(),
StorageConfig: p.getStorageConfig(req),
StorageConfig: sc,
}

// filename
Expand Down
7 changes: 6 additions & 1 deletion pkg/config/output_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConf
return nil, err
}

sc, err := p.getStorageConfig(images)
if err != nil {
return nil, err
}

filenamePrefix := clean(images.FilenamePrefix)
conf := &ImageConfig{
outputConfig: outputConfig{
Expand All @@ -77,7 +82,7 @@ func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConf
ImagePrefix: filenamePrefix,
ImageSuffix: images.FilenameSuffix,
DisableManifest: images.DisableManifest,
StorageConfig: p.getStorageConfig(images),
StorageConfig: sc,
CaptureInterval: images.CaptureInterval,
Width: images.Width,
Height: images.Height,
Expand Down
10 changes: 7 additions & 3 deletions pkg/config/output_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func (p *PipelineConfig) GetSegmentConfig() *SegmentConfig {

// segments should always be added last, so we can check keyframe interval from file/stream
func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput) (*SegmentConfig, error) {
sc, err := p.getStorageConfig(segments)
if err != nil {
return nil, err
}

conf := &SegmentConfig{
SegmentsInfo: &livekit.SegmentsInfo{},
SegmentPrefix: clean(segments.FilenamePrefix),
Expand All @@ -60,7 +65,7 @@ func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput)
LivePlaylistFilename: clean(segments.LivePlaylistName),
SegmentDuration: int(segments.SegmentDuration),
DisableManifest: segments.DisableManifest,
StorageConfig: p.getStorageConfig(segments),
StorageConfig: sc,
}

if conf.SegmentDuration == 0 {
Expand All @@ -74,8 +79,7 @@ func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput)
}

// filename
err := conf.updatePrefixAndPlaylist(p)
if err != nil {
if err = conf.updatePrefixAndPlaylist(p); err != nil {
return nil, err
}

Expand Down
22 changes: 16 additions & 6 deletions pkg/config/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package config
import (
"time"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/protocol/egress"
)

Expand Down Expand Up @@ -61,7 +62,7 @@ type GCPConfig struct {
ProxyConfig *ProxyConfig `yaml:"proxy_config"`
}

func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConfig {
func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) (*StorageConfig, error) {
sc := &StorageConfig{}
if p.StorageConfig != nil {
sc.PathPrefix = p.StorageConfig.PathPrefix
Expand Down Expand Up @@ -102,7 +103,7 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf
if sc.S3.MinRetryDelay == 0 {
sc.S3.MinRetryDelay = time.Millisecond * 100
}
return sc
return sc, nil
}

if gcp := req.GetGcp(); gcp != nil {
Expand All @@ -117,7 +118,7 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf
Password: gcp.Proxy.Password,
}
}
return sc
return sc, nil
}

if azure := req.GetAzure(); azure != nil {
Expand All @@ -126,7 +127,7 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf
AccountKey: azure.AccountKey,
ContainerName: azure.ContainerName,
}
return sc
return sc, nil
}

if ali := req.GetAliOSS(); ali != nil {
Expand All @@ -137,8 +138,17 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf
Endpoint: ali.Endpoint,
Bucket: ali.Bucket,
}
return sc
return sc, nil
}

return p.StorageConfig
sc = p.StorageConfig
if p.DisallowLocalStorage && (sc == nil || sc.IsLocal()) {
return nil, errors.ErrInvalidInput("output")
}

return sc, nil
}

func (c *StorageConfig) IsLocal() bool {
return c.S3 == nil && c.GCP == nil && c.Azure == nil && c.AliOSS == nil
}
40 changes: 25 additions & 15 deletions pkg/server/server_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,30 @@ func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (
"request", p.Info.Request,
)

errChan := s.ioClient.CreateEgress(ctx, (*livekit.EgressInfo)(p.Info))
s.launchProcess(req, (*livekit.EgressInfo)(p.Info))
if err = <-errChan; err != nil {
s.AbortProcess(req.EgressId, err)
s.monitor.EgressAborted(req)
s.activeRequests.Dec()
return nil, err
info := (*livekit.EgressInfo)(p.Info)

errChan := s.ioClient.CreateEgress(ctx, info)
launchErr := s.launchProcess(req, info)
createErr := <-errChan

if launchErr != nil {
if createErr == nil {
// send failed update if it was saved to db
s.processEnded(req, info, launchErr)
}
return nil, launchErr
} else if createErr != nil {
// launched but failed to save - abort and return error
info.Error = createErr.Error()
info.ErrorCode = int32(http.StatusInternalServerError)
s.AbortProcess(req.EgressId, createErr)
return nil, createErr
}

return (*livekit.EgressInfo)(p.Info), nil
}

func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.EgressInfo) {
func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.EgressInfo) error {
_, span := tracer.Start(context.Background(), "Service.launchProcess")
defer span.End()

Expand All @@ -102,16 +113,14 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress
if err != nil {
span.RecordError(err)
logger.Errorw("could not marshal config", err)
s.processEnded(req, info, err)
return
return err
}

reqString, err := protojson.Marshal(req)
if err != nil {
span.RecordError(err)
logger.Errorw("could not marshal request", err)
s.processEnded(req, info, err)
return
return err
}

cmd := exec.Command("egress",
Expand All @@ -125,13 +134,14 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}

if err = s.Launch(context.Background(), handlerID, req, info, cmd); err != nil {
s.processEnded(req, info, err)
return err
} else {
s.monitor.UpdatePID(info.EgressId, cmd.Process.Pid)
go func() {
err = cmd.Wait()
s.processEnded(req, info, err)
}()
return nil
}
}

Expand All @@ -145,8 +155,8 @@ func (s *Server) processEnded(req *rpc.StartEgressRequest, info *livekit.EgressI
info.Error = "internal error"
info.ErrorCode = int32(http.StatusInternalServerError)
_ = s.ioClient.UpdateEgress(context.Background(), info)
logger.Errorw("process failed, shutting down", err)
s.Shutdown(false, false)

logger.Errorw("process failed", err)
}

avgCPU, maxCPU := s.monitor.EgressEnded(req)
Expand Down

0 comments on commit ffdc501

Please sign in to comment.