From d7674458c023ff4b80e4e6df40b6a012ce4ee3ab Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Wed, 20 Sep 2023 14:32:17 -0700 Subject: [PATCH 01/31] Some condf work --- go.mod | 2 + pkg/config/output_image.go | 135 +++++++++++++++++++++++++++++++++++++ pkg/pipeline/controller.go | 7 ++ pkg/service/handler.go | 15 +++++ pkg/types/types.go | 5 ++ 5 files changed, 164 insertions(+) create mode 100644 pkg/config/output_image.go diff --git a/go.mod b/go.mod index e18f5575..7f6eba39 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/livekit/egress go 1.20 +replace github.com/livekit/protocol => ../protocol + replace github.com/tinyzimmer/go-glib v0.0.25 => github.com/livekit/go-glib v0.0.0-20230223001336-834490045522 replace github.com/tinyzimmer/go-gst v0.2.33 => github.com/livekit/go-gst v0.2.34-0.20230901155409-dd09095b979e diff --git a/pkg/config/output_image.go b/pkg/config/output_image.go new file mode 100644 index 00000000..e3e4cc87 --- /dev/null +++ b/pkg/config/output_image.go @@ -0,0 +1,135 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "os" + "path" + "time" + + "github.com/livekit/egress/pkg/errors" + "github.com/livekit/egress/pkg/types" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/utils" +) + +type ImageConfig struct { + outputConfig + + ImagesInfo *livekit.ImagesInfo + LocalDir string + StorageDir string + ImagePrefix string + ImageSuffix livekit.ImageFileSuffix + ImageExtension types.FileExtension + + DisableManifest bool + UploadConfig UploadConfig + + CaptureInterval uint32 + Width int32 + Height int32 + ImageOutCodec types.MimeType +} + +func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConfig, error) { + outCodec, outputType, err := getMimeTypes(images.ImageCodec) + if err != nil { + return nil, err + } + + conf := &ImageConfig{ + outputConfig: outputConfig{ + OutputType: outputType, + }, + ImagesInfo: &livekit.ImagesInfo{}, + ImagePrefix: clean(images.FilenamePrefix), + ImageSuffix: images.FilenameSuffix, + DisableManifest: images.DisableManifest, + UploadConfig: p.getUploadConfig(images), + CaptureInterval: images.CaptureInterval, + Width: images.Width, + Height: images.Height, + ImageOutCodec: outCodec, + } + + if conf.CaptureInterval == 0 { + // 10s by default + conf.CaptureInterval = 10 + } + + // filename + err = conf.updatePrefix(p) + if err != nil { + return nil, err + } + + return conf, nil +} + +func (o *ImageConfig) updatePrefix(p *PipelineConfig) error { + identifier, replacements := p.getFilenameInfo() + + o.ImagePrefix = stringReplace(o.ImagePrefix, replacements) + + o.ImageExtension = types.FileExtensionForOutputType[o.OutputType] + + imagesDir, imagesPrefix := path.Split(o.ImagePrefix) + + o.StorageDir = imagesDir + + // ensure playlistName + if imagesPrefix == "" { + imagesPrefix = fmt.Sprintf("%s-%s", identifier, time.Now().Format("2006-01-02T150405")) + } + + // update config + o.ImagePrefix = fmt.Sprintf("%s%s", imagesDir, imagesPrefix) + + if o.UploadConfig == nil { + o.LocalDir = imagesDir + } else { + // Prepend the configuration base directory and the egress Id, and slug to prevent conflict if + // there is more than one image output + // os.ModeDir creates a directory with mode 000 when mapping the directory outside the container + // Append a "/" to the path for consistency with the "UploadConfig == nil" case + + slug := utils.NewGuid("") + + o.LocalDir = path.Join(TmpDir, p.Info.EgressId, slug) + "/" + } + + // create local directories + if imagesDir != "" { + if err := os.MkdirAll(path.Join(o.LocalDir, imagesDir), 0755); err != nil { + return err + } + } else if o.LocalDir != "" { + if err := os.MkdirAll(o.LocalDir, 0755); err != nil { + return err + } + } + + return nil +} +func getMimeTypes(imageCodec livekit.ImageCodec) (types.MimeType, types.OutputType, error) { + switch imageCodec { + case livekit.ImageCodec_IC_DEFAULT, livekit.ImageCodec_IC_JPEG: + return types.MimeTypeJPEG, types.OutputTypeJPEG, nil + default: + return "", "", errors.ErrNoCompatibleCodec + } +} diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index e3ae73ba..12afe860 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -290,6 +290,13 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream return errs.ToError() } +func (c *Controller) UpdateOutputs(ctx context.Context, req *livekit.UpdateOutputsRequest) error { + ctx, span := tracer.Start(ctx, "Pipeline.UpdateOutputs") + defer span.End() + + return nil +} + func (c *Controller) removeSink(ctx context.Context, url string, streamErr error) error { now := time.Now().UnixNano() diff --git a/pkg/service/handler.go b/pkg/service/handler.go index a7040d14..5eb583b8 100644 --- a/pkg/service/handler.go +++ b/pkg/service/handler.go @@ -145,6 +145,21 @@ func (h *Handler) UpdateStream(ctx context.Context, req *livekit.UpdateStreamReq return h.pipeline.Info, nil } +func (h *Handler) UpdateOutputs(ctx context.Context, req *livekit.UpdateOutputsRequest) (*livekit.EgressInfo, error) { + ctx, span := tracer.Start(ctx, "Handler.UpdateOutputs") + defer span.End() + + if h.pipeline == nil { + return nil, errors.ErrEgressNotFound + } + + err := h.pipeline.UpdateOutputs(ctx, req) + if err != nil { + return nil, err + } + return h.pipeline.Info, nil +} + func (h *Handler) StopEgress(ctx context.Context, _ *livekit.StopEgressRequest) (*livekit.EgressInfo, error) { ctx, span := tracer.Start(ctx, "Handler.StopEgress") defer span.End() diff --git a/pkg/types/types.go b/pkg/types/types.go index a0e66d7e..a1d421e7 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -47,6 +47,7 @@ const ( MimeTypeH264 MimeType = "video/h264" MimeTypeVP8 MimeType = "video/vp8" MimeTypeVP9 MimeType = "video/vp9" + MimeTypeJPEG MimeType = "image/jpeg" MimeTypeRawVideo MimeType = "video/x-raw" // video profiles @@ -62,6 +63,7 @@ const ( OutputTypeMP4 OutputType = "video/mp4" OutputTypeTS OutputType = "video/mp2t" OutputTypeWebM OutputType = "video/webm" + OutputTypeJPEG OutputType = "image/jpeg" OutputTypeRTMP OutputType = "rtmp" OutputTypeHLS OutputType = "application/x-mpegurl" OutputTypeJSON OutputType = "application/json" @@ -75,6 +77,7 @@ const ( FileExtensionTS = ".ts" FileExtensionWebM = ".webm" FileExtensionM3U8 = ".m3u8" + FileExtensionJPEG = ".jpeg" ) var ( @@ -105,6 +108,7 @@ var ( FileExtensionTS: {}, FileExtensionWebM: {}, FileExtensionM3U8: {}, + FileExtensionJPEG: {}, } FileExtensionForOutputType = map[OutputType]FileExtension{ @@ -115,6 +119,7 @@ var ( OutputTypeTS: FileExtensionTS, OutputTypeWebM: FileExtensionWebM, OutputTypeHLS: FileExtensionM3U8, + OutputTypeJPEG: FileExtensionJPEG, } CodecCompatibility = map[OutputType]map[MimeType]bool{ From 4594cfecfee6a6ee079892a7245181789ca05946 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Wed, 20 Sep 2023 15:14:14 -0700 Subject: [PATCH 02/31] WiP --- pkg/config/output.go | 19 +++++++++++++++++++ pkg/types/types.go | 1 + 2 files changed, 20 insertions(+) diff --git a/pkg/config/output.go b/pkg/config/output.go index 7f862698..60aab060 100644 --- a/pkg/config/output.go +++ b/pkg/config/output.go @@ -48,6 +48,10 @@ type EncodedOutputDeprecated interface { GetSegmentOutputs() []*livekit.SegmentedFileOutput } +type ImageOutput interface { + GetImageOutputs() []*livekit.ImageOutput +} + func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error { files := req.GetFileOutputs() streams := req.GetStreamOutputs() @@ -193,6 +197,21 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err return nil } +func (p *PipelineConfig) updateImageOutputs(req ImageOutput) error { + images := req.GetImageOutputs() + + for _, img := range images { + conf, err := p.getImageConfig(img) + if err != nil { + return err + } + + p.Outputs[types.EgressTypeImages] = conf + } + + return nil +} + func redactEncodedOutputs(out EncodedOutput) { if files := out.GetFileOutputs(); len(files) == 1 { redactUpload(files[0]) diff --git a/pkg/types/types.go b/pkg/types/types.go index a1d421e7..e98764cf 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -39,6 +39,7 @@ const ( EgressTypeWebsocket EgressType = "websocket" EgressTypeFile EgressType = "file" EgressTypeSegments EgressType = "segments" + EgressTypeImages EgressType = "images" // input types MimeTypeAAC MimeType = "audio/aac" From 07557b19256b590d6f18173ce3c38ea2e6adede0 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Wed, 20 Sep 2023 16:59:47 -0700 Subject: [PATCH 03/31] Array output per type --- pkg/config/output.go | 12 +++++------ pkg/config/output_file.go | 4 ++-- pkg/config/output_segment.go | 4 ++-- pkg/config/output_stream.go | 8 ++++---- pkg/config/pipeline.go | 38 +++++++++++++++++++++++++---------- pkg/pipeline/builder/audio.go | 2 +- pkg/pipeline/builder/video.go | 2 +- pkg/pipeline/controller.go | 23 +++++++++++++++------ pkg/pipeline/sink/sink.go | 10 ++++++--- 9 files changed, 67 insertions(+), 36 deletions(-) diff --git a/pkg/config/output.go b/pkg/config/output.go index 60aab060..3142a356 100644 --- a/pkg/config/output.go +++ b/pkg/config/output.go @@ -75,7 +75,7 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error { return err } - p.Outputs[types.EgressTypeFile] = conf + p.Outputs[types.EgressTypeFile] = []OutputConfig{conf} p.OutputCount++ p.FinalizationRequired = true @@ -104,7 +104,7 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error { return err } - p.Outputs[types.EgressTypeStream] = conf + p.Outputs[types.EgressTypeStream] = []OutputConfig{conf} p.OutputCount += len(stream.Urls) streamInfoList := make([]*livekit.StreamInfo, 0, len(conf.StreamInfo)) @@ -141,7 +141,7 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error { return err } - p.Outputs[types.EgressTypeSegments] = conf + p.Outputs[types.EgressTypeSegments] = []OutputConfig{conf} p.OutputCount++ p.FinalizationRequired = true @@ -170,7 +170,7 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err p.Info.FileResults = []*livekit.FileInfo{conf.FileInfo} p.Info.Result = &livekit.EgressInfo_File{File: conf.FileInfo} - p.Outputs[types.EgressTypeFile] = conf + p.Outputs[types.EgressTypeFile] = []OutputConfig{conf} p.OutputCount = 1 p.FinalizationRequired = true @@ -187,7 +187,7 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err p.Info.StreamResults = streamInfoList p.Info.Result = &livekit.EgressInfo_Stream{Stream: &livekit.StreamInfoList{Info: streamInfoList}} - p.Outputs[types.EgressTypeWebsocket] = conf + p.Outputs[types.EgressTypeWebsocket] = []OutputConfig{conf} p.OutputCount = 1 default: @@ -206,7 +206,7 @@ func (p *PipelineConfig) updateImageOutputs(req ImageOutput) error { return err } - p.Outputs[types.EgressTypeImages] = conf + p.Outputs[types.EgressTypeImages] = append(p.Outputs[types.EgressTypeImages], conf) } return nil diff --git a/pkg/config/output_file.go b/pkg/config/output_file.go index 68e59461..88ab71a3 100644 --- a/pkg/config/output_file.go +++ b/pkg/config/output_file.go @@ -38,10 +38,10 @@ type FileConfig struct { func (p *PipelineConfig) GetFileConfig() *FileConfig { o, ok := p.Outputs[types.EgressTypeFile] - if !ok { + if !ok || len(o) == 0 { return nil } - return o.(*FileConfig) + return o[0].(*FileConfig) } func (p *PipelineConfig) getEncodedFileConfig(file *livekit.EncodedFileOutput) (*FileConfig, error) { diff --git a/pkg/config/output_segment.go b/pkg/config/output_segment.go index 99dfcf53..a473b238 100644 --- a/pkg/config/output_segment.go +++ b/pkg/config/output_segment.go @@ -42,10 +42,10 @@ type SegmentConfig struct { func (p *PipelineConfig) GetSegmentConfig() *SegmentConfig { o, ok := p.Outputs[types.EgressTypeSegments] - if !ok { + if !ok || len(o) == 0 { return nil } - return o.(*SegmentConfig) + return o[0].(*SegmentConfig) } // segments should always be added last, so we can check keyframe interval from file/stream diff --git a/pkg/config/output_stream.go b/pkg/config/output_stream.go index a32fea05..74f5f3e0 100644 --- a/pkg/config/output_stream.go +++ b/pkg/config/output_stream.go @@ -28,18 +28,18 @@ type StreamConfig struct { func (p *PipelineConfig) GetStreamConfig() *StreamConfig { o, ok := p.Outputs[types.EgressTypeStream] - if !ok { + if !ok || len(o) == 0 { return nil } - return o.(*StreamConfig) + return o[0].(*StreamConfig) } func (p *PipelineConfig) GetWebsocketConfig() *StreamConfig { o, ok := p.Outputs[types.EgressTypeWebsocket] - if !ok { + if !ok || len(o) == 0 { return nil } - return o.(*StreamConfig) + return o[0].(*StreamConfig) } func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []string) (*StreamConfig, error) { diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index 31bceb81..4af731e3 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -53,9 +53,9 @@ type PipelineConfig struct { AudioConfig `yaml:"-"` VideoConfig `yaml:"-"` - Outputs map[types.EgressType]OutputConfig `yaml:"-"` - OutputCount int `yaml:"-"` - FinalizationRequired bool `yaml:"-"` + Outputs map[types.EgressType][]OutputConfig `yaml:"-"` + OutputCount int `yaml:"-"` + FinalizationRequired bool `yaml:"-"` OnUpdate func(context.Context, *livekit.EgressInfo) `yaml:"-"` @@ -128,7 +128,7 @@ func NewPipelineConfig(confString string, req *rpc.StartEgressRequest) (*Pipelin Level: "info", }, }, - Outputs: make(map[types.EgressType]OutputConfig), + Outputs: make(map[types.EgressType][]OutputConfig), } if err := yaml.Unmarshal([]byte(confString), p); err != nil { @@ -153,7 +153,7 @@ func GetValidatedPipelineConfig(conf *ServiceConfig, req *rpc.StartEgressRequest p := &PipelineConfig{ BaseConfig: conf.BaseConfig, - Outputs: make(map[types.EgressType]OutputConfig), + Outputs: make(map[types.EgressType][]OutputConfig), } return p, p.Update(req) @@ -446,7 +446,8 @@ func (p *PipelineConfig) validateAndUpdateOutputParams() error { // Select a codec compatible with all outputs if p.AudioEnabled { - for _, o := range p.Outputs { + for _, o := range p.GetEncodedOutputs() { + if compatibleAudioCodecs[types.DefaultAudioCodecs[o.GetOutputType()]] { p.AudioOutCodec = types.DefaultAudioCodecs[o.GetOutputType()] break @@ -461,7 +462,7 @@ func (p *PipelineConfig) validateAndUpdateOutputParams() error { } if p.VideoEnabled { - for _, o := range p.Outputs { + for _, o := range p.GetEncodedOutputs() { if compatibleVideoCodecs[types.DefaultVideoCodecs[o.GetOutputType()]] { p.VideoOutCodec = types.DefaultVideoCodecs[o.GetOutputType()] break @@ -490,7 +491,7 @@ func (p *PipelineConfig) validateAndUpdateOutputCodecs() (compatibleAudioCodecs compatibleAudioCodecs[p.AudioOutCodec] = true } - for _, o := range p.Outputs { + for _, o := range p.GetEncodedOutputs() { compatibleAudioCodecs = types.GetMapIntersection(compatibleAudioCodecs, types.CodecCompatibility[o.GetOutputType()]) if len(compatibleAudioCodecs) == 0 { if p.AudioOutCodec == "" { @@ -510,7 +511,7 @@ func (p *PipelineConfig) validateAndUpdateOutputCodecs() (compatibleAudioCodecs compatibleVideoCodecs[p.VideoOutCodec] = true } - for _, o := range p.Outputs { + for _, o := range p.GetEncodedOutputs() { compatibleVideoCodecs = types.GetMapIntersection(compatibleVideoCodecs, types.CodecCompatibility[o.GetOutputType()]) if len(compatibleVideoCodecs) == 0 { if p.AudioOutCodec == "" { @@ -563,17 +564,22 @@ func (p *PipelineConfig) updateOutputType(compatibleAudioCodecs map[types.MimeTy // used for sdk input source func (p *PipelineConfig) UpdateInfoFromSDK(identifier string, replacements map[string]string) error { for egressType, c := range p.Outputs { + if len(c) == 0 { + continue + } switch egressType { case types.EgressTypeFile: - return c.(*FileConfig).updateFilepath(p, identifier, replacements) + return c[0].(*FileConfig).updateFilepath(p, identifier, replacements) case types.EgressTypeSegments: - o := c.(*SegmentConfig) + o := c[0].(*SegmentConfig) o.LocalDir = stringReplace(o.LocalDir, replacements) o.StorageDir = stringReplace(o.StorageDir, replacements) o.PlaylistFilename = stringReplace(o.PlaylistFilename, replacements) o.SegmentPrefix = stringReplace(o.SegmentPrefix, replacements) o.SegmentsInfo.PlaylistName = stringReplace(o.SegmentsInfo.PlaylistName, replacements) + + // TODO Images } } @@ -609,6 +615,16 @@ func (p *PipelineConfig) ValidateUrl(rawUrl string, outputType types.OutputType) } } +func (p *PipelineConfig) GetEncodedOutputs() []OutputConfig { + ret := make([]OutputConfig, 0) + + for _, k := range []types.EgressType{types.EgressTypeFile, types.EgressTypeSegments, types.EgressTypeStream, types.EgressTypeWebsocket} { + ret = append(ret, p.Outputs[k]...) + } + + return ret +} + func stringReplace(s string, replacements map[string]string) string { for template, value := range replacements { s = strings.Replace(s, template, value, -1) diff --git a/pkg/pipeline/builder/audio.go b/pkg/pipeline/builder/audio.go index 7a3901ce..4be12803 100644 --- a/pkg/pipeline/builder/audio.go +++ b/pkg/pipeline/builder/audio.go @@ -59,7 +59,7 @@ func BuildAudioBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error pipeline.AddOnTrackRemoved(b.onTrackRemoved) } - if len(p.Outputs) > 1 { + if len(p.GetEncodedOutputs()) > 1 { tee, err := gst.NewElementWithName("tee", "audio_tee") if err != nil { return err diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index 8186c87e..4232dc7e 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -71,7 +71,7 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error pipeline.AddOnTrackUnmuted(b.onTrackUnmuted) } - if len(p.Outputs) > 1 { + if len(p.GetEncodedOutputs()) > 1 { tee, err := gst.NewElementWithName("tee", "video_tee") if err != nil { return err diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 12afe860..659ce57c 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -152,6 +152,8 @@ func (c *Controller) BuildPipeline() error { case types.EgressTypeWebsocket: writer := c.sinks[egressType].(*sink.WebsocketSink) sinkBin, err = builder.BuildWebsocketBin(p, writer.SinkCallbacks()) + + // TODO Add Images } if err != nil { return err @@ -457,6 +459,7 @@ func (c *Controller) startSessionLimitTimer(ctx context.Context) { case types.EgressTypeSegments: t = c.SegmentOutputMaxDuration } + // TODO Images if t > 0 && (timeout == 0 || t < timeout) { timeout = t } @@ -480,20 +483,24 @@ func (c *Controller) startSessionLimitTimer(ctx context.Context) { func (c *Controller) updateStartTime(startedAt int64) { for egressType, o := range c.Outputs { + if len(o) == 0 { + continue + } switch egressType { case types.EgressTypeStream, types.EgressTypeWebsocket: c.mu.Lock() - for _, streamInfo := range o.(*config.StreamConfig).StreamInfo { + for _, streamInfo := range o[0].(*config.StreamConfig).StreamInfo { streamInfo.Status = livekit.StreamInfo_ACTIVE streamInfo.StartedAt = startedAt } c.mu.Unlock() case types.EgressTypeFile: - o.(*config.FileConfig).FileInfo.StartedAt = startedAt + o[0].(*config.FileConfig).FileInfo.StartedAt = startedAt case types.EgressTypeSegments: - o.(*config.SegmentConfig).SegmentsInfo.StartedAt = startedAt + o[0].(*config.SegmentConfig).SegmentsInfo.StartedAt = startedAt + // TODO Images } } @@ -506,9 +513,12 @@ func (c *Controller) updateStartTime(startedAt int64) { func (c *Controller) updateDuration(endedAt int64) { for egressType, o := range c.Outputs { + if len(o) == 0 { + continue + } switch egressType { case types.EgressTypeStream, types.EgressTypeWebsocket: - for _, info := range o.(*config.StreamConfig).StreamInfo { + for _, info := range o[0].(*config.StreamConfig).StreamInfo { info.Status = livekit.StreamInfo_FINISHED if info.StartedAt == 0 { info.StartedAt = endedAt @@ -518,7 +528,7 @@ func (c *Controller) updateDuration(endedAt int64) { } case types.EgressTypeFile: - fileInfo := o.(*config.FileConfig).FileInfo + fileInfo := o[0].(*config.FileConfig).FileInfo if fileInfo.StartedAt == 0 { fileInfo.StartedAt = endedAt } @@ -526,12 +536,13 @@ func (c *Controller) updateDuration(endedAt int64) { fileInfo.Duration = endedAt - fileInfo.StartedAt case types.EgressTypeSegments: - segmentsInfo := o.(*config.SegmentConfig).SegmentsInfo + segmentsInfo := o[0].(*config.SegmentConfig).SegmentsInfo if segmentsInfo.StartedAt == 0 { segmentsInfo.StartedAt = endedAt } segmentsInfo.EndedAt = endedAt segmentsInfo.Duration = endedAt - segmentsInfo.StartedAt + // TODO Images } } } diff --git a/pkg/pipeline/sink/sink.go b/pkg/pipeline/sink/sink.go index 05e4a41b..227cd4aa 100644 --- a/pkg/pipeline/sink/sink.go +++ b/pkg/pipeline/sink/sink.go @@ -30,11 +30,14 @@ type Sink interface { func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[types.EgressType]Sink, error) { sinks := make(map[types.EgressType]Sink) for egressType, c := range p.Outputs { + if len(c) == 0 { + continue + } var s Sink var err error switch egressType { case types.EgressTypeFile: - o := c.(*config.FileConfig) + o := c[0].(*config.FileConfig) u, err := uploader.New(o.UploadConfig, p.BackupStorage) if err != nil { @@ -44,7 +47,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[ s = newFileSink(u, p, o) case types.EgressTypeSegments: - o := c.(*config.SegmentConfig) + o := c[0].(*config.SegmentConfig) u, err := uploader.New(o.UploadConfig, p.BackupStorage) if err != nil { @@ -60,12 +63,13 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[ // no sink needed case types.EgressTypeWebsocket: - o := c.(*config.StreamConfig) + o := c[0].(*config.StreamConfig) s, err = newWebsocketSink(o, types.MimeTypeRawAudio, callbacks) if err != nil { return nil, err } + // TODO Images } if s != nil { From 3c333f573dfc9734c39939b2b4867263a31f9431 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Wed, 20 Sep 2023 17:17:16 -0700 Subject: [PATCH 04/31] WiP --- pkg/config/output.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/config/output.go b/pkg/config/output.go index 3142a356..1b293168 100644 --- a/pkg/config/output.go +++ b/pkg/config/output.go @@ -37,6 +37,7 @@ type EncodedOutput interface { GetFileOutputs() []*livekit.EncodedFileOutput GetStreamOutputs() []*livekit.StreamOutput GetSegmentOutputs() []*livekit.SegmentedFileOutput + ImageOutput } type EncodedOutputDeprecated interface { @@ -152,6 +153,11 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error { } } + err := p.updateImageOutputs(req) + if err != nil { + return err + } + if p.OutputCount == 0 { return errors.ErrInvalidInput("output") } @@ -200,6 +206,10 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err func (p *PipelineConfig) updateImageOutputs(req ImageOutput) error { images := req.GetImageOutputs() + if !p.VideoEnabled { + return errors.ErrInvalidInput("audio_only") + } + for _, img := range images { conf, err := p.getImageConfig(img) if err != nil { @@ -207,6 +217,10 @@ func (p *PipelineConfig) updateImageOutputs(req ImageOutput) error { } p.Outputs[types.EgressTypeImages] = append(p.Outputs[types.EgressTypeImages], conf) + p.OutputCount++ + p.FinalizationRequired = true + + p.Info.ImagesResults = append(p.Info.ImagesResults, conf.ImagesInfo) } return nil From ceffc2cf59276649db60f55d81f5f2f7fdb09e50 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Thu, 21 Sep 2023 15:43:47 -0700 Subject: [PATCH 05/31] WiP --- pkg/pipeline/controller.go | 30 ++++++++++++++++++------------ pkg/pipeline/sink/sink.go | 21 +++++++++++++++++---- pkg/pipeline/watch.go | 7 ++++++- 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 659ce57c..c74c4c20 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -46,7 +46,7 @@ type Controller struct { // gstreamer src source.Source p *gstreamer.Pipeline - sinks map[types.EgressType]sink.Sink + sinks map[types.EgressType][]sink.Sink streamBin *builder.StreamBin callbacks *gstreamer.Callbacks @@ -150,7 +150,7 @@ func (c *Controller) BuildPipeline() error { c.streamBin, sinkBin, err = builder.BuildStreamBin(p, c.PipelineConfig) case types.EgressTypeWebsocket: - writer := c.sinks[egressType].(*sink.WebsocketSink) + writer := c.sinks[egressType][0].(*sink.WebsocketSink) sinkBin, err = builder.BuildWebsocketBin(p, writer.SinkCallbacks()) // TODO Add Images @@ -200,10 +200,12 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { } } - for _, s := range c.sinks { - if err := s.Start(); err != nil { - c.Info.Error = err.Error() - return c.Info + for _, si := range c.sinks { + for _, s := range si { + if err := s.Start(); err != nil { + c.Info.Error = err.Error() + return c.Info + } } } @@ -212,10 +214,12 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { return c.Info } - for _, s := range c.sinks { - if err := s.Close(); err != nil { - c.Info.Error = err.Error() - return c.Info + for _, si := range c.sinks { + for _, s := range si { + if err := s.Close(); err != nil { + c.Info.Error = err.Error() + return c.Info + } } } @@ -442,8 +446,10 @@ func (c *Controller) Close() { c.Info.Status = livekit.EgressStatus_EGRESS_COMPLETE } - for _, s := range c.sinks { - s.Cleanup() + for _, si := range c.sinks { + for _, s := range si { + s.Cleanup() + } } } diff --git a/pkg/pipeline/sink/sink.go b/pkg/pipeline/sink/sink.go index 227cd4aa..e3951117 100644 --- a/pkg/pipeline/sink/sink.go +++ b/pkg/pipeline/sink/sink.go @@ -27,8 +27,8 @@ type Sink interface { Cleanup() } -func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[types.EgressType]Sink, error) { - sinks := make(map[types.EgressType]Sink) +func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[types.EgressType][]Sink, error) { + sinks := make(map[types.EgressType][]Sink) for egressType, c := range p.Outputs { if len(c) == 0 { continue @@ -69,11 +69,24 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[ if err != nil { return nil, err } - // TODO Images + case types.EgressTypeImages: + for _, ci := range c { + o := ci.(*config.ImageConfig) + + u, err := uploader.New(o.UploadConfig, p.BackupStorage) + if err != nil { + return nil, err + } + + s, err = newImageSink(u, p, o, callbacks) + if err != nil { + return nil, err + } + } } if s != nil { - sinks[egressType] = s + sinks[egressType] = append(sinks[egressType], s) } } diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 4d7d20a6..d3145167 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -323,5 +323,10 @@ func getFirstSampleMetadataFromGstStructure(s *gst.Structure) (startDate time.Ti } func (c *Controller) getSegmentSink() *sink.SegmentSink { - return c.sinks[types.EgressTypeSegments].(*sink.SegmentSink) + s := c.sinks[types.EgressTypeSegments] + if len(s) == 0 { + return nil + } + + return s[0].(*sink.SegmentSink) } From 9b1fad89a65dc56d8451410af97c33316fe29c01 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Fri, 22 Sep 2023 13:29:08 -0700 Subject: [PATCH 06/31] WiP --- pkg/pipeline/builder/video.go | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index 4232dc7e..7d75f480 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -43,9 +43,10 @@ type VideoBin struct { selectedPad string nextPad string - mu sync.Mutex - pads map[string]*gst.Pad - selector *gst.Element + mu sync.Mutex + pads map[string]*gst.Pad + selector *gst.Element + rawVideoTee *gst.Element } func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error { @@ -71,6 +72,7 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error pipeline.AddOnTrackUnmuted(b.onTrackUnmuted) } + var getPad func() *gst.Pad if len(p.GetEncodedOutputs()) > 1 { tee, err := gst.NewElementWithName("tee", "video_tee") if err != nil { @@ -80,6 +82,10 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error if err = b.bin.AddElement(tee); err != nil { return err } + + getPad = func() *gst.Pad { + return tee.GetRequestPad("src_%u") + } } else { queue, err := gstreamer.BuildQueue("video_queue", p.Latency, true) if err != nil { @@ -88,8 +94,20 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error if err = b.bin.AddElement(queue); err != nil { return err } + + getPad = func() *gst.Pad { + return queue.GetStaticPad("src") + } } + b.bin.SetGetSinkPad(func(name string) *gst.Pad { + if name == "image" { + return rawVideoTee.GetRequestPad("src_%u") + } else { + return getPad() + } + }) + return pipeline.AddSourceBin(b.bin) } @@ -479,6 +497,15 @@ func (b *VideoBin) addSelector() error { } func (b *VideoBin) addEncoder() error { + var err error + b.rawVideoTee, err = gst.NewElement("tee") + if err != nil { + return err + } + if err = b.bin.AddElement(rawVideoTee); err != nil { + return err + } + videoQueue, err := gstreamer.BuildQueue("video_encoder_queue", b.conf.Latency, false) if err != nil { return err From e13ffe54afe4692cd2c21ef9c61cd4251b7f042e Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Mon, 25 Sep 2023 12:48:57 -0700 Subject: [PATCH 07/31] WiP --- pkg/config/output_image.go | 11 +++++++++++ pkg/pipeline/builder/video.go | 7 ++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/pkg/config/output_image.go b/pkg/config/output_image.go index e3e4cc87..bb0de89d 100644 --- a/pkg/config/output_image.go +++ b/pkg/config/output_image.go @@ -45,6 +45,17 @@ type ImageConfig struct { ImageOutCodec types.MimeType } +func (p *PipelineConfig) GetImageConfigs() []*ImageConfig { + o, _ := p.Outputs[types.EgressTypeImages] + + var configs []*ImageConfig + for _, c := range o { + configs = append(configs, c.(*ImageConfig)) + } + + return configs +} + func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConfig, error) { outCodec, outputType, err := getMimeTypes(images.ImageCodec) if err != nil { diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index 7d75f480..0f09a49d 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -16,6 +16,7 @@ package builder import ( "fmt" + "strings" "sync" "time" "unsafe" @@ -101,8 +102,8 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error } b.bin.SetGetSinkPad(func(name string) *gst.Pad { - if name == "image" { - return rawVideoTee.GetRequestPad("src_%u") + if strings.HasPrefix(name, "image") { + return b.rawVideoTee.GetRequestPad("src_%u") } else { return getPad() } @@ -502,7 +503,7 @@ func (b *VideoBin) addEncoder() error { if err != nil { return err } - if err = b.bin.AddElement(rawVideoTee); err != nil { + if err = b.bin.AddElement(b.rawVideoTee); err != nil { return err } From 378c1a8dfe6868ce2527468d1469f9a90c80bda6 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Mon, 25 Sep 2023 14:11:45 -0700 Subject: [PATCH 08/31] WiP --- pkg/pipeline/builder/image.go | 132 ++++++++++++++++++++++++++++++++++ pkg/pipeline/sink/images.go | 60 ++++++++++++++++ 2 files changed, 192 insertions(+) create mode 100644 pkg/pipeline/builder/image.go create mode 100644 pkg/pipeline/sink/images.go diff --git a/pkg/pipeline/builder/image.go b/pkg/pipeline/builder/image.go new file mode 100644 index 00000000..3822d636 --- /dev/null +++ b/pkg/pipeline/builder/image.go @@ -0,0 +1,132 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builder + +import ( + "fmt" + "time" + + "github.com/livekit/egress/pkg/config" + "github.com/livekit/egress/pkg/errors" + "github.com/livekit/egress/pkg/gstreamer" + "github.com/livekit/egress/pkg/types" + "github.com/livekit/protocol/utils" + "github.com/tinyzimmer/go-gst/gst" +) + +const ( + imageQueueLatency = uint64(200 * time.Millisecond) +) + +func BuildImageBins(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) ([]*gstreamer.Bin, error) { + o := p.GetImageConfigs() + + var bins []*gstreamer.Bin + for _, c := range o { + b, err := BuildImageBin(c, pipeline, p) + if err != nil { + return nil, err + } + + bins = append(bins, b) + } + + return bins, nil +} + +func BuildImageBin(c *config.ImageConfig, pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*gstreamer.Bin, error) { + id := utils.NewGuid("") + + b := pipeline.NewBin(fmt.Sprintf("image_%s", id)) + + queue, err := gstreamer.BuildQueue(fmt.Sprintf("image_queue_%s", id), imageQueueLatency, true) + if err != nil { + return nil, err + } + if err := b.AddElements(queue); err != nil { + return nil, errors.ErrGstPipelineError(err) + } + + videoRate, err := gst.NewElement("videorate") + if err != nil { + return nil, errors.ErrGstPipelineError(err) + } + if err = videoRate.SetProperty("max-duplication-time", uint64(time.Duration(c.CaptureInterval)*time.Second)); err != nil { + return nil, err + } + if err = videoRate.SetProperty("skip-to-first", true); err != nil { + return nil, err + } + if err := b.AddElements(videoRate); err != nil { + return nil, errors.ErrGstPipelineError(err) + } + + videoScale, err := gst.NewElement("videoscale") + if err != nil { + return nil, errors.ErrGstPipelineError(err) + } + if err := b.AddElements(videoScale); err != nil { + return nil, errors.ErrGstPipelineError(err) + } + + caps, err := gst.NewElement("capsfilter") + if err != nil { + return nil, errors.ErrGstPipelineError(err) + } + err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( + "video/x-raw,framerate=1/%d,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1", + c.CaptureInterval, c.Width, c.Height))) + if err != nil { + return nil, err + } + if err := b.AddElements(caps); err != nil { + return nil, errors.ErrGstPipelineError(err) + } + + switch c.ImageOutCodec { + case types.MimeTypeJPEG: + enc, err := gst.NewElement("jpegenc") + if err != nil { + return nil, errors.ErrGstPipelineError(err) + } + if err := b.AddElements(enc); err != nil { + return nil, errors.ErrGstPipelineError(err) + } + default: + return nil, errors.ErrNoCompatibleCodec + } + + sink, err := gst.NewElement("multifilesink") + if err != nil { + return nil, err + } + err = sink.SetProperty("post-messages", true) + if err != nil { + return nil, err + } + + // File will be renamed if the TS prefix is configured + location := fmt.Sprintf("%s_%%05d.ts", c.ImagePrefix) + + err = sink.SetProperty("location", location) + if err != nil { + return nil, err + } + if err := b.AddElements(sink); err != nil { + return nil, errors.ErrGstPipelineError(err) + } + + return b, nil +} diff --git a/pkg/pipeline/sink/images.go b/pkg/pipeline/sink/images.go new file mode 100644 index 00000000..9ac78c29 --- /dev/null +++ b/pkg/pipeline/sink/images.go @@ -0,0 +1,60 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "github.com/livekit/egress/pkg/config" + "github.com/livekit/egress/pkg/gstreamer" + "github.com/livekit/egress/pkg/pipeline/sink/uploader" +) + +type ImageSink struct { + uploader.Uploader + + *config.ImageConfig + + conf *config.PipelineConfig + callbacks *gstreamer.Callbacks +} + +func newImageSink(u uploader.Uploader, p *config.PipelineConfig, o *config.ImageConfig, callbacks *gstreamer.Callbacks) (*ImageSink, error) { + return &ImageSink{ + Uploader: u, + ImageConfig: o, + conf: p, + callbacks: callbacks, + }, nil +} + +func (s *ImageSink) Start() error { + // TODO setup gst pipeline + // TODO filename + + return nil +} + +func (s *ImageSink) NewImage(filepath string, ts uint64) error { + // TODO rename file, upload + + return nil +} + +func (s *ImageSink) Close() error { + + return nil +} + +func (*Sink) Cleanup() { +} From 5c591b4586e91280767ec45189cd8f58dcfd7389 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Mon, 25 Sep 2023 14:49:15 -0700 Subject: [PATCH 09/31] WiP --- pkg/pipeline/controller.go | 20 +++++++++++++++++--- pkg/pipeline/sink/images.go | 2 +- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index c74c4c20..3c3312bc 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -137,28 +137,42 @@ func (c *Controller) BuildPipeline() error { } } + var sinkBins []*gstreamer.Bin for egressType := range c.Outputs { - var sinkBin *gstreamer.Bin switch egressType { case types.EgressTypeFile: + var sinkBin *gstreamer.Bin sinkBin, err = builder.BuildFileBin(p, c.PipelineConfig) + sinkBins = append(sinkBins, sinkBin) case types.EgressTypeSegments: + var sinkBin *gstreamer.Bin sinkBin, err = builder.BuildSegmentBin(p, c.PipelineConfig) + sinkBins = append(sinkBins, sinkBin) case types.EgressTypeStream: + var sinkBin *gstreamer.Bin c.streamBin, sinkBin, err = builder.BuildStreamBin(p, c.PipelineConfig) + sinkBins = append(sinkBins, sinkBin) case types.EgressTypeWebsocket: + var sinkBin *gstreamer.Bin writer := c.sinks[egressType][0].(*sink.WebsocketSink) sinkBin, err = builder.BuildWebsocketBin(p, writer.SinkCallbacks()) + sinkBins = append(sinkBins, sinkBin) - // TODO Add Images + case types.EgressTypeImages: + var bins []*gstreamer.Bin + bins, err = builder.BuildImageBins(p, c.PipelineConfig) + sinkBins = append(sinkBins, bins...) } if err != nil { return err } - if err = p.AddSinkBin(sinkBin); err != nil { + } + + for _, bin := range sinkBins { + if err = p.AddSinkBin(bin); err != nil { return err } } diff --git a/pkg/pipeline/sink/images.go b/pkg/pipeline/sink/images.go index 9ac78c29..89194d29 100644 --- a/pkg/pipeline/sink/images.go +++ b/pkg/pipeline/sink/images.go @@ -56,5 +56,5 @@ func (s *ImageSink) Close() error { return nil } -func (*Sink) Cleanup() { +func (*ImageSink) Cleanup() { } From 005fd314ec4db6fb435a4da11bd413e53f06c16b Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Tue, 26 Sep 2023 12:53:19 -0700 Subject: [PATCH 10/31] Default values --- pkg/config/base.go | 1 + pkg/config/output_image.go | 12 ++++++++ pkg/config/pipeline.go | 20 ++++++++++++-- pkg/pipeline/controller.go | 18 ++++++++++-- pkg/pipeline/source/sdk.go | 56 ++++++++++++++++++++++++++------------ 5 files changed, 84 insertions(+), 23 deletions(-) diff --git a/pkg/config/base.go b/pkg/config/base.go index a8e1f232..6c3e8870 100644 --- a/pkg/config/base.go +++ b/pkg/config/base.go @@ -86,6 +86,7 @@ type SessionLimits struct { FileOutputMaxDuration time.Duration `yaml:"file_output_max_duration"` StreamOutputMaxDuration time.Duration `yaml:"stream_output_max_duration"` SegmentOutputMaxDuration time.Duration `yaml:"segment_output_max_duration"` + ImageOutputMaxDuration time.Duration `yaml:"image_output_max_duration"` } func (c *BaseConfig) initLogger(values ...interface{}) error { diff --git a/pkg/config/output_image.go b/pkg/config/output_image.go index bb0de89d..d586300f 100644 --- a/pkg/config/output_image.go +++ b/pkg/config/output_image.go @@ -82,6 +82,18 @@ func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConf conf.CaptureInterval = 10 } + // Set default dimensions for RoomComposite and Web. For all SDKs input, default will be + // set from the track dimensions + switch p.Info.Request.(type) { + case *livekit.EgressInfo_RoomComposite, *livekit.EgressInfo_Web: + if conf.Width == 0 { + conf.Width = p.Width + } + if conf.Height == 0 { + conf.Height = p.Height + } + } + // filename err = conf.updatePrefix(p) if err != nil { diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index 4af731e3..57473d92 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -562,7 +562,7 @@ func (p *PipelineConfig) updateOutputType(compatibleAudioCodecs map[types.MimeTy } // used for sdk input source -func (p *PipelineConfig) UpdateInfoFromSDK(identifier string, replacements map[string]string) error { +func (p *PipelineConfig) UpdateInfoFromSDK(identifier string, replacements map[string]string, w, h uint32) error { for egressType, c := range p.Outputs { if len(c) == 0 { continue @@ -578,14 +578,28 @@ func (p *PipelineConfig) UpdateInfoFromSDK(identifier string, replacements map[s o.PlaylistFilename = stringReplace(o.PlaylistFilename, replacements) o.SegmentPrefix = stringReplace(o.SegmentPrefix, replacements) o.SegmentsInfo.PlaylistName = stringReplace(o.SegmentsInfo.PlaylistName, replacements) - - // TODO Images + case types.EgressTypeImages: + for _, ci := range c { + o := ci.(*ImageConfig) + o.LocalDir = stringReplace(o.LocalDir, replacements) + o.StorageDir = stringReplace(o.StorageDir, replacements) + o.ImagePrefix = stringReplace(o.ImagePrefix, replacements) + if o.Width == 0 { + o.Width = int32(w) + } + if o.Height == 0 { + o.Height = int32(h) + } + } } } return nil } +// TODO Images room composite default dimensions +// TODO Images sdk dimensions + func (p *PipelineConfig) ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, error) { parsed, err := url.Parse(rawUrl) if err != nil { diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 3c3312bc..295efdfc 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -478,8 +478,10 @@ func (c *Controller) startSessionLimitTimer(ctx context.Context) { t = c.StreamOutputMaxDuration case types.EgressTypeSegments: t = c.SegmentOutputMaxDuration + case types.EgressTypeImages: + t = c.ImageOutputMaxDuration + } - // TODO Images if t > 0 && (timeout == 0 || t < timeout) { timeout = t } @@ -520,7 +522,10 @@ func (c *Controller) updateStartTime(startedAt int64) { case types.EgressTypeSegments: o[0].(*config.SegmentConfig).SegmentsInfo.StartedAt = startedAt - // TODO Images + case types.EgressTypeImages: + for _, c := range o { + c.(*config.ImageConfig).ImagesInfo.StartedAt = startedAt + } } } @@ -562,7 +567,14 @@ func (c *Controller) updateDuration(endedAt int64) { } segmentsInfo.EndedAt = endedAt segmentsInfo.Duration = endedAt - segmentsInfo.StartedAt - // TODO Images + case types.EgressTypeImages: + for _, c := range o { + imageInfo := c.(*config.ImageConfig).ImagesInfo + if imageInfo.StartedAt == 0 { + imageInfo.StartedAt = endedAt + } + imageInfo.EndedAt = endedAt + } } } } diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index 9fe36821..4683a6d0 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -167,10 +167,11 @@ func (s *SDKSource) joinRoom() error { var fileIdentifier string var err error + var w, h uint32 switch s.RequestType { case types.RequestTypeParticipant: fileIdentifier = s.Identity - err = s.awaitParticipant(s.Identity) + w, h, err = s.awaitParticipant(s.Identity) case types.RequestTypeTrackComposite: fileIdentifier = s.Info.RoomName @@ -181,17 +182,17 @@ func (s *SDKSource) joinRoom() error { if s.VideoEnabled { tracks[s.VideoTrackID] = struct{}{} } - err = s.awaitTracks(tracks) + w, h, err = s.awaitTracks(tracks) case types.RequestTypeTrack: fileIdentifier = s.TrackID - err = s.awaitTracks(map[string]struct{}{s.TrackID: {}}) + w, h, err = s.awaitTracks(map[string]struct{}{s.TrackID: {}}) } if err != nil { return err } - if err = s.UpdateInfoFromSDK(fileIdentifier, s.filenameReplacements); err != nil { + if err = s.UpdateInfoFromSDK(fileIdentifier, s.filenameReplacements, w, h); err != nil { logger.Errorw("could not update file params", err) return err } @@ -199,22 +200,30 @@ func (s *SDKSource) joinRoom() error { return nil } -func (s *SDKSource) awaitParticipant(identity string) error { +func (s *SDKSource) awaitParticipant(identity string) (uint32, uint32, error) { s.errors = make(chan error, 2) rp, err := s.getParticipant(identity) if err != nil { - return err + return 0, 0, err } for trackCount := 0; trackCount == 0 || trackCount < len(rp.Tracks()); trackCount++ { if err = <-s.errors; err != nil { - return err + return 0, 0, err } } + for _, t := range rp.Tracks() { + if t.TrackInfo().Type == livekit.TrackType_VIDEO { + w = t.TrackInfo().Width + h = t.TrackInfo().Height + } + + } + s.initialized.Break() - return nil + return w, h, nil } func (s *SDKSource) getParticipant(identity string) (*lksdk.RemoteParticipant, error) { @@ -230,12 +239,13 @@ func (s *SDKSource) getParticipant(identity string) (*lksdk.RemoteParticipant, e return nil, errors.ErrParticipantNotFound(identity) } -func (s *SDKSource) awaitTracks(expecting map[string]struct{}) error { +func (s *SDKSource) awaitTracks(expecting map[string]struct{}) (uint32, uint32, error) { trackCount := len(expecting) s.errors = make(chan error, trackCount) deadline := time.After(subscriptionTimeout) - if err := s.subscribeToTracks(expecting, deadline); err != nil { + tracks, err := s.subscribeToTracks(expecting, deadline) + if err != nil { return err } @@ -243,23 +253,33 @@ func (s *SDKSource) awaitTracks(expecting map[string]struct{}) error { select { case err := <-s.errors: if err != nil { - return err + return 0, 0, err } case <-deadline: - return errors.ErrSubscriptionFailed + return 0, 0, errors.ErrSubscriptionFailed + } + } + + var w, h uint32 + for _, t := range tracks { + if t.TrackInfo().Type() == livekit.TrackType_VIDEO { + w = t.TrackInfo().Width + h = t.TrackInfo().Height } } s.initialized.Break() - return nil + return w, h, nil } -func (s *SDKSource) subscribeToTracks(expecting map[string]struct{}, deadline <-chan time.Time) error { +func (s *SDKSource) subscribeToTracks(expecting map[string]struct{}, deadline <-chan time.Time) ([]lksdk.TrackPublication, error) { + var tracks []lksdk.TrackPublication + for { select { case <-deadline: for trackID := range expecting { - return errors.ErrTrackNotFound(trackID) + return nil, errors.ErrTrackNotFound(trackID) } default: for _, p := range s.room.GetParticipants() { @@ -267,12 +287,14 @@ func (s *SDKSource) subscribeToTracks(expecting map[string]struct{}, deadline <- trackID := track.SID() if _, ok := expecting[trackID]; ok { if err := s.subscribe(track); err != nil { - return err + return nil, err } + tracks = append(tracks, track) + delete(expecting, track.SID()) if len(expecting) == 0 { - return nil + return tracks, nil } } } From e884a7668f78ddf78a74327e62035191c73a3233 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Tue, 26 Sep 2023 14:04:30 -0700 Subject: [PATCH 11/31] WiP --- pkg/pipeline/builder/image.go | 2 +- pkg/pipeline/sink/images.go | 61 +++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/pkg/pipeline/builder/image.go b/pkg/pipeline/builder/image.go index 3822d636..512efc22 100644 --- a/pkg/pipeline/builder/image.go +++ b/pkg/pipeline/builder/image.go @@ -118,7 +118,7 @@ func BuildImageBin(c *config.ImageConfig, pipeline *gstreamer.Pipeline, p *confi } // File will be renamed if the TS prefix is configured - location := fmt.Sprintf("%s_%%05d.ts", c.ImagePrefix) + location := fmt.Sprintf("%s_%%05d%s", c.ImagePrefix, types.FileExtensionForOutputType[c.OutputType]) err = sink.SetProperty("location", location) if err != nil { diff --git a/pkg/pipeline/sink/images.go b/pkg/pipeline/sink/images.go index 89194d29..baea4675 100644 --- a/pkg/pipeline/sink/images.go +++ b/pkg/pipeline/sink/images.go @@ -15,9 +15,16 @@ package sink import ( + "fmt" + "path" + + "github.com/frostbyte73/core" "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/gstreamer" "github.com/livekit/egress/pkg/pipeline/sink/uploader" + "github.com/livekit/egress/pkg/types" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" ) type ImageSink struct { @@ -27,6 +34,14 @@ type ImageSink struct { conf *config.PipelineConfig callbacks *gstreamer.Callbacks + + createdImages chan ImageUpdate + done core.Fuse +} + +type ImageUpdate struct { + timestamp int64 + filename string } func newImageSink(u uploader.Uploader, p *config.PipelineConfig, o *config.ImageConfig, callbacks *gstreamer.Callbacks) (*ImageSink, error) { @@ -35,10 +50,56 @@ func newImageSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Image ImageConfig: o, conf: p, callbacks: callbacks, + + createdImages: make(chan ImageUpdate, maxPendingUploads), + done: core.NewFuse(), }, nil } func (s *ImageSink) Start() error { + go func() { + var err error + defer func() { + if err != nil { + s.callbacks.OnError(err) + } + s.done.Break() + }() + + for update := range s.createdImages { + s.ImagesInfo.ImagesCount++ + + filename := update.filename + imageLocalPath := path.Join(s.LocalDir, filename) + if s.ImageSuffix == livekit.ImageFileSuffix_IMAGE_SUFFIX_TIMESTAMP { + location := fmt.Sprintf("%s_%%05d%s", c.ImagePrefix, types.FileExtensionForOutputType[c.OutputType]) + newFilemame := fmt.Sprintf("%s_%s%03d.ts", s.ImagePrefix, ts.Format("20060102150405"), ts.UnixMilli()%1000, types.FileExtensionForOutputType[s.OutputType]) + } + + imageStoragePath := path.Join(s.StorageDir, filename) + + _, size, err = s.Upload(imageLocalPath, imageStoragePath, s.getImageOutputType(), true) + if err != nil { + return + } + + s.SegmentsInfo.Size += size + + err = s.endSegment(update.filename, update.endTime) + if err != nil { + logger.Errorw("failed to end segment", err, "path", segmentLocalPath) + return + } + + playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) + playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) + s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) + if err != nil { + return + } + } + }() + // TODO setup gst pipeline // TODO filename From 70670a459ea1cc04a7a1c999fe0397e4ed95d517 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Tue, 26 Sep 2023 17:48:46 -0700 Subject: [PATCH 12/31] WiP --- pkg/pipeline/sink/images.go | 85 ++++++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 29 deletions(-) diff --git a/pkg/pipeline/sink/images.go b/pkg/pipeline/sink/images.go index baea4675..0c6def40 100644 --- a/pkg/pipeline/sink/images.go +++ b/pkg/pipeline/sink/images.go @@ -16,7 +16,10 @@ package sink import ( "fmt" + "os" "path" + "strings" + "time" "github.com/frostbyte73/core" "github.com/livekit/egress/pkg/config" @@ -35,12 +38,16 @@ type ImageSink struct { conf *config.PipelineConfig callbacks *gstreamer.Callbacks + initialized bool + startTime time.Time + startRunningTime uint64 + createdImages chan ImageUpdate done core.Fuse } type ImageUpdate struct { - timestamp int64 + timestamp uint64 filename string } @@ -67,47 +74,67 @@ func (s *ImageSink) Start() error { }() for update := range s.createdImages { - s.ImagesInfo.ImagesCount++ - - filename := update.filename - imageLocalPath := path.Join(s.LocalDir, filename) - if s.ImageSuffix == livekit.ImageFileSuffix_IMAGE_SUFFIX_TIMESTAMP { - location := fmt.Sprintf("%s_%%05d%s", c.ImagePrefix, types.FileExtensionForOutputType[c.OutputType]) - newFilemame := fmt.Sprintf("%s_%s%03d.ts", s.ImagePrefix, ts.Format("20060102150405"), ts.UnixMilli()%1000, types.FileExtensionForOutputType[s.OutputType]) - } - - imageStoragePath := path.Join(s.StorageDir, filename) - - _, size, err = s.Upload(imageLocalPath, imageStoragePath, s.getImageOutputType(), true) + err = s.handleNewImage(update) if err != nil { + logger.Errorw("new image handling failed", err) return } + } + }() - s.SegmentsInfo.Size += size + // TODO update manifest - err = s.endSegment(update.filename, update.endTime) - if err != nil { - logger.Errorw("failed to end segment", err, "path", segmentLocalPath) - return - } + return nil +} - playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) - playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) - s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) - if err != nil { - return - } +func (s *ImageSink) handleNewImage(update *ImageUpdate) error { + s.ImagesInfo.ImagesCount++ + + filename := update.filename + ts := s.getImageTime(update.timestamp) + imageLocalPath := path.Join(s.LocalDir, filename) + if s.ImageSuffix == livekit.ImageFileSuffix_IMAGE_SUFFIX_TIMESTAMP { + newFilemame := fmt.Sprintf("%s_%s%03d%s", s.ImagePrefix, ts.Format("20060102150405"), ts.UnixMilli()%1000, types.FileExtensionForOutputType[s.OutputType]) + newImageLocalPath = path.Join(s.LocalDir, newFilename) + + err = os.Rename(imageLocalPath, newImageLocalPath) + if err != nil { + return err } - }() + filename = newFilemame + imageLocalPath = newImageLocalPath - // TODO setup gst pipeline - // TODO filename + } + + imageStoragePath := path.Join(s.StorageDir, filename) + + _, size, err = s.Upload(imageLocalPath, imageStoragePath, s.getImageOutputType(), true) + if err != nil { + return err + } + + err = s.updateManifest(filename, ts, size) + if err != nil { + return err + } return nil } +func (s *ImageSink) getImageTime(pts uint64) time.Time { + if !s.initialized { + s.startTime = time.Now() + s.startRunningTime = pts + s.initialized = true + } + + return s.startTime.Add(time.Duration(pts - s.startRunningTime)) +} + func (s *ImageSink) NewImage(filepath string, ts uint64) error { - // TODO rename file, upload + if !strings.HasPrefix(filepath, s.LocalDir) { + return fmt.Errorf("invalid filepath") + } return nil } From 205f0ca379f565ba9b278fc115695650b5ee4ea4 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Wed, 27 Sep 2023 13:45:49 -0700 Subject: [PATCH 13/31] WiP --- pkg/config/output_image.go | 8 +-- pkg/pipeline/builder/image.go | 3 +- pkg/pipeline/sink/image_manifest.go | 77 +++++++++++++++++++++++++++++ pkg/pipeline/sink/images.go | 60 ++++++++++++++++------ pkg/pipeline/sink/manifest.go | 18 ++++--- 5 files changed, 137 insertions(+), 29 deletions(-) create mode 100644 pkg/pipeline/sink/image_manifest.go diff --git a/pkg/config/output_image.go b/pkg/config/output_image.go index d586300f..cd458d54 100644 --- a/pkg/config/output_image.go +++ b/pkg/config/output_image.go @@ -120,7 +120,7 @@ func (o *ImageConfig) updatePrefix(p *PipelineConfig) error { } // update config - o.ImagePrefix = fmt.Sprintf("%s%s", imagesDir, imagesPrefix) + o.ImagePrefix = imagesPrefix if o.UploadConfig == nil { o.LocalDir = imagesDir @@ -136,11 +136,7 @@ func (o *ImageConfig) updatePrefix(p *PipelineConfig) error { } // create local directories - if imagesDir != "" { - if err := os.MkdirAll(path.Join(o.LocalDir, imagesDir), 0755); err != nil { - return err - } - } else if o.LocalDir != "" { + if o.LocalDir != "" { if err := os.MkdirAll(o.LocalDir, 0755); err != nil { return err } diff --git a/pkg/pipeline/builder/image.go b/pkg/pipeline/builder/image.go index 512efc22..cc3ef449 100644 --- a/pkg/pipeline/builder/image.go +++ b/pkg/pipeline/builder/image.go @@ -16,6 +16,7 @@ package builder import ( "fmt" + "path" "time" "github.com/livekit/egress/pkg/config" @@ -118,7 +119,7 @@ func BuildImageBin(c *config.ImageConfig, pipeline *gstreamer.Pipeline, p *confi } // File will be renamed if the TS prefix is configured - location := fmt.Sprintf("%s_%%05d%s", c.ImagePrefix, types.FileExtensionForOutputType[c.OutputType]) + location := fmt.Sprintf("%s_%%05d%s", path.Join(c.StorageDir, c.ImagePrefix), types.FileExtensionForOutputType[c.OutputType]) err = sink.SetProperty("location", location) if err != nil { diff --git a/pkg/pipeline/sink/image_manifest.go b/pkg/pipeline/sink/image_manifest.go new file mode 100644 index 00000000..f7a455d0 --- /dev/null +++ b/pkg/pipeline/sink/image_manifest.go @@ -0,0 +1,77 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "encoding/json" + "os" + "time" + + "github.com/livekit/egress/pkg/config" + "github.com/livekit/egress/pkg/pipeline/sink/uploader" + "github.com/livekit/egress/pkg/types" +) + +type ImageManifest struct { + Manifest `json:",inline"` + + Images []*Image `json:"images"` +} + +type Image struct { + Name string `json:"name"` + Timestamp time.Time `json:"timestamp"` + Size int64 `json:"size"` +} + +func createImageManifest(p *config.PipelineConfig) *ImageManifest { + return &ImageManifest{ + Manifest: initManifest(p), + } +} + +func (m *ImageManifest) imageCreated(filename string, ts time.Time, size int64) { + m.Images = append(m.Images, &Image{ + Name: filename, + Timestamp: ts, + Size: size, + }) +} + +func (m *ImageManifest) updateManifest(u uploader.Uploader, localFilepath, storageFilepath string) error { + manifest, err := os.Create(localFilepath) + if err != nil { + return err + } + + b, err := json.Marshal(manifest) + if err != nil { + return err + } + + _, err = manifest.Write(b) + if err != nil { + return err + } + + err = manifest.Close() + if err != nil { + return err + } + + _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false) + + return err +} diff --git a/pkg/pipeline/sink/images.go b/pkg/pipeline/sink/images.go index 0c6def40..60659bbc 100644 --- a/pkg/pipeline/sink/images.go +++ b/pkg/pipeline/sink/images.go @@ -42,11 +42,12 @@ type ImageSink struct { startTime time.Time startRunningTime uint64 - createdImages chan ImageUpdate + manifest *ImageManifest + createdImages chan *imageUpdate done core.Fuse } -type ImageUpdate struct { +type imageUpdate struct { timestamp uint64 filename string } @@ -58,7 +59,8 @@ func newImageSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Image conf: p, callbacks: callbacks, - createdImages: make(chan ImageUpdate, maxPendingUploads), + manifest: createImageManifest(p), + createdImages: make(chan *imageUpdate, maxPendingUploads), done: core.NewFuse(), }, nil } @@ -82,40 +84,40 @@ func (s *ImageSink) Start() error { } }() - // TODO update manifest - return nil } -func (s *ImageSink) handleNewImage(update *ImageUpdate) error { +func (s *ImageSink) handleNewImage(update *imageUpdate) error { s.ImagesInfo.ImagesCount++ filename := update.filename ts := s.getImageTime(update.timestamp) imageLocalPath := path.Join(s.LocalDir, filename) if s.ImageSuffix == livekit.ImageFileSuffix_IMAGE_SUFFIX_TIMESTAMP { - newFilemame := fmt.Sprintf("%s_%s%03d%s", s.ImagePrefix, ts.Format("20060102150405"), ts.UnixMilli()%1000, types.FileExtensionForOutputType[s.OutputType]) - newImageLocalPath = path.Join(s.LocalDir, newFilename) + newFilename := fmt.Sprintf("%s_%s%03d%s", s.ImagePrefix, ts.Format("20060102150405"), ts.UnixMilli()%1000, types.FileExtensionForOutputType[s.OutputType]) + newImageLocalPath := path.Join(s.LocalDir, newFilename) - err = os.Rename(imageLocalPath, newImageLocalPath) + err := os.Rename(imageLocalPath, newImageLocalPath) if err != nil { return err } - filename = newFilemame + filename = newFilename imageLocalPath = newImageLocalPath } imageStoragePath := path.Join(s.StorageDir, filename) - _, size, err = s.Upload(imageLocalPath, imageStoragePath, s.getImageOutputType(), true) + _, size, err := s.Upload(imageLocalPath, imageStoragePath, s.OutputType, true) if err != nil { return err } - err = s.updateManifest(filename, ts, size) - if err != nil { - return err + if !s.DisableManifest { + err = s.updateManifest(filename, ts, size) + if err != nil { + return err + } } return nil @@ -131,18 +133,46 @@ func (s *ImageSink) getImageTime(pts uint64) time.Time { return s.startTime.Add(time.Duration(pts - s.startRunningTime)) } +func (s *ImageSink) updateManifest(filename string, ts time.Time, size int64) error { + s.manifest.imageCreated(filename, ts, size) + + manifestLocalPath := fmt.Sprintf("%s.json", path.Join(s.LocalDir, s.ImagePrefix)) + manifestStoragePath := fmt.Sprintf("%s.json", path.Join(s.StorageDir, s.ImagePrefix)) + return s.manifest.updateManifest(s.Uploader, manifestLocalPath, manifestStoragePath) +} + func (s *ImageSink) NewImage(filepath string, ts uint64) error { if !strings.HasPrefix(filepath, s.LocalDir) { return fmt.Errorf("invalid filepath") } + filename := filepath[len(s.LocalDir):] + + s.createdImages <- &imageUpdate{ + filename: filename, + timestamp: ts, + } + return nil } func (s *ImageSink) Close() error { + close(s.createdImages) + <-s.done.Watch() return nil } -func (*ImageSink) Cleanup() { +func (s *ImageSink) Cleanup() { + if s.LocalDir == s.StorageDir { + return + } + + if s.LocalDir != "" { + logger.Debugw("removing temporary directory", "path", s.LocalDir) + if err := os.RemoveAll(s.LocalDir); err != nil { + logger.Errorw("could not delete temp dir", err) + } + } + } diff --git a/pkg/pipeline/sink/manifest.go b/pkg/pipeline/sink/manifest.go index deced515..6d967971 100644 --- a/pkg/pipeline/sink/manifest.go +++ b/pkg/pipeline/sink/manifest.go @@ -60,7 +60,17 @@ func uploadManifest(p *config.PipelineConfig, u uploader.Uploader, localFilepath } func getManifest(p *config.PipelineConfig) ([]byte, error) { - manifest := Manifest{ + manifest := initManifest(p) + + if o := p.GetSegmentConfig(); o != nil { + manifest.SegmentCount = o.SegmentsInfo.SegmentCount + } + + return json.Marshal(manifest) +} + +func initManifest(p *config.PipelineConfig) Manifest { + return Manifest{ EgressID: p.Info.EgressId, RoomID: p.Info.RoomId, RoomName: p.Info.RoomName, @@ -74,10 +84,4 @@ func getManifest(p *config.PipelineConfig) ([]byte, error) { AudioTrackID: p.AudioTrackID, VideoTrackID: p.VideoTrackID, } - - if o := p.GetSegmentConfig(); o != nil { - manifest.SegmentCount = o.SegmentsInfo.SegmentCount - } - - return json.Marshal(manifest) } From e436167c9061799a55b558bb2abddffe17f1bc41 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Wed, 27 Sep 2023 15:33:05 -0700 Subject: [PATCH 14/31] WiP --- test/images.go | 97 +++++++++++++++++++++++++++++++++++++++++ test/track_composite.go | 59 +++++++++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 test/images.go diff --git a/test/images.go b/test/images.go new file mode 100644 index 00000000..f0380514 --- /dev/null +++ b/test/images.go @@ -0,0 +1,97 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build integration + +package test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/livekit/egress/pkg/config" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" +) + +func (r *Runner) runImagesTest(t *testing.T, req *rpc.StartEgressRequest, test *testCase) { + egressID := r.startEgress(t, req) + + time.Sleep(time.Second * 10) + if r.Dotfiles { + r.createDotFile(t, egressID) + } + + // stop + time.Sleep(time.Second * 15) + res := r.stopEgress(t, egressID) + + // get params + p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) + require.NoError(t, err) + + r.verifyImages(t, p, test.imageFilenameSuffix, res) +} + +func (r *Runner) verifyImages(t *testing.T, p *config.PipelineConfig, filenameSuffix livekit.ImageFileSuffix, res *livekit.EgressInfo) { + // egress info + require.Equal(t, res.Error == "", res.Status != livekit.EgressStatus_EGRESS_FAILED) + require.NotZero(t, res.StartedAt) + require.NotZero(t, res.EndedAt) + + // segments info + require.Len(t, res.GetImagesResults(), 1) + images := res.GetImagesResults()[0] + + require.Greater(t, segments.ImagesCount, int64(0)) + + // r.verifyImagesOutput(t, p, filenameSuffix, segments.PlaylistName, segments.PlaylistLocation, int(segments.SegmentCount), res, m3u8.PlaylistTypeEvent) + // r.verifyManifest(t, p, segments.PlaylistName) +} + +//func (r *Runner) verifyManifest(t *testing.T, p *config.PipelineConfig, plName string) { +// localPlaylistPath := fmt.Sprintf("%s/%s", r.FilePrefix, plName) +// +// if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil { +// download(t, uploadConfig, localPlaylistPath+".json", plName+".json") +// } +//} + +//func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, filenameSuffix livekit.SegmentedFileSuffix, plName string, plLocation string, segmentCount int, res *livekit.EgressInfo, plType m3u8.PlaylistType) { +// require.NotEmpty(t, plName) +// require.NotEmpty(t, plLocation) + +// storedPlaylistPath := plName +// localPlaylistPath := plName + +// // download from cloud storage +// if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil { +// localPlaylistPath = fmt.Sprintf("%s/%s", r.FilePrefix, storedPlaylistPath) +// download(t, uploadConfig, localPlaylistPath, storedPlaylistPath) +// if plType == m3u8.PlaylistTypeEvent { +// // Only download segments once +// base := storedPlaylistPath[:len(storedPlaylistPath)-5] +// for i := 0; i < int(segmentCount); i++ { +// cloudPath := fmt.Sprintf("%s_%05d.ts", base, i) +// localPath := fmt.Sprintf("%s/%s", r.FilePrefix, cloudPath) +// download(t, uploadConfig, localPath, cloudPath) +// } +// } +// } + +// verify +// verify(t, localPlaylistPath, p, res, types.EgressTypeSegments, r.Muting, r.sourceFramerate, plType == m3u8.PlaylistTypeLive) +//} diff --git a/test/track_composite.go b/test/track_composite.go index 7bda4ab9..b5caa9b5 100644 --- a/test/track_composite.go +++ b/test/track_composite.go @@ -228,6 +228,65 @@ func (r *Runner) testTrackCompositeSegments(t *testing.T) { }) } +func (r *Runner) testTrackCompositeImages(t *testing.T) { + if !r.runImagesTests() { + return + } + + t.Run("TrackComposite/Images", func(t *testing.T) { + for _, test := range []*testCase{ + { + name: "VP8", + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeH264, + filename: "tcs_{publisher_identity}_vp8_{time}", + }, + } { + r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, + func(t *testing.T, audioTrackID, videoTrackID string) { + var aID, vID string + if !test.audioOnly { + vID = videoTrackID + } + if !test.videoOnly { + aID = audioTrackID + } + + imageOutput := &livekit.ImageOutput{ + CaptureInterval: 5, + Width: 1280, + Height: 720, + FilenamePrefix: test.filename, + } + + trackRequest := &livekit.TrackCompositeEgressRequest{ + RoomName: r.room.Name(), + AudioTrackId: aID, + VideoTrackId: vID, + ImageOutputs: []*livekit.ImageOutput{imageOutput}, + } + if test.options != nil { + trackRequest.Options = &livekit.TrackCompositeEgressRequest_Advanced{ + Advanced: test.options, + } + } + + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_TrackComposite{ + TrackComposite: trackRequest, + }, + } + r.runImagessTest(t, req, test) + }, + ) + if r.Short { + return + } + } + }) +} + func (r *Runner) testTrackCompositeMulti(t *testing.T) { if !r.runMultiTests() { return From 54455d583123f71f9d7950d92a3752799eb27539 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Wed, 27 Sep 2023 16:06:11 -0700 Subject: [PATCH 15/31] WiP --- go.mod | 4 +--- go.sum | 6 ++++-- pkg/pipeline/source/sdk.go | 5 +++-- test/ffprobe.go | 6 +++--- test/images.go | 2 +- test/integration.go | 3 +++ test/runner.go | 13 +++++++++---- test/track_composite.go | 4 ++-- 8 files changed, 26 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index 7f6eba39..60151e32 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module github.com/livekit/egress go 1.20 -replace github.com/livekit/protocol => ../protocol - replace github.com/tinyzimmer/go-glib v0.0.25 => github.com/livekit/go-glib v0.0.0-20230223001336-834490045522 replace github.com/tinyzimmer/go-gst v0.2.33 => github.com/livekit/go-gst v0.2.34-0.20230901155409-dd09095b979e @@ -21,7 +19,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/livekit/livekit-server v1.4.6-0.20230918194757-8a0d417a8c99 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e + github.com/livekit/protocol v1.7.3-0.20230927224423-14bc77f4821b github.com/livekit/psrpc v0.3.3 github.com/livekit/server-sdk-go v1.0.17-0.20230918212012-3a26309be9c5 github.com/pion/rtp v1.8.1 diff --git a/go.sum b/go.sum index 4468f45b..24cdb134 100644 --- a/go.sum +++ b/go.sum @@ -177,8 +177,10 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f h1:b4ri7hQESRSzJWzXXcmANG2hJ4HTj5LM01Ekm8lnQmg= github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e h1:WEet0iH/JazBFNhhH+YuZHtXpKefb7mnbCC2al3peyA= -github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= +github.com/livekit/protocol v1.7.3-0.20230927223354-d7401d265a2f h1:YicTs1L+qnxFgJ2IrW2nwgPDbigY6OEMHqvSHm1KHV4= +github.com/livekit/protocol v1.7.3-0.20230927223354-d7401d265a2f/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= +github.com/livekit/protocol v1.7.3-0.20230927224423-14bc77f4821b h1:TcOck4b6Z5cWDfeMUSOFQ+UuQPyZppuTZZI9wvhTj50= +github.com/livekit/protocol v1.7.3-0.20230927224423-14bc77f4821b/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/livekit/server-sdk-go v1.0.17-0.20230918212012-3a26309be9c5 h1:JZttHKxyzGFuMpsl3Mmt84WU+DZsZg00FCOPfr6CmBY= diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index 4683a6d0..9d3163ce 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -214,6 +214,7 @@ func (s *SDKSource) awaitParticipant(identity string) (uint32, uint32, error) { } } + var w, h uint32 for _, t := range rp.Tracks() { if t.TrackInfo().Type == livekit.TrackType_VIDEO { w = t.TrackInfo().Width @@ -246,7 +247,7 @@ func (s *SDKSource) awaitTracks(expecting map[string]struct{}) (uint32, uint32, deadline := time.After(subscriptionTimeout) tracks, err := s.subscribeToTracks(expecting, deadline) if err != nil { - return err + return 0, 0, err } for i := 0; i < trackCount; i++ { @@ -262,7 +263,7 @@ func (s *SDKSource) awaitTracks(expecting map[string]struct{}) (uint32, uint32, var w, h uint32 for _, t := range tracks { - if t.TrackInfo().Type() == livekit.TrackType_VIDEO { + if t.TrackInfo().Type == livekit.TrackType_VIDEO { w = t.TrackInfo().Width h = t.TrackInfo().Height } diff --git a/test/ffprobe.go b/test/ffprobe.go index 0be62149..d5576f08 100644 --- a/test/ffprobe.go +++ b/test/ffprobe.go @@ -121,7 +121,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre require.NoError(t, err, "input %s does not exist", in) } - switch p.Outputs[egressType].GetOutputType() { + switch p.Outputs[egressType][0].GetOutputType() { case types.OutputTypeRaw: require.Equal(t, 0, info.Format.ProbeScore) case types.OutputTypeIVF: @@ -213,7 +213,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre require.Equal(t, 2, stream.Channels) // audio bitrate - if p.Outputs[egressType].GetOutputType() == types.OutputTypeMP4 { + if p.Outputs[egressType][0].GetOutputType() == types.OutputTypeMP4 { bitrate, err := strconv.Atoi(stream.BitRate) require.NoError(t, err) require.NotZero(t, bitrate) @@ -243,7 +243,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre require.Equal(t, "vp9", stream.CodecName) } - switch p.Outputs[egressType].GetOutputType() { + switch p.Outputs[egressType][0].GetOutputType() { case types.OutputTypeIVF: require.Equal(t, "vp8", stream.CodecName) diff --git a/test/images.go b/test/images.go index f0380514..f526862b 100644 --- a/test/images.go +++ b/test/images.go @@ -56,7 +56,7 @@ func (r *Runner) verifyImages(t *testing.T, p *config.PipelineConfig, filenameSu require.Len(t, res.GetImagesResults(), 1) images := res.GetImagesResults()[0] - require.Greater(t, segments.ImagesCount, int64(0)) + require.Greater(t, images.ImagesCount, int64(0)) // r.verifyImagesOutput(t, p, filenameSuffix, segments.PlaylistName, segments.PlaylistLocation, int(segments.SegmentCount), res, m3u8.PlaylistTypeEvent) // r.verifyManifest(t, p, segments.PlaylistName) diff --git a/test/integration.go b/test/integration.go index 08bfe37c..75046048 100644 --- a/test/integration.go +++ b/test/integration.go @@ -79,6 +79,9 @@ type testCase struct { live_playlist string filenameSuffix livekit.SegmentedFileSuffix + // used by images tests + imageFilenameSuffix livekit.ImageFileSuffix + // used by sdk tests audioCodec types.MimeType audioDelay time.Duration diff --git a/test/runner.go b/test/runner.go index a86eee82..b8d44007 100644 --- a/test/runner.go +++ b/test/runner.go @@ -63,6 +63,7 @@ type Runner struct { FileTestsOnly bool `yaml:"file_only"` StreamTestsOnly bool `yaml:"stream_only"` SegmentTestsOnly bool `yaml:"segments_only"` + ImageTestsOnly bool `yaml:"images_only"` MultiTestsOnly bool `yaml:"multi_only"` Muting bool `yaml:"muting"` Dotfiles bool `yaml:"dot_files"` @@ -202,17 +203,21 @@ func (r *Runner) runTrackTests() bool { } func (r *Runner) runFileTests() bool { - return !r.StreamTestsOnly && !r.SegmentTestsOnly && !r.MultiTestsOnly + return !r.StreamTestsOnly && !r.SegmentTestsOnly && !r.MultiTestsOnly && !r.ImageTestsOnly } func (r *Runner) runStreamTests() bool { - return !r.FileTestsOnly && !r.SegmentTestsOnly && !r.MultiTestsOnly + return !r.FileTestsOnly && !r.SegmentTestsOnly && !r.MultiTestsOnly && !r.ImageTestsOnly } func (r *Runner) runSegmentTests() bool { - return !r.FileTestsOnly && !r.StreamTestsOnly && !r.MultiTestsOnly + return !r.FileTestsOnly && !r.StreamTestsOnly && !r.MultiTestsOnly && !r.ImageTestsOnly +} + +func (r *Runner) runImageTests() bool { + return !r.FileTestsOnly && !r.StreamTestsOnly && !r.SegmentTestsOnly && !r.MultiTestsOnly } func (r *Runner) runMultiTests() bool { - return !r.FileTestsOnly && !r.StreamTestsOnly && !r.SegmentTestsOnly + return !r.FileTestsOnly && !r.StreamTestsOnly && !r.SegmentTestsOnly && !r.ImageTestsOnly } diff --git a/test/track_composite.go b/test/track_composite.go index b5caa9b5..541e63e8 100644 --- a/test/track_composite.go +++ b/test/track_composite.go @@ -229,7 +229,7 @@ func (r *Runner) testTrackCompositeSegments(t *testing.T) { } func (r *Runner) testTrackCompositeImages(t *testing.T) { - if !r.runImagesTests() { + if !r.runImageTests() { return } @@ -277,7 +277,7 @@ func (r *Runner) testTrackCompositeImages(t *testing.T) { TrackComposite: trackRequest, }, } - r.runImagessTest(t, req, test) + r.runImagesTest(t, req, test) }, ) if r.Short { From 47f742092c6ab28e719cbf76a46fc30b4af8f6a0 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Wed, 27 Sep 2023 19:35:41 -0700 Subject: [PATCH 16/31] Generates frames --- pkg/gstreamer/pads.go | 1 + pkg/pipeline/builder/image.go | 13 +++++++++++++ pkg/pipeline/builder/video.go | 1 + test/track_composite.go | 3 ++- 4 files changed, 17 insertions(+), 1 deletion(-) diff --git a/pkg/gstreamer/pads.go b/pkg/gstreamer/pads.go index 3b202550..347b1baa 100644 --- a/pkg/gstreamer/pads.go +++ b/pkg/gstreamer/pads.go @@ -98,6 +98,7 @@ func createGhostPadsLocked(src, sink *Bin, queue *gst.Element) (*gst.GhostPad, * } func matchPadsLocked(src, sink *Bin) (*gst.Pad, *gst.Pad, error) { + fmt.Println("matchPadsLocked", src.bin.GetName(), sink.bin.GetName()) var srcPad, sinkPad *gst.Pad var srcTemplates, sinkTemplates []*padTemplate if src.getSinkPad != nil { diff --git a/pkg/pipeline/builder/image.go b/pkg/pipeline/builder/image.go index cc3ef449..d0268172 100644 --- a/pkg/pipeline/builder/image.go +++ b/pkg/pipeline/builder/image.go @@ -52,6 +52,11 @@ func BuildImageBin(c *config.ImageConfig, pipeline *gstreamer.Pipeline, p *confi b := pipeline.NewBin(fmt.Sprintf("image_%s", id)) + fakeAudio, err := gst.NewElement("fakesink") + if err != nil { + return nil, err + } + queue, err := gstreamer.BuildQueue(fmt.Sprintf("image_queue_%s", id), imageQueueLatency, true) if err != nil { return nil, err @@ -60,6 +65,14 @@ func BuildImageBin(c *config.ImageConfig, pipeline *gstreamer.Pipeline, p *confi return nil, errors.ErrGstPipelineError(err) } + b.SetGetSrcPad(func(name string) *gst.Pad { + if name == "audio" { + return fakeAudio.GetStaticPad("sink") + } else { + return queue.GetStaticPad("sink") + } + }) + videoRate, err := gst.NewElement("videorate") if err != nil { return nil, errors.ErrGstPipelineError(err) diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index 0f09a49d..3dcc1280 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -102,6 +102,7 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error } b.bin.SetGetSinkPad(func(name string) *gst.Pad { + fmt.Println("FOO PAD", name) if strings.HasPrefix(name, "image") { return b.rawVideoTee.GetRequestPad("src_%u") } else { diff --git a/test/track_composite.go b/test/track_composite.go index 541e63e8..80a9d960 100644 --- a/test/track_composite.go +++ b/test/track_composite.go @@ -34,6 +34,7 @@ func (r *Runner) testTrackComposite(t *testing.T) { r.testTrackCompositeFile(t) r.testTrackCompositeStream(t) r.testTrackCompositeSegments(t) + r.testTrackCompositeImages(t) r.testTrackCompositeMulti(t) } @@ -256,7 +257,7 @@ func (r *Runner) testTrackCompositeImages(t *testing.T) { CaptureInterval: 5, Width: 1280, Height: 720, - FilenamePrefix: test.filename, + FilenamePrefix: r.getFilePath(test.filename), } trackRequest := &livekit.TrackCompositeEgressRequest{ From f3887454498dcd2cf99621c4d16c2958a8b0aef4 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Thu, 28 Sep 2023 09:42:55 -0700 Subject: [PATCH 17/31] Fix tests --- pkg/config/output.go | 13 +++++++++ pkg/config/pipeline.go | 11 ++++---- pkg/pipeline/builder/image.go | 10 +++++-- pkg/pipeline/builder/video.go | 53 +++++++++++++++++++++-------------- pkg/pipeline/source/sdk.go | 5 +++- test/ffprobe.go | 6 ++-- test/file.go | 2 +- test/integration.go | 2 +- test/participant.go | 6 ++-- test/room_composite.go | 26 ++++++++--------- test/segments.go | 2 +- test/stream.go | 2 +- test/track_composite.go | 6 ++-- test/web.go | 6 ++-- 14 files changed, 91 insertions(+), 59 deletions(-) diff --git a/pkg/config/output.go b/pkg/config/output.go index 1b293168..c9171551 100644 --- a/pkg/config/output.go +++ b/pkg/config/output.go @@ -79,6 +79,9 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error { p.Outputs[types.EgressTypeFile] = []OutputConfig{conf} p.OutputCount++ p.FinalizationRequired = true + if p.VideoEnabled { + p.VideoEncoding = true + } p.Info.FileResults = []*livekit.FileInfo{conf.FileInfo} if len(streams)+len(segments) == 0 { @@ -107,6 +110,9 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error { p.Outputs[types.EgressTypeStream] = []OutputConfig{conf} p.OutputCount += len(stream.Urls) + if p.VideoEnabled { + p.VideoEncoding = true + } streamInfoList := make([]*livekit.StreamInfo, 0, len(conf.StreamInfo)) for _, info := range conf.StreamInfo { @@ -145,6 +151,9 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error { p.Outputs[types.EgressTypeSegments] = []OutputConfig{conf} p.OutputCount++ p.FinalizationRequired = true + if p.VideoEnabled { + p.VideoEncoding = true + } p.Info.SegmentResults = []*livekit.SegmentsInfo{conf.SegmentsInfo} if len(streams)+len(segments) == 0 { @@ -165,6 +174,10 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error { return nil } +func (p *PipelineConfig) HasEncodedOutput() bool { + return p.GetFileConfig() != nil || p.GetStreamConfig() != nil || p.GetSegmentConfig() != nil +} + func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) error { switch o := req.Output.(type) { case *livekit.TrackEgressRequest_File: diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index 45783419..8af23294 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -110,7 +110,8 @@ type AudioConfig struct { type VideoConfig struct { VideoEnabled bool - VideoTranscoding bool + VideoDecoding bool + VideoEncoding bool VideoOutCodec types.MimeType VideoProfile types.Profile Width int32 @@ -218,7 +219,7 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error { if !req.RoomComposite.AudioOnly { p.VideoEnabled = true p.VideoInCodec = types.MimeTypeRawVideo - p.VideoTranscoding = true + p.VideoDecoding = true } if !p.AudioEnabled && !p.VideoEnabled { return errors.ErrInvalidInput("audio_only and video_only") @@ -267,7 +268,7 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error { if !req.Web.AudioOnly { p.VideoEnabled = true p.VideoInCodec = types.MimeTypeRawVideo - p.VideoTranscoding = true + p.VideoDecoding = true } if !p.AudioEnabled && !p.VideoEnabled { return errors.ErrInvalidInput("audio_only and video_only") @@ -304,7 +305,7 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error { p.AudioEnabled = true p.AudioTranscoding = true p.VideoEnabled = true - p.VideoTranscoding = true + p.VideoDecoding = true p.Identity = req.Participant.Identity if p.Identity == "" { return errors.ErrInvalidInput("identity") @@ -346,7 +347,7 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error { if videoTrackID := req.TrackComposite.VideoTrackId; videoTrackID != "" { p.VideoEnabled = true p.VideoTrackID = videoTrackID - p.VideoTranscoding = true + p.VideoDecoding = true } if !p.AudioEnabled && !p.VideoEnabled { return errors.ErrInvalidInput("audio_track_id or video_track_id") diff --git a/pkg/pipeline/builder/image.go b/pkg/pipeline/builder/image.go index d0268172..7be20cb1 100644 --- a/pkg/pipeline/builder/image.go +++ b/pkg/pipeline/builder/image.go @@ -52,9 +52,13 @@ func BuildImageBin(c *config.ImageConfig, pipeline *gstreamer.Pipeline, p *confi b := pipeline.NewBin(fmt.Sprintf("image_%s", id)) - fakeAudio, err := gst.NewElement("fakesink") - if err != nil { - return nil, err + var err error + var fakeAudio *gst.Element + if p.AudioEnabled { + fakeAudio, err = gst.NewElement("fakesink") + if err != nil { + return nil, err + } } queue, err := gstreamer.BuildQueue(fmt.Sprintf("image_queue_%s", id), imageQueueLatency, true) diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index 3dcc1280..9648c309 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -219,11 +219,10 @@ func (b *VideoBin) buildWebInput() error { return err } - if b.conf.VideoTranscoding { - if err = b.addEncoder(); err != nil { - return err - } + if err = b.addDecodedVideoSink(); err != nil { + return err } + return nil } @@ -231,7 +230,7 @@ func (b *VideoBin) buildSDKInput() error { b.pads = make(map[string]*gst.Pad) // add selector first so pads can be created - if b.conf.VideoTranscoding { + if b.conf.VideoDecoding { if err := b.addSelector(); err != nil { return err } @@ -243,7 +242,7 @@ func (b *VideoBin) buildSDKInput() error { } } - if b.conf.VideoTranscoding { + if b.conf.VideoDecoding { b.bin.SetGetSrcPad(b.getSrcPad) b.bin.SetEOSFunc(func() bool { b.mu.Lock() @@ -265,7 +264,7 @@ func (b *VideoBin) buildSDKInput() error { return err } } - if err := b.addEncoder(); err != nil { + if err := b.addDecodedVideoSink(); err != nil { return err } } @@ -279,7 +278,7 @@ func (b *VideoBin) addAppSrcBin(ts *config.TrackSource) error { return err } - if b.conf.VideoTranscoding { + if b.conf.VideoDecoding { b.createSrcPad(ts.TrackID) } @@ -287,7 +286,7 @@ func (b *VideoBin) addAppSrcBin(ts *config.TrackSource) error { return err } - if b.conf.VideoTranscoding { + if b.conf.VideoDecoding { return b.setSelectorPad(ts.TrackID) } @@ -332,7 +331,7 @@ func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource) (*gstreamer.Bin, error return nil, err } - if b.conf.VideoTranscoding { + if b.conf.VideoDecoding { avDecH264, err := gst.NewElement("avdec_h264") if err != nil { return nil, errors.ErrGstPipelineError(err) @@ -370,7 +369,7 @@ func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource) (*gstreamer.Bin, error return nil, err } - if b.conf.VideoTranscoding { + if b.conf.VideoDecoding { vp8Dec, err := gst.NewElement("vp8dec") if err != nil { return nil, errors.ErrGstPipelineError(err) @@ -398,7 +397,7 @@ func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource) (*gstreamer.Bin, error return nil, err } - if b.conf.VideoTranscoding { + if b.conf.VideoDecoding { vp9Dec, err := gst.NewElement("vp9dec") if err != nil { return nil, errors.ErrGstPipelineError(err) @@ -499,15 +498,6 @@ func (b *VideoBin) addSelector() error { } func (b *VideoBin) addEncoder() error { - var err error - b.rawVideoTee, err = gst.NewElement("tee") - if err != nil { - return err - } - if err = b.bin.AddElement(b.rawVideoTee); err != nil { - return err - } - videoQueue, err := gstreamer.BuildQueue("video_encoder_queue", b.conf.Latency, false) if err != nil { return err @@ -602,6 +592,27 @@ func (b *VideoBin) addEncoder() error { default: return errors.ErrNotSupported(fmt.Sprintf("%s encoding", b.conf.VideoOutCodec)) } + +} + +func (b *VideoBin) addDecodedVideoSink() error { + var err error + b.rawVideoTee, err = gst.NewElement("tee") + if err != nil { + return err + } + if err = b.bin.AddElement(b.rawVideoTee); err != nil { + return err + } + + if b.conf.VideoEncoding { + err = b.addEncoder() + if err != nil { + return err + } + } + + return nil } func addVideoConverter(b *gstreamer.Bin, p *config.PipelineConfig) error { diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index 9d3163ce..61538d5d 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -380,7 +380,10 @@ func (s *SDKSource) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Remo s.VideoOutCodec = ts.MimeType } if s.VideoInCodec != s.VideoOutCodec { - s.VideoTranscoding = true + s.VideoDecoding = true + if s.HasEncodedOutput() { + s.VideoEncoding = true + } } writer, err := s.createWriter(track, pub, rp, ts) diff --git a/test/ffprobe.go b/test/ffprobe.go index d5576f08..bc7410fd 100644 --- a/test/ffprobe.go +++ b/test/ffprobe.go @@ -227,7 +227,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre case types.MimeTypeH264: require.Equal(t, "h264", stream.CodecName) - if p.VideoTranscoding { + if p.VideoEncoding { switch p.VideoProfile { case types.ProfileBaseline: require.Equal(t, "Constrained Baseline", stream.Profile) @@ -250,7 +250,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre case types.OutputTypeMP4: require.Equal(t, "h264", stream.CodecName) - if p.VideoTranscoding { + if p.VideoEncoding { // bitrate, not available for HLS or WebM bitrate, err := strconv.Atoi(stream.BitRate) require.NoError(t, err) @@ -274,7 +274,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre case types.OutputTypeHLS: require.Equal(t, "h264", stream.CodecName) - if p.VideoTranscoding { + if p.VideoEncoding { // dimensions require.Equal(t, p.Width, stream.Width) require.Equal(t, p.Height, stream.Height) diff --git a/test/file.go b/test/file.go index 3ab8844b..2257a317 100644 --- a/test/file.go +++ b/test/file.go @@ -50,7 +50,7 @@ func (r *Runner) runFileTest(t *testing.T, req *rpc.StartEgressRequest, test *te p.GetFileConfig().OutputType = test.outputType } - require.Equal(t, test.expectVideoTranscoding, p.VideoTranscoding) + require.Equal(t, test.expectVideoEncoding, p.VideoEncoding) // verify r.verifyFile(t, p, res) diff --git a/test/integration.go b/test/integration.go index 75046048..8bf2016a 100644 --- a/test/integration.go +++ b/test/integration.go @@ -96,7 +96,7 @@ type testCase struct { // used by track tests outputType types.OutputType - expectVideoTranscoding bool + expectVideoEncoding bool } func (r *Runner) awaitIdle(t *testing.T) { diff --git a/test/participant.go b/test/participant.go index 4135527a..f1e56497 100644 --- a/test/participant.go +++ b/test/participant.go @@ -121,7 +121,7 @@ func (r *Runner) testParticipantFile(t *testing.T) { }, } - test.expectVideoTranscoding = true + test.expectVideoEncoding = true r.runFileTest(t, req, test) }) if r.Short { @@ -157,7 +157,7 @@ func (r *Runner) testParticipantStream(t *testing.T) { }, } - r.runStreamTest(t, req, &testCase{expectVideoTranscoding: true}) + r.runStreamTest(t, req, &testCase{expectVideoEncoding: true}) }, ) } @@ -219,7 +219,7 @@ func (r *Runner) testParticipantSegments(t *testing.T) { Participant: trackRequest, }, } - test.expectVideoTranscoding = true + test.expectVideoEncoding = true r.runSegmentsTest(t, req, test) }, diff --git a/test/room_composite.go b/test/room_composite.go index 245e7e16..b8e2b32b 100644 --- a/test/room_composite.go +++ b/test/room_composite.go @@ -57,9 +57,9 @@ func (r *Runner) testRoomCompositeFile(t *testing.T) { t.Run("RoomComposite/File", func(t *testing.T) { for _, test := range []*testCase{ { - name: "Base", - filename: "r_{room_name}_{time}.mp4", - expectVideoTranscoding: true, + name: "Base", + filename: "r_{room_name}_{time}.mp4", + expectVideoEncoding: true, }, { name: "Video-Only", @@ -67,8 +67,8 @@ func (r *Runner) testRoomCompositeFile(t *testing.T) { options: &livekit.EncodingOptions{ VideoCodec: livekit.VideoCodec_H264_HIGH, }, - filename: "r_{room_name}_video_{time}.mp4", - expectVideoTranscoding: true, + filename: "r_{room_name}_video_{time}.mp4", + expectVideoEncoding: true, }, { name: "Audio-Only", @@ -77,8 +77,8 @@ func (r *Runner) testRoomCompositeFile(t *testing.T) { options: &livekit.EncodingOptions{ AudioCodec: livekit.AudioCodec_OPUS, }, - filename: "r_{room_name}_audio_{time}", - expectVideoTranscoding: false, + filename: "r_{room_name}_audio_{time}", + expectVideoEncoding: false, }, } { r.runRoomTest(t, test.name, types.MimeTypeOpus, types.MimeTypeH264, func(t *testing.T) { @@ -147,7 +147,7 @@ func (r *Runner) testRoomCompositeStream(t *testing.T) { }, } - r.runStreamTest(t, req, &testCase{expectVideoTranscoding: true}) + r.runStreamTest(t, req, &testCase{expectVideoEncoding: true}) }) if r.Short { return @@ -202,11 +202,11 @@ func (r *Runner) testRoomCompositeSegments(t *testing.T) { Height: 1080, VideoBitrate: 4500, }, - filename: "r_{room_name}_{time}", - playlist: "r_{room_name}_{time}.m3u8", - live_playlist: "r_live_{room_name}_{time}.m3u8", - filenameSuffix: livekit.SegmentedFileSuffix_TIMESTAMP, - expectVideoTranscoding: true, + filename: "r_{room_name}_{time}", + playlist: "r_{room_name}_{time}.m3u8", + live_playlist: "r_live_{room_name}_{time}.m3u8", + filenameSuffix: livekit.SegmentedFileSuffix_TIMESTAMP, + expectVideoEncoding: true, }, &testCase{ options: &livekit.EncodingOptions{ diff --git a/test/segments.go b/test/segments.go index d7c250e7..542751dc 100644 --- a/test/segments.go +++ b/test/segments.go @@ -51,7 +51,7 @@ func (r *Runner) runSegmentsTest(t *testing.T, req *rpc.StartEgressRequest, test r.verifySegments(t, p, test.filenameSuffix, res, test.live_playlist != "") if !test.audioOnly { - require.Equal(t, test.expectVideoTranscoding, p.VideoTranscoding) + require.Equal(t, test.expectVideoEncoding, p.VideoEncoding) } } diff --git a/test/stream.go b/test/stream.go index db7b3839..05c3ccaf 100644 --- a/test/stream.go +++ b/test/stream.go @@ -37,7 +37,7 @@ func (r *Runner) runStreamTest(t *testing.T, req *rpc.StartEgressRequest, test * // get params p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) require.NoError(t, err) - require.Equal(t, test.expectVideoTranscoding, p.VideoTranscoding) + require.Equal(t, test.expectVideoEncoding, p.VideoEncoding) // verify and check update time.Sleep(time.Second * 5) diff --git a/test/track_composite.go b/test/track_composite.go index 80a9d960..458a986a 100644 --- a/test/track_composite.go +++ b/test/track_composite.go @@ -110,7 +110,7 @@ func (r *Runner) testTrackCompositeFile(t *testing.T) { }, } - test.expectVideoTranscoding = true + test.expectVideoEncoding = true r.runFileTest(t, req, test) }) if r.Short { @@ -141,7 +141,7 @@ func (r *Runner) testTrackCompositeStream(t *testing.T) { }, } - r.runStreamTest(t, req, &testCase{expectVideoTranscoding: true}) + r.runStreamTest(t, req, &testCase{expectVideoEncoding: true}) }, ) } @@ -217,7 +217,7 @@ func (r *Runner) testTrackCompositeSegments(t *testing.T) { TrackComposite: trackRequest, }, } - test.expectVideoTranscoding = true + test.expectVideoEncoding = true r.runSegmentsTest(t, req, test) }, diff --git a/test/web.go b/test/web.go index c91b6ede..0d31669b 100644 --- a/test/web.go +++ b/test/web.go @@ -71,7 +71,7 @@ func (r *Runner) testWebFile(t *testing.T) { } r.runFileTest(t, req, &testCase{ - expectVideoTranscoding: true, + expectVideoEncoding: true, }) }) } @@ -96,7 +96,7 @@ func (r *Runner) testWebStream(t *testing.T) { } r.runStreamTest(t, req, &testCase{ - expectVideoTranscoding: true, + expectVideoEncoding: true, }) }) } @@ -129,7 +129,7 @@ func (r *Runner) testWebSegments(t *testing.T) { } r.runSegmentsTest(t, req, &testCase{ - expectVideoTranscoding: true, + expectVideoEncoding: true, }) }) } From f2ae9d41d1fecb7a66faea4b1978029e653fb039 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Thu, 28 Sep 2023 11:10:54 -0700 Subject: [PATCH 18/31] WiP --- pkg/config/output.go | 4 ---- pkg/pipeline/builder/video.go | 3 +-- pkg/pipeline/source/sdk.go | 2 +- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/pkg/config/output.go b/pkg/config/output.go index c9171551..ccdfbf44 100644 --- a/pkg/config/output.go +++ b/pkg/config/output.go @@ -174,10 +174,6 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error { return nil } -func (p *PipelineConfig) HasEncodedOutput() bool { - return p.GetFileConfig() != nil || p.GetStreamConfig() != nil || p.GetSegmentConfig() != nil -} - func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) error { switch o := req.Output.(type) { case *livekit.TrackEgressRequest_File: diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index 9648c309..f9f4d29e 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -87,7 +87,7 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error getPad = func() *gst.Pad { return tee.GetRequestPad("src_%u") } - } else { + } else if len(p.GetEncodedOutputs()) > 0 { queue, err := gstreamer.BuildQueue("video_queue", p.Latency, true) if err != nil { return errors.ErrGstPipelineError(err) @@ -102,7 +102,6 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error } b.bin.SetGetSinkPad(func(name string) *gst.Pad { - fmt.Println("FOO PAD", name) if strings.HasPrefix(name, "image") { return b.rawVideoTee.GetRequestPad("src_%u") } else { diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index 61538d5d..8af69ec6 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -381,7 +381,7 @@ func (s *SDKSource) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Remo } if s.VideoInCodec != s.VideoOutCodec { s.VideoDecoding = true - if s.HasEncodedOutput() { + if len(s.GetEncodedOutputs()) > 0 { s.VideoEncoding = true } } From 6ec822ca059f1749a9bd354b7cc4bc30b0fe3397 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Thu, 28 Sep 2023 13:07:55 -0700 Subject: [PATCH 19/31] Add watch --- pkg/pipeline/watch.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 7ea40b2c..33788d9a 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -46,10 +46,14 @@ const ( msgFirstSampleMetadata = "FirstSampleMetadata" msgFragmentOpened = "splitmuxsink-fragment-opened" msgFragmentClosed = "splitmuxsink-fragment-closed" + msgGstMultiFileSink = "GstMultiFileSink" fragmentLocation = "location" fragmentRunningTime = "running-time" + gstMultiFileSinkFilename = "filename" + gstMultiFileSinkTimestamp = "timestamp" + // common gst errors msgWrongThread = "Called from wrong thread" @@ -284,6 +288,13 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error { logger.Debugw("received FirstSampleMetadata message", "startDate", startDate) c.getSegmentSink().UpdateStartDate(startDate) + + case msgGstMultiFileSink: + location, ts, err := getImageInformationFromGstStructure(s) + if err != nil { + return err + } + logger.Debugw("received GstMultiFileSink message", "location", location, "timestamp", ts, "source", msg.Source()) } } @@ -322,6 +333,29 @@ func getFirstSampleMetadataFromGstStructure(s *gst.Structure) (startDate time.Ti return time.Unix(0, firstSampleMetadata.StartDate), nil } +func getImageInformationFromGstStructure(s *gst.Structure) (string, uint64, error) { + loc, err := s.GetValue(gstMultiFileSinkFilename) + if err != nil { + return "", 0, err + } + filepath, ok := loc.(string) + if !ok { + return "", 0, errors.ErrGstPipelineError(errors.New("invalid type for location")) + } + + t, err := s.GetValue(gstMultiFileSinkTimestamp) + if err != nil { + return "", 0, err + } + ti, ok := t.(uint64) + if !ok { + return "", 0, errors.ErrGstPipelineError(errors.New("invalid type for time")) + } + + return filepath, ti, nil + +} + func (c *Controller) getSegmentSink() *sink.SegmentSink { s := c.sinks[types.EgressTypeSegments] if len(s) == 0 { From 3963cb086e59a36e926ce6fc09116ae6db79a8f7 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Thu, 28 Sep 2023 13:49:54 -0700 Subject: [PATCH 20/31] Wire sink --- pkg/config/output_image.go | 8 +++++--- pkg/errors/errors.go | 1 + pkg/pipeline/builder/image.go | 9 +++------ pkg/pipeline/sink/image_manifest.go | 2 +- pkg/pipeline/watch.go | 28 ++++++++++++++++++++++++++++ 5 files changed, 38 insertions(+), 10 deletions(-) diff --git a/pkg/config/output_image.go b/pkg/config/output_image.go index cd458d54..823dcd2f 100644 --- a/pkg/config/output_image.go +++ b/pkg/config/output_image.go @@ -29,6 +29,8 @@ import ( type ImageConfig struct { outputConfig + Id string // Used internally to map a gst Bin/element back to a sink and as part of the path + ImagesInfo *livekit.ImagesInfo LocalDir string StorageDir string @@ -66,6 +68,8 @@ func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConf outputConfig: outputConfig{ OutputType: outputType, }, + + Id: utils.NewGuid(""), ImagesInfo: &livekit.ImagesInfo{}, ImagePrefix: clean(images.FilenamePrefix), ImageSuffix: images.FilenameSuffix, @@ -130,9 +134,7 @@ func (o *ImageConfig) updatePrefix(p *PipelineConfig) error { // os.ModeDir creates a directory with mode 000 when mapping the directory outside the container // Append a "/" to the path for consistency with the "UploadConfig == nil" case - slug := utils.NewGuid("") - - o.LocalDir = path.Join(TmpDir, p.Info.EgressId, slug) + "/" + o.LocalDir = path.Join(TmpDir, p.Info.EgressId, o.Id) + "/" } // create local directories diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index a9376c31..be1063d4 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -37,6 +37,7 @@ var ( ErrResourceExhausted = psrpc.NewErrorf(psrpc.ResourceExhausted, "not enough CPU") ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Internal, "failed to subscribe to track") ErrPipelineFrozen = psrpc.NewErrorf(psrpc.Internal, "pipeline frozen") + ErrSinkNotFound = psrpc.NewErrorf(psrpc.Internal, "sink not found") ) func New(err string) error { diff --git a/pkg/pipeline/builder/image.go b/pkg/pipeline/builder/image.go index 7be20cb1..eca0807c 100644 --- a/pkg/pipeline/builder/image.go +++ b/pkg/pipeline/builder/image.go @@ -23,7 +23,6 @@ import ( "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/gstreamer" "github.com/livekit/egress/pkg/types" - "github.com/livekit/protocol/utils" "github.com/tinyzimmer/go-gst/gst" ) @@ -48,9 +47,7 @@ func BuildImageBins(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) ([]* } func BuildImageBin(c *config.ImageConfig, pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*gstreamer.Bin, error) { - id := utils.NewGuid("") - - b := pipeline.NewBin(fmt.Sprintf("image_%s", id)) + b := pipeline.NewBin(fmt.Sprintf("image_%s", c.Id)) var err error var fakeAudio *gst.Element @@ -61,7 +58,7 @@ func BuildImageBin(c *config.ImageConfig, pipeline *gstreamer.Pipeline, p *confi } } - queue, err := gstreamer.BuildQueue(fmt.Sprintf("image_queue_%s", id), imageQueueLatency, true) + queue, err := gstreamer.BuildQueue(fmt.Sprintf("image_queue_%s", c.Id), imageQueueLatency, true) if err != nil { return nil, err } @@ -126,7 +123,7 @@ func BuildImageBin(c *config.ImageConfig, pipeline *gstreamer.Pipeline, p *confi return nil, errors.ErrNoCompatibleCodec } - sink, err := gst.NewElement("multifilesink") + sink, err := gst.NewElementWithName("multifilesink", fmt.Sprintf("multifilesink_%s", c.Id)) if err != nil { return nil, err } diff --git a/pkg/pipeline/sink/image_manifest.go b/pkg/pipeline/sink/image_manifest.go index f7a455d0..8cc466a3 100644 --- a/pkg/pipeline/sink/image_manifest.go +++ b/pkg/pipeline/sink/image_manifest.go @@ -56,7 +56,7 @@ func (m *ImageManifest) updateManifest(u uploader.Uploader, localFilepath, stora return err } - b, err := json.Marshal(manifest) + b, err := json.Marshal(m) if err != nil { return err } diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 33788d9a..a6421959 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -295,6 +295,16 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error { return err } logger.Debugw("received GstMultiFileSink message", "location", location, "timestamp", ts, "source", msg.Source()) + + sink := c.getImageSink(msg.Source()) + if sink == nil { + return errors.ErrSinkNotFound + } + + err = sink.NewImage(location, ts) + if err != nil { + return err + } } } @@ -364,3 +374,21 @@ func (c *Controller) getSegmentSink() *sink.SegmentSink { return s[0].(*sink.SegmentSink) } + +func (c *Controller) getImageSink(name string) *sink.ImageSink { + id := name[len("multifilesink_"):] + + s := c.sinks[types.EgressTypeImages] + if len(s) == 0 { + return nil + } + + // Use a map here? + for _, si := range s { + if si.(*sink.ImageSink).Id == id { + return si.(*sink.ImageSink) + } + } + + return nil +} From f5a804df7d036b58140f7d6a5aca99ed7cf309c2 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Thu, 28 Sep 2023 14:31:45 -0700 Subject: [PATCH 21/31] deadlock --- pkg/pipeline/watch.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index a6421959..c40b11c0 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -113,12 +113,6 @@ func (c *Controller) gstLog(level gst.DebugLevel, file, function string, line in msg = fmt.Sprintf("[gst %s] %s", lvl, message) } args := []interface{}{"caller", fmt.Sprintf("%s:%d", file, line)} - if obj != nil { - name, err := obj.GetProperty("name") - if err == nil { - args = append(args, "object", name.(string)) - } - } c.gstLogger.Debugw(msg, args...) } From cf634fa21685fdf6ea47d88ccb46416393b81fbf Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Thu, 28 Sep 2023 15:41:43 -0700 Subject: [PATCH 22/31] WiP --- pkg/pipeline/builder/video.go | 4 +++- pkg/pipeline/controller.go | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index f9f4d29e..f4c1491d 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -104,9 +104,11 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error b.bin.SetGetSinkPad(func(name string) *gst.Pad { if strings.HasPrefix(name, "image") { return b.rawVideoTee.GetRequestPad("src_%u") - } else { + } else if getPad != nil { return getPad() } + + return nil }) return pipeline.AddSourceBin(b.bin) diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 295efdfc..45583c18 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -34,6 +34,7 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/protocol/tracer" "github.com/livekit/protocol/utils" + "github.com/livekit/psrpc" ) const ( @@ -314,7 +315,7 @@ func (c *Controller) UpdateOutputs(ctx context.Context, req *livekit.UpdateOutpu ctx, span := tracer.Start(ctx, "Pipeline.UpdateOutputs") defer span.End() - return nil + return psrpc.NewErrorf(psrpc.Unimplemented, "Updating outputs unimplemented") } func (c *Controller) removeSink(ctx context.Context, url string, streamErr error) error { From 29de86cbb27a2af195b00a4278c2dee414e3acf5 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Thu, 28 Sep 2023 15:43:29 -0700 Subject: [PATCH 23/31] Update module --- go.mod | 2 +- go.sum | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 60151e32..6b480b26 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/livekit/livekit-server v1.4.6-0.20230918194757-8a0d417a8c99 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/protocol v1.7.3-0.20230927224423-14bc77f4821b + github.com/livekit/protocol v1.7.3-0.20230928220138-a73f47fb8bc8 github.com/livekit/psrpc v0.3.3 github.com/livekit/server-sdk-go v1.0.17-0.20230918212012-3a26309be9c5 github.com/pion/rtp v1.8.1 diff --git a/go.sum b/go.sum index 24cdb134..d7e2ec5c 100644 --- a/go.sum +++ b/go.sum @@ -177,10 +177,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f h1:b4ri7hQESRSzJWzXXcmANG2hJ4HTj5LM01Ekm8lnQmg= github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.7.3-0.20230927223354-d7401d265a2f h1:YicTs1L+qnxFgJ2IrW2nwgPDbigY6OEMHqvSHm1KHV4= -github.com/livekit/protocol v1.7.3-0.20230927223354-d7401d265a2f/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= -github.com/livekit/protocol v1.7.3-0.20230927224423-14bc77f4821b h1:TcOck4b6Z5cWDfeMUSOFQ+UuQPyZppuTZZI9wvhTj50= -github.com/livekit/protocol v1.7.3-0.20230927224423-14bc77f4821b/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= +github.com/livekit/protocol v1.7.3-0.20230928220138-a73f47fb8bc8 h1:qgXAnLeeM7O2lnN87XRabwdlb7B537LfpNqBQcdZ4vU= +github.com/livekit/protocol v1.7.3-0.20230928220138-a73f47fb8bc8/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/livekit/server-sdk-go v1.0.17-0.20230918212012-3a26309be9c5 h1:JZttHKxyzGFuMpsl3Mmt84WU+DZsZg00FCOPfr6CmBY= From 53fe4df6d5b195f2bcdc020b7de4a051b16f65b4 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Fri, 29 Sep 2023 13:31:21 -0700 Subject: [PATCH 24/31] Set SetGetSrcPad on outputs --- pkg/gstreamer/pads.go | 1 - pkg/pipeline/builder/file.go | 6 ++++++ pkg/pipeline/builder/stream.go | 4 ++++ pkg/pipeline/builder/websocket.go | 5 +++++ 4 files changed, 15 insertions(+), 1 deletion(-) diff --git a/pkg/gstreamer/pads.go b/pkg/gstreamer/pads.go index 347b1baa..3b202550 100644 --- a/pkg/gstreamer/pads.go +++ b/pkg/gstreamer/pads.go @@ -98,7 +98,6 @@ func createGhostPadsLocked(src, sink *Bin, queue *gst.Element) (*gst.GhostPad, * } func matchPadsLocked(src, sink *Bin) (*gst.Pad, *gst.Pad, error) { - fmt.Println("matchPadsLocked", src.bin.GetName(), sink.bin.GetName()) var srcPad, sinkPad *gst.Pad var srcTemplates, sinkTemplates []*padTemplate if src.getSinkPad != nil { diff --git a/pkg/pipeline/builder/file.go b/pkg/pipeline/builder/file.go index 86be26e3..00f99c93 100644 --- a/pkg/pipeline/builder/file.go +++ b/pkg/pipeline/builder/file.go @@ -59,5 +59,11 @@ func BuildFileBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*gstr return nil, err } + b.SetGetSrcPad(func(name string) *gst.Pad { + var padName = name + "_%u" + + return mux.GetRequestPad(padName) + }) + return b, nil } diff --git a/pkg/pipeline/builder/stream.go b/pkg/pipeline/builder/stream.go index e6e2484b..67ad39d9 100644 --- a/pkg/pipeline/builder/stream.go +++ b/pkg/pipeline/builder/stream.go @@ -98,6 +98,10 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*St } } + b.SetGetSrcPad(func(name string) *gst.Pad { + return mux.GetStaticPad(name) + }) + return sb, b, nil } diff --git a/pkg/pipeline/builder/websocket.go b/pkg/pipeline/builder/websocket.go index d266a377..41f72492 100644 --- a/pkg/pipeline/builder/websocket.go +++ b/pkg/pipeline/builder/websocket.go @@ -15,6 +15,7 @@ package builder import ( + "github.com/tinyzimmer/go-gst/gst" "github.com/tinyzimmer/go-gst/gst/app" "github.com/livekit/egress/pkg/gstreamer" @@ -33,5 +34,9 @@ func BuildWebsocketBin(pipeline *gstreamer.Pipeline, appSinkCallbacks *app.SinkC return nil, err } + b.SetGetSrcPad(func(name string) *gst.Pad { + return appSink.GetStaticPad("sink") + }) + return b, nil } From 25a955f3193ddc974248ae96e67729b12ece23d4 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Fri, 29 Sep 2023 13:43:08 -0700 Subject: [PATCH 25/31] Fix stream --- pkg/pipeline/builder/stream.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/pipeline/builder/stream.go b/pkg/pipeline/builder/stream.go index 67ad39d9..eb0d2c1c 100644 --- a/pkg/pipeline/builder/stream.go +++ b/pkg/pipeline/builder/stream.go @@ -99,7 +99,9 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*St } b.SetGetSrcPad(func(name string) *gst.Pad { - return mux.GetStaticPad(name) + fmt.Println("NAME", name) + + return mux.GetRequestPad(name) }) return sb, b, nil From a776b027fba93b460075f971ee4ce07922ad98c8 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Fri, 29 Sep 2023 14:01:41 -0700 Subject: [PATCH 26/31] Fix audio only --- pkg/config/output.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/output.go b/pkg/config/output.go index ccdfbf44..086720a6 100644 --- a/pkg/config/output.go +++ b/pkg/config/output.go @@ -215,7 +215,7 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err func (p *PipelineConfig) updateImageOutputs(req ImageOutput) error { images := req.GetImageOutputs() - if !p.VideoEnabled { + if len(images) > 0 && !p.VideoEnabled { return errors.ErrInvalidInput("audio_only") } From 23a7c5524e22b58d340f8db93e9fd9eb9719135d Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Fri, 29 Sep 2023 14:47:24 -0700 Subject: [PATCH 27/31] WiP --- test/room_composite.go | 47 +++++++++++++++++++++++++++++++++++++++++ test/track_composite.go | 2 ++ 2 files changed, 49 insertions(+) diff --git a/test/room_composite.go b/test/room_composite.go index b8e2b32b..76cb985a 100644 --- a/test/room_composite.go +++ b/test/room_composite.go @@ -255,6 +255,53 @@ func (r *Runner) testRoomCompositeSegments(t *testing.T) { }) } +func (r *Runner) testRoomCompositeImages(t *testing.T) { + if !r.runImageTests() { + return + } + + r.runRoomTest(t, "RoomComposite/Images", types.MimeTypeOpus, types.MimeTypeH264, func(t *testing.T) { + for _, test := range []*testCase{ + &testCase{ + options: &livekit.EncodingOptions{ + Width: 640, + Height: 360, + }, + filename: "r_{room_name}_{time}", + imageFilenameSuffix: livekit.ImageFileSuffix_IMAGE_SUFFIX_TIMESTAMP, + }, + } { + imageOutput := &livekit.ImageOutput{ + FilenamePrefix: r.getFilePath(test.filename), + FilenameSuffix: test.imageFilenameSuffix, + } + + // TODO upload + + room := &livekit.RoomCompositeEgressRequest{ + RoomName: r.RoomName, + Layout: "grid-dark", + AudioOnly: test.audioOnly, + SegmentOutputs: []*livekit.SegmentedFileOutput{segmentOutput}, + } + if test.options != nil { + room.Options = &livekit.RoomCompositeEgressRequest_Advanced{ + Advanced: test.options, + } + } + + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_RoomComposite{ + RoomComposite: room, + }, + } + + r.runImagesTest(t, req, test) + } + }) +} + func (r *Runner) testRoomCompositeMulti(t *testing.T) { if !r.runMultiTests() { return diff --git a/test/track_composite.go b/test/track_composite.go index 458a986a..cf0aa128 100644 --- a/test/track_composite.go +++ b/test/track_composite.go @@ -260,6 +260,8 @@ func (r *Runner) testTrackCompositeImages(t *testing.T) { FilenamePrefix: r.getFilePath(test.filename), } + // TODO Upload + trackRequest := &livekit.TrackCompositeEgressRequest{ RoomName: r.room.Name(), AudioTrackId: aID, From 7afe0fbb6106655a3f3b77ee30e8ff18369cf1d7 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Fri, 29 Sep 2023 15:11:55 -0700 Subject: [PATCH 28/31] WiP --- go.mod | 2 +- go.sum | 4 ++-- pkg/config/output.go | 2 +- pkg/pipeline/sink/{images.go => image.go} | 2 +- test/room_composite.go | 1 + 5 files changed, 6 insertions(+), 5 deletions(-) rename pkg/pipeline/sink/{images.go => image.go} (99%) diff --git a/go.mod b/go.mod index 255017e5..601236e4 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/livekit/livekit-server v1.4.6-0.20230918194757-8a0d417a8c99 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/protocol v1.7.3-0.20230928220138-a73f47fb8bc8 + github.com/livekit/protocol v1.7.3-0.20230929193653-a020215c5c8e github.com/livekit/psrpc v0.3.3 github.com/livekit/server-sdk-go v1.0.17-0.20230928233454-b49bf45b164b github.com/pion/rtp v1.8.1 diff --git a/go.sum b/go.sum index 046720d1..c99f69b3 100644 --- a/go.sum +++ b/go.sum @@ -177,8 +177,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f h1:b4ri7hQESRSzJWzXXcmANG2hJ4HTj5LM01Ekm8lnQmg= github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.7.3-0.20230928220138-a73f47fb8bc8 h1:qgXAnLeeM7O2lnN87XRabwdlb7B537LfpNqBQcdZ4vU= -github.com/livekit/protocol v1.7.3-0.20230928220138-a73f47fb8bc8/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= +github.com/livekit/protocol v1.7.3-0.20230929193653-a020215c5c8e h1:4Z+XM3ohqxBuyARhQ0Cj8JjAj4XaQDulqPq1Ykvp4BQ= +github.com/livekit/protocol v1.7.3-0.20230929193653-a020215c5c8e/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/livekit/server-sdk-go v1.0.17-0.20230928233454-b49bf45b164b h1:4rm2CAPOfLG7r7texpyhf0HEE5xvAkFfg+UrcxV4x3U= diff --git a/pkg/config/output.go b/pkg/config/output.go index 086720a6..f2262c79 100644 --- a/pkg/config/output.go +++ b/pkg/config/output.go @@ -229,7 +229,7 @@ func (p *PipelineConfig) updateImageOutputs(req ImageOutput) error { p.OutputCount++ p.FinalizationRequired = true - p.Info.ImagesResults = append(p.Info.ImagesResults, conf.ImagesInfo) + p.Info.ImageResults = append(p.Info.ImageResults, conf.ImagesInfo) } return nil diff --git a/pkg/pipeline/sink/images.go b/pkg/pipeline/sink/image.go similarity index 99% rename from pkg/pipeline/sink/images.go rename to pkg/pipeline/sink/image.go index 60659bbc..e9721ea9 100644 --- a/pkg/pipeline/sink/images.go +++ b/pkg/pipeline/sink/image.go @@ -88,7 +88,7 @@ func (s *ImageSink) Start() error { } func (s *ImageSink) handleNewImage(update *imageUpdate) error { - s.ImagesInfo.ImagesCount++ + s.ImagesInfo.ImageCount++ filename := update.filename ts := s.getImageTime(update.timestamp) diff --git a/test/room_composite.go b/test/room_composite.go index 76cb985a..611f2788 100644 --- a/test/room_composite.go +++ b/test/room_composite.go @@ -38,6 +38,7 @@ func (r *Runner) testRoomComposite(t *testing.T) { r.testRoomCompositeFile(t) r.testRoomCompositeStream(t) r.testRoomCompositeSegments(t) + r.testRoomCompositeImages(t) r.testRoomCompositeMulti(t) } From b41807b9d0bcccd07b13d8c919f7104b6ad63432 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Fri, 29 Sep 2023 15:21:55 -0700 Subject: [PATCH 29/31] WiP --- test/images.go | 6 +++--- test/room_composite.go | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/test/images.go b/test/images.go index f526862b..f0165956 100644 --- a/test/images.go +++ b/test/images.go @@ -53,10 +53,10 @@ func (r *Runner) verifyImages(t *testing.T, p *config.PipelineConfig, filenameSu require.NotZero(t, res.EndedAt) // segments info - require.Len(t, res.GetImagesResults(), 1) - images := res.GetImagesResults()[0] + require.Len(t, res.GetImageResults(), 1) + images := res.GetImageResults()[0] - require.Greater(t, images.ImagesCount, int64(0)) + require.Greater(t, images.ImageCount, int64(0)) // r.verifyImagesOutput(t, p, filenameSuffix, segments.PlaylistName, segments.PlaylistLocation, int(segments.SegmentCount), res, m3u8.PlaylistTypeEvent) // r.verifyManifest(t, p, segments.PlaylistName) diff --git a/test/room_composite.go b/test/room_composite.go index 611f2788..5ad252c0 100644 --- a/test/room_composite.go +++ b/test/room_composite.go @@ -280,10 +280,10 @@ func (r *Runner) testRoomCompositeImages(t *testing.T) { // TODO upload room := &livekit.RoomCompositeEgressRequest{ - RoomName: r.RoomName, - Layout: "grid-dark", - AudioOnly: test.audioOnly, - SegmentOutputs: []*livekit.SegmentedFileOutput{segmentOutput}, + RoomName: r.RoomName, + Layout: "grid-dark", + AudioOnly: test.audioOnly, + ImageOutputs: []*livekit.ImageOutput{imageOutput}, } if test.options != nil { room.Options = &livekit.RoomCompositeEgressRequest_Advanced{ From da3299f16482eafe820f4da4d22a200951fd7db4 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Mon, 2 Oct 2023 14:34:26 -0700 Subject: [PATCH 30/31] clean up --- pkg/pipeline/builder/stream.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/pipeline/builder/stream.go b/pkg/pipeline/builder/stream.go index eb0d2c1c..c0ded8aa 100644 --- a/pkg/pipeline/builder/stream.go +++ b/pkg/pipeline/builder/stream.go @@ -99,8 +99,6 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*St } b.SetGetSrcPad(func(name string) *gst.Pad { - fmt.Println("NAME", name) - return mux.GetRequestPad(name) }) From 1751f2056f012feb43119df921167d8939d94957 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Mon, 2 Oct 2023 16:56:49 -0700 Subject: [PATCH 31/31] Ensure default value for image dimensions for SDK output --- pkg/config/pipeline.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index 8af23294..c5b9eee0 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -589,10 +589,18 @@ func (p *PipelineConfig) UpdateInfoFromSDK(identifier string, replacements map[s o.StorageDir = stringReplace(o.StorageDir, replacements) o.ImagePrefix = stringReplace(o.ImagePrefix, replacements) if o.Width == 0 { - o.Width = int32(w) + if w != 0 { + o.Width = int32(w) + } else { + o.Width = p.VideoConfig.Width + } } if o.Height == 0 { - o.Height = int32(h) + if h != 0 { + o.Height = int32(h) + } else { + o.Height = p.VideoConfig.Height + } } } }