diff --git a/go.mod b/go.mod index af4f6494..601236e4 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/livekit/livekit-server v1.4.6-0.20230918194757-8a0d417a8c99 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e + github.com/livekit/protocol v1.7.3-0.20230929193653-a020215c5c8e github.com/livekit/psrpc v0.3.3 github.com/livekit/server-sdk-go v1.0.17-0.20230928233454-b49bf45b164b github.com/pion/rtp v1.8.1 diff --git a/go.sum b/go.sum index c8748a5b..c99f69b3 100644 --- a/go.sum +++ b/go.sum @@ -177,8 +177,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f h1:b4ri7hQESRSzJWzXXcmANG2hJ4HTj5LM01Ekm8lnQmg= github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e h1:WEet0iH/JazBFNhhH+YuZHtXpKefb7mnbCC2al3peyA= -github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= +github.com/livekit/protocol v1.7.3-0.20230929193653-a020215c5c8e h1:4Z+XM3ohqxBuyARhQ0Cj8JjAj4XaQDulqPq1Ykvp4BQ= +github.com/livekit/protocol v1.7.3-0.20230929193653-a020215c5c8e/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0= github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo= github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U= github.com/livekit/server-sdk-go v1.0.17-0.20230928233454-b49bf45b164b h1:4rm2CAPOfLG7r7texpyhf0HEE5xvAkFfg+UrcxV4x3U= diff --git a/pkg/config/base.go b/pkg/config/base.go index a8e1f232..6c3e8870 100644 --- a/pkg/config/base.go +++ b/pkg/config/base.go @@ -86,6 +86,7 @@ type SessionLimits struct { FileOutputMaxDuration time.Duration `yaml:"file_output_max_duration"` StreamOutputMaxDuration time.Duration `yaml:"stream_output_max_duration"` SegmentOutputMaxDuration time.Duration `yaml:"segment_output_max_duration"` + ImageOutputMaxDuration time.Duration `yaml:"image_output_max_duration"` } func (c *BaseConfig) initLogger(values ...interface{}) error { diff --git a/pkg/config/output.go b/pkg/config/output.go index 954c7e79..d6ce31c4 100644 --- a/pkg/config/output.go +++ b/pkg/config/output.go @@ -37,6 +37,7 @@ type EncodedOutput interface { GetFileOutputs() []*livekit.EncodedFileOutput GetStreamOutputs() []*livekit.StreamOutput GetSegmentOutputs() []*livekit.SegmentedFileOutput + ImageOutput } type EncodedOutputDeprecated interface { @@ -48,6 +49,10 @@ type EncodedOutputDeprecated interface { GetSegmentOutputs() []*livekit.SegmentedFileOutput } +type ImageOutput interface { + GetImageOutputs() []*livekit.ImageOutput +} + func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error { files := req.GetFileOutputs() streams := req.GetStreamOutputs() @@ -71,9 +76,12 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error { return err } - p.Outputs[types.EgressTypeFile] = conf + p.Outputs[types.EgressTypeFile] = []OutputConfig{conf} p.OutputCount++ p.FinalizationRequired = true + if p.VideoEnabled { + p.VideoEncoding = true + } p.Info.FileResults = []*livekit.FileInfo{conf.FileInfo} if len(streams)+len(segments) == 0 { @@ -100,8 +108,11 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error { return err } - p.Outputs[types.EgressTypeStream] = conf + p.Outputs[types.EgressTypeStream] = []OutputConfig{conf} p.OutputCount += len(stream.Urls) + if p.VideoEnabled { + p.VideoEncoding = true + } streamInfoList := make([]*livekit.StreamInfo, 0, len(conf.StreamInfo)) for _, info := range conf.StreamInfo { @@ -137,9 +148,12 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error { return err } - p.Outputs[types.EgressTypeSegments] = conf + p.Outputs[types.EgressTypeSegments] = []OutputConfig{conf} p.OutputCount++ p.FinalizationRequired = true + if p.VideoEnabled { + p.VideoEncoding = true + } p.Info.SegmentResults = []*livekit.SegmentsInfo{conf.SegmentsInfo} if len(streams)+len(segments) == 0 { @@ -148,14 +162,19 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error { } } - if segmentConf := p.Outputs[types.EgressTypeSegments]; segmentConf != nil { + if segmentConf := p.Outputs[types.EgressTypeSegments]; segmentConf != nil && len(segmentConf) > 0 { // double the segment length - p.KeyFrameInterval = float64(2 * segmentConf.(*SegmentConfig).SegmentDuration) + p.KeyFrameInterval = float64(2 * segmentConf[0].(*SegmentConfig).SegmentDuration) } else if p.KeyFrameInterval == 0 && p.Outputs[types.EgressTypeStream] != nil { // default 4s for streams p.KeyFrameInterval = 4 } + err := p.updateImageOutputs(req) + if err != nil { + return err + } + if p.OutputCount == 0 { return errors.ErrInvalidInput("output") } @@ -174,7 +193,7 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err p.Info.FileResults = []*livekit.FileInfo{conf.FileInfo} p.Info.Result = &livekit.EgressInfo_File{File: conf.FileInfo} - p.Outputs[types.EgressTypeFile] = conf + p.Outputs[types.EgressTypeFile] = []OutputConfig{conf} p.OutputCount = 1 p.FinalizationRequired = true @@ -191,7 +210,7 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err p.Info.StreamResults = streamInfoList p.Info.Result = &livekit.EgressInfo_Stream{Stream: &livekit.StreamInfoList{Info: streamInfoList}} - p.Outputs[types.EgressTypeWebsocket] = conf + p.Outputs[types.EgressTypeWebsocket] = []OutputConfig{conf} p.OutputCount = 1 default: @@ -201,6 +220,29 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err return nil } +func (p *PipelineConfig) updateImageOutputs(req ImageOutput) error { + images := req.GetImageOutputs() + + if len(images) > 0 && !p.VideoEnabled { + return errors.ErrInvalidInput("audio_only") + } + + for _, img := range images { + conf, err := p.getImageConfig(img) + if err != nil { + return err + } + + p.Outputs[types.EgressTypeImages] = append(p.Outputs[types.EgressTypeImages], conf) + p.OutputCount++ + p.FinalizationRequired = true + + p.Info.ImageResults = append(p.Info.ImageResults, conf.ImagesInfo) + } + + return nil +} + func redactEncodedOutputs(out EncodedOutput) { if files := out.GetFileOutputs(); len(files) == 1 { redactUpload(files[0]) diff --git a/pkg/config/output_file.go b/pkg/config/output_file.go index 68e59461..88ab71a3 100644 --- a/pkg/config/output_file.go +++ b/pkg/config/output_file.go @@ -38,10 +38,10 @@ type FileConfig struct { func (p *PipelineConfig) GetFileConfig() *FileConfig { o, ok := p.Outputs[types.EgressTypeFile] - if !ok { + if !ok || len(o) == 0 { return nil } - return o.(*FileConfig) + return o[0].(*FileConfig) } func (p *PipelineConfig) getEncodedFileConfig(file *livekit.EncodedFileOutput) (*FileConfig, error) { diff --git a/pkg/config/output_image.go b/pkg/config/output_image.go new file mode 100644 index 00000000..823dcd2f --- /dev/null +++ b/pkg/config/output_image.go @@ -0,0 +1,156 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "os" + "path" + "time" + + "github.com/livekit/egress/pkg/errors" + "github.com/livekit/egress/pkg/types" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/utils" +) + +type ImageConfig struct { + outputConfig + + Id string // Used internally to map a gst Bin/element back to a sink and as part of the path + + ImagesInfo *livekit.ImagesInfo + LocalDir string + StorageDir string + ImagePrefix string + ImageSuffix livekit.ImageFileSuffix + ImageExtension types.FileExtension + + DisableManifest bool + UploadConfig UploadConfig + + CaptureInterval uint32 + Width int32 + Height int32 + ImageOutCodec types.MimeType +} + +func (p *PipelineConfig) GetImageConfigs() []*ImageConfig { + o, _ := p.Outputs[types.EgressTypeImages] + + var configs []*ImageConfig + for _, c := range o { + configs = append(configs, c.(*ImageConfig)) + } + + return configs +} + +func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConfig, error) { + outCodec, outputType, err := getMimeTypes(images.ImageCodec) + if err != nil { + return nil, err + } + + conf := &ImageConfig{ + outputConfig: outputConfig{ + OutputType: outputType, + }, + + Id: utils.NewGuid(""), + ImagesInfo: &livekit.ImagesInfo{}, + ImagePrefix: clean(images.FilenamePrefix), + ImageSuffix: images.FilenameSuffix, + DisableManifest: images.DisableManifest, + UploadConfig: p.getUploadConfig(images), + CaptureInterval: images.CaptureInterval, + Width: images.Width, + Height: images.Height, + ImageOutCodec: outCodec, + } + + if conf.CaptureInterval == 0 { + // 10s by default + conf.CaptureInterval = 10 + } + + // Set default dimensions for RoomComposite and Web. For all SDKs input, default will be + // set from the track dimensions + switch p.Info.Request.(type) { + case *livekit.EgressInfo_RoomComposite, *livekit.EgressInfo_Web: + if conf.Width == 0 { + conf.Width = p.Width + } + if conf.Height == 0 { + conf.Height = p.Height + } + } + + // filename + err = conf.updatePrefix(p) + if err != nil { + return nil, err + } + + return conf, nil +} + +func (o *ImageConfig) updatePrefix(p *PipelineConfig) error { + identifier, replacements := p.getFilenameInfo() + + o.ImagePrefix = stringReplace(o.ImagePrefix, replacements) + + o.ImageExtension = types.FileExtensionForOutputType[o.OutputType] + + imagesDir, imagesPrefix := path.Split(o.ImagePrefix) + + o.StorageDir = imagesDir + + // ensure playlistName + if imagesPrefix == "" { + imagesPrefix = fmt.Sprintf("%s-%s", identifier, time.Now().Format("2006-01-02T150405")) + } + + // update config + o.ImagePrefix = imagesPrefix + + if o.UploadConfig == nil { + o.LocalDir = imagesDir + } else { + // Prepend the configuration base directory and the egress Id, and slug to prevent conflict if + // there is more than one image output + // os.ModeDir creates a directory with mode 000 when mapping the directory outside the container + // Append a "/" to the path for consistency with the "UploadConfig == nil" case + + o.LocalDir = path.Join(TmpDir, p.Info.EgressId, o.Id) + "/" + } + + // create local directories + if o.LocalDir != "" { + if err := os.MkdirAll(o.LocalDir, 0755); err != nil { + return err + } + } + + return nil +} +func getMimeTypes(imageCodec livekit.ImageCodec) (types.MimeType, types.OutputType, error) { + switch imageCodec { + case livekit.ImageCodec_IC_DEFAULT, livekit.ImageCodec_IC_JPEG: + return types.MimeTypeJPEG, types.OutputTypeJPEG, nil + default: + return "", "", errors.ErrNoCompatibleCodec + } +} diff --git a/pkg/config/output_segment.go b/pkg/config/output_segment.go index ba822d46..ef173470 100644 --- a/pkg/config/output_segment.go +++ b/pkg/config/output_segment.go @@ -44,10 +44,10 @@ type SegmentConfig struct { func (p *PipelineConfig) GetSegmentConfig() *SegmentConfig { o, ok := p.Outputs[types.EgressTypeSegments] - if !ok { + if !ok || len(o) == 0 { return nil } - return o.(*SegmentConfig) + return o[0].(*SegmentConfig) } // segments should always be added last, so we can check keyframe interval from file/stream diff --git a/pkg/config/output_stream.go b/pkg/config/output_stream.go index 92940f86..29e8dfc5 100644 --- a/pkg/config/output_stream.go +++ b/pkg/config/output_stream.go @@ -28,18 +28,18 @@ type StreamConfig struct { func (p *PipelineConfig) GetStreamConfig() *StreamConfig { o, ok := p.Outputs[types.EgressTypeStream] - if !ok { + if !ok || len(o) == 0 { return nil } - return o.(*StreamConfig) + return o[0].(*StreamConfig) } func (p *PipelineConfig) GetWebsocketConfig() *StreamConfig { o, ok := p.Outputs[types.EgressTypeWebsocket] - if !ok { + if !ok || len(o) == 0 { return nil } - return o.(*StreamConfig) + return o[0].(*StreamConfig) } func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []string) (*StreamConfig, error) { diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index 99abd757..c5b9eee0 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -53,9 +53,9 @@ type PipelineConfig struct { AudioConfig `yaml:"-"` VideoConfig `yaml:"-"` - Outputs map[types.EgressType]OutputConfig `yaml:"-"` - OutputCount int `yaml:"-"` - FinalizationRequired bool `yaml:"-"` + Outputs map[types.EgressType][]OutputConfig `yaml:"-"` + OutputCount int `yaml:"-"` + FinalizationRequired bool `yaml:"-"` OnUpdate func(context.Context, *livekit.EgressInfo) `yaml:"-"` @@ -110,7 +110,8 @@ type AudioConfig struct { type VideoConfig struct { VideoEnabled bool - VideoTranscoding bool + VideoDecoding bool + VideoEncoding bool VideoOutCodec types.MimeType VideoProfile types.Profile Width int32 @@ -128,7 +129,7 @@ func NewPipelineConfig(confString string, req *rpc.StartEgressRequest) (*Pipelin Level: "info", }, }, - Outputs: make(map[types.EgressType]OutputConfig), + Outputs: make(map[types.EgressType][]OutputConfig), } if err := yaml.Unmarshal([]byte(confString), p); err != nil { @@ -153,7 +154,7 @@ func GetValidatedPipelineConfig(conf *ServiceConfig, req *rpc.StartEgressRequest p := &PipelineConfig{ BaseConfig: conf.BaseConfig, - Outputs: make(map[types.EgressType]OutputConfig), + Outputs: make(map[types.EgressType][]OutputConfig), } return p, p.Update(req) @@ -218,7 +219,7 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error { if !req.RoomComposite.AudioOnly { p.VideoEnabled = true p.VideoInCodec = types.MimeTypeRawVideo - p.VideoTranscoding = true + p.VideoDecoding = true } if !p.AudioEnabled && !p.VideoEnabled { return errors.ErrInvalidInput("audio_only and video_only") @@ -267,7 +268,7 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error { if !req.Web.AudioOnly { p.VideoEnabled = true p.VideoInCodec = types.MimeTypeRawVideo - p.VideoTranscoding = true + p.VideoDecoding = true } if !p.AudioEnabled && !p.VideoEnabled { return errors.ErrInvalidInput("audio_only and video_only") @@ -304,7 +305,7 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error { p.AudioEnabled = true p.AudioTranscoding = true p.VideoEnabled = true - p.VideoTranscoding = true + p.VideoDecoding = true p.Identity = req.Participant.Identity if p.Identity == "" { return errors.ErrInvalidInput("identity") @@ -346,7 +347,7 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error { if videoTrackID := req.TrackComposite.VideoTrackId; videoTrackID != "" { p.VideoEnabled = true p.VideoTrackID = videoTrackID - p.VideoTranscoding = true + p.VideoDecoding = true } if !p.AudioEnabled && !p.VideoEnabled { return errors.ErrInvalidInput("audio_track_id or video_track_id") @@ -446,7 +447,8 @@ func (p *PipelineConfig) validateAndUpdateOutputParams() error { // Select a codec compatible with all outputs if p.AudioEnabled { - for _, o := range p.Outputs { + for _, o := range p.GetEncodedOutputs() { + if compatibleAudioCodecs[types.DefaultAudioCodecs[o.GetOutputType()]] { p.AudioOutCodec = types.DefaultAudioCodecs[o.GetOutputType()] break @@ -461,7 +463,7 @@ func (p *PipelineConfig) validateAndUpdateOutputParams() error { } if p.VideoEnabled { - for _, o := range p.Outputs { + for _, o := range p.GetEncodedOutputs() { if compatibleVideoCodecs[types.DefaultVideoCodecs[o.GetOutputType()]] { p.VideoOutCodec = types.DefaultVideoCodecs[o.GetOutputType()] break @@ -490,7 +492,7 @@ func (p *PipelineConfig) validateAndUpdateOutputCodecs() (compatibleAudioCodecs compatibleAudioCodecs[p.AudioOutCodec] = true } - for _, o := range p.Outputs { + for _, o := range p.GetEncodedOutputs() { compatibleAudioCodecs = types.GetMapIntersection(compatibleAudioCodecs, types.CodecCompatibility[o.GetOutputType()]) if len(compatibleAudioCodecs) == 0 { if p.AudioOutCodec == "" { @@ -510,7 +512,7 @@ func (p *PipelineConfig) validateAndUpdateOutputCodecs() (compatibleAudioCodecs compatibleVideoCodecs[p.VideoOutCodec] = true } - for _, o := range p.Outputs { + for _, o := range p.GetEncodedOutputs() { compatibleVideoCodecs = types.GetMapIntersection(compatibleVideoCodecs, types.CodecCompatibility[o.GetOutputType()]) if len(compatibleVideoCodecs) == 0 { if p.AudioOutCodec == "" { @@ -561,14 +563,17 @@ func (p *PipelineConfig) updateOutputType(compatibleAudioCodecs map[types.MimeTy } // used for sdk input source -func (p *PipelineConfig) UpdateInfoFromSDK(identifier string, replacements map[string]string) error { +func (p *PipelineConfig) UpdateInfoFromSDK(identifier string, replacements map[string]string, w, h uint32) error { for egressType, c := range p.Outputs { + if len(c) == 0 { + continue + } switch egressType { case types.EgressTypeFile: - return c.(*FileConfig).updateFilepath(p, identifier, replacements) + return c[0].(*FileConfig).updateFilepath(p, identifier, replacements) case types.EgressTypeSegments: - o := c.(*SegmentConfig) + o := c[0].(*SegmentConfig) o.LocalDir = stringReplace(o.LocalDir, replacements) o.StorageDir = stringReplace(o.StorageDir, replacements) o.PlaylistFilename = stringReplace(o.PlaylistFilename, replacements) @@ -576,6 +581,28 @@ func (p *PipelineConfig) UpdateInfoFromSDK(identifier string, replacements map[s o.SegmentPrefix = stringReplace(o.SegmentPrefix, replacements) o.SegmentsInfo.PlaylistName = stringReplace(o.SegmentsInfo.PlaylistName, replacements) o.SegmentsInfo.LivePlaylistName = stringReplace(o.SegmentsInfo.LivePlaylistName, replacements) + + case types.EgressTypeImages: + for _, ci := range c { + o := ci.(*ImageConfig) + o.LocalDir = stringReplace(o.LocalDir, replacements) + o.StorageDir = stringReplace(o.StorageDir, replacements) + o.ImagePrefix = stringReplace(o.ImagePrefix, replacements) + if o.Width == 0 { + if w != 0 { + o.Width = int32(w) + } else { + o.Width = p.VideoConfig.Width + } + } + if o.Height == 0 { + if h != 0 { + o.Height = int32(h) + } else { + o.Height = p.VideoConfig.Height + } + } + } } } @@ -611,6 +638,16 @@ func (p *PipelineConfig) ValidateUrl(rawUrl string, outputType types.OutputType) } } +func (p *PipelineConfig) GetEncodedOutputs() []OutputConfig { + ret := make([]OutputConfig, 0) + + for _, k := range []types.EgressType{types.EgressTypeFile, types.EgressTypeSegments, types.EgressTypeStream, types.EgressTypeWebsocket} { + ret = append(ret, p.Outputs[k]...) + } + + return ret +} + func stringReplace(s string, replacements map[string]string) string { for template, value := range replacements { s = strings.Replace(s, template, value, -1) diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index a9376c31..be1063d4 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -37,6 +37,7 @@ var ( ErrResourceExhausted = psrpc.NewErrorf(psrpc.ResourceExhausted, "not enough CPU") ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Internal, "failed to subscribe to track") ErrPipelineFrozen = psrpc.NewErrorf(psrpc.Internal, "pipeline frozen") + ErrSinkNotFound = psrpc.NewErrorf(psrpc.Internal, "sink not found") ) func New(err string) error { diff --git a/pkg/pipeline/builder/audio.go b/pkg/pipeline/builder/audio.go index 7a3901ce..4be12803 100644 --- a/pkg/pipeline/builder/audio.go +++ b/pkg/pipeline/builder/audio.go @@ -59,7 +59,7 @@ func BuildAudioBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error pipeline.AddOnTrackRemoved(b.onTrackRemoved) } - if len(p.Outputs) > 1 { + if len(p.GetEncodedOutputs()) > 1 { tee, err := gst.NewElementWithName("tee", "audio_tee") if err != nil { return err diff --git a/pkg/pipeline/builder/file.go b/pkg/pipeline/builder/file.go index 86be26e3..00f99c93 100644 --- a/pkg/pipeline/builder/file.go +++ b/pkg/pipeline/builder/file.go @@ -59,5 +59,11 @@ func BuildFileBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*gstr return nil, err } + b.SetGetSrcPad(func(name string) *gst.Pad { + var padName = name + "_%u" + + return mux.GetRequestPad(padName) + }) + return b, nil } diff --git a/pkg/pipeline/builder/image.go b/pkg/pipeline/builder/image.go new file mode 100644 index 00000000..eca0807c --- /dev/null +++ b/pkg/pipeline/builder/image.go @@ -0,0 +1,147 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builder + +import ( + "fmt" + "path" + "time" + + "github.com/livekit/egress/pkg/config" + "github.com/livekit/egress/pkg/errors" + "github.com/livekit/egress/pkg/gstreamer" + "github.com/livekit/egress/pkg/types" + "github.com/tinyzimmer/go-gst/gst" +) + +const ( + imageQueueLatency = uint64(200 * time.Millisecond) +) + +func BuildImageBins(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) ([]*gstreamer.Bin, error) { + o := p.GetImageConfigs() + + var bins []*gstreamer.Bin + for _, c := range o { + b, err := BuildImageBin(c, pipeline, p) + if err != nil { + return nil, err + } + + bins = append(bins, b) + } + + return bins, nil +} + +func BuildImageBin(c *config.ImageConfig, pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*gstreamer.Bin, error) { + b := pipeline.NewBin(fmt.Sprintf("image_%s", c.Id)) + + var err error + var fakeAudio *gst.Element + if p.AudioEnabled { + fakeAudio, err = gst.NewElement("fakesink") + if err != nil { + return nil, err + } + } + + queue, err := gstreamer.BuildQueue(fmt.Sprintf("image_queue_%s", c.Id), imageQueueLatency, true) + if err != nil { + return nil, err + } + if err := b.AddElements(queue); err != nil { + return nil, errors.ErrGstPipelineError(err) + } + + b.SetGetSrcPad(func(name string) *gst.Pad { + if name == "audio" { + return fakeAudio.GetStaticPad("sink") + } else { + return queue.GetStaticPad("sink") + } + }) + + videoRate, err := gst.NewElement("videorate") + if err != nil { + return nil, errors.ErrGstPipelineError(err) + } + if err = videoRate.SetProperty("max-duplication-time", uint64(time.Duration(c.CaptureInterval)*time.Second)); err != nil { + return nil, err + } + if err = videoRate.SetProperty("skip-to-first", true); err != nil { + return nil, err + } + if err := b.AddElements(videoRate); err != nil { + return nil, errors.ErrGstPipelineError(err) + } + + videoScale, err := gst.NewElement("videoscale") + if err != nil { + return nil, errors.ErrGstPipelineError(err) + } + if err := b.AddElements(videoScale); err != nil { + return nil, errors.ErrGstPipelineError(err) + } + + caps, err := gst.NewElement("capsfilter") + if err != nil { + return nil, errors.ErrGstPipelineError(err) + } + err = caps.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf( + "video/x-raw,framerate=1/%d,format=I420,width=%d,height=%d,colorimetry=bt709,chroma-site=mpeg2,pixel-aspect-ratio=1/1", + c.CaptureInterval, c.Width, c.Height))) + if err != nil { + return nil, err + } + if err := b.AddElements(caps); err != nil { + return nil, errors.ErrGstPipelineError(err) + } + + switch c.ImageOutCodec { + case types.MimeTypeJPEG: + enc, err := gst.NewElement("jpegenc") + if err != nil { + return nil, errors.ErrGstPipelineError(err) + } + if err := b.AddElements(enc); err != nil { + return nil, errors.ErrGstPipelineError(err) + } + default: + return nil, errors.ErrNoCompatibleCodec + } + + sink, err := gst.NewElementWithName("multifilesink", fmt.Sprintf("multifilesink_%s", c.Id)) + if err != nil { + return nil, err + } + err = sink.SetProperty("post-messages", true) + if err != nil { + return nil, err + } + + // File will be renamed if the TS prefix is configured + location := fmt.Sprintf("%s_%%05d%s", path.Join(c.StorageDir, c.ImagePrefix), types.FileExtensionForOutputType[c.OutputType]) + + err = sink.SetProperty("location", location) + if err != nil { + return nil, err + } + if err := b.AddElements(sink); err != nil { + return nil, errors.ErrGstPipelineError(err) + } + + return b, nil +} diff --git a/pkg/pipeline/builder/stream.go b/pkg/pipeline/builder/stream.go index e6e2484b..c0ded8aa 100644 --- a/pkg/pipeline/builder/stream.go +++ b/pkg/pipeline/builder/stream.go @@ -98,6 +98,10 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*St } } + b.SetGetSrcPad(func(name string) *gst.Pad { + return mux.GetRequestPad(name) + }) + return sb, b, nil } diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index 8186c87e..f4c1491d 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -16,6 +16,7 @@ package builder import ( "fmt" + "strings" "sync" "time" "unsafe" @@ -43,9 +44,10 @@ type VideoBin struct { selectedPad string nextPad string - mu sync.Mutex - pads map[string]*gst.Pad - selector *gst.Element + mu sync.Mutex + pads map[string]*gst.Pad + selector *gst.Element + rawVideoTee *gst.Element } func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error { @@ -71,7 +73,8 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error pipeline.AddOnTrackUnmuted(b.onTrackUnmuted) } - if len(p.Outputs) > 1 { + var getPad func() *gst.Pad + if len(p.GetEncodedOutputs()) > 1 { tee, err := gst.NewElementWithName("tee", "video_tee") if err != nil { return err @@ -80,7 +83,11 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error if err = b.bin.AddElement(tee); err != nil { return err } - } else { + + getPad = func() *gst.Pad { + return tee.GetRequestPad("src_%u") + } + } else if len(p.GetEncodedOutputs()) > 0 { queue, err := gstreamer.BuildQueue("video_queue", p.Latency, true) if err != nil { return errors.ErrGstPipelineError(err) @@ -88,8 +95,22 @@ func BuildVideoBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error if err = b.bin.AddElement(queue); err != nil { return err } + + getPad = func() *gst.Pad { + return queue.GetStaticPad("src") + } } + b.bin.SetGetSinkPad(func(name string) *gst.Pad { + if strings.HasPrefix(name, "image") { + return b.rawVideoTee.GetRequestPad("src_%u") + } else if getPad != nil { + return getPad() + } + + return nil + }) + return pipeline.AddSourceBin(b.bin) } @@ -199,11 +220,10 @@ func (b *VideoBin) buildWebInput() error { return err } - if b.conf.VideoTranscoding { - if err = b.addEncoder(); err != nil { - return err - } + if err = b.addDecodedVideoSink(); err != nil { + return err } + return nil } @@ -211,7 +231,7 @@ func (b *VideoBin) buildSDKInput() error { b.pads = make(map[string]*gst.Pad) // add selector first so pads can be created - if b.conf.VideoTranscoding { + if b.conf.VideoDecoding { if err := b.addSelector(); err != nil { return err } @@ -223,7 +243,7 @@ func (b *VideoBin) buildSDKInput() error { } } - if b.conf.VideoTranscoding { + if b.conf.VideoDecoding { b.bin.SetGetSrcPad(b.getSrcPad) b.bin.SetEOSFunc(func() bool { b.mu.Lock() @@ -245,7 +265,7 @@ func (b *VideoBin) buildSDKInput() error { return err } } - if err := b.addEncoder(); err != nil { + if err := b.addDecodedVideoSink(); err != nil { return err } } @@ -259,7 +279,7 @@ func (b *VideoBin) addAppSrcBin(ts *config.TrackSource) error { return err } - if b.conf.VideoTranscoding { + if b.conf.VideoDecoding { b.createSrcPad(ts.TrackID) } @@ -267,7 +287,7 @@ func (b *VideoBin) addAppSrcBin(ts *config.TrackSource) error { return err } - if b.conf.VideoTranscoding { + if b.conf.VideoDecoding { return b.setSelectorPad(ts.TrackID) } @@ -312,7 +332,7 @@ func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource) (*gstreamer.Bin, error return nil, err } - if b.conf.VideoTranscoding { + if b.conf.VideoDecoding { avDecH264, err := gst.NewElement("avdec_h264") if err != nil { return nil, errors.ErrGstPipelineError(err) @@ -350,7 +370,7 @@ func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource) (*gstreamer.Bin, error return nil, err } - if b.conf.VideoTranscoding { + if b.conf.VideoDecoding { vp8Dec, err := gst.NewElement("vp8dec") if err != nil { return nil, errors.ErrGstPipelineError(err) @@ -378,7 +398,7 @@ func (b *VideoBin) buildAppSrcBin(ts *config.TrackSource) (*gstreamer.Bin, error return nil, err } - if b.conf.VideoTranscoding { + if b.conf.VideoDecoding { vp9Dec, err := gst.NewElement("vp9dec") if err != nil { return nil, errors.ErrGstPipelineError(err) @@ -573,6 +593,27 @@ func (b *VideoBin) addEncoder() error { default: return errors.ErrNotSupported(fmt.Sprintf("%s encoding", b.conf.VideoOutCodec)) } + +} + +func (b *VideoBin) addDecodedVideoSink() error { + var err error + b.rawVideoTee, err = gst.NewElement("tee") + if err != nil { + return err + } + if err = b.bin.AddElement(b.rawVideoTee); err != nil { + return err + } + + if b.conf.VideoEncoding { + err = b.addEncoder() + if err != nil { + return err + } + } + + return nil } func addVideoConverter(b *gstreamer.Bin, p *config.PipelineConfig) error { diff --git a/pkg/pipeline/builder/websocket.go b/pkg/pipeline/builder/websocket.go index d266a377..41f72492 100644 --- a/pkg/pipeline/builder/websocket.go +++ b/pkg/pipeline/builder/websocket.go @@ -15,6 +15,7 @@ package builder import ( + "github.com/tinyzimmer/go-gst/gst" "github.com/tinyzimmer/go-gst/gst/app" "github.com/livekit/egress/pkg/gstreamer" @@ -33,5 +34,9 @@ func BuildWebsocketBin(pipeline *gstreamer.Pipeline, appSinkCallbacks *app.SinkC return nil, err } + b.SetGetSrcPad(func(name string) *gst.Pad { + return appSink.GetStaticPad("sink") + }) + return b, nil } diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index e3ae73ba..45583c18 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -34,6 +34,7 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/protocol/tracer" "github.com/livekit/protocol/utils" + "github.com/livekit/psrpc" ) const ( @@ -46,7 +47,7 @@ type Controller struct { // gstreamer src source.Source p *gstreamer.Pipeline - sinks map[types.EgressType]sink.Sink + sinks map[types.EgressType][]sink.Sink streamBin *builder.StreamBin callbacks *gstreamer.Callbacks @@ -137,26 +138,42 @@ func (c *Controller) BuildPipeline() error { } } + var sinkBins []*gstreamer.Bin for egressType := range c.Outputs { - var sinkBin *gstreamer.Bin switch egressType { case types.EgressTypeFile: + var sinkBin *gstreamer.Bin sinkBin, err = builder.BuildFileBin(p, c.PipelineConfig) + sinkBins = append(sinkBins, sinkBin) case types.EgressTypeSegments: + var sinkBin *gstreamer.Bin sinkBin, err = builder.BuildSegmentBin(p, c.PipelineConfig) + sinkBins = append(sinkBins, sinkBin) case types.EgressTypeStream: + var sinkBin *gstreamer.Bin c.streamBin, sinkBin, err = builder.BuildStreamBin(p, c.PipelineConfig) + sinkBins = append(sinkBins, sinkBin) case types.EgressTypeWebsocket: - writer := c.sinks[egressType].(*sink.WebsocketSink) + var sinkBin *gstreamer.Bin + writer := c.sinks[egressType][0].(*sink.WebsocketSink) sinkBin, err = builder.BuildWebsocketBin(p, writer.SinkCallbacks()) + sinkBins = append(sinkBins, sinkBin) + + case types.EgressTypeImages: + var bins []*gstreamer.Bin + bins, err = builder.BuildImageBins(p, c.PipelineConfig) + sinkBins = append(sinkBins, bins...) } if err != nil { return err } - if err = p.AddSinkBin(sinkBin); err != nil { + } + + for _, bin := range sinkBins { + if err = p.AddSinkBin(bin); err != nil { return err } } @@ -198,10 +215,12 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { } } - for _, s := range c.sinks { - if err := s.Start(); err != nil { - c.Info.Error = err.Error() - return c.Info + for _, si := range c.sinks { + for _, s := range si { + if err := s.Start(); err != nil { + c.Info.Error = err.Error() + return c.Info + } } } @@ -210,10 +229,12 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { return c.Info } - for _, s := range c.sinks { - if err := s.Close(); err != nil { - c.Info.Error = err.Error() - return c.Info + for _, si := range c.sinks { + for _, s := range si { + if err := s.Close(); err != nil { + c.Info.Error = err.Error() + return c.Info + } } } @@ -290,6 +311,13 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream return errs.ToError() } +func (c *Controller) UpdateOutputs(ctx context.Context, req *livekit.UpdateOutputsRequest) error { + ctx, span := tracer.Start(ctx, "Pipeline.UpdateOutputs") + defer span.End() + + return psrpc.NewErrorf(psrpc.Unimplemented, "Updating outputs unimplemented") +} + func (c *Controller) removeSink(ctx context.Context, url string, streamErr error) error { now := time.Now().UnixNano() @@ -433,8 +461,10 @@ func (c *Controller) Close() { c.Info.Status = livekit.EgressStatus_EGRESS_COMPLETE } - for _, s := range c.sinks { - s.Cleanup() + for _, si := range c.sinks { + for _, s := range si { + s.Cleanup() + } } } @@ -449,6 +479,9 @@ func (c *Controller) startSessionLimitTimer(ctx context.Context) { t = c.StreamOutputMaxDuration case types.EgressTypeSegments: t = c.SegmentOutputMaxDuration + case types.EgressTypeImages: + t = c.ImageOutputMaxDuration + } if t > 0 && (timeout == 0 || t < timeout) { timeout = t @@ -473,20 +506,27 @@ func (c *Controller) startSessionLimitTimer(ctx context.Context) { func (c *Controller) updateStartTime(startedAt int64) { for egressType, o := range c.Outputs { + if len(o) == 0 { + continue + } switch egressType { case types.EgressTypeStream, types.EgressTypeWebsocket: c.mu.Lock() - for _, streamInfo := range o.(*config.StreamConfig).StreamInfo { + for _, streamInfo := range o[0].(*config.StreamConfig).StreamInfo { streamInfo.Status = livekit.StreamInfo_ACTIVE streamInfo.StartedAt = startedAt } c.mu.Unlock() case types.EgressTypeFile: - o.(*config.FileConfig).FileInfo.StartedAt = startedAt + o[0].(*config.FileConfig).FileInfo.StartedAt = startedAt case types.EgressTypeSegments: - o.(*config.SegmentConfig).SegmentsInfo.StartedAt = startedAt + o[0].(*config.SegmentConfig).SegmentsInfo.StartedAt = startedAt + case types.EgressTypeImages: + for _, c := range o { + c.(*config.ImageConfig).ImagesInfo.StartedAt = startedAt + } } } @@ -499,9 +539,12 @@ func (c *Controller) updateStartTime(startedAt int64) { func (c *Controller) updateDuration(endedAt int64) { for egressType, o := range c.Outputs { + if len(o) == 0 { + continue + } switch egressType { case types.EgressTypeStream, types.EgressTypeWebsocket: - for _, info := range o.(*config.StreamConfig).StreamInfo { + for _, info := range o[0].(*config.StreamConfig).StreamInfo { info.Status = livekit.StreamInfo_FINISHED if info.StartedAt == 0 { info.StartedAt = endedAt @@ -511,7 +554,7 @@ func (c *Controller) updateDuration(endedAt int64) { } case types.EgressTypeFile: - fileInfo := o.(*config.FileConfig).FileInfo + fileInfo := o[0].(*config.FileConfig).FileInfo if fileInfo.StartedAt == 0 { fileInfo.StartedAt = endedAt } @@ -519,12 +562,20 @@ func (c *Controller) updateDuration(endedAt int64) { fileInfo.Duration = endedAt - fileInfo.StartedAt case types.EgressTypeSegments: - segmentsInfo := o.(*config.SegmentConfig).SegmentsInfo + segmentsInfo := o[0].(*config.SegmentConfig).SegmentsInfo if segmentsInfo.StartedAt == 0 { segmentsInfo.StartedAt = endedAt } segmentsInfo.EndedAt = endedAt segmentsInfo.Duration = endedAt - segmentsInfo.StartedAt + case types.EgressTypeImages: + for _, c := range o { + imageInfo := c.(*config.ImageConfig).ImagesInfo + if imageInfo.StartedAt == 0 { + imageInfo.StartedAt = endedAt + } + imageInfo.EndedAt = endedAt + } } } } diff --git a/pkg/pipeline/sink/image.go b/pkg/pipeline/sink/image.go new file mode 100644 index 00000000..e9721ea9 --- /dev/null +++ b/pkg/pipeline/sink/image.go @@ -0,0 +1,178 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "fmt" + "os" + "path" + "strings" + "time" + + "github.com/frostbyte73/core" + "github.com/livekit/egress/pkg/config" + "github.com/livekit/egress/pkg/gstreamer" + "github.com/livekit/egress/pkg/pipeline/sink/uploader" + "github.com/livekit/egress/pkg/types" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" +) + +type ImageSink struct { + uploader.Uploader + + *config.ImageConfig + + conf *config.PipelineConfig + callbacks *gstreamer.Callbacks + + initialized bool + startTime time.Time + startRunningTime uint64 + + manifest *ImageManifest + createdImages chan *imageUpdate + done core.Fuse +} + +type imageUpdate struct { + timestamp uint64 + filename string +} + +func newImageSink(u uploader.Uploader, p *config.PipelineConfig, o *config.ImageConfig, callbacks *gstreamer.Callbacks) (*ImageSink, error) { + return &ImageSink{ + Uploader: u, + ImageConfig: o, + conf: p, + callbacks: callbacks, + + manifest: createImageManifest(p), + createdImages: make(chan *imageUpdate, maxPendingUploads), + done: core.NewFuse(), + }, nil +} + +func (s *ImageSink) Start() error { + go func() { + var err error + defer func() { + if err != nil { + s.callbacks.OnError(err) + } + s.done.Break() + }() + + for update := range s.createdImages { + err = s.handleNewImage(update) + if err != nil { + logger.Errorw("new image handling failed", err) + return + } + } + }() + + return nil +} + +func (s *ImageSink) handleNewImage(update *imageUpdate) error { + s.ImagesInfo.ImageCount++ + + filename := update.filename + ts := s.getImageTime(update.timestamp) + imageLocalPath := path.Join(s.LocalDir, filename) + if s.ImageSuffix == livekit.ImageFileSuffix_IMAGE_SUFFIX_TIMESTAMP { + newFilename := fmt.Sprintf("%s_%s%03d%s", s.ImagePrefix, ts.Format("20060102150405"), ts.UnixMilli()%1000, types.FileExtensionForOutputType[s.OutputType]) + newImageLocalPath := path.Join(s.LocalDir, newFilename) + + err := os.Rename(imageLocalPath, newImageLocalPath) + if err != nil { + return err + } + filename = newFilename + imageLocalPath = newImageLocalPath + + } + + imageStoragePath := path.Join(s.StorageDir, filename) + + _, size, err := s.Upload(imageLocalPath, imageStoragePath, s.OutputType, true) + if err != nil { + return err + } + + if !s.DisableManifest { + err = s.updateManifest(filename, ts, size) + if err != nil { + return err + } + } + + return nil +} + +func (s *ImageSink) getImageTime(pts uint64) time.Time { + if !s.initialized { + s.startTime = time.Now() + s.startRunningTime = pts + s.initialized = true + } + + return s.startTime.Add(time.Duration(pts - s.startRunningTime)) +} + +func (s *ImageSink) updateManifest(filename string, ts time.Time, size int64) error { + s.manifest.imageCreated(filename, ts, size) + + manifestLocalPath := fmt.Sprintf("%s.json", path.Join(s.LocalDir, s.ImagePrefix)) + manifestStoragePath := fmt.Sprintf("%s.json", path.Join(s.StorageDir, s.ImagePrefix)) + return s.manifest.updateManifest(s.Uploader, manifestLocalPath, manifestStoragePath) +} + +func (s *ImageSink) NewImage(filepath string, ts uint64) error { + if !strings.HasPrefix(filepath, s.LocalDir) { + return fmt.Errorf("invalid filepath") + } + + filename := filepath[len(s.LocalDir):] + + s.createdImages <- &imageUpdate{ + filename: filename, + timestamp: ts, + } + + return nil +} + +func (s *ImageSink) Close() error { + close(s.createdImages) + <-s.done.Watch() + + return nil +} + +func (s *ImageSink) Cleanup() { + if s.LocalDir == s.StorageDir { + return + } + + if s.LocalDir != "" { + logger.Debugw("removing temporary directory", "path", s.LocalDir) + if err := os.RemoveAll(s.LocalDir); err != nil { + logger.Errorw("could not delete temp dir", err) + } + } + +} diff --git a/pkg/pipeline/sink/image_manifest.go b/pkg/pipeline/sink/image_manifest.go new file mode 100644 index 00000000..8cc466a3 --- /dev/null +++ b/pkg/pipeline/sink/image_manifest.go @@ -0,0 +1,77 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "encoding/json" + "os" + "time" + + "github.com/livekit/egress/pkg/config" + "github.com/livekit/egress/pkg/pipeline/sink/uploader" + "github.com/livekit/egress/pkg/types" +) + +type ImageManifest struct { + Manifest `json:",inline"` + + Images []*Image `json:"images"` +} + +type Image struct { + Name string `json:"name"` + Timestamp time.Time `json:"timestamp"` + Size int64 `json:"size"` +} + +func createImageManifest(p *config.PipelineConfig) *ImageManifest { + return &ImageManifest{ + Manifest: initManifest(p), + } +} + +func (m *ImageManifest) imageCreated(filename string, ts time.Time, size int64) { + m.Images = append(m.Images, &Image{ + Name: filename, + Timestamp: ts, + Size: size, + }) +} + +func (m *ImageManifest) updateManifest(u uploader.Uploader, localFilepath, storageFilepath string) error { + manifest, err := os.Create(localFilepath) + if err != nil { + return err + } + + b, err := json.Marshal(m) + if err != nil { + return err + } + + _, err = manifest.Write(b) + if err != nil { + return err + } + + err = manifest.Close() + if err != nil { + return err + } + + _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false) + + return err +} diff --git a/pkg/pipeline/sink/manifest.go b/pkg/pipeline/sink/manifest.go index deced515..6d967971 100644 --- a/pkg/pipeline/sink/manifest.go +++ b/pkg/pipeline/sink/manifest.go @@ -60,7 +60,17 @@ func uploadManifest(p *config.PipelineConfig, u uploader.Uploader, localFilepath } func getManifest(p *config.PipelineConfig) ([]byte, error) { - manifest := Manifest{ + manifest := initManifest(p) + + if o := p.GetSegmentConfig(); o != nil { + manifest.SegmentCount = o.SegmentsInfo.SegmentCount + } + + return json.Marshal(manifest) +} + +func initManifest(p *config.PipelineConfig) Manifest { + return Manifest{ EgressID: p.Info.EgressId, RoomID: p.Info.RoomId, RoomName: p.Info.RoomName, @@ -74,10 +84,4 @@ func getManifest(p *config.PipelineConfig) ([]byte, error) { AudioTrackID: p.AudioTrackID, VideoTrackID: p.VideoTrackID, } - - if o := p.GetSegmentConfig(); o != nil { - manifest.SegmentCount = o.SegmentsInfo.SegmentCount - } - - return json.Marshal(manifest) } diff --git a/pkg/pipeline/sink/sink.go b/pkg/pipeline/sink/sink.go index 05e4a41b..e3951117 100644 --- a/pkg/pipeline/sink/sink.go +++ b/pkg/pipeline/sink/sink.go @@ -27,14 +27,17 @@ type Sink interface { Cleanup() } -func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[types.EgressType]Sink, error) { - sinks := make(map[types.EgressType]Sink) +func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[types.EgressType][]Sink, error) { + sinks := make(map[types.EgressType][]Sink) for egressType, c := range p.Outputs { + if len(c) == 0 { + continue + } var s Sink var err error switch egressType { case types.EgressTypeFile: - o := c.(*config.FileConfig) + o := c[0].(*config.FileConfig) u, err := uploader.New(o.UploadConfig, p.BackupStorage) if err != nil { @@ -44,7 +47,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[ s = newFileSink(u, p, o) case types.EgressTypeSegments: - o := c.(*config.SegmentConfig) + o := c[0].(*config.SegmentConfig) u, err := uploader.New(o.UploadConfig, p.BackupStorage) if err != nil { @@ -60,16 +63,30 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[ // no sink needed case types.EgressTypeWebsocket: - o := c.(*config.StreamConfig) + o := c[0].(*config.StreamConfig) s, err = newWebsocketSink(o, types.MimeTypeRawAudio, callbacks) if err != nil { return nil, err } + case types.EgressTypeImages: + for _, ci := range c { + o := ci.(*config.ImageConfig) + + u, err := uploader.New(o.UploadConfig, p.BackupStorage) + if err != nil { + return nil, err + } + + s, err = newImageSink(u, p, o, callbacks) + if err != nil { + return nil, err + } + } } if s != nil { - sinks[egressType] = s + sinks[egressType] = append(sinks[egressType], s) } } diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index 9fe36821..8af69ec6 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -167,10 +167,11 @@ func (s *SDKSource) joinRoom() error { var fileIdentifier string var err error + var w, h uint32 switch s.RequestType { case types.RequestTypeParticipant: fileIdentifier = s.Identity - err = s.awaitParticipant(s.Identity) + w, h, err = s.awaitParticipant(s.Identity) case types.RequestTypeTrackComposite: fileIdentifier = s.Info.RoomName @@ -181,17 +182,17 @@ func (s *SDKSource) joinRoom() error { if s.VideoEnabled { tracks[s.VideoTrackID] = struct{}{} } - err = s.awaitTracks(tracks) + w, h, err = s.awaitTracks(tracks) case types.RequestTypeTrack: fileIdentifier = s.TrackID - err = s.awaitTracks(map[string]struct{}{s.TrackID: {}}) + w, h, err = s.awaitTracks(map[string]struct{}{s.TrackID: {}}) } if err != nil { return err } - if err = s.UpdateInfoFromSDK(fileIdentifier, s.filenameReplacements); err != nil { + if err = s.UpdateInfoFromSDK(fileIdentifier, s.filenameReplacements, w, h); err != nil { logger.Errorw("could not update file params", err) return err } @@ -199,22 +200,31 @@ func (s *SDKSource) joinRoom() error { return nil } -func (s *SDKSource) awaitParticipant(identity string) error { +func (s *SDKSource) awaitParticipant(identity string) (uint32, uint32, error) { s.errors = make(chan error, 2) rp, err := s.getParticipant(identity) if err != nil { - return err + return 0, 0, err } for trackCount := 0; trackCount == 0 || trackCount < len(rp.Tracks()); trackCount++ { if err = <-s.errors; err != nil { - return err + return 0, 0, err + } + } + + var w, h uint32 + for _, t := range rp.Tracks() { + if t.TrackInfo().Type == livekit.TrackType_VIDEO { + w = t.TrackInfo().Width + h = t.TrackInfo().Height } + } s.initialized.Break() - return nil + return w, h, nil } func (s *SDKSource) getParticipant(identity string) (*lksdk.RemoteParticipant, error) { @@ -230,36 +240,47 @@ func (s *SDKSource) getParticipant(identity string) (*lksdk.RemoteParticipant, e return nil, errors.ErrParticipantNotFound(identity) } -func (s *SDKSource) awaitTracks(expecting map[string]struct{}) error { +func (s *SDKSource) awaitTracks(expecting map[string]struct{}) (uint32, uint32, error) { trackCount := len(expecting) s.errors = make(chan error, trackCount) deadline := time.After(subscriptionTimeout) - if err := s.subscribeToTracks(expecting, deadline); err != nil { - return err + tracks, err := s.subscribeToTracks(expecting, deadline) + if err != nil { + return 0, 0, err } for i := 0; i < trackCount; i++ { select { case err := <-s.errors: if err != nil { - return err + return 0, 0, err } case <-deadline: - return errors.ErrSubscriptionFailed + return 0, 0, errors.ErrSubscriptionFailed + } + } + + var w, h uint32 + for _, t := range tracks { + if t.TrackInfo().Type == livekit.TrackType_VIDEO { + w = t.TrackInfo().Width + h = t.TrackInfo().Height } } s.initialized.Break() - return nil + return w, h, nil } -func (s *SDKSource) subscribeToTracks(expecting map[string]struct{}, deadline <-chan time.Time) error { +func (s *SDKSource) subscribeToTracks(expecting map[string]struct{}, deadline <-chan time.Time) ([]lksdk.TrackPublication, error) { + var tracks []lksdk.TrackPublication + for { select { case <-deadline: for trackID := range expecting { - return errors.ErrTrackNotFound(trackID) + return nil, errors.ErrTrackNotFound(trackID) } default: for _, p := range s.room.GetParticipants() { @@ -267,12 +288,14 @@ func (s *SDKSource) subscribeToTracks(expecting map[string]struct{}, deadline <- trackID := track.SID() if _, ok := expecting[trackID]; ok { if err := s.subscribe(track); err != nil { - return err + return nil, err } + tracks = append(tracks, track) + delete(expecting, track.SID()) if len(expecting) == 0 { - return nil + return tracks, nil } } } @@ -357,7 +380,10 @@ func (s *SDKSource) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Remo s.VideoOutCodec = ts.MimeType } if s.VideoInCodec != s.VideoOutCodec { - s.VideoTranscoding = true + s.VideoDecoding = true + if len(s.GetEncodedOutputs()) > 0 { + s.VideoEncoding = true + } } writer, err := s.createWriter(track, pub, rp, ts) diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 67f3b08d..069b245a 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -46,10 +46,14 @@ const ( msgFirstSampleMetadata = "FirstSampleMetadata" msgFragmentOpened = "splitmuxsink-fragment-opened" msgFragmentClosed = "splitmuxsink-fragment-closed" + msgGstMultiFileSink = "GstMultiFileSink" fragmentLocation = "location" fragmentRunningTime = "running-time" + gstMultiFileSinkFilename = "filename" + gstMultiFileSinkTimestamp = "timestamp" + // common gst errors msgWrongThread = "Called from wrong thread" @@ -278,6 +282,23 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error { logger.Errorw("failed to end segment with playlist writer", err, "runningTime", t) return err } + + case msgGstMultiFileSink: + location, ts, err := getImageInformationFromGstStructure(s) + if err != nil { + return err + } + logger.Debugw("received GstMultiFileSink message", "location", location, "timestamp", ts, "source", msg.Source()) + + sink := c.getImageSink(msg.Source()) + if sink == nil { + return errors.ErrSinkNotFound + } + + err = sink.NewImage(location, ts) + if err != nil { + return err + } } } @@ -316,6 +337,52 @@ func getFirstSampleMetadataFromGstStructure(s *gst.Structure) (startDate time.Ti return time.Unix(0, firstSampleMetadata.StartDate), nil } +func getImageInformationFromGstStructure(s *gst.Structure) (string, uint64, error) { + loc, err := s.GetValue(gstMultiFileSinkFilename) + if err != nil { + return "", 0, err + } + filepath, ok := loc.(string) + if !ok { + return "", 0, errors.ErrGstPipelineError(errors.New("invalid type for location")) + } + + t, err := s.GetValue(gstMultiFileSinkTimestamp) + if err != nil { + return "", 0, err + } + ti, ok := t.(uint64) + if !ok { + return "", 0, errors.ErrGstPipelineError(errors.New("invalid type for time")) + } + + return filepath, ti, nil + +} + func (c *Controller) getSegmentSink() *sink.SegmentSink { - return c.sinks[types.EgressTypeSegments].(*sink.SegmentSink) + s := c.sinks[types.EgressTypeSegments] + if len(s) == 0 { + return nil + } + + return s[0].(*sink.SegmentSink) +} + +func (c *Controller) getImageSink(name string) *sink.ImageSink { + id := name[len("multifilesink_"):] + + s := c.sinks[types.EgressTypeImages] + if len(s) == 0 { + return nil + } + + // Use a map here? + for _, si := range s { + if si.(*sink.ImageSink).Id == id { + return si.(*sink.ImageSink) + } + } + + return nil } diff --git a/pkg/service/handler.go b/pkg/service/handler.go index a7040d14..5eb583b8 100644 --- a/pkg/service/handler.go +++ b/pkg/service/handler.go @@ -145,6 +145,21 @@ func (h *Handler) UpdateStream(ctx context.Context, req *livekit.UpdateStreamReq return h.pipeline.Info, nil } +func (h *Handler) UpdateOutputs(ctx context.Context, req *livekit.UpdateOutputsRequest) (*livekit.EgressInfo, error) { + ctx, span := tracer.Start(ctx, "Handler.UpdateOutputs") + defer span.End() + + if h.pipeline == nil { + return nil, errors.ErrEgressNotFound + } + + err := h.pipeline.UpdateOutputs(ctx, req) + if err != nil { + return nil, err + } + return h.pipeline.Info, nil +} + func (h *Handler) StopEgress(ctx context.Context, _ *livekit.StopEgressRequest) (*livekit.EgressInfo, error) { ctx, span := tracer.Start(ctx, "Handler.StopEgress") defer span.End() diff --git a/pkg/types/types.go b/pkg/types/types.go index a0e66d7e..e98764cf 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -39,6 +39,7 @@ const ( EgressTypeWebsocket EgressType = "websocket" EgressTypeFile EgressType = "file" EgressTypeSegments EgressType = "segments" + EgressTypeImages EgressType = "images" // input types MimeTypeAAC MimeType = "audio/aac" @@ -47,6 +48,7 @@ const ( MimeTypeH264 MimeType = "video/h264" MimeTypeVP8 MimeType = "video/vp8" MimeTypeVP9 MimeType = "video/vp9" + MimeTypeJPEG MimeType = "image/jpeg" MimeTypeRawVideo MimeType = "video/x-raw" // video profiles @@ -62,6 +64,7 @@ const ( OutputTypeMP4 OutputType = "video/mp4" OutputTypeTS OutputType = "video/mp2t" OutputTypeWebM OutputType = "video/webm" + OutputTypeJPEG OutputType = "image/jpeg" OutputTypeRTMP OutputType = "rtmp" OutputTypeHLS OutputType = "application/x-mpegurl" OutputTypeJSON OutputType = "application/json" @@ -75,6 +78,7 @@ const ( FileExtensionTS = ".ts" FileExtensionWebM = ".webm" FileExtensionM3U8 = ".m3u8" + FileExtensionJPEG = ".jpeg" ) var ( @@ -105,6 +109,7 @@ var ( FileExtensionTS: {}, FileExtensionWebM: {}, FileExtensionM3U8: {}, + FileExtensionJPEG: {}, } FileExtensionForOutputType = map[OutputType]FileExtension{ @@ -115,6 +120,7 @@ var ( OutputTypeTS: FileExtensionTS, OutputTypeWebM: FileExtensionWebM, OutputTypeHLS: FileExtensionM3U8, + OutputTypeJPEG: FileExtensionJPEG, } CodecCompatibility = map[OutputType]map[MimeType]bool{ diff --git a/test/ffprobe.go b/test/ffprobe.go index 0be62149..bc7410fd 100644 --- a/test/ffprobe.go +++ b/test/ffprobe.go @@ -121,7 +121,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre require.NoError(t, err, "input %s does not exist", in) } - switch p.Outputs[egressType].GetOutputType() { + switch p.Outputs[egressType][0].GetOutputType() { case types.OutputTypeRaw: require.Equal(t, 0, info.Format.ProbeScore) case types.OutputTypeIVF: @@ -213,7 +213,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre require.Equal(t, 2, stream.Channels) // audio bitrate - if p.Outputs[egressType].GetOutputType() == types.OutputTypeMP4 { + if p.Outputs[egressType][0].GetOutputType() == types.OutputTypeMP4 { bitrate, err := strconv.Atoi(stream.BitRate) require.NoError(t, err) require.NotZero(t, bitrate) @@ -227,7 +227,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre case types.MimeTypeH264: require.Equal(t, "h264", stream.CodecName) - if p.VideoTranscoding { + if p.VideoEncoding { switch p.VideoProfile { case types.ProfileBaseline: require.Equal(t, "Constrained Baseline", stream.Profile) @@ -243,14 +243,14 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre require.Equal(t, "vp9", stream.CodecName) } - switch p.Outputs[egressType].GetOutputType() { + switch p.Outputs[egressType][0].GetOutputType() { case types.OutputTypeIVF: require.Equal(t, "vp8", stream.CodecName) case types.OutputTypeMP4: require.Equal(t, "h264", stream.CodecName) - if p.VideoTranscoding { + if p.VideoEncoding { // bitrate, not available for HLS or WebM bitrate, err := strconv.Atoi(stream.BitRate) require.NoError(t, err) @@ -274,7 +274,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre case types.OutputTypeHLS: require.Equal(t, "h264", stream.CodecName) - if p.VideoTranscoding { + if p.VideoEncoding { // dimensions require.Equal(t, p.Width, stream.Width) require.Equal(t, p.Height, stream.Height) diff --git a/test/file.go b/test/file.go index 3ab8844b..2257a317 100644 --- a/test/file.go +++ b/test/file.go @@ -50,7 +50,7 @@ func (r *Runner) runFileTest(t *testing.T, req *rpc.StartEgressRequest, test *te p.GetFileConfig().OutputType = test.outputType } - require.Equal(t, test.expectVideoTranscoding, p.VideoTranscoding) + require.Equal(t, test.expectVideoEncoding, p.VideoEncoding) // verify r.verifyFile(t, p, res) diff --git a/test/images.go b/test/images.go new file mode 100644 index 00000000..f0165956 --- /dev/null +++ b/test/images.go @@ -0,0 +1,97 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build integration + +package test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/livekit/egress/pkg/config" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" +) + +func (r *Runner) runImagesTest(t *testing.T, req *rpc.StartEgressRequest, test *testCase) { + egressID := r.startEgress(t, req) + + time.Sleep(time.Second * 10) + if r.Dotfiles { + r.createDotFile(t, egressID) + } + + // stop + time.Sleep(time.Second * 15) + res := r.stopEgress(t, egressID) + + // get params + p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) + require.NoError(t, err) + + r.verifyImages(t, p, test.imageFilenameSuffix, res) +} + +func (r *Runner) verifyImages(t *testing.T, p *config.PipelineConfig, filenameSuffix livekit.ImageFileSuffix, res *livekit.EgressInfo) { + // egress info + require.Equal(t, res.Error == "", res.Status != livekit.EgressStatus_EGRESS_FAILED) + require.NotZero(t, res.StartedAt) + require.NotZero(t, res.EndedAt) + + // segments info + require.Len(t, res.GetImageResults(), 1) + images := res.GetImageResults()[0] + + require.Greater(t, images.ImageCount, int64(0)) + + // r.verifyImagesOutput(t, p, filenameSuffix, segments.PlaylistName, segments.PlaylistLocation, int(segments.SegmentCount), res, m3u8.PlaylistTypeEvent) + // r.verifyManifest(t, p, segments.PlaylistName) +} + +//func (r *Runner) verifyManifest(t *testing.T, p *config.PipelineConfig, plName string) { +// localPlaylistPath := fmt.Sprintf("%s/%s", r.FilePrefix, plName) +// +// if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil { +// download(t, uploadConfig, localPlaylistPath+".json", plName+".json") +// } +//} + +//func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, filenameSuffix livekit.SegmentedFileSuffix, plName string, plLocation string, segmentCount int, res *livekit.EgressInfo, plType m3u8.PlaylistType) { +// require.NotEmpty(t, plName) +// require.NotEmpty(t, plLocation) + +// storedPlaylistPath := plName +// localPlaylistPath := plName + +// // download from cloud storage +// if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil { +// localPlaylistPath = fmt.Sprintf("%s/%s", r.FilePrefix, storedPlaylistPath) +// download(t, uploadConfig, localPlaylistPath, storedPlaylistPath) +// if plType == m3u8.PlaylistTypeEvent { +// // Only download segments once +// base := storedPlaylistPath[:len(storedPlaylistPath)-5] +// for i := 0; i < int(segmentCount); i++ { +// cloudPath := fmt.Sprintf("%s_%05d.ts", base, i) +// localPath := fmt.Sprintf("%s/%s", r.FilePrefix, cloudPath) +// download(t, uploadConfig, localPath, cloudPath) +// } +// } +// } + +// verify +// verify(t, localPlaylistPath, p, res, types.EgressTypeSegments, r.Muting, r.sourceFramerate, plType == m3u8.PlaylistTypeLive) +//} diff --git a/test/integration.go b/test/integration.go index 08bfe37c..8bf2016a 100644 --- a/test/integration.go +++ b/test/integration.go @@ -79,6 +79,9 @@ type testCase struct { live_playlist string filenameSuffix livekit.SegmentedFileSuffix + // used by images tests + imageFilenameSuffix livekit.ImageFileSuffix + // used by sdk tests audioCodec types.MimeType audioDelay time.Duration @@ -93,7 +96,7 @@ type testCase struct { // used by track tests outputType types.OutputType - expectVideoTranscoding bool + expectVideoEncoding bool } func (r *Runner) awaitIdle(t *testing.T) { diff --git a/test/participant.go b/test/participant.go index 4135527a..f1e56497 100644 --- a/test/participant.go +++ b/test/participant.go @@ -121,7 +121,7 @@ func (r *Runner) testParticipantFile(t *testing.T) { }, } - test.expectVideoTranscoding = true + test.expectVideoEncoding = true r.runFileTest(t, req, test) }) if r.Short { @@ -157,7 +157,7 @@ func (r *Runner) testParticipantStream(t *testing.T) { }, } - r.runStreamTest(t, req, &testCase{expectVideoTranscoding: true}) + r.runStreamTest(t, req, &testCase{expectVideoEncoding: true}) }, ) } @@ -219,7 +219,7 @@ func (r *Runner) testParticipantSegments(t *testing.T) { Participant: trackRequest, }, } - test.expectVideoTranscoding = true + test.expectVideoEncoding = true r.runSegmentsTest(t, req, test) }, diff --git a/test/room_composite.go b/test/room_composite.go index 245e7e16..5ad252c0 100644 --- a/test/room_composite.go +++ b/test/room_composite.go @@ -38,6 +38,7 @@ func (r *Runner) testRoomComposite(t *testing.T) { r.testRoomCompositeFile(t) r.testRoomCompositeStream(t) r.testRoomCompositeSegments(t) + r.testRoomCompositeImages(t) r.testRoomCompositeMulti(t) } @@ -57,9 +58,9 @@ func (r *Runner) testRoomCompositeFile(t *testing.T) { t.Run("RoomComposite/File", func(t *testing.T) { for _, test := range []*testCase{ { - name: "Base", - filename: "r_{room_name}_{time}.mp4", - expectVideoTranscoding: true, + name: "Base", + filename: "r_{room_name}_{time}.mp4", + expectVideoEncoding: true, }, { name: "Video-Only", @@ -67,8 +68,8 @@ func (r *Runner) testRoomCompositeFile(t *testing.T) { options: &livekit.EncodingOptions{ VideoCodec: livekit.VideoCodec_H264_HIGH, }, - filename: "r_{room_name}_video_{time}.mp4", - expectVideoTranscoding: true, + filename: "r_{room_name}_video_{time}.mp4", + expectVideoEncoding: true, }, { name: "Audio-Only", @@ -77,8 +78,8 @@ func (r *Runner) testRoomCompositeFile(t *testing.T) { options: &livekit.EncodingOptions{ AudioCodec: livekit.AudioCodec_OPUS, }, - filename: "r_{room_name}_audio_{time}", - expectVideoTranscoding: false, + filename: "r_{room_name}_audio_{time}", + expectVideoEncoding: false, }, } { r.runRoomTest(t, test.name, types.MimeTypeOpus, types.MimeTypeH264, func(t *testing.T) { @@ -147,7 +148,7 @@ func (r *Runner) testRoomCompositeStream(t *testing.T) { }, } - r.runStreamTest(t, req, &testCase{expectVideoTranscoding: true}) + r.runStreamTest(t, req, &testCase{expectVideoEncoding: true}) }) if r.Short { return @@ -202,11 +203,11 @@ func (r *Runner) testRoomCompositeSegments(t *testing.T) { Height: 1080, VideoBitrate: 4500, }, - filename: "r_{room_name}_{time}", - playlist: "r_{room_name}_{time}.m3u8", - live_playlist: "r_live_{room_name}_{time}.m3u8", - filenameSuffix: livekit.SegmentedFileSuffix_TIMESTAMP, - expectVideoTranscoding: true, + filename: "r_{room_name}_{time}", + playlist: "r_{room_name}_{time}.m3u8", + live_playlist: "r_live_{room_name}_{time}.m3u8", + filenameSuffix: livekit.SegmentedFileSuffix_TIMESTAMP, + expectVideoEncoding: true, }, &testCase{ options: &livekit.EncodingOptions{ @@ -255,6 +256,53 @@ func (r *Runner) testRoomCompositeSegments(t *testing.T) { }) } +func (r *Runner) testRoomCompositeImages(t *testing.T) { + if !r.runImageTests() { + return + } + + r.runRoomTest(t, "RoomComposite/Images", types.MimeTypeOpus, types.MimeTypeH264, func(t *testing.T) { + for _, test := range []*testCase{ + &testCase{ + options: &livekit.EncodingOptions{ + Width: 640, + Height: 360, + }, + filename: "r_{room_name}_{time}", + imageFilenameSuffix: livekit.ImageFileSuffix_IMAGE_SUFFIX_TIMESTAMP, + }, + } { + imageOutput := &livekit.ImageOutput{ + FilenamePrefix: r.getFilePath(test.filename), + FilenameSuffix: test.imageFilenameSuffix, + } + + // TODO upload + + room := &livekit.RoomCompositeEgressRequest{ + RoomName: r.RoomName, + Layout: "grid-dark", + AudioOnly: test.audioOnly, + ImageOutputs: []*livekit.ImageOutput{imageOutput}, + } + if test.options != nil { + room.Options = &livekit.RoomCompositeEgressRequest_Advanced{ + Advanced: test.options, + } + } + + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_RoomComposite{ + RoomComposite: room, + }, + } + + r.runImagesTest(t, req, test) + } + }) +} + func (r *Runner) testRoomCompositeMulti(t *testing.T) { if !r.runMultiTests() { return diff --git a/test/runner.go b/test/runner.go index a86eee82..b8d44007 100644 --- a/test/runner.go +++ b/test/runner.go @@ -63,6 +63,7 @@ type Runner struct { FileTestsOnly bool `yaml:"file_only"` StreamTestsOnly bool `yaml:"stream_only"` SegmentTestsOnly bool `yaml:"segments_only"` + ImageTestsOnly bool `yaml:"images_only"` MultiTestsOnly bool `yaml:"multi_only"` Muting bool `yaml:"muting"` Dotfiles bool `yaml:"dot_files"` @@ -202,17 +203,21 @@ func (r *Runner) runTrackTests() bool { } func (r *Runner) runFileTests() bool { - return !r.StreamTestsOnly && !r.SegmentTestsOnly && !r.MultiTestsOnly + return !r.StreamTestsOnly && !r.SegmentTestsOnly && !r.MultiTestsOnly && !r.ImageTestsOnly } func (r *Runner) runStreamTests() bool { - return !r.FileTestsOnly && !r.SegmentTestsOnly && !r.MultiTestsOnly + return !r.FileTestsOnly && !r.SegmentTestsOnly && !r.MultiTestsOnly && !r.ImageTestsOnly } func (r *Runner) runSegmentTests() bool { - return !r.FileTestsOnly && !r.StreamTestsOnly && !r.MultiTestsOnly + return !r.FileTestsOnly && !r.StreamTestsOnly && !r.MultiTestsOnly && !r.ImageTestsOnly +} + +func (r *Runner) runImageTests() bool { + return !r.FileTestsOnly && !r.StreamTestsOnly && !r.SegmentTestsOnly && !r.MultiTestsOnly } func (r *Runner) runMultiTests() bool { - return !r.FileTestsOnly && !r.StreamTestsOnly && !r.SegmentTestsOnly + return !r.FileTestsOnly && !r.StreamTestsOnly && !r.SegmentTestsOnly && !r.ImageTestsOnly } diff --git a/test/segments.go b/test/segments.go index d7c250e7..542751dc 100644 --- a/test/segments.go +++ b/test/segments.go @@ -51,7 +51,7 @@ func (r *Runner) runSegmentsTest(t *testing.T, req *rpc.StartEgressRequest, test r.verifySegments(t, p, test.filenameSuffix, res, test.live_playlist != "") if !test.audioOnly { - require.Equal(t, test.expectVideoTranscoding, p.VideoTranscoding) + require.Equal(t, test.expectVideoEncoding, p.VideoEncoding) } } diff --git a/test/stream.go b/test/stream.go index db7b3839..05c3ccaf 100644 --- a/test/stream.go +++ b/test/stream.go @@ -37,7 +37,7 @@ func (r *Runner) runStreamTest(t *testing.T, req *rpc.StartEgressRequest, test * // get params p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) require.NoError(t, err) - require.Equal(t, test.expectVideoTranscoding, p.VideoTranscoding) + require.Equal(t, test.expectVideoEncoding, p.VideoEncoding) // verify and check update time.Sleep(time.Second * 5) diff --git a/test/track_composite.go b/test/track_composite.go index 7bda4ab9..cf0aa128 100644 --- a/test/track_composite.go +++ b/test/track_composite.go @@ -34,6 +34,7 @@ func (r *Runner) testTrackComposite(t *testing.T) { r.testTrackCompositeFile(t) r.testTrackCompositeStream(t) r.testTrackCompositeSegments(t) + r.testTrackCompositeImages(t) r.testTrackCompositeMulti(t) } @@ -109,7 +110,7 @@ func (r *Runner) testTrackCompositeFile(t *testing.T) { }, } - test.expectVideoTranscoding = true + test.expectVideoEncoding = true r.runFileTest(t, req, test) }) if r.Short { @@ -140,7 +141,7 @@ func (r *Runner) testTrackCompositeStream(t *testing.T) { }, } - r.runStreamTest(t, req, &testCase{expectVideoTranscoding: true}) + r.runStreamTest(t, req, &testCase{expectVideoEncoding: true}) }, ) } @@ -216,7 +217,7 @@ func (r *Runner) testTrackCompositeSegments(t *testing.T) { TrackComposite: trackRequest, }, } - test.expectVideoTranscoding = true + test.expectVideoEncoding = true r.runSegmentsTest(t, req, test) }, @@ -228,6 +229,67 @@ func (r *Runner) testTrackCompositeSegments(t *testing.T) { }) } +func (r *Runner) testTrackCompositeImages(t *testing.T) { + if !r.runImageTests() { + return + } + + t.Run("TrackComposite/Images", func(t *testing.T) { + for _, test := range []*testCase{ + { + name: "VP8", + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeH264, + filename: "tcs_{publisher_identity}_vp8_{time}", + }, + } { + r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, + func(t *testing.T, audioTrackID, videoTrackID string) { + var aID, vID string + if !test.audioOnly { + vID = videoTrackID + } + if !test.videoOnly { + aID = audioTrackID + } + + imageOutput := &livekit.ImageOutput{ + CaptureInterval: 5, + Width: 1280, + Height: 720, + FilenamePrefix: r.getFilePath(test.filename), + } + + // TODO Upload + + trackRequest := &livekit.TrackCompositeEgressRequest{ + RoomName: r.room.Name(), + AudioTrackId: aID, + VideoTrackId: vID, + ImageOutputs: []*livekit.ImageOutput{imageOutput}, + } + if test.options != nil { + trackRequest.Options = &livekit.TrackCompositeEgressRequest_Advanced{ + Advanced: test.options, + } + } + + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_TrackComposite{ + TrackComposite: trackRequest, + }, + } + r.runImagesTest(t, req, test) + }, + ) + if r.Short { + return + } + } + }) +} + func (r *Runner) testTrackCompositeMulti(t *testing.T) { if !r.runMultiTests() { return diff --git a/test/web.go b/test/web.go index c91b6ede..0d31669b 100644 --- a/test/web.go +++ b/test/web.go @@ -71,7 +71,7 @@ func (r *Runner) testWebFile(t *testing.T) { } r.runFileTest(t, req, &testCase{ - expectVideoTranscoding: true, + expectVideoEncoding: true, }) }) } @@ -96,7 +96,7 @@ func (r *Runner) testWebStream(t *testing.T) { } r.runStreamTest(t, req, &testCase{ - expectVideoTranscoding: true, + expectVideoEncoding: true, }) }) } @@ -129,7 +129,7 @@ func (r *Runner) testWebSegments(t *testing.T) { } r.runSegmentsTest(t, req, &testCase{ - expectVideoTranscoding: true, + expectVideoEncoding: true, }) }) }