From 426541fd947870e54d117d3eccfda08c4cfd2a98 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 17 Oct 2024 15:23:20 -0400 Subject: [PATCH 1/9] manifest updates --- go.mod | 6 +- go.sum | 12 +- pkg/config/manifest.go | 161 ++++++++++++++++++++ pkg/config/pipeline.go | 3 +- pkg/config/storage.go | 2 + pkg/pipeline/controller.go | 49 ++++++ pkg/pipeline/sink/file.go | 27 +++- pkg/pipeline/sink/image.go | 33 ++-- pkg/pipeline/sink/image_manifest.go | 77 ---------- pkg/pipeline/sink/manifest.go | 88 ----------- pkg/pipeline/sink/segments.go | 62 +++++--- pkg/pipeline/sink/sink.go | 1 + pkg/pipeline/sink/uploader/alioss.go | 25 +-- pkg/pipeline/sink/uploader/azure.go | 31 ++-- pkg/pipeline/sink/uploader/gcp.go | 31 ++-- pkg/pipeline/sink/uploader/local.go | 19 +-- pkg/pipeline/sink/uploader/s3.go | 47 ++++-- pkg/pipeline/sink/uploader/uploader.go | 62 +++++--- pkg/pipeline/sink/uploader/uploader_test.go | 65 ++++++++ pkg/pipeline/sink/websocket.go | 4 + 20 files changed, 498 insertions(+), 307 deletions(-) create mode 100644 pkg/config/manifest.go delete mode 100644 pkg/pipeline/sink/image_manifest.go delete mode 100644 pkg/pipeline/sink/manifest.go create mode 100644 pkg/pipeline/sink/uploader/uploader_test.go diff --git a/go.mod b/go.mod index 8e3cd5ba..0f7e9510 100644 --- a/go.mod +++ b/go.mod @@ -23,8 +23,8 @@ require ( github.com/gorilla/websocket v1.5.3 github.com/livekit/livekit-server v1.7.2 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/protocol v1.21.0 - github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a + github.com/livekit/protocol v1.26.1-0.20241017190602-ef6fc8f9c752 + github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 github.com/livekit/server-sdk-go/v2 v2.2.1 github.com/pion/rtp v1.8.9 github.com/pion/webrtc/v3 v3.3.1 @@ -75,8 +75,6 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/eapache/channels v1.1.0 // indirect - github.com/eapache/queue v1.1.0 // indirect github.com/elliotchance/orderedmap/v2 v2.2.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect diff --git a/go.sum b/go.sum index 8cb43dee..8f30c5c3 100644 --- a/go.sum +++ b/go.sum @@ -106,10 +106,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/eapache/channels v1.1.0 h1:F1taHcn7/F0i8DYqKXJnyhJcVpp2kgFcNePxXtnyu4k= -github.com/eapache/channels v1.1.0/go.mod h1:jMm2qB5Ubtg9zLd+inMZd2/NUvXgzmWXsDaLyQIGfH0= -github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= -github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4cyHeOFOnj4Vk= github.com/elliotchance/orderedmap/v2 v2.2.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -222,10 +218,10 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.21.0 h1:qb+c8LwLsVdVm/RGwIG60K4g2q3xpCF4vr5xPxBfrL4= -github.com/livekit/protocol v1.21.0/go.mod h1:AFuwk3+uIWFeO5ohKjx5w606Djl940+wktaZ441VoCI= -github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo= -github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= +github.com/livekit/protocol v1.26.1-0.20241017190602-ef6fc8f9c752 h1:EgULMfdFSW/3ZZckhiF+CwDApYTD3SkqR3MYazKeE5w= +github.com/livekit/protocol v1.26.1-0.20241017190602-ef6fc8f9c752/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= +github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs= +github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/livekit/server-sdk-go/v2 v2.2.1 h1:MK6H52X/k5FA3d613qAawAVvWPwyzxS5CLpxfKd/pD4= github.com/livekit/server-sdk-go/v2 v2.2.1/go.mod h1:Oqv4EYIqLWjdgWXw8HuxN79QDy77vws1vI9YMkBxSMM= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= diff --git a/pkg/config/manifest.go b/pkg/config/manifest.go new file mode 100644 index 00000000..619f1a32 --- /dev/null +++ b/pkg/config/manifest.go @@ -0,0 +1,161 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "encoding/json" + "sync" + "time" +) + +type Manifest struct { + EgressID string `json:"egress_id,omitempty"` + RoomID string `json:"room_id,omitempty"` + RoomName string `json:"room_name,omitempty"` + Url string `json:"url,omitempty"` + StartedAt int64 `json:"started_at,omitempty"` + EndedAt int64 `json:"ended_at,omitempty"` + PublisherIdentity string `json:"publisher_identity,omitempty"` + TrackID string `json:"track_id,omitempty"` + TrackKind string `json:"track_kind,omitempty"` + TrackSource string `json:"track_source,omitempty"` + AudioTrackID string `json:"audio_track_id,omitempty"` + VideoTrackID string `json:"video_track_id,omitempty"` + + mu sync.Mutex + Files []*File `json:"files,omitempty"` + Playlists []*Playlist `json:"playlists,omitempty"` + Images []*Image `json:"images,omitempty"` +} + +type File struct { + Filename string `json:"filename,omitempty"` + Location string `json:"location,omitempty"` + PresignedUrl string `json:"presigned_url,omitempty"` +} + +type Playlist struct { + mu sync.Mutex + PlaylistLocation string `json:"playlist_location"` + PlaylistPresignedUrl string `json:"playlist_presigned_url"` + Segments []*Segment `json:"segments,omitempty"` +} + +type Segment struct { + Filename string `json:"filename,omitempty"` + Location string `json:"location,omitempty"` + PresignedUrl string `json:"presigned_url,omitempty"` +} + +type Image struct { + Filename string `json:"filename,omitempty"` + Timestamp time.Time `json:"timestamp,omitempty"` + Location string `json:"location,omitempty"` + PresignedUrl string `json:"presigned_url,omitempty"` +} + +func (p *PipelineConfig) initManifest() { + if p.shouldCreateManifest() { + p.Manifest = &Manifest{ + EgressID: p.Info.EgressId, + RoomID: p.Info.RoomId, + RoomName: p.Info.RoomName, + Url: p.WebUrl, + StartedAt: p.Info.StartedAt, + PublisherIdentity: p.Identity, + TrackID: p.TrackID, + TrackKind: p.TrackKind, + TrackSource: p.TrackSource, + AudioTrackID: p.AudioTrackID, + VideoTrackID: p.VideoTrackID, + } + + if p.GetSegmentConfig() != nil { + p.Manifest.Playlists = append(p.Manifest.Playlists, &Playlist{}) + } + } +} + +func (p *PipelineConfig) shouldCreateManifest() bool { + if p.BackupConfig != nil { + return true + } + if fc := p.GetFileConfig(); fc != nil && !fc.DisableManifest { + return true + } + if sc := p.GetSegmentConfig(); sc != nil && !sc.DisableManifest { + return true + } + for _, ic := range p.GetImageConfigs() { + if !ic.DisableManifest { + return true + } + } + return false +} + +func (m *Manifest) AddFile(filename, location, presignedUrl string) { + m.mu.Lock() + m.Files = append(m.Files, &File{ + Filename: filename, + Location: location, + PresignedUrl: presignedUrl, + }) + m.mu.Unlock() +} + +func (m *Manifest) AddPlaylist() *Playlist { + p := &Playlist{} + + m.mu.Lock() + m.Playlists = append(m.Playlists, p) + m.mu.Unlock() + + return p +} + +func (p *Playlist) UpdateLocation(location, presignedUrl string) { + p.mu.Lock() + p.PlaylistLocation = location + p.PlaylistPresignedUrl = presignedUrl + p.mu.Unlock() +} + +func (p *Playlist) AddSegment(filename, location, presignedUrl string) { + p.mu.Lock() + p.Segments = append(p.Segments, &Segment{ + Filename: filename, + Location: location, + PresignedUrl: presignedUrl, + }) + p.mu.Unlock() +} + +func (m *Manifest) AddImage(filename string, ts time.Time, location, presignedUrl string) { + m.mu.Lock() + m.Images = append(m.Images, &Image{ + Filename: filename, + Timestamp: ts, + Location: location, + PresignedUrl: presignedUrl, + }) + m.mu.Unlock() +} + +func (m *Manifest) Close(endedAt int64) ([]byte, error) { + m.EndedAt = endedAt + + return json.Marshal(m) +} diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index 1c8ae580..eda9a444 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -55,7 +55,8 @@ type PipelineConfig struct { OutputCount atomic.Int32 `yaml:"-"` FinalizationRequired bool `yaml:"-"` - Info *info.EgressInfo `yaml:"-"` + Info *info.EgressInfo `yaml:"-"` + Manifest *Manifest `yaml:"-"` } type SourceConfig struct { diff --git a/pkg/config/storage.go b/pkg/config/storage.go index 942f1a0f..4b40ebb3 100644 --- a/pkg/config/storage.go +++ b/pkg/config/storage.go @@ -27,6 +27,8 @@ type StorageConfig struct { Azure *AzureConfig `yaml:"azure"` // upload to azure GCP *GCPConfig `yaml:"gcp"` // upload to gcp AliOSS *S3Config `yaml:"alioss"` // upload to aliyun + + GeneratePresignedUrl bool `yaml:"generate_presigned_url"` } type S3Config struct { diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 05b8890b..a098acdb 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -17,6 +17,8 @@ package pipeline import ( "context" "fmt" + "os" + "path" "sync" "time" @@ -457,6 +459,9 @@ func (c *Controller) Close() { livekit.EgressStatus_EGRESS_ENDING: c.Info.SetComplete() } + + // upload manifest and add location to egress info + c.uploadManifest() } func (c *Controller) startSessionLimitTimer(ctx context.Context) { @@ -614,3 +619,47 @@ func (c *Controller) updateEndTime() { } } } + +// uploadManifest happens last, after all sinks have finished +func (c *Controller) uploadManifest() { + if c.Manifest == nil { + return + } + + b, err := c.Manifest.Close(c.Info.EndedAt) + if err != nil { + logger.Errorw("failed to close manifest", err) + return + } + + manifestPath := path.Join(c.TmpDir, fmt.Sprintf("%s.json", c.Info.EgressId)) + f, err := os.Create(manifestPath) + if err != nil { + logger.Errorw("failed to create manifest file", err) + return + } + + _, err = f.Write(b) + if err != nil { + logger.Errorw("failed to write to manifest file", err) + return + } + _ = f.Close() + + infoUpdated := false + for _, si := range c.sinks { + for _, s := range si { + location, presignedUrl, uploaded, err := s.UploadManifest(manifestPath) + if err != nil { + logger.Errorw("failed to upload manifest", err) + continue + } + + if !infoUpdated && uploaded { + c.Info.ManifestLocation = location + c.Info.ManifestPresignedUrl = presignedUrl + infoUpdated = true + } + } + } +} diff --git a/pkg/pipeline/sink/file.go b/pkg/pipeline/sink/file.go index a1aa73e0..77637340 100644 --- a/pkg/pipeline/sink/file.go +++ b/pkg/pipeline/sink/file.go @@ -15,10 +15,11 @@ package sink import ( - "fmt" + "path" "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/pipeline/sink/uploader" + "github.com/livekit/egress/pkg/types" ) type FileSink struct { @@ -41,7 +42,7 @@ func (s *FileSink) Start() error { } func (s *FileSink) Close() error { - location, size, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false) + location, size, presignedUrl, err := s.Upload(s.LocalFilepath, s.StorageFilepath, s.OutputType, false) if err != nil { return err } @@ -49,13 +50,23 @@ func (s *FileSink) Close() error { s.FileInfo.Location = location s.FileInfo.Size = size - if !s.DisableManifest { - manifestLocalPath := fmt.Sprintf("%s.json", s.LocalFilepath) - manifestStoragePath := fmt.Sprintf("%s.json", s.StorageFilepath) - if err = uploadManifest(s.conf, s.Uploader, manifestLocalPath, manifestStoragePath); err != nil { - return err - } + if s.conf.Manifest != nil { + s.conf.Manifest.AddFile(s.StorageFilepath, location, presignedUrl) } return nil } + +func (s *FileSink) UploadManifest(filepath string) (string, string, bool, error) { + if s.DisableManifest && !s.ManifestRequired() { + return "", "", false, nil + } + + storagePath := path.Join(path.Dir(s.StorageFilepath), path.Base(filepath)) + location, _, presignedUrl, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false) + if err != nil { + return "", "", false, err + } + + return location, presignedUrl, true, nil +} diff --git a/pkg/pipeline/sink/image.go b/pkg/pipeline/sink/image.go index 6d114dde..2076a0a5 100644 --- a/pkg/pipeline/sink/image.go +++ b/pkg/pipeline/sink/image.go @@ -43,7 +43,6 @@ type ImageSink struct { startTime time.Time startRunningTime uint64 - manifest *ImageManifest createdImages chan *imageUpdate done core.Fuse } @@ -61,7 +60,6 @@ func newImageSink(u *uploader.Uploader, p *config.PipelineConfig, o *config.Imag conf: p, callbacks: callbacks, - manifest: createImageManifest(p), createdImages: make(chan *imageUpdate, maxPendingUploads), }, nil } @@ -108,16 +106,13 @@ func (s *ImageSink) handleNewImage(update *imageUpdate) error { imageStoragePath := path.Join(s.StorageDir, filename) - _, size, err := s.Upload(imageLocalPath, imageStoragePath, s.OutputType, true) + location, _, presignedUrl, err := s.Upload(imageLocalPath, imageStoragePath, s.OutputType, true) if err != nil { return err } - if !s.DisableManifest { - err = s.updateManifest(filename, ts, size) - if err != nil { - return err - } + if s.conf.Manifest != nil { + s.conf.Manifest.AddImage(imageStoragePath, ts, location, presignedUrl) } return nil @@ -133,14 +128,6 @@ func (s *ImageSink) getImageTime(pts uint64) time.Time { return s.startTime.Add(time.Duration(pts - s.startRunningTime)) } -func (s *ImageSink) updateManifest(filename string, ts time.Time, size int64) error { - s.manifest.imageCreated(filename, ts, size) - - manifestLocalPath := fmt.Sprintf("%s.json", path.Join(s.LocalDir, s.ImagePrefix)) - manifestStoragePath := fmt.Sprintf("%s.json", path.Join(s.StorageDir, s.ImagePrefix)) - return s.manifest.updateManifest(s.Uploader, manifestLocalPath, manifestStoragePath) -} - func (s *ImageSink) NewImage(filepath string, ts uint64) error { if !strings.HasPrefix(filepath, s.LocalDir) { return fmt.Errorf("invalid filepath") @@ -162,3 +149,17 @@ func (s *ImageSink) Close() error { return nil } + +func (s *ImageSink) UploadManifest(filepath string) (string, string, bool, error) { + if s.DisableManifest && !s.ManifestRequired() { + return "", "", false, nil + } + + storagePath := path.Join(s.StorageDir, path.Base(filepath)) + location, _, presignedUrl, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false) + if err != nil { + return "", "", false, err + } + + return location, presignedUrl, true, nil +} diff --git a/pkg/pipeline/sink/image_manifest.go b/pkg/pipeline/sink/image_manifest.go deleted file mode 100644 index 95cb88d8..00000000 --- a/pkg/pipeline/sink/image_manifest.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sink - -import ( - "encoding/json" - "os" - "time" - - "github.com/livekit/egress/pkg/config" - "github.com/livekit/egress/pkg/pipeline/sink/uploader" - "github.com/livekit/egress/pkg/types" -) - -type ImageManifest struct { - Manifest `json:",inline"` - - Images []*Image `json:"images"` -} - -type Image struct { - Name string `json:"name"` - Timestamp time.Time `json:"timestamp"` - Size int64 `json:"size"` -} - -func createImageManifest(p *config.PipelineConfig) *ImageManifest { - return &ImageManifest{ - Manifest: initManifest(p), - } -} - -func (m *ImageManifest) imageCreated(filename string, ts time.Time, size int64) { - m.Images = append(m.Images, &Image{ - Name: filename, - Timestamp: ts, - Size: size, - }) -} - -func (m *ImageManifest) updateManifest(u *uploader.Uploader, localFilepath, storageFilepath string) error { - manifest, err := os.Create(localFilepath) - if err != nil { - return err - } - - b, err := json.Marshal(m) - if err != nil { - return err - } - - _, err = manifest.Write(b) - if err != nil { - return err - } - - err = manifest.Close() - if err != nil { - return err - } - - _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false) - - return err -} diff --git a/pkg/pipeline/sink/manifest.go b/pkg/pipeline/sink/manifest.go deleted file mode 100644 index 6594d107..00000000 --- a/pkg/pipeline/sink/manifest.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sink - -import ( - "encoding/json" - "os" - - "github.com/livekit/egress/pkg/config" - "github.com/livekit/egress/pkg/pipeline/sink/uploader" - "github.com/livekit/egress/pkg/types" -) - -type Manifest struct { - EgressID string `json:"egress_id,omitempty"` - RoomID string `json:"room_id,omitempty"` - RoomName string `json:"room_name,omitempty"` - Url string `json:"url,omitempty"` - StartedAt int64 `json:"started_at,omitempty"` - EndedAt int64 `json:"ended_at,omitempty"` - PublisherIdentity string `json:"publisher_identity,omitempty"` - TrackID string `json:"track_id,omitempty"` - TrackKind string `json:"track_kind,omitempty"` - TrackSource string `json:"track_source,omitempty"` - AudioTrackID string `json:"audio_track_id,omitempty"` - VideoTrackID string `json:"video_track_id,omitempty"` - SegmentCount int64 `json:"segment_count,omitempty"` -} - -func uploadManifest(p *config.PipelineConfig, u *uploader.Uploader, localFilepath, storageFilepath string) error { - manifest, err := os.Create(localFilepath) - if err != nil { - return err - } - - b, err := getManifest(p) - if err != nil { - return err - } - - _, err = manifest.Write(b) - if err != nil { - return err - } - - _, _, err = u.Upload(localFilepath, storageFilepath, types.OutputTypeJSON, false) - - return err -} - -func getManifest(p *config.PipelineConfig) ([]byte, error) { - manifest := initManifest(p) - - if o := p.GetSegmentConfig(); o != nil { - manifest.SegmentCount = o.SegmentsInfo.SegmentCount - } - - return json.Marshal(manifest) -} - -func initManifest(p *config.PipelineConfig) Manifest { - return Manifest{ - EgressID: p.Info.EgressId, - RoomID: p.Info.RoomId, - RoomName: p.Info.RoomName, - Url: p.WebUrl, - StartedAt: p.Info.StartedAt, - EndedAt: p.Info.EndedAt, - PublisherIdentity: p.Identity, - TrackID: p.TrackID, - TrackKind: p.TrackKind, - TrackSource: p.TrackSource, - AudioTrackID: p.AudioTrackID, - VideoTrackID: p.VideoTrackID, - } -} diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index c28dfb82..daa834a8 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -41,8 +41,9 @@ type SegmentSink struct { *uploader.Uploader *config.SegmentConfig - conf *config.PipelineConfig - callbacks *gstreamer.Callbacks + conf *config.PipelineConfig + manifestPlaylist *config.Playlist + callbacks *gstreamer.Callbacks segmentCount int playlist m3u8.PlaylistWriter @@ -104,6 +105,10 @@ func newSegmentSink(u *uploader.Uploader, p *config.PipelineConfig, o *config.Se playlistUpdates: make(chan SegmentUpdate, maxPendingUploads), } + if p.Manifest != nil { + s.manifestPlaylist = p.Manifest.AddPlaylist() + } + // Register gauges that track the number of segments and playlist updates pending upload monitor.RegisterPlaylistChannelSizeGauge(s.conf.NodeID, s.conf.ClusterID, s.conf.Info.EgressId, func() float64 { @@ -149,7 +154,7 @@ func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) { go func() { defer close(update.uploadComplete) - _, size, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true) + location, size, presignedUrl, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true) if err != nil { s.callbacks.OnError(err) return @@ -159,6 +164,9 @@ func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) { s.infoLock.Lock() s.SegmentsInfo.SegmentCount++ s.SegmentsInfo.Size += size + if s.manifestPlaylist != nil { + s.manifestPlaylist.AddSegment(segmentStoragePath, location, presignedUrl) + } s.infoLock.Unlock() }() } @@ -188,18 +196,16 @@ func (s *SegmentSink) handlePlaylistUpdates(update SegmentUpdate) error { s.segmentCount++ if s.shouldUploadPlaylist() { - if err := s.uploadPlaylist(); err != nil { - s.callbacks.OnError(err) - } + // ignore playlist upload failures until close + _ = s.uploadPlaylist() } if s.livePlaylist != nil { if err := s.livePlaylist.Append(segmentStartTime, duration, update.filename); err != nil { return err } - if err := s.uploadLivePlaylist(); err != nil { - s.callbacks.OnError(err) - } + // ignore playlist upload failures until close + _ = s.uploadLivePlaylist() } return nil @@ -214,18 +220,26 @@ func (s *SegmentSink) shouldUploadPlaylist() bool { } func (s *SegmentSink) uploadPlaylist() error { - var err error playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) - s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) + playlistLocation, _, presignedUrl, err := s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) + if err == nil { + s.SegmentsInfo.PlaylistLocation = playlistLocation + if s.manifestPlaylist != nil { + s.manifestPlaylist.PlaylistLocation = playlistLocation + s.manifestPlaylist.PlaylistPresignedUrl = presignedUrl + } + } return err } func (s *SegmentSink) uploadLivePlaylist() error { - var err error liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename) liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename) - s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false) + livePlaylistLocation, _, _, err := s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false) + if err == nil { + s.SegmentsInfo.LivePlaylistLocation = livePlaylistLocation + } return err } @@ -305,15 +319,19 @@ func (s *SegmentSink) Close() error { } } - if !s.DisableManifest { - playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) - playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) - manifestLocalPath := fmt.Sprintf("%s.json", playlistLocalPath) - manifestStoragePath := fmt.Sprintf("%s.json", playlistStoragePath) - if err := uploadManifest(s.conf, s.Uploader, manifestLocalPath, manifestStoragePath); err != nil { - return err - } + return nil +} + +func (s *SegmentSink) UploadManifest(filepath string) (string, string, bool, error) { + if s.DisableManifest && !s.ManifestRequired() { + return "", "", false, nil } - return nil + storagePath := path.Join(s.StorageDir, path.Base(filepath)) + location, _, presignedUrl, err := s.Upload(filepath, storagePath, types.OutputTypeJSON, false) + if err != nil { + return "", "", false, err + } + + return location, presignedUrl, true, nil } diff --git a/pkg/pipeline/sink/sink.go b/pkg/pipeline/sink/sink.go index cc675da2..dcae7ed4 100644 --- a/pkg/pipeline/sink/sink.go +++ b/pkg/pipeline/sink/sink.go @@ -25,6 +25,7 @@ import ( type Sink interface { Start() error Close() error + UploadManifest(string) (string, string, bool, error) } func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monitor *stats.HandlerMonitor) (map[types.EgressType][]Sink, error) { diff --git a/pkg/pipeline/sink/uploader/alioss.go b/pkg/pipeline/sink/uploader/alioss.go index 5139a2c5..5e416186 100644 --- a/pkg/pipeline/sink/uploader/alioss.go +++ b/pkg/pipeline/sink/uploader/alioss.go @@ -27,39 +27,42 @@ import ( ) type AliOSSUploader struct { - conf *config.S3Config - prefix string + conf *config.S3Config + prefix string + generatePresignedUrl bool } -func newAliOSSUploader(conf *config.S3Config, prefix string) (uploader, error) { +func newAliOSSUploader(c *config.StorageConfig) (uploader, error) { + conf := c.AliOSS return &AliOSSUploader{ - conf: conf, - prefix: prefix, + conf: conf, + prefix: c.PathPrefix, + generatePresignedUrl: c.GeneratePresignedUrl, }, nil } -func (u *AliOSSUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) { +func (u *AliOSSUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, string, error) { storageFilepath = path.Join(u.prefix, storageFilepath) stat, err := os.Stat(localFilepath) if err != nil { - return "", 0, errors.ErrUploadFailed("AliOSS", err) + return "", 0, "", errors.ErrUploadFailed("AliOSS", err) } client, err := oss.New(u.conf.Endpoint, u.conf.AccessKey, u.conf.Secret) if err != nil { - return "", 0, errors.ErrUploadFailed("AliOSS", err) + return "", 0, "", errors.ErrUploadFailed("AliOSS", err) } bucket, err := client.Bucket(u.conf.Bucket) if err != nil { - return "", 0, errors.ErrUploadFailed("AliOSS", err) + return "", 0, "", errors.ErrUploadFailed("AliOSS", err) } err = bucket.PutObjectFromFile(storageFilepath, localFilepath) if err != nil { - return "", 0, errors.ErrUploadFailed("AliOSS", err) + return "", 0, "", errors.ErrUploadFailed("AliOSS", err) } - return fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, u.conf.Endpoint, storageFilepath), stat.Size(), nil + return fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, u.conf.Endpoint, storageFilepath), stat.Size(), "", nil } diff --git a/pkg/pipeline/sink/uploader/azure.go b/pkg/pipeline/sink/uploader/azure.go index 5c3d9f95..a85026b4 100644 --- a/pkg/pipeline/sink/uploader/azure.go +++ b/pkg/pipeline/sink/uploader/azure.go @@ -29,20 +29,23 @@ import ( ) type AzureUploader struct { - conf *config.AzureConfig - prefix string - container string + conf *config.AzureConfig + prefix string + container string + generatePresignedUrl bool } -func newAzureUploader(conf *config.AzureConfig, prefix string) (uploader, error) { +func newAzureUploader(c *config.StorageConfig) (uploader, error) { + conf := c.Azure return &AzureUploader{ - conf: conf, - prefix: prefix, - container: fmt.Sprintf("https://%s.blob.core.windows.net/%s", conf.AccountName, conf.ContainerName), + conf: conf, + prefix: c.PathPrefix, + generatePresignedUrl: c.GeneratePresignedUrl, + container: fmt.Sprintf("https://%s.blob.core.windows.net/%s", conf.AccountName, conf.ContainerName), }, nil } -func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) { +func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, string, error) { storageFilepath = path.Join(u.prefix, storageFilepath) credential, err := azblob.NewSharedKeyCredential( @@ -50,12 +53,12 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType u.conf.AccountKey, ) if err != nil { - return "", 0, errors.ErrUploadFailed("Azure", err) + return "", 0, "", errors.ErrUploadFailed("Azure", err) } azUrl, err := url.Parse(u.container) if err != nil { - return "", 0, errors.ErrUploadFailed("Azure", err) + return "", 0, "", errors.ErrUploadFailed("Azure", err) } pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{ @@ -71,7 +74,7 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType file, err := os.Open(localFilepath) if err != nil { - return "", 0, errors.ErrUploadFailed("Azure", err) + return "", 0, "", errors.ErrUploadFailed("Azure", err) } defer func() { _ = file.Close() @@ -79,7 +82,7 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType stat, err := file.Stat() if err != nil { - return "", 0, errors.ErrUploadFailed("Azure", err) + return "", 0, "", errors.ErrUploadFailed("Azure", err) } // upload blocks in parallel for optimal performance @@ -90,8 +93,8 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType Parallelism: 16, }) if err != nil { - return "", 0, errors.ErrUploadFailed("Azure", err) + return "", 0, "", errors.ErrUploadFailed("Azure", err) } - return fmt.Sprintf("%s/%s", u.container, storageFilepath), stat.Size(), nil + return fmt.Sprintf("%s/%s", u.container, storageFilepath), stat.Size(), "", nil } diff --git a/pkg/pipeline/sink/uploader/gcp.go b/pkg/pipeline/sink/uploader/gcp.go index 64acc7fc..3f26b8e6 100644 --- a/pkg/pipeline/sink/uploader/gcp.go +++ b/pkg/pipeline/sink/uploader/gcp.go @@ -37,15 +37,18 @@ import ( const storageScope = "https://www.googleapis.com/auth/devstorage.read_write" type GCPUploader struct { - conf *config.GCPConfig - prefix string - client *storage.Client + conf *config.GCPConfig + prefix string + generatePresignedUrl bool + client *storage.Client } -func newGCPUploader(conf *config.GCPConfig, prefix string) (uploader, error) { +func newGCPUploader(c *config.StorageConfig) (uploader, error) { + conf := c.GCP u := &GCPUploader{ - conf: conf, - prefix: prefix, + conf: conf, + prefix: c.PathPrefix, + generatePresignedUrl: c.GeneratePresignedUrl, } var opts []option.ClientOption @@ -73,7 +76,7 @@ func newGCPUploader(conf *config.GCPConfig, prefix string) (uploader, error) { defaultTransport.ProxyConnectHeader.Add("Proxy-Authorization", basicAuth) } } - c, err := storage.NewClient(context.Background(), opts...) + client, err := storage.NewClient(context.Background(), opts...) // restore default transport http.DefaultTransport = transportClone @@ -81,16 +84,16 @@ func newGCPUploader(conf *config.GCPConfig, prefix string) (uploader, error) { return nil, err } - u.client = c + u.client = client return u, nil } -func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) { +func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, string, error) { storageFilepath = path.Join(u.prefix, storageFilepath) file, err := os.Open(localFilepath) if err != nil { - return "", 0, errors.ErrUploadFailed("GCP", err) + return "", 0, "", errors.ErrUploadFailed("GCP", err) } defer func() { _ = file.Close() @@ -98,7 +101,7 @@ func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.Outp stat, err := file.Stat() if err != nil { - return "", 0, errors.ErrUploadFailed("GCP", err) + return "", 0, "", errors.ErrUploadFailed("GCP", err) } wc := u.client.Bucket(u.conf.Bucket).Object(storageFilepath).Retryer( @@ -113,12 +116,12 @@ func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.Outp wc.ChunkRetryDeadline = 0 if _, err = io.Copy(wc, file); err != nil { - return "", 0, errors.ErrUploadFailed("GCP", err) + return "", 0, "", errors.ErrUploadFailed("GCP", err) } if err = wc.Close(); err != nil { - return "", 0, errors.ErrUploadFailed("GCP", err) + return "", 0, "", errors.ErrUploadFailed("GCP", err) } - return fmt.Sprintf("https://%s.storage.googleapis.com/%s", u.conf.Bucket, storageFilepath), stat.Size(), nil + return fmt.Sprintf("https://%s.storage.googleapis.com/%s", u.conf.Bucket, storageFilepath), stat.Size(), "", nil } diff --git a/pkg/pipeline/sink/uploader/local.go b/pkg/pipeline/sink/uploader/local.go index dc7d6750..d3b0aabc 100644 --- a/pkg/pipeline/sink/uploader/local.go +++ b/pkg/pipeline/sink/uploader/local.go @@ -19,6 +19,7 @@ import ( "os" "path" + "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/types" ) @@ -26,39 +27,39 @@ type localUploader struct { prefix string } -func newLocalUploader(prefix string) (*localUploader, error) { - return &localUploader{prefix: prefix}, nil +func newLocalUploader(c *config.StorageConfig) (*localUploader, error) { + return &localUploader{prefix: c.PathPrefix}, nil } -func (u *localUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) { +func (u *localUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, string, error) { storageFilepath = path.Join(u.prefix, storageFilepath) stat, err := os.Stat(localFilepath) if err != nil { - return "", 0, err + return "", 0, "", err } dir, _ := path.Split(storageFilepath) if err = os.MkdirAll(dir, 0755); err != nil { - return "", 0, err + return "", 0, "", err } local, err := os.Open(localFilepath) if err != nil { - return "", 0, err + return "", 0, "", err } defer local.Close() storage, err := os.Create(storageFilepath) if err != nil { - return "", 0, err + return "", 0, "", err } defer storage.Close() _, err = io.Copy(storage, local) if err != nil { - return "", 0, err + return "", 0, "", err } - return storageFilepath, stat.Size(), nil + return storageFilepath, stat.Size(), "", nil } diff --git a/pkg/pipeline/sink/uploader/s3.go b/pkg/pipeline/sink/uploader/s3.go index 0f4490c3..a3e612b7 100644 --- a/pkg/pipeline/sink/uploader/s3.go +++ b/pkg/pipeline/sink/uploader/s3.go @@ -45,12 +45,14 @@ const ( ) type S3Uploader struct { - conf *config.S3Config - prefix string - awsConf *aws.Config + conf *config.S3Config + prefix string + generatePresignedUrl bool + awsConf *aws.Config } -func newS3Uploader(conf *config.S3Config, prefix string) (uploader, error) { +func newS3Uploader(c *config.StorageConfig) (uploader, error) { + conf := c.S3 opts := func(o *awsConfig.LoadOptions) error { if conf.Region != "" { o.Region = conf.Region @@ -97,7 +99,7 @@ func newS3Uploader(conf *config.S3Config, prefix string) (uploader, error) { awsConf, err := awsConfig.LoadDefaultConfig(context.Background(), opts) if err != nil { - return nil, err + return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "failed to load aws config: %v", err) } if conf.Endpoint != "" { @@ -109,9 +111,10 @@ func newS3Uploader(conf *config.S3Config, prefix string) (uploader, error) { } return &S3Uploader{ - conf: conf, - prefix: prefix, - awsConf: &awsConf, + conf: conf, + prefix: c.PathPrefix, + generatePresignedUrl: c.GeneratePresignedUrl, + awsConf: &awsConf, }, nil } @@ -132,12 +135,16 @@ func updateRegion(awsConf *aws.Config, bucket string) error { return nil } -func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) { +func (u *S3Uploader) upload( + localFilepath, storageFilepath string, + outputType types.OutputType, +) (string, int64, string, error) { + storageFilepath = path.Join(u.prefix, storageFilepath) file, err := os.Open(localFilepath) if err != nil { - return "", 0, errors.ErrUploadFailed("S3", err) + return "", 0, "", errors.ErrUploadFailed("S3", err) } defer func() { _ = file.Close() @@ -145,7 +152,7 @@ func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType ty stat, err := file.Stat() if err != nil { - return "", 0, errors.ErrUploadFailed("S3", err) + return "", 0, "", errors.ErrUploadFailed("S3", err) } l := &s3Logger{ @@ -159,7 +166,7 @@ func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType ty input := &s3.PutObjectInput{ Body: file, - Bucket: &u.conf.Bucket, + Bucket: aws.String(u.conf.Bucket), ContentType: aws.String(string(outputType)), Key: aws.String(storageFilepath), Metadata: u.conf.Metadata, @@ -176,7 +183,7 @@ func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType ty if _, err = manager.NewUploader(client).Upload(context.Background(), input); err != nil { l.log() - return "", 0, errors.ErrUploadFailed("S3", err) + return "", 0, "", errors.ErrUploadFailed("S3", err) } endpoint := "s3.amazonaws.com" @@ -191,7 +198,19 @@ func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType ty location = fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, endpoint, storageFilepath) } - return location, stat.Size(), nil + if !u.generatePresignedUrl { + return location, stat.Size(), "", nil + } + + res, err := s3.NewPresignClient(client).PresignGetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String(u.conf.Bucket), + Key: aws.String(storageFilepath), + }) + if err != nil { + return "", 0, "", errors.ErrUploadFailed("S3", err) + } + + return location, stat.Size(), res.URL, nil } // s3Logger only logs aws messages on upload failure diff --git a/pkg/pipeline/sink/uploader/uploader.go b/pkg/pipeline/sink/uploader/uploader.go index d106a54e..65c4afed 100644 --- a/pkg/pipeline/sink/uploader/uploader.go +++ b/pkg/pipeline/sink/uploader/uploader.go @@ -32,13 +32,14 @@ const ( ) type uploader interface { - upload(string, string, types.OutputType) (string, int64, error) + upload(string, string, types.OutputType) (string, int64, string, error) } type Uploader struct { - primary uploader - backup uploader - monitor *stats.HandlerMonitor + primary uploader + backup uploader + backupUsed bool + monitor *stats.HandlerMonitor } func New(conf, backup *config.StorageConfig, monitor *stats.HandlerMonitor) (*Uploader, error) { @@ -67,45 +68,64 @@ func New(conf, backup *config.StorageConfig, monitor *stats.HandlerMonitor) (*Up func getUploader(conf *config.StorageConfig) (uploader, error) { switch { case conf == nil: - return newLocalUploader("") + return newLocalUploader(&config.StorageConfig{}) case conf.S3 != nil: - return newS3Uploader(conf.S3, conf.PathPrefix) + return newS3Uploader(conf) case conf.GCP != nil: - return newGCPUploader(conf.GCP, conf.PathPrefix) + return newGCPUploader(conf) case conf.Azure != nil: - return newAzureUploader(conf.Azure, conf.PathPrefix) + return newAzureUploader(conf) case conf.AliOSS != nil: - return newAliOSSUploader(conf.AliOSS, conf.PathPrefix) + return newAliOSSUploader(conf) default: - return newLocalUploader(conf.PathPrefix) + return newLocalUploader(conf) } } -func (u *Uploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool) (string, int64, error) { +func (u *Uploader) Upload( + localFilepath, storageFilepath string, + outputType types.OutputType, + deleteAfterUpload bool, +) (string, int64, string, error) { + start := time.Now() - location, size, primaryErr := u.primary.upload(localFilepath, storageFilepath, outputType) + location, size, presignedUrl, primaryErr := u.primary.upload(localFilepath, storageFilepath, outputType) elapsed := time.Since(start) if primaryErr == nil { // success - u.monitor.IncUploadCountSuccess(string(outputType), float64(elapsed.Milliseconds())) + if u.monitor != nil { + u.monitor.IncUploadCountSuccess(string(outputType), float64(elapsed.Milliseconds())) + } if deleteAfterUpload { _ = os.Remove(localFilepath) } - return location, size, nil + return location, size, presignedUrl, nil } - u.monitor.IncUploadCountFailure(string(outputType), float64(elapsed.Milliseconds())) + if u.monitor != nil { + u.monitor.IncUploadCountFailure(string(outputType), float64(elapsed.Milliseconds())) + } if u.backup != nil { - location, size, backupErr := u.backup.upload(localFilepath, storageFilepath, outputType) + location, size, presignedUrl, backupErr := u.backup.upload(localFilepath, storageFilepath, outputType) if backupErr == nil { - u.monitor.IncBackupStorageWrites(string(outputType)) - return location, size, nil + u.backupUsed = true + if u.monitor != nil { + u.monitor.IncBackupStorageWrites(string(outputType)) + } + if deleteAfterUpload { + _ = os.Remove(localFilepath) + } + return location, size, presignedUrl, nil } - return "", 0, psrpc.NewErrorf(psrpc.InvalidArgument, - "primary and backup uploads failed: %s\n%s", primaryErr.Error(), backupErr.Error()) + return "", 0, "", psrpc.NewErrorf(psrpc.InvalidArgument, + "primary: %s\nbackup: %s", primaryErr.Error(), backupErr.Error()) } - return "", 0, primaryErr + return "", 0, "", primaryErr +} + +func (u *Uploader) ManifestRequired() bool { + return u.backupUsed } diff --git a/pkg/pipeline/sink/uploader/uploader_test.go b/pkg/pipeline/sink/uploader/uploader_test.go new file mode 100644 index 00000000..c1f5db52 --- /dev/null +++ b/pkg/pipeline/sink/uploader/uploader_test.go @@ -0,0 +1,65 @@ +package uploader + +import ( + "fmt" + "io" + "net/http" + "os" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/livekit/egress/pkg/config" +) + +func TestUploader(t *testing.T) { + key := os.Getenv("AWS_ACCESS_KEY") + secret := os.Getenv("AWS_SECRET") + region := os.Getenv("AWS_REGION") + bucket := os.Getenv("AWS_BUCKET") + + primary := &config.StorageConfig{ + S3: &config.S3Config{ + AccessKey: "nonsense", + Secret: "public", + Region: "us-east-1", + Bucket: "fake-bucket", + }, + } + backup := &config.StorageConfig{ + PathPrefix: "testProject", + S3: &config.S3Config{ + AccessKey: key, + Secret: secret, + Region: region, + Bucket: bucket, + }, + GeneratePresignedUrl: true, + } + + u, err := New(primary, backup, nil) + require.NoError(t, err) + + filepath := "uploader_test.go" + storagePath := "uploader_test.go" + + location, size, presignedUrl, err := u.Upload(filepath, storagePath, "test/plain", false) + require.NoError(t, err) + + expectedLocation := fmt.Sprintf("https://%s.s3.amazonaws.com/testProject/uploader_test.go", bucket) + + require.Equal(t, expectedLocation, location) + require.NotZero(t, size) + require.NotEmpty(t, presignedUrl) + + response, err := http.Get(presignedUrl) + require.NoError(t, err) + defer response.Body.Close() + + require.Equal(t, http.StatusOK, response.StatusCode) + b, err := io.ReadAll(response.Body) + require.NoError(t, err) + + require.True(t, strings.HasPrefix(string(b), "package uploader")) +} diff --git a/pkg/pipeline/sink/websocket.go b/pkg/pipeline/sink/websocket.go index 8f3d95c5..67cc7f05 100644 --- a/pkg/pipeline/sink/websocket.go +++ b/pkg/pipeline/sink/websocket.go @@ -217,3 +217,7 @@ func (s *WebsocketSink) Close() error { return nil } + +func (s *WebsocketSink) UploadManifest(_ string) (string, string, bool, error) { + return "", "", false, nil +} From 6e8006c613c68f4bdfda4dbc0d8771da6f880993 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 17 Oct 2024 15:30:12 -0400 Subject: [PATCH 2/9] init manifest --- pkg/config/manifest.go | 4 ---- pkg/config/pipeline.go | 3 ++- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/pkg/config/manifest.go b/pkg/config/manifest.go index 619f1a32..fa8642f9 100644 --- a/pkg/config/manifest.go +++ b/pkg/config/manifest.go @@ -81,10 +81,6 @@ func (p *PipelineConfig) initManifest() { AudioTrackID: p.AudioTrackID, VideoTrackID: p.VideoTrackID, } - - if p.GetSegmentConfig() != nil { - p.Manifest.Playlists = append(p.Manifest.Playlists, &Playlist{}) - } } } diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index eda9a444..30dfa1b2 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -381,7 +381,7 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error { } if err := p.updateDirectOutput(req.Track); err != nil { - return nil + return err } default: @@ -422,6 +422,7 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error { } } + p.initManifest() return nil } From 184d765ac56d8931bee2541b24266cc164da8606 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 17 Oct 2024 15:52:04 -0400 Subject: [PATCH 3/9] update deps --- go.mod | 6 +++--- go.sum | 8 ++++---- pkg/config/storage.go | 6 +++--- pkg/pipeline/sink/uploader/alioss.go | 4 ++++ pkg/pipeline/sink/uploader/azure.go | 4 ++++ pkg/pipeline/sink/uploader/gcp.go | 4 ++++ 6 files changed, 22 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index 0f7e9510..3343622f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/livekit/egress -go 1.22.1 +go 1.23.1 require ( cloud.google.com/go/storage v1.43.0 @@ -25,7 +25,7 @@ require ( github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/protocol v1.26.1-0.20241017190602-ef6fc8f9c752 github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 - github.com/livekit/server-sdk-go/v2 v2.2.1 + github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b github.com/pion/rtp v1.8.9 github.com/pion/webrtc/v3 v3.3.1 github.com/prometheus/client_golang v1.20.3 @@ -39,7 +39,7 @@ require ( golang.org/x/oauth2 v0.23.0 google.golang.org/api v0.196.0 google.golang.org/grpc v1.66.1 - google.golang.org/protobuf v1.34.2 + google.golang.org/protobuf v1.35.1 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 8f30c5c3..ba9282bd 100644 --- a/go.sum +++ b/go.sum @@ -222,8 +222,8 @@ github.com/livekit/protocol v1.26.1-0.20241017190602-ef6fc8f9c752 h1:EgULMfdFSW/ github.com/livekit/protocol v1.26.1-0.20241017190602-ef6fc8f9c752/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= -github.com/livekit/server-sdk-go/v2 v2.2.1 h1:MK6H52X/k5FA3d613qAawAVvWPwyzxS5CLpxfKd/pD4= -github.com/livekit/server-sdk-go/v2 v2.2.1/go.mod h1:Oqv4EYIqLWjdgWXw8HuxN79QDy77vws1vI9YMkBxSMM= +github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b h1:R1GpKwVbSYsG08k5sIkNCukvnrkOE18R8IO1YeujR8o= +github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b/go.mod h1:m2IukIyPCvJCdx04mdWIud9FoCBUAWc3526x3KgT8qY= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= github.com/mackerelio/go-osstat v0.2.5/go.mod h1:atxwWF+POUZcdtR1wnsUcQxTytoHG4uhl2AKKzrOajY= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= @@ -488,8 +488,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/pkg/config/storage.go b/pkg/config/storage.go index 4b40ebb3..d96590ab 100644 --- a/pkg/config/storage.go +++ b/pkg/config/storage.go @@ -21,14 +21,13 @@ import ( ) type StorageConfig struct { - PathPrefix string `yaml:"prefix"` // prefix applied to all filenames + PathPrefix string `yaml:"prefix"` // prefix applied to all filenames + GeneratePresignedUrl bool `yaml:"generate_presigned_url"` S3 *S3Config `yaml:"s3"` // upload to s3 Azure *AzureConfig `yaml:"azure"` // upload to azure GCP *GCPConfig `yaml:"gcp"` // upload to gcp AliOSS *S3Config `yaml:"alioss"` // upload to aliyun - - GeneratePresignedUrl bool `yaml:"generate_presigned_url"` } type S3Config struct { @@ -66,6 +65,7 @@ func (p *PipelineConfig) getStorageConfig(req egress.UploadRequest) *StorageConf sc := &StorageConfig{} if p.StorageConfig != nil { sc.PathPrefix = p.StorageConfig.PathPrefix + sc.GeneratePresignedUrl = p.StorageConfig.GeneratePresignedUrl } if s3 := req.GetS3(); s3 != nil { diff --git a/pkg/pipeline/sink/uploader/alioss.go b/pkg/pipeline/sink/uploader/alioss.go index 5e416186..6e60fcad 100644 --- a/pkg/pipeline/sink/uploader/alioss.go +++ b/pkg/pipeline/sink/uploader/alioss.go @@ -33,6 +33,10 @@ type AliOSSUploader struct { } func newAliOSSUploader(c *config.StorageConfig) (uploader, error) { + if c.GeneratePresignedUrl { + return nil, errors.ErrUploadFailed("AliOSS", fmt.Errorf("presigned URLs not supported")) + } + conf := c.AliOSS return &AliOSSUploader{ conf: conf, diff --git a/pkg/pipeline/sink/uploader/azure.go b/pkg/pipeline/sink/uploader/azure.go index a85026b4..d8d46c41 100644 --- a/pkg/pipeline/sink/uploader/azure.go +++ b/pkg/pipeline/sink/uploader/azure.go @@ -36,6 +36,10 @@ type AzureUploader struct { } func newAzureUploader(c *config.StorageConfig) (uploader, error) { + if c.GeneratePresignedUrl { + return nil, errors.ErrUploadFailed("Azure", fmt.Errorf("presigned URLs not supported")) + } + conf := c.Azure return &AzureUploader{ conf: conf, diff --git a/pkg/pipeline/sink/uploader/gcp.go b/pkg/pipeline/sink/uploader/gcp.go index 3f26b8e6..4d0114fb 100644 --- a/pkg/pipeline/sink/uploader/gcp.go +++ b/pkg/pipeline/sink/uploader/gcp.go @@ -44,6 +44,10 @@ type GCPUploader struct { } func newGCPUploader(c *config.StorageConfig) (uploader, error) { + if c.GeneratePresignedUrl { + return nil, errors.ErrUploadFailed("GCP", fmt.Errorf("presigned URLs not supported")) + } + conf := c.GCP u := &GCPUploader{ conf: conf, From 39fc3c71e45c8816db386744c242018e6cec2e23 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 17 Oct 2024 16:05:36 -0400 Subject: [PATCH 4/9] update livekit server --- go.mod | 40 ++++++++++++++-------------- go.sum | 83 +++++++++++++++++++++++++++++----------------------------- 2 files changed, 62 insertions(+), 61 deletions(-) diff --git a/go.mod b/go.mod index 3343622f..54fc2336 100644 --- a/go.mod +++ b/go.mod @@ -21,24 +21,24 @@ require ( github.com/go-logr/logr v1.4.2 github.com/googleapis/gax-go/v2 v2.13.0 github.com/gorilla/websocket v1.5.3 - github.com/livekit/livekit-server v1.7.2 + github.com/livekit/livekit-server v1.7.3-0.20241017190429-44a74fc06ae7 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/protocol v1.26.1-0.20241017190602-ef6fc8f9c752 github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b github.com/pion/rtp v1.8.9 - github.com/pion/webrtc/v3 v3.3.1 - github.com/prometheus/client_golang v1.20.3 + github.com/pion/webrtc/v3 v3.3.4 + github.com/prometheus/client_golang v1.20.4 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.59.1 github.com/stretchr/testify v1.9.0 github.com/urfave/cli/v2 v2.27.4 go.uber.org/atomic v1.11.0 go.uber.org/zap v1.27.0 - golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 + golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c golang.org/x/oauth2 v0.23.0 google.golang.org/api v0.196.0 - google.golang.org/grpc v1.66.1 + google.golang.org/grpc v1.67.1 google.golang.org/protobuf v1.35.1 gopkg.in/yaml.v3 v3.0.1 ) @@ -75,7 +75,7 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/elliotchance/orderedmap/v2 v2.2.0 // indirect + github.com/elliotchance/orderedmap/v2 v2.4.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gammazero/deque v0.2.1 // indirect @@ -89,10 +89,10 @@ require ( github.com/google/s2a-go v0.1.8 // indirect github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.3 // indirect - github.com/jellydator/ttlcache/v3 v3.2.0 // indirect + github.com/jellydator/ttlcache/v3 v3.3.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/jxskiss/base62 v1.1.0 // indirect - github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/lithammer/shortuuid/v4 v4.0.0 // indirect github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 // indirect @@ -102,18 +102,18 @@ require ( github.com/mattn/go-ieproxy v0.0.1 // indirect github.com/mattn/go-pointer v0.0.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/nats-io/nats.go v1.36.0 // indirect + github.com/nats-io/nats.go v1.37.0 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect - github.com/pion/datachannel v1.5.8 // indirect + github.com/pion/datachannel v1.5.9 // indirect github.com/pion/dtls/v2 v2.2.12 // indirect - github.com/pion/ice/v2 v2.3.34 // indirect - github.com/pion/interceptor v0.1.30 // indirect + github.com/pion/ice/v2 v2.3.36 // indirect + github.com/pion/interceptor v0.1.37 // indirect github.com/pion/logging v0.2.2 // indirect github.com/pion/mdns v0.0.12 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/rtcp v1.2.14 // indirect - github.com/pion/sctp v1.8.20 // indirect + github.com/pion/sctp v1.8.33 // indirect github.com/pion/sdp/v3 v3.0.9 // indirect github.com/pion/srtp/v2 v2.0.20 // indirect github.com/pion/stun v0.6.1 // indirect @@ -123,11 +123,11 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/puzpuzpuz/xsync/v3 v3.1.0 // indirect - github.com/redis/go-redis/v9 v9.6.1 // indirect + github.com/redis/go-redis/v9 v9.6.2 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect github.com/twitchtv/twirp v8.1.3+incompatible // indirect - github.com/wlynxg/anet v0.0.3 // indirect + github.com/wlynxg/anet v0.0.5 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opencensus.io v0.24.0 // indirect @@ -138,13 +138,13 @@ require ( go.opentelemetry.io/otel/trace v1.29.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap/exp v0.2.0 // indirect - golang.org/x/crypto v0.26.0 // indirect - golang.org/x/net v0.28.0 // indirect + golang.org/x/crypto v0.28.0 // indirect + golang.org/x/net v0.30.0 // indirect golang.org/x/sync v0.8.0 // indirect - golang.org/x/sys v0.24.0 // indirect - golang.org/x/text v0.17.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/text v0.19.0 // indirect golang.org/x/time v0.6.0 // indirect google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240827150818-7e3bb234dfed // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect ) diff --git a/go.sum b/go.sum index ba9282bd..87f2c7ed 100644 --- a/go.sum +++ b/go.sum @@ -106,14 +106,14 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4cyHeOFOnj4Vk= -github.com/elliotchance/orderedmap/v2 v2.2.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q= +github.com/elliotchance/orderedmap/v2 v2.4.0 h1:6tUmMwD9F998FNpwFxA5E6NQvSpk2PVw7RKsVq3+2Cw= +github.com/elliotchance/orderedmap/v2 v2.4.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= -github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= +github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM= +github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= @@ -188,14 +188,14 @@ github.com/googleapis/gax-go/v2 v2.13.0 h1:yitjD5f7jQHhyDsnhKEBU52NdvvdSeGzlAnDP github.com/googleapis/gax-go/v2 v2.13.0/go.mod h1:Z/fvTZXF8/uw7Xu5GuslPw+bplx6SS338j1Is2S+B7A= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE= -github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4= +github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc= +github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw= github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -212,8 +212,8 @@ github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80 h1:6Yzfa6GP0rIo/kUL github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs= github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw7k08o4c= github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= -github.com/livekit/livekit-server v1.7.2 h1:h0tQLWQySeArLl5JEhBUbHrHojCV/cVsFxnXvW4W2Nc= -github.com/livekit/livekit-server v1.7.2/go.mod h1:Kb1PcXIR2/VaJW1ExW/Lugah4Dw8ZuWShbTcv2fNM80= +github.com/livekit/livekit-server v1.7.3-0.20241017190429-44a74fc06ae7 h1:F1Y2BKevmuszSfQUqRrCPlfxL0wwTflY1BF84+uQgsY= +github.com/livekit/livekit-server v1.7.3-0.20241017190429-44a74fc06ae7/go.mod h1:du80dGfNEghRhYuGba78p0UOgrBgS9HktKXk1BNTlnk= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= @@ -236,23 +236,23 @@ github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU= -github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde h1:x0TT0RDC7UhAVbbWWBzr41ElhJx5tXPWkIHA2HWPRuw= github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde/go.mod h1:nZgzbfBr3hhjoZnS66nKrHmduYNpc34ny7RK4z5/HM0= -github.com/pion/datachannel v1.5.8 h1:ph1P1NsGkazkjrvyMfhRBUAWMxugJjq2HfQifaOoSNo= -github.com/pion/datachannel v1.5.8/go.mod h1:PgmdpoaNBLX9HNzNClmdki4DYW5JtI7Yibu8QzbL3tI= +github.com/pion/datachannel v1.5.9 h1:LpIWAOYPyDrXtU+BW7X0Yt/vGtYxtXQ8ql7dFfYUVZA= +github.com/pion/datachannel v1.5.9/go.mod h1:kDUuk4CU4Uxp82NH4LQZbISULkX/HtzKa4P7ldf9izE= github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk= github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= -github.com/pion/ice/v2 v2.3.34 h1:Ic1ppYCj4tUOcPAp76U6F3fVrlSw8A9JtRXLqw6BbUM= -github.com/pion/ice/v2 v2.3.34/go.mod h1:mBF7lnigdqgtB+YHkaY/Y6s6tsyRyo4u4rPGRuOjUBQ= -github.com/pion/interceptor v0.1.30 h1:au5rlVHsgmxNi+v/mjOPazbW1SHzfx7/hYOEYQnUcxA= -github.com/pion/interceptor v0.1.30/go.mod h1:RQuKT5HTdkP2Fi0cuOS5G5WNymTjzXaGF75J4k7z2nc= +github.com/pion/ice/v2 v2.3.36 h1:SopeXiVbbcooUg2EIR8sq4b13RQ8gzrkkldOVg+bBsc= +github.com/pion/ice/v2 v2.3.36/go.mod h1:mBF7lnigdqgtB+YHkaY/Y6s6tsyRyo4u4rPGRuOjUBQ= +github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI= +github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/mdns v0.0.12 h1:CiMYlY+O0azojWDmxdNr7ADGrnZ+V6Ilfner+6mSVK8= @@ -265,8 +265,8 @@ github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9 github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= github.com/pion/rtp v1.8.9 h1:E2HX740TZKaqdcPmf4pw6ZZuG8u5RlMMt+l3dxeu6Wk= github.com/pion/rtp v1.8.9/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= -github.com/pion/sctp v1.8.20 h1:sOc3lkV/tQaP57ZUEXIMdM2V92IIB2ia5v/ygnBxaEg= -github.com/pion/sctp v1.8.20/go.mod h1:oTxw8i5m+WbDHZJL/xUpe6CPIn1Y0GIKKwTLF4h53H8= +github.com/pion/sctp v1.8.33 h1:dSE4wX6uTJBcNm8+YlMg7lw1wqyKHggsP5uKbdj+NZw= +github.com/pion/sctp v1.8.33/go.mod h1:beTnqSzewI53KWoG3nqB282oDMGrhNxBdb+JZnkCwRM= github.com/pion/sdp/v3 v3.0.9 h1:pX++dCHoHUwq43kuwf3PyJfHlwIj4hXA7Vrifiq0IJY= github.com/pion/sdp/v3 v3.0.9/go.mod h1:B5xmvENq5IXJimIO4zfp6LAe1fD9N+kFv+V/1lOdz8M= github.com/pion/srtp/v2 v2.0.20 h1:HNNny4s+OUmG280ETrCdgFndp4ufx3/uy85EawYEhTk= @@ -284,14 +284,14 @@ github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uP github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= github.com/pion/turn/v2 v2.1.6 h1:Xr2niVsiPTB0FPtt+yAWKFUkU1eotQbGgpTIld4x1Gc= github.com/pion/turn/v2 v2.1.6/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= -github.com/pion/webrtc/v3 v3.3.1 h1:VAJr70z+YQ5sNwMhYA7HgRFfu9qlHWKRtNPTU7EY71s= -github.com/pion/webrtc/v3 v3.3.1/go.mod h1:hVmrDJvwhEertRWObeb1xzulzHGeVUoPlWvxdGzcfU0= +github.com/pion/webrtc/v3 v3.3.4 h1:v2heQVnXTSqNRXcaFQVOhIOYkLMxOu1iJG8uy1djvkk= +github.com/pion/webrtc/v3 v3.3.4/go.mod h1:liNa+E1iwyzyXqNUwvoMRNQ10x8h8FOeJKL8RkIbamE= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.20.3 h1:oPksm4K8B+Vt35tUhw6GbSNSgVlVSBH0qELP/7u83l4= -github.com/prometheus/client_golang v1.20.3/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_golang v1.20.4 h1:Tgh3Yr67PaOv/uTqloMsCEdeuFTatm5zIq5+qNN23vI= +github.com/prometheus/client_golang v1.20.4/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= @@ -301,8 +301,8 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/puzpuzpuz/xsync/v3 v3.1.0 h1:EewKT7/LNac5SLiEblJeUu8z5eERHrmRLnMQL2d7qX4= github.com/puzpuzpuz/xsync/v3 v3.1.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= -github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= -github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= +github.com/redis/go-redis/v9 v9.6.2 h1:w0uvkRbc9KpgD98zcvo5IrVUsn0lXpRMuhNgiHDJzdk= +github.com/redis/go-redis/v9 v9.6.2/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= @@ -325,8 +325,9 @@ github.com/twitchtv/twirp v8.1.3+incompatible h1:+F4TdErPgSUbMZMwp13Q/KgDVuI7HJX github.com/twitchtv/twirp v8.1.3+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A= github.com/urfave/cli/v2 v2.27.4 h1:o1owoI+02Eb+K107p27wEX9Bb8eqIoZCfLXloLUSWJ8= github.com/urfave/cli/v2 v2.27.4/go.mod h1:m4QzxcD2qpra4z7WhzEGn74WZLViBnMpb1ToCAKdGRQ= -github.com/wlynxg/anet v0.0.3 h1:PvR53psxFXstc12jelG6f1Lv4MWqE0tI76/hHGjh9rg= github.com/wlynxg/anet v0.0.3/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= +github.com/wlynxg/anet v0.0.5 h1:J3VJGi1gvo0JwZ/P1/Yc/8p63SoW98B5dHkYDmpgvvU= +github.com/wlynxg/anet v0.0.5/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= @@ -367,11 +368,11 @@ golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= -golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= -golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= +golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= -golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= +golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY= +golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -393,8 +394,8 @@ golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= -golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= -golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= @@ -424,8 +425,8 @@ golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= -golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -443,8 +444,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= -golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -468,15 +469,15 @@ google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1 h1:BulPr26Jqjnd4eY google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:hL97c3SYopEHblzpxRL4lSs523++l8DYxGM1FQiYmb4= google.golang.org/genproto/googleapis/api v0.0.0-20240827150818-7e3bb234dfed h1:3RgNmBoI9MZhsj3QxC+AP/qQhNwpCLOvYDYYsFrhFt0= google.golang.org/genproto/googleapis/api v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:OCdP9MfskevB/rbYvHTsXTtKC+3bHWajPdoKgjcYkfo= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 h1:X58yt85/IXCx0Y3ZwN6sEIKZzQtDEYaBWrDvErdXrRE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= -google.golang.org/grpc v1.66.1 h1:hO5qAXR19+/Z44hmvIM4dQFMSYX9XcWsByfoxutBpAM= -google.golang.org/grpc v1.66.1/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= From 1f00aa0da3c49b020f8b0ef417df7f546280cfb5 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 17 Oct 2024 16:24:15 -0400 Subject: [PATCH 5/9] fix debug uploads --- pkg/pipeline/debug.go | 4 ++-- test/download.go | 15 +++++++++++++++ test/file.go | 6 +++++- test/segments.go | 25 ++++++++++--------------- 4 files changed, 32 insertions(+), 18 deletions(-) diff --git a/pkg/pipeline/debug.go b/pkg/pipeline/debug.go index 4ccb9d4a..a037fa8f 100644 --- a/pkg/pipeline/debug.go +++ b/pkg/pipeline/debug.go @@ -90,7 +90,7 @@ func (c *Controller) uploadTrackFiles(u *uploader.Uploader) { if strings.HasSuffix(f.Name(), ".csv") { local := path.Join(c.TmpDir, f.Name()) storage := path.Join(c.Info.EgressId, f.Name()) - _, _, err = u.Upload(local, storage, types.OutputTypeBlob, false) + _, _, _, err = u.Upload(local, storage, types.OutputTypeBlob, false) if err != nil { logger.Errorw("failed to upload debug file", err) return @@ -131,7 +131,7 @@ func (c *Controller) uploadDebugFile(u *uploader.Uploader, data []byte, fileExte return } - _, _, err = u.Upload(local, path.Join(storageDir, filename), types.OutputTypeBlob, false) + _, _, _, err = u.Upload(local, path.Join(storageDir, filename), types.OutputTypeBlob, false) if err != nil { logger.Errorw("failed to upload debug file", err) return diff --git a/test/download.go b/test/download.go index 0d82b927..97611c4d 100644 --- a/test/download.go +++ b/test/download.go @@ -18,6 +18,7 @@ package test import ( "context" + "encoding/json" "fmt" "io" "net/url" @@ -39,6 +40,20 @@ import ( "github.com/livekit/protocol/logger" ) +func loadManifest(t *testing.T, c *config.StorageConfig, localFilepath, storageFilepath string) *config.Manifest { + download(t, c, localFilepath, storageFilepath) + defer os.Remove(localFilepath) + + b, err := os.ReadFile(localFilepath) + require.NoError(t, err) + + m := &config.Manifest{} + err = json.Unmarshal(b, m) + require.NoError(t, err) + + return m +} + func download(t *testing.T, c *config.StorageConfig, localFilepath, storageFilepath string) { if c != nil { if c.S3 != nil { diff --git a/test/file.go b/test/file.go index 9e350daf..569c4fb6 100644 --- a/test/file.go +++ b/test/file.go @@ -266,7 +266,11 @@ func (r *Runner) verifyFile(t *testing.T, p *config.PipelineConfig, res *livekit // download from cloud storage localPath = path.Join(r.FilePrefix, storageFilename) download(t, p.GetFileConfig().StorageConfig, localPath, storagePath) - download(t, p.GetFileConfig().StorageConfig, localPath+".json", storagePath+".json") + + manifestLocal := path.Join(path.Dir(localPath), res.EgressId+".json") + manifestStorage := path.Join(path.Dir(storagePath), res.EgressId+".json") + manifest := loadManifest(t, p.GetFileConfig().StorageConfig, manifestLocal, manifestStorage) + require.NotNil(t, manifest) // verify verify(t, localPath, p, res, types.EgressTypeFile, r.Muting, r.sourceFramerate, false) diff --git a/test/segments.go b/test/segments.go index f9f5a587..6c6e2687 100644 --- a/test/segments.go +++ b/test/segments.go @@ -17,7 +17,6 @@ package test import ( - "fmt" "os" "path" "strconv" @@ -195,18 +194,11 @@ func (r *Runner) verifySegments(t *testing.T, p *config.PipelineConfig, filename require.Greater(t, segments.Duration, int64(0)) r.verifySegmentOutput(t, p, filenameSuffix, segments.PlaylistName, segments.PlaylistLocation, int(segments.SegmentCount), res, m3u8.PlaylistTypeEvent) - r.verifyManifest(t, p, segments.PlaylistName) if enableLivePlaylist { r.verifySegmentOutput(t, p, filenameSuffix, segments.LivePlaylistName, segments.LivePlaylistLocation, 5, res, m3u8.PlaylistTypeLive) } } -func (r *Runner) verifyManifest(t *testing.T, p *config.PipelineConfig, plName string) { - localPlaylistPath := path.Join(r.FilePrefix, path.Base(plName)) - - download(t, p.GetSegmentConfig().StorageConfig, localPlaylistPath+".json", plName+".json") -} - func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, filenameSuffix livekit.SegmentedFileSuffix, plName string, plLocation string, segmentCount int, res *livekit.EgressInfo, plType m3u8.PlaylistType) { require.NotEmpty(t, plName) require.NotEmpty(t, plLocation) @@ -217,13 +209,16 @@ func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, fil // download from cloud storage localPlaylistPath = path.Join(r.FilePrefix, path.Base(storedPlaylistPath)) download(t, p.GetSegmentConfig().StorageConfig, localPlaylistPath, storedPlaylistPath) - if plType == m3u8.PlaylistTypeEvent { - // Only download segments once - base := storedPlaylistPath[:len(storedPlaylistPath)-5] - for i := 0; i < segmentCount; i++ { - cloudPath := fmt.Sprintf("%s_%05d.ts", base, i) - localPath := path.Join(r.FilePrefix, path.Base(cloudPath)) - download(t, p.GetSegmentConfig().StorageConfig, localPath, cloudPath) + + manifestLocal := path.Join(path.Dir(localPlaylistPath), res.EgressId+".json") + manifestStorage := path.Join(path.Dir(storedPlaylistPath), res.EgressId+".json") + manifest := loadManifest(t, p.GetSegmentConfig().StorageConfig, manifestLocal, manifestStorage) + + for _, playlist := range manifest.Playlists { + require.Equal(t, segmentCount, len(playlist.Segments)) + for _, segment := range playlist.Segments { + localPath := path.Join(r.FilePrefix, path.Base(segment.Filename)) + download(t, p.GetSegmentConfig().StorageConfig, localPath, segment.Filename) } } From a47f750c16d501d88c4fc3f4bde86678e96ccf67 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 17 Oct 2024 16:41:48 -0400 Subject: [PATCH 6/9] fix live playlist tests --- test/builder.go | 2 +- test/segments.go | 20 +++++++++++--------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/test/builder.go b/test/builder.go index 4489aca5..b9700f61 100644 --- a/test/builder.go +++ b/test/builder.go @@ -289,7 +289,7 @@ func (r *Runner) buildStreamOutputs(o *streamOptions) []*livekit.StreamOutput { } func (r *Runner) buildSegmentOutputs(o *segmentOptions) []*livekit.SegmentedFileOutput { - if u := r.getUploadConfig(); u != nil && o.suffix == livekit.SegmentedFileSuffix_INDEX { + if u := r.getUploadConfig(); u != nil { output := &livekit.SegmentedFileOutput{ FilenamePrefix: path.Join(uploadPrefix, o.prefix), PlaylistName: o.playlist, diff --git a/test/segments.go b/test/segments.go index 6c6e2687..7ecc2ea3 100644 --- a/test/segments.go +++ b/test/segments.go @@ -210,15 +210,17 @@ func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, fil localPlaylistPath = path.Join(r.FilePrefix, path.Base(storedPlaylistPath)) download(t, p.GetSegmentConfig().StorageConfig, localPlaylistPath, storedPlaylistPath) - manifestLocal := path.Join(path.Dir(localPlaylistPath), res.EgressId+".json") - manifestStorage := path.Join(path.Dir(storedPlaylistPath), res.EgressId+".json") - manifest := loadManifest(t, p.GetSegmentConfig().StorageConfig, manifestLocal, manifestStorage) - - for _, playlist := range manifest.Playlists { - require.Equal(t, segmentCount, len(playlist.Segments)) - for _, segment := range playlist.Segments { - localPath := path.Join(r.FilePrefix, path.Base(segment.Filename)) - download(t, p.GetSegmentConfig().StorageConfig, localPath, segment.Filename) + if plType == m3u8.PlaylistTypeEvent { + manifestLocal := path.Join(path.Dir(localPlaylistPath), res.EgressId+".json") + manifestStorage := path.Join(path.Dir(storedPlaylistPath), res.EgressId+".json") + manifest := loadManifest(t, p.GetSegmentConfig().StorageConfig, manifestLocal, manifestStorage) + + for _, playlist := range manifest.Playlists { + require.Equal(t, segmentCount, len(playlist.Segments)) + for _, segment := range playlist.Segments { + localPath := path.Join(r.FilePrefix, path.Base(segment.Filename)) + download(t, p.GetSegmentConfig().StorageConfig, localPath, segment.Filename) + } } } From 3a65ca7268691a4ea9d2873226987f08169b384e Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 17 Oct 2024 17:25:14 -0400 Subject: [PATCH 7/9] fix file/segment multi test --- test/download.go | 40 +++++++++++++++++++++++----------------- test/file.go | 2 +- test/images.go | 2 +- test/segments.go | 18 ++++++++++++++---- 4 files changed, 39 insertions(+), 23 deletions(-) diff --git a/test/download.go b/test/download.go index 97611c4d..7c16deaf 100644 --- a/test/download.go +++ b/test/download.go @@ -41,7 +41,7 @@ import ( ) func loadManifest(t *testing.T, c *config.StorageConfig, localFilepath, storageFilepath string) *config.Manifest { - download(t, c, localFilepath, storageFilepath) + download(t, c, localFilepath, storageFilepath, false) defer os.Remove(localFilepath) b, err := os.ReadFile(localFilepath) @@ -54,22 +54,22 @@ func loadManifest(t *testing.T, c *config.StorageConfig, localFilepath, storageF return m } -func download(t *testing.T, c *config.StorageConfig, localFilepath, storageFilepath string) { +func download(t *testing.T, c *config.StorageConfig, localFilepath, storageFilepath string, delete bool) { if c != nil { if c.S3 != nil { logger.Debugw("s3 download", "localFilepath", localFilepath, "storageFilepath", storageFilepath) - downloadS3(t, c.S3, localFilepath, storageFilepath) + downloadS3(t, c.S3, localFilepath, storageFilepath, delete) } else if c.GCP != nil { logger.Debugw("gcp download", "localFilepath", localFilepath, "storageFilepath", storageFilepath) - downloadGCP(t, c.GCP, localFilepath, storageFilepath) + downloadGCP(t, c.GCP, localFilepath, storageFilepath, delete) } else if c.Azure != nil { logger.Debugw("azure download", "localFilepath", localFilepath, "storageFilepath", storageFilepath) - downloadAzure(t, c.Azure, localFilepath, storageFilepath) + downloadAzure(t, c.Azure, localFilepath, storageFilepath, delete) } } } -func downloadS3(t *testing.T, conf *config.S3Config, localFilepath, storageFilepath string) { +func downloadS3(t *testing.T, conf *config.S3Config, localFilepath, storageFilepath string, delete bool) { file, err := os.Create(localFilepath) require.NoError(t, err) defer file.Close() @@ -99,14 +99,16 @@ func downloadS3(t *testing.T, conf *config.S3Config, localFilepath, storageFilep ) require.NoError(t, err) - _, err = s3Client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ - Bucket: aws.String(conf.Bucket), - Key: aws.String(storageFilepath), - }) - require.NoError(t, err) + if delete { + _, err = s3Client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ + Bucket: aws.String(conf.Bucket), + Key: aws.String(storageFilepath), + }) + require.NoError(t, err) + } } -func downloadAzure(t *testing.T, conf *config.AzureConfig, localFilepath, storageFilepath string) { +func downloadAzure(t *testing.T, conf *config.AzureConfig, localFilepath, storageFilepath string, delete bool) { credential, err := azblob.NewSharedKeyCredential( conf.AccountName, conf.AccountKey, @@ -140,11 +142,13 @@ func downloadAzure(t *testing.T, conf *config.AzureConfig, localFilepath, storag }) require.NoError(t, err) - _, err = blobURL.Delete(context.Background(), azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) - require.NoError(t, err) + if delete { + _, err = blobURL.Delete(context.Background(), azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) + require.NoError(t, err) + } } -func downloadGCP(t *testing.T, conf *config.GCPConfig, localFilepath, storageFilepath string) { +func downloadGCP(t *testing.T, conf *config.GCPConfig, localFilepath, storageFilepath string, delete bool) { ctx := context.Background() var client *storage.Client @@ -176,6 +180,8 @@ func downloadGCP(t *testing.T, conf *config.GCPConfig, localFilepath, storageFil _ = rc.Close() require.NoError(t, err) - err = client.Bucket(conf.Bucket).Object(storageFilepath).Delete(context.Background()) - require.NoError(t, err) + if delete { + err = client.Bucket(conf.Bucket).Object(storageFilepath).Delete(context.Background()) + require.NoError(t, err) + } } diff --git a/test/file.go b/test/file.go index 569c4fb6..8e4dae63 100644 --- a/test/file.go +++ b/test/file.go @@ -265,7 +265,7 @@ func (r *Runner) verifyFile(t *testing.T, p *config.PipelineConfig, res *livekit // download from cloud storage localPath = path.Join(r.FilePrefix, storageFilename) - download(t, p.GetFileConfig().StorageConfig, localPath, storagePath) + download(t, p.GetFileConfig().StorageConfig, localPath, storagePath, true) manifestLocal := path.Join(path.Dir(localPath), res.EgressId+".json") manifestStorage := path.Join(path.Dir(storagePath), res.EgressId+".json") diff --git a/test/images.go b/test/images.go index 9c4202e9..5d39a0d1 100644 --- a/test/images.go +++ b/test/images.go @@ -114,6 +114,6 @@ func (r *Runner) verifyImages(t *testing.T, p *config.PipelineConfig, res *livek for i := range images.ImageCount { storagePath := fmt.Sprintf("%s_%05d%s", images.FilenamePrefix, i, imageConfig.ImageExtension) localPath := path.Join(r.FilePrefix, path.Base(storagePath)) - download(t, imageConfig.StorageConfig, localPath, storagePath) + download(t, imageConfig.StorageConfig, localPath, storagePath, true) } } diff --git a/test/segments.go b/test/segments.go index 7ecc2ea3..33897ce5 100644 --- a/test/segments.go +++ b/test/segments.go @@ -180,7 +180,11 @@ func (r *Runner) runSegmentsTest(t *testing.T, test *testCase) { r.verifySegments(t, p, test.segmentOptions.suffix, res, test.livePlaylist != "") } -func (r *Runner) verifySegments(t *testing.T, p *config.PipelineConfig, filenameSuffix livekit.SegmentedFileSuffix, res *livekit.EgressInfo, enableLivePlaylist bool) { +func (r *Runner) verifySegments( + t *testing.T, p *config.PipelineConfig, + filenameSuffix livekit.SegmentedFileSuffix, + res *livekit.EgressInfo, enableLivePlaylist bool, +) { // egress info require.Equal(t, res.Error == "", res.Status != livekit.EgressStatus_EGRESS_FAILED) require.NotZero(t, res.StartedAt) @@ -199,7 +203,13 @@ func (r *Runner) verifySegments(t *testing.T, p *config.PipelineConfig, filename } } -func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, filenameSuffix livekit.SegmentedFileSuffix, plName string, plLocation string, segmentCount int, res *livekit.EgressInfo, plType m3u8.PlaylistType) { +func (r *Runner) verifySegmentOutput( + t *testing.T, p *config.PipelineConfig, + filenameSuffix livekit.SegmentedFileSuffix, + plName string, plLocation string, segmentCount int, + res *livekit.EgressInfo, plType m3u8.PlaylistType, +) { + require.NotEmpty(t, plName) require.NotEmpty(t, plLocation) @@ -208,7 +218,7 @@ func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, fil // download from cloud storage localPlaylistPath = path.Join(r.FilePrefix, path.Base(storedPlaylistPath)) - download(t, p.GetSegmentConfig().StorageConfig, localPlaylistPath, storedPlaylistPath) + download(t, p.GetSegmentConfig().StorageConfig, localPlaylistPath, storedPlaylistPath, true) if plType == m3u8.PlaylistTypeEvent { manifestLocal := path.Join(path.Dir(localPlaylistPath), res.EgressId+".json") @@ -219,7 +229,7 @@ func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, fil require.Equal(t, segmentCount, len(playlist.Segments)) for _, segment := range playlist.Segments { localPath := path.Join(r.FilePrefix, path.Base(segment.Filename)) - download(t, p.GetSegmentConfig().StorageConfig, localPath, segment.Filename) + download(t, p.GetSegmentConfig().StorageConfig, localPath, segment.Filename, true) } } } From c22eb3d0f0be2707f9cc0a828621c732cc84f2ad Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 17 Oct 2024 20:15:48 -0400 Subject: [PATCH 8/9] omitempty --- pkg/config/manifest.go | 12 ++++++------ pkg/pipeline/sink/segments.go | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/config/manifest.go b/pkg/config/manifest.go index fa8642f9..04c099ba 100644 --- a/pkg/config/manifest.go +++ b/pkg/config/manifest.go @@ -47,10 +47,10 @@ type File struct { } type Playlist struct { - mu sync.Mutex - PlaylistLocation string `json:"playlist_location"` - PlaylistPresignedUrl string `json:"playlist_presigned_url"` - Segments []*Segment `json:"segments,omitempty"` + mu sync.Mutex + Location string `json:"location,omitempty"` + PresignedUrl string `json:"presigned_url,omitempty"` + Segments []*Segment `json:"segments,omitempty"` } type Segment struct { @@ -124,8 +124,8 @@ func (m *Manifest) AddPlaylist() *Playlist { func (p *Playlist) UpdateLocation(location, presignedUrl string) { p.mu.Lock() - p.PlaylistLocation = location - p.PlaylistPresignedUrl = presignedUrl + p.Location = location + p.PresignedUrl = presignedUrl p.mu.Unlock() } diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index daa834a8..119ae5be 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -226,8 +226,8 @@ func (s *SegmentSink) uploadPlaylist() error { if err == nil { s.SegmentsInfo.PlaylistLocation = playlistLocation if s.manifestPlaylist != nil { - s.manifestPlaylist.PlaylistLocation = playlistLocation - s.manifestPlaylist.PlaylistPresignedUrl = presignedUrl + s.manifestPlaylist.Location = playlistLocation + s.manifestPlaylist.PresignedUrl = presignedUrl } } return err From ce5abc12e7efbaa8efc3dbc8a8578a1d6cf6e027 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 21 Oct 2024 16:09:58 -0400 Subject: [PATCH 9/9] CBR for rtmp only --- pkg/pipeline/builder/video.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index 4e6ada55..d9ff57ab 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -564,7 +564,7 @@ func (b *VideoBin) addEncoder() error { return errors.ErrGstPipelineError(err) } - if b.conf.GetStreamConfig() != nil { + if sc := b.conf.GetStreamConfig(); sc != nil && sc.OutputType == types.OutputTypeRTMP { options = append(options, "nal-hrd=cbr") } if len(options) > 0 {