From 21bffe780ce255e8eabf38a66e52080d250f34f6 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 14 Nov 2023 16:21:17 -0800 Subject: [PATCH] fix panic --- pkg/pipeline/debug.go | 2 +- pkg/pipeline/sink/segments.go | 2 +- pkg/pipeline/sink/sink.go | 3 ++- pkg/pipeline/sink/uploader/uploader.go | 4 ++-- test/participant.go | 8 ++++---- test/room_composite.go | 10 +++++----- test/track.go | 4 ++-- test/track_composite.go | 10 +++++----- test/web.go | 8 ++++---- 9 files changed, 26 insertions(+), 25 deletions(-) diff --git a/pkg/pipeline/debug.go b/pkg/pipeline/debug.go index bed3b3e8..ec24b420 100644 --- a/pkg/pipeline/debug.go +++ b/pkg/pipeline/debug.go @@ -38,7 +38,7 @@ func (c *Controller) GetGstPipelineDebugDot() string { } func (c *Controller) uploadDebugFiles() { - monitor := *stats.NewHandlerMonitor(c.NodeID, c.ClusterID, c.Info.EgressId) + monitor := stats.NewHandlerMonitor(c.NodeID, c.ClusterID, c.Info.EgressId) u, err := uploader.New(c.Debug.ToUploadConfig(), "", monitor) if err != nil { logger.Errorw("failed to create uploader", err) diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index 33d06442..2ad0ec07 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -71,7 +71,7 @@ type SegmentUpdate struct { uploadComplete chan struct{} } -func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig, callbacks *gstreamer.Callbacks, monitor stats.HandlerMonitor) (*SegmentSink, error) { +func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig, callbacks *gstreamer.Callbacks, monitor *stats.HandlerMonitor) (*SegmentSink, error) { playlistName := path.Join(o.LocalDir, o.PlaylistFilename) playlist, err := m3u8.NewEventPlaylistWriter(playlistName, o.SegmentDuration) if err != nil { diff --git a/pkg/pipeline/sink/sink.go b/pkg/pipeline/sink/sink.go index edf5805b..7ac6b5ca 100644 --- a/pkg/pipeline/sink/sink.go +++ b/pkg/pipeline/sink/sink.go @@ -30,13 +30,14 @@ type Sink interface { func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[types.EgressType][]Sink, error) { sinks := make(map[types.EgressType][]Sink) + monitor := stats.NewHandlerMonitor(p.NodeID, p.ClusterID, p.Info.EgressId) for egressType, c := range p.Outputs { if len(c) == 0 { continue } + var s Sink var err error - monitor := *stats.NewHandlerMonitor(p.NodeID, p.ClusterID, p.Info.EgressId) switch egressType { case types.EgressTypeFile: o := c[0].(*config.FileConfig) diff --git a/pkg/pipeline/sink/uploader/uploader.go b/pkg/pipeline/sink/uploader/uploader.go index 04760cd1..cb5cde09 100644 --- a/pkg/pipeline/sink/uploader/uploader.go +++ b/pkg/pipeline/sink/uploader/uploader.go @@ -42,7 +42,7 @@ type uploader interface { upload(string, string, types.OutputType) (string, int64, error) } -func New(conf config.UploadConfig, backup string, monitor stats.HandlerMonitor) (Uploader, error) { +func New(conf config.UploadConfig, backup string, monitor *stats.HandlerMonitor) (Uploader, error) { var u uploader var err error @@ -77,7 +77,7 @@ type remoteUploader struct { uploader backup string - monitor stats.HandlerMonitor + monitor *stats.HandlerMonitor } func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, fileType string) (string, int64, error) { diff --git a/test/participant.go b/test/participant.go index f1e56497..49d1a11c 100644 --- a/test/participant.go +++ b/test/participant.go @@ -61,7 +61,7 @@ func (r *Runner) testParticipantFile(t *testing.T) { return } - t.Run("Participant/File", func(t *testing.T) { + t.Run("3A/Participant/File", func(t *testing.T) { for _, test := range []*testCase{ { name: "VP8", @@ -142,7 +142,7 @@ func (r *Runner) testParticipantStream(t *testing.T) { videoCodec: types.MimeTypeVP8, } - r.runParticipantTest(t, "Participant/Stream", test, + r.runParticipantTest(t, "3B/Participant/Stream", test, func(t *testing.T, identity string) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), @@ -167,7 +167,7 @@ func (r *Runner) testParticipantSegments(t *testing.T) { return } - t.Run("Participant/Segments", func(t *testing.T) { + t.Run("3C/Participant/Segments", func(t *testing.T) { for _, test := range []*testCase{ { name: "VP8", @@ -243,7 +243,7 @@ func (r *Runner) testParticipantMulti(t *testing.T) { videoDelay: time.Second * 10, } - r.runParticipantTest(t, "Participant/Multi", test, + r.runParticipantTest(t, "3D/Participant/Multi", test, func(t *testing.T, identity string) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), diff --git a/test/room_composite.go b/test/room_composite.go index 30f665eb..83e177a7 100644 --- a/test/room_composite.go +++ b/test/room_composite.go @@ -55,7 +55,7 @@ func (r *Runner) testRoomCompositeFile(t *testing.T) { return } - t.Run("RoomComposite/File", func(t *testing.T) { + t.Run("1A/RoomComposite/File", func(t *testing.T) { for _, test := range []*testCase{ { name: "Base", @@ -132,7 +132,7 @@ func (r *Runner) testRoomCompositeStream(t *testing.T) { return } - t.Run("RoomComposite/Stream", func(t *testing.T) { + t.Run("1B/RoomComposite/Stream", func(t *testing.T) { r.runRoomTest(t, "Rtmp", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), @@ -193,7 +193,7 @@ func (r *Runner) testRoomCompositeSegments(t *testing.T) { return } - r.runRoomTest(t, "RoomComposite/Segments", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { + r.runRoomTest(t, "1C/RoomComposite/Segments", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { for _, test := range []*testCase{ { options: &livekit.EncodingOptions{ @@ -261,7 +261,7 @@ func (r *Runner) testRoomCompositeImages(t *testing.T) { return } - r.runRoomTest(t, "RoomComposite/Images", types.MimeTypeOpus, types.MimeTypeH264, func(t *testing.T) { + r.runRoomTest(t, "1D/RoomComposite/Images", types.MimeTypeOpus, types.MimeTypeH264, func(t *testing.T) { for _, test := range []*testCase{ { options: &livekit.EncodingOptions{ @@ -308,7 +308,7 @@ func (r *Runner) testRoomCompositeMulti(t *testing.T) { return } - r.runRoomTest(t, "RoomComposite/Multi", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { + r.runRoomTest(t, "1E/RoomComposite/Multi", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), Request: &rpc.StartEgressRequest_RoomComposite{ diff --git a/test/track.go b/test/track.go index 3f116cb7..b20f7119 100644 --- a/test/track.go +++ b/test/track.go @@ -52,7 +52,7 @@ func (r *Runner) testTrackFile(t *testing.T) { return } - t.Run("Track/File", func(t *testing.T) { + t.Run("5A/Track/File", func(t *testing.T) { for _, test := range []*testCase{ { name: "OPUS", @@ -120,7 +120,7 @@ func (r *Runner) testTrackStream(t *testing.T) { return } - t.Run("Track/Stream", func(t *testing.T) { + t.Run("5B/Track/Stream", func(t *testing.T) { now := time.Now().Unix() for _, test := range []*testCase{ { diff --git a/test/track_composite.go b/test/track_composite.go index cf0aa128..60d10e79 100644 --- a/test/track_composite.go +++ b/test/track_composite.go @@ -54,7 +54,7 @@ func (r *Runner) testTrackCompositeFile(t *testing.T) { return } - t.Run("TrackComposite/File", func(t *testing.T) { + t.Run("4A/TrackComposite/File", func(t *testing.T) { for _, test := range []*testCase{ { name: "VP8", @@ -125,7 +125,7 @@ func (r *Runner) testTrackCompositeStream(t *testing.T) { return } - r.runTrackTest(t, "TrackComposite/Stream", types.MimeTypeOpus, types.MimeTypeVP8, + r.runTrackTest(t, "4B/TrackComposite/Stream", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T, audioTrackID, videoTrackID string) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), @@ -151,7 +151,7 @@ func (r *Runner) testTrackCompositeSegments(t *testing.T) { return } - t.Run("TrackComposite/Segments", func(t *testing.T) { + t.Run("4C/TrackComposite/Segments", func(t *testing.T) { for _, test := range []*testCase{ { name: "VP8", @@ -234,7 +234,7 @@ func (r *Runner) testTrackCompositeImages(t *testing.T) { return } - t.Run("TrackComposite/Images", func(t *testing.T) { + t.Run("4D/TrackComposite/Images", func(t *testing.T) { for _, test := range []*testCase{ { name: "VP8", @@ -295,7 +295,7 @@ func (r *Runner) testTrackCompositeMulti(t *testing.T) { return } - r.runTrackTest(t, "TrackComposite/Multi", types.MimeTypeOpus, types.MimeTypeVP8, + r.runTrackTest(t, "4E/TrackComposite/Multi", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T, audioTrackID, videoTrackID string) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), diff --git a/test/web.go b/test/web.go index 0d31669b..73f564ad 100644 --- a/test/web.go +++ b/test/web.go @@ -48,7 +48,7 @@ func (r *Runner) testWebFile(t *testing.T) { return } - r.runWebTest(t, "Web/File", func(t *testing.T) { + r.runWebTest(t, "2A/Web/File", func(t *testing.T) { fileOutput := &livekit.EncodedFileOutput{ Filepath: r.getFilePath("web_{time}"), } @@ -81,7 +81,7 @@ func (r *Runner) testWebStream(t *testing.T) { return } - r.runWebTest(t, "Web/Stream", func(t *testing.T) { + r.runWebTest(t, "2B/Web/Stream", func(t *testing.T) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), Request: &rpc.StartEgressRequest_Web{ @@ -106,7 +106,7 @@ func (r *Runner) testWebSegments(t *testing.T) { return } - r.runWebTest(t, "Web/Segments", func(t *testing.T) { + r.runWebTest(t, "2C/Web/Segments", func(t *testing.T) { segmentOutput := &livekit.SegmentedFileOutput{ FilenamePrefix: r.getFilePath("web_{time}"), PlaylistName: "web_{time}.m3u8", @@ -139,7 +139,7 @@ func (r *Runner) testWebMulti(t *testing.T) { return } - r.runWebTest(t, "Web/Multi", func(t *testing.T) { + r.runWebTest(t, "2D/Web/Multi", func(t *testing.T) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix),