Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Image output in egress #506

Merged
merged 39 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
d767445
Some condf work
biglittlebigben Sep 20, 2023
4594cfe
WiP
biglittlebigben Sep 20, 2023
07557b1
Array output per type
biglittlebigben Sep 20, 2023
3c333f5
WiP
biglittlebigben Sep 21, 2023
ceffc2c
WiP
biglittlebigben Sep 21, 2023
9b1fad8
WiP
biglittlebigben Sep 22, 2023
e13ffe5
WiP
biglittlebigben Sep 25, 2023
378c1a8
WiP
biglittlebigben Sep 25, 2023
5c591b4
WiP
biglittlebigben Sep 25, 2023
005fd31
Default values
biglittlebigben Sep 26, 2023
e884a76
WiP
biglittlebigben Sep 26, 2023
f843491
Merge remote-tracking branch 'origin/main' into benjamin/image_output
biglittlebigben Sep 26, 2023
70670a4
WiP
biglittlebigben Sep 27, 2023
205f0ca
WiP
biglittlebigben Sep 27, 2023
e436167
WiP
biglittlebigben Sep 27, 2023
54455d5
WiP
biglittlebigben Sep 27, 2023
47f7420
Generates frames
biglittlebigben Sep 28, 2023
f388745
Fix tests
biglittlebigben Sep 28, 2023
f2ae9d4
WiP
biglittlebigben Sep 28, 2023
6ec822c
Add watch
biglittlebigben Sep 28, 2023
3963cb0
Wire sink
biglittlebigben Sep 28, 2023
f5a804d
deadlock
biglittlebigben Sep 28, 2023
cb64833
Merge remote-tracking branch 'origin/main' into benjamin/image_output
biglittlebigben Sep 28, 2023
cf634fa
WiP
biglittlebigben Sep 28, 2023
29de86c
Update module
biglittlebigben Sep 28, 2023
8f7a581
Merge branch 'main' into benjamin/image_output
biglittlebigben Sep 29, 2023
53fe4df
Set SetGetSrcPad on outputs
biglittlebigben Sep 29, 2023
757c736
Merge remote-tracking branch 'origin/benjamin/image_output' into benj…
biglittlebigben Sep 29, 2023
1071471
Merge branch 'main' into benjamin/image_output
biglittlebigben Sep 29, 2023
25a955f
Fix stream
biglittlebigben Sep 29, 2023
16658ea
Merge remote-tracking branch 'origin/benjamin/image_output' into benj…
biglittlebigben Sep 29, 2023
a776b02
Fix audio only
biglittlebigben Sep 29, 2023
23a7c55
WiP
biglittlebigben Sep 29, 2023
7afe0fb
WiP
biglittlebigben Sep 29, 2023
b41807b
WiP
biglittlebigben Sep 29, 2023
48c15e3
Merge remote-tracking branch 'origin/main' into benjamin/image_output
biglittlebigben Sep 29, 2023
3c0376e
Merge remote-tracking branch 'origin/main' into benjamin/image_output
biglittlebigben Oct 2, 2023
da3299f
clean up
biglittlebigben Oct 2, 2023
1751f20
Ensure default value for image dimensions for SDK output
biglittlebigben Oct 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/livekit/livekit-server v1.4.6-0.20230918194757-8a0d417a8c99
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e
github.com/livekit/protocol v1.7.3-0.20230929193653-a020215c5c8e
github.com/livekit/psrpc v0.3.3
github.com/livekit/server-sdk-go v1.0.17-0.20230928233454-b49bf45b164b
github.com/pion/rtp v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f h1:b4ri7hQESRSzJWzXXcmANG2hJ4HTj5LM01Ekm8lnQmg=
github.com/livekit/mediatransportutil v0.0.0-20230906055425-e81fd5f6fb3f/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e h1:WEet0iH/JazBFNhhH+YuZHtXpKefb7mnbCC2al3peyA=
github.com/livekit/protocol v1.7.3-0.20230915202328-cf9f95141e0e/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0=
github.com/livekit/protocol v1.7.3-0.20230929193653-a020215c5c8e h1:4Z+XM3ohqxBuyARhQ0Cj8JjAj4XaQDulqPq1Ykvp4BQ=
github.com/livekit/protocol v1.7.3-0.20230929193653-a020215c5c8e/go.mod h1:zbh0QPUcLGOeZeIO/VeigwWWbudz4Lv+Px94FnVfQH0=
github.com/livekit/psrpc v0.3.3 h1:+lltbuN39IdaynXhLLxRShgYqYsRMWeeXKzv60oqyWo=
github.com/livekit/psrpc v0.3.3/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
github.com/livekit/server-sdk-go v1.0.17-0.20230928233454-b49bf45b164b h1:4rm2CAPOfLG7r7texpyhf0HEE5xvAkFfg+UrcxV4x3U=
Expand Down
1 change: 1 addition & 0 deletions pkg/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type SessionLimits struct {
FileOutputMaxDuration time.Duration `yaml:"file_output_max_duration"`
StreamOutputMaxDuration time.Duration `yaml:"stream_output_max_duration"`
SegmentOutputMaxDuration time.Duration `yaml:"segment_output_max_duration"`
ImageOutputMaxDuration time.Duration `yaml:"image_output_max_duration"`
}

func (c *BaseConfig) initLogger(values ...interface{}) error {
Expand Down
56 changes: 49 additions & 7 deletions pkg/config/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type EncodedOutput interface {
GetFileOutputs() []*livekit.EncodedFileOutput
GetStreamOutputs() []*livekit.StreamOutput
GetSegmentOutputs() []*livekit.SegmentedFileOutput
ImageOutput
}

type EncodedOutputDeprecated interface {
Expand All @@ -48,6 +49,10 @@ type EncodedOutputDeprecated interface {
GetSegmentOutputs() []*livekit.SegmentedFileOutput
}

type ImageOutput interface {
GetImageOutputs() []*livekit.ImageOutput
}

func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error {
files := req.GetFileOutputs()
streams := req.GetStreamOutputs()
Expand All @@ -71,9 +76,12 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error {
return err
}

p.Outputs[types.EgressTypeFile] = conf
p.Outputs[types.EgressTypeFile] = []OutputConfig{conf}
p.OutputCount++
p.FinalizationRequired = true
if p.VideoEnabled {
p.VideoEncoding = true
}

p.Info.FileResults = []*livekit.FileInfo{conf.FileInfo}
if len(streams)+len(segments) == 0 {
Expand All @@ -100,8 +108,11 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error {
return err
}

p.Outputs[types.EgressTypeStream] = conf
p.Outputs[types.EgressTypeStream] = []OutputConfig{conf}
p.OutputCount += len(stream.Urls)
if p.VideoEnabled {
p.VideoEncoding = true
}

streamInfoList := make([]*livekit.StreamInfo, 0, len(conf.StreamInfo))
for _, info := range conf.StreamInfo {
Expand Down Expand Up @@ -137,9 +148,12 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error {
return err
}

p.Outputs[types.EgressTypeSegments] = conf
p.Outputs[types.EgressTypeSegments] = []OutputConfig{conf}
p.OutputCount++
p.FinalizationRequired = true
if p.VideoEnabled {
p.VideoEncoding = true
}

p.Info.SegmentResults = []*livekit.SegmentsInfo{conf.SegmentsInfo}
if len(streams)+len(segments) == 0 {
Expand All @@ -148,14 +162,19 @@ func (p *PipelineConfig) updateEncodedOutputs(req EncodedOutput) error {
}
}

if segmentConf := p.Outputs[types.EgressTypeSegments]; segmentConf != nil {
if segmentConf := p.Outputs[types.EgressTypeSegments]; segmentConf != nil && len(segmentConf) > 0 {
// double the segment length
p.KeyFrameInterval = float64(2 * segmentConf.(*SegmentConfig).SegmentDuration)
p.KeyFrameInterval = float64(2 * segmentConf[0].(*SegmentConfig).SegmentDuration)
} else if p.KeyFrameInterval == 0 && p.Outputs[types.EgressTypeStream] != nil {
// default 4s for streams
p.KeyFrameInterval = 4
}

err := p.updateImageOutputs(req)
if err != nil {
return err
}

if p.OutputCount == 0 {
return errors.ErrInvalidInput("output")
}
Expand All @@ -174,7 +193,7 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err
p.Info.FileResults = []*livekit.FileInfo{conf.FileInfo}
p.Info.Result = &livekit.EgressInfo_File{File: conf.FileInfo}

p.Outputs[types.EgressTypeFile] = conf
p.Outputs[types.EgressTypeFile] = []OutputConfig{conf}
p.OutputCount = 1
p.FinalizationRequired = true

Expand All @@ -191,7 +210,7 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err
p.Info.StreamResults = streamInfoList
p.Info.Result = &livekit.EgressInfo_Stream{Stream: &livekit.StreamInfoList{Info: streamInfoList}}

p.Outputs[types.EgressTypeWebsocket] = conf
p.Outputs[types.EgressTypeWebsocket] = []OutputConfig{conf}
p.OutputCount = 1

default:
Expand All @@ -201,6 +220,29 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err
return nil
}

func (p *PipelineConfig) updateImageOutputs(req ImageOutput) error {
images := req.GetImageOutputs()

if len(images) > 0 && !p.VideoEnabled {
return errors.ErrInvalidInput("audio_only")
}

for _, img := range images {
conf, err := p.getImageConfig(img)
if err != nil {
return err
}

p.Outputs[types.EgressTypeImages] = append(p.Outputs[types.EgressTypeImages], conf)
p.OutputCount++
p.FinalizationRequired = true

p.Info.ImageResults = append(p.Info.ImageResults, conf.ImagesInfo)
}

return nil
}

func redactEncodedOutputs(out EncodedOutput) {
if files := out.GetFileOutputs(); len(files) == 1 {
redactUpload(files[0])
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/output_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ type FileConfig struct {

func (p *PipelineConfig) GetFileConfig() *FileConfig {
o, ok := p.Outputs[types.EgressTypeFile]
if !ok {
if !ok || len(o) == 0 {
return nil
}
return o.(*FileConfig)
return o[0].(*FileConfig)
}

func (p *PipelineConfig) getEncodedFileConfig(file *livekit.EncodedFileOutput) (*FileConfig, error) {
Expand Down
156 changes: 156 additions & 0 deletions pkg/config/output_image.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"fmt"
"os"
"path"
"time"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/utils"
)

type ImageConfig struct {
outputConfig

Id string // Used internally to map a gst Bin/element back to a sink and as part of the path

ImagesInfo *livekit.ImagesInfo
LocalDir string
StorageDir string
ImagePrefix string
ImageSuffix livekit.ImageFileSuffix
ImageExtension types.FileExtension

DisableManifest bool
UploadConfig UploadConfig

CaptureInterval uint32
Width int32
Height int32
ImageOutCodec types.MimeType
}

func (p *PipelineConfig) GetImageConfigs() []*ImageConfig {
o, _ := p.Outputs[types.EgressTypeImages]

var configs []*ImageConfig
for _, c := range o {
configs = append(configs, c.(*ImageConfig))
}

return configs
}

func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConfig, error) {
outCodec, outputType, err := getMimeTypes(images.ImageCodec)
if err != nil {
return nil, err
}

conf := &ImageConfig{
outputConfig: outputConfig{
OutputType: outputType,
},

Id: utils.NewGuid(""),
ImagesInfo: &livekit.ImagesInfo{},
ImagePrefix: clean(images.FilenamePrefix),
ImageSuffix: images.FilenameSuffix,
DisableManifest: images.DisableManifest,
UploadConfig: p.getUploadConfig(images),
CaptureInterval: images.CaptureInterval,
Width: images.Width,
Height: images.Height,
ImageOutCodec: outCodec,
}

if conf.CaptureInterval == 0 {
// 10s by default
conf.CaptureInterval = 10
}

// Set default dimensions for RoomComposite and Web. For all SDKs input, default will be
// set from the track dimensions
switch p.Info.Request.(type) {
case *livekit.EgressInfo_RoomComposite, *livekit.EgressInfo_Web:
if conf.Width == 0 {
conf.Width = p.Width
}
if conf.Height == 0 {
conf.Height = p.Height
}
}

// filename
err = conf.updatePrefix(p)
if err != nil {
return nil, err
}

return conf, nil
}

func (o *ImageConfig) updatePrefix(p *PipelineConfig) error {
identifier, replacements := p.getFilenameInfo()

o.ImagePrefix = stringReplace(o.ImagePrefix, replacements)

o.ImageExtension = types.FileExtensionForOutputType[o.OutputType]

imagesDir, imagesPrefix := path.Split(o.ImagePrefix)

o.StorageDir = imagesDir

// ensure playlistName
if imagesPrefix == "" {
imagesPrefix = fmt.Sprintf("%s-%s", identifier, time.Now().Format("2006-01-02T150405"))
}

// update config
o.ImagePrefix = imagesPrefix

if o.UploadConfig == nil {
o.LocalDir = imagesDir
} else {
// Prepend the configuration base directory and the egress Id, and slug to prevent conflict if
// there is more than one image output
// os.ModeDir creates a directory with mode 000 when mapping the directory outside the container
// Append a "/" to the path for consistency with the "UploadConfig == nil" case

o.LocalDir = path.Join(TmpDir, p.Info.EgressId, o.Id) + "/"
}

// create local directories
if o.LocalDir != "" {
if err := os.MkdirAll(o.LocalDir, 0755); err != nil {
return err
}
}

return nil
}
func getMimeTypes(imageCodec livekit.ImageCodec) (types.MimeType, types.OutputType, error) {
switch imageCodec {
case livekit.ImageCodec_IC_DEFAULT, livekit.ImageCodec_IC_JPEG:
return types.MimeTypeJPEG, types.OutputTypeJPEG, nil
default:
return "", "", errors.ErrNoCompatibleCodec
}
}
4 changes: 2 additions & 2 deletions pkg/config/output_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ type SegmentConfig struct {

func (p *PipelineConfig) GetSegmentConfig() *SegmentConfig {
o, ok := p.Outputs[types.EgressTypeSegments]
if !ok {
if !ok || len(o) == 0 {
return nil
}
return o.(*SegmentConfig)
return o[0].(*SegmentConfig)
}

// segments should always be added last, so we can check keyframe interval from file/stream
Expand Down
8 changes: 4 additions & 4 deletions pkg/config/output_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ type StreamConfig struct {

func (p *PipelineConfig) GetStreamConfig() *StreamConfig {
o, ok := p.Outputs[types.EgressTypeStream]
if !ok {
if !ok || len(o) == 0 {
return nil
}
return o.(*StreamConfig)
return o[0].(*StreamConfig)
}

func (p *PipelineConfig) GetWebsocketConfig() *StreamConfig {
o, ok := p.Outputs[types.EgressTypeWebsocket]
if !ok {
if !ok || len(o) == 0 {
return nil
}
return o.(*StreamConfig)
return o[0].(*StreamConfig)
}

func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []string) (*StreamConfig, error) {
Expand Down
Loading