Skip to content

Commit

Permalink
fix panic
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Nov 15, 2023
1 parent 31a16af commit 21bffe7
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/pipeline/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (c *Controller) GetGstPipelineDebugDot() string {
}

func (c *Controller) uploadDebugFiles() {
monitor := *stats.NewHandlerMonitor(c.NodeID, c.ClusterID, c.Info.EgressId)
monitor := stats.NewHandlerMonitor(c.NodeID, c.ClusterID, c.Info.EgressId)
u, err := uploader.New(c.Debug.ToUploadConfig(), "", monitor)
if err != nil {
logger.Errorw("failed to create uploader", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/sink/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type SegmentUpdate struct {
uploadComplete chan struct{}
}

func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig, callbacks *gstreamer.Callbacks, monitor stats.HandlerMonitor) (*SegmentSink, error) {
func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig, callbacks *gstreamer.Callbacks, monitor *stats.HandlerMonitor) (*SegmentSink, error) {
playlistName := path.Join(o.LocalDir, o.PlaylistFilename)
playlist, err := m3u8.NewEventPlaylistWriter(playlistName, o.SegmentDuration)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/pipeline/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ type Sink interface {

func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks) (map[types.EgressType][]Sink, error) {
sinks := make(map[types.EgressType][]Sink)
monitor := stats.NewHandlerMonitor(p.NodeID, p.ClusterID, p.Info.EgressId)
for egressType, c := range p.Outputs {
if len(c) == 0 {
continue
}

var s Sink
var err error
monitor := *stats.NewHandlerMonitor(p.NodeID, p.ClusterID, p.Info.EgressId)
switch egressType {
case types.EgressTypeFile:
o := c[0].(*config.FileConfig)
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/sink/uploader/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type uploader interface {
upload(string, string, types.OutputType) (string, int64, error)
}

func New(conf config.UploadConfig, backup string, monitor stats.HandlerMonitor) (Uploader, error) {
func New(conf config.UploadConfig, backup string, monitor *stats.HandlerMonitor) (Uploader, error) {
var u uploader
var err error

Expand Down Expand Up @@ -77,7 +77,7 @@ type remoteUploader struct {
uploader

backup string
monitor stats.HandlerMonitor
monitor *stats.HandlerMonitor
}

func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, fileType string) (string, int64, error) {
Expand Down
8 changes: 4 additions & 4 deletions test/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (r *Runner) testParticipantFile(t *testing.T) {
return
}

t.Run("Participant/File", func(t *testing.T) {
t.Run("3A/Participant/File", func(t *testing.T) {
for _, test := range []*testCase{
{
name: "VP8",
Expand Down Expand Up @@ -142,7 +142,7 @@ func (r *Runner) testParticipantStream(t *testing.T) {
videoCodec: types.MimeTypeVP8,
}

r.runParticipantTest(t, "Participant/Stream", test,
r.runParticipantTest(t, "3B/Participant/Stream", test,
func(t *testing.T, identity string) {
req := &rpc.StartEgressRequest{
EgressId: utils.NewGuid(utils.EgressPrefix),
Expand All @@ -167,7 +167,7 @@ func (r *Runner) testParticipantSegments(t *testing.T) {
return
}

t.Run("Participant/Segments", func(t *testing.T) {
t.Run("3C/Participant/Segments", func(t *testing.T) {
for _, test := range []*testCase{
{
name: "VP8",
Expand Down Expand Up @@ -243,7 +243,7 @@ func (r *Runner) testParticipantMulti(t *testing.T) {
videoDelay: time.Second * 10,
}

r.runParticipantTest(t, "Participant/Multi", test,
r.runParticipantTest(t, "3D/Participant/Multi", test,
func(t *testing.T, identity string) {
req := &rpc.StartEgressRequest{
EgressId: utils.NewGuid(utils.EgressPrefix),
Expand Down
10 changes: 5 additions & 5 deletions test/room_composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (r *Runner) testRoomCompositeFile(t *testing.T) {
return
}

t.Run("RoomComposite/File", func(t *testing.T) {
t.Run("1A/RoomComposite/File", func(t *testing.T) {
for _, test := range []*testCase{
{
name: "Base",
Expand Down Expand Up @@ -132,7 +132,7 @@ func (r *Runner) testRoomCompositeStream(t *testing.T) {
return
}

t.Run("RoomComposite/Stream", func(t *testing.T) {
t.Run("1B/RoomComposite/Stream", func(t *testing.T) {
r.runRoomTest(t, "Rtmp", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) {
req := &rpc.StartEgressRequest{
EgressId: utils.NewGuid(utils.EgressPrefix),
Expand Down Expand Up @@ -193,7 +193,7 @@ func (r *Runner) testRoomCompositeSegments(t *testing.T) {
return
}

r.runRoomTest(t, "RoomComposite/Segments", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) {
r.runRoomTest(t, "1C/RoomComposite/Segments", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) {
for _, test := range []*testCase{
{
options: &livekit.EncodingOptions{
Expand Down Expand Up @@ -261,7 +261,7 @@ func (r *Runner) testRoomCompositeImages(t *testing.T) {
return
}

r.runRoomTest(t, "RoomComposite/Images", types.MimeTypeOpus, types.MimeTypeH264, func(t *testing.T) {
r.runRoomTest(t, "1D/RoomComposite/Images", types.MimeTypeOpus, types.MimeTypeH264, func(t *testing.T) {
for _, test := range []*testCase{
{
options: &livekit.EncodingOptions{
Expand Down Expand Up @@ -308,7 +308,7 @@ func (r *Runner) testRoomCompositeMulti(t *testing.T) {
return
}

r.runRoomTest(t, "RoomComposite/Multi", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) {
r.runRoomTest(t, "1E/RoomComposite/Multi", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) {
req := &rpc.StartEgressRequest{
EgressId: utils.NewGuid(utils.EgressPrefix),
Request: &rpc.StartEgressRequest_RoomComposite{
Expand Down
4 changes: 2 additions & 2 deletions test/track.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (r *Runner) testTrackFile(t *testing.T) {
return
}

t.Run("Track/File", func(t *testing.T) {
t.Run("5A/Track/File", func(t *testing.T) {
for _, test := range []*testCase{
{
name: "OPUS",
Expand Down Expand Up @@ -120,7 +120,7 @@ func (r *Runner) testTrackStream(t *testing.T) {
return
}

t.Run("Track/Stream", func(t *testing.T) {
t.Run("5B/Track/Stream", func(t *testing.T) {
now := time.Now().Unix()
for _, test := range []*testCase{
{
Expand Down
10 changes: 5 additions & 5 deletions test/track_composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (r *Runner) testTrackCompositeFile(t *testing.T) {
return
}

t.Run("TrackComposite/File", func(t *testing.T) {
t.Run("4A/TrackComposite/File", func(t *testing.T) {
for _, test := range []*testCase{
{
name: "VP8",
Expand Down Expand Up @@ -125,7 +125,7 @@ func (r *Runner) testTrackCompositeStream(t *testing.T) {
return
}

r.runTrackTest(t, "TrackComposite/Stream", types.MimeTypeOpus, types.MimeTypeVP8,
r.runTrackTest(t, "4B/TrackComposite/Stream", types.MimeTypeOpus, types.MimeTypeVP8,
func(t *testing.T, audioTrackID, videoTrackID string) {
req := &rpc.StartEgressRequest{
EgressId: utils.NewGuid(utils.EgressPrefix),
Expand All @@ -151,7 +151,7 @@ func (r *Runner) testTrackCompositeSegments(t *testing.T) {
return
}

t.Run("TrackComposite/Segments", func(t *testing.T) {
t.Run("4C/TrackComposite/Segments", func(t *testing.T) {
for _, test := range []*testCase{
{
name: "VP8",
Expand Down Expand Up @@ -234,7 +234,7 @@ func (r *Runner) testTrackCompositeImages(t *testing.T) {
return
}

t.Run("TrackComposite/Images", func(t *testing.T) {
t.Run("4D/TrackComposite/Images", func(t *testing.T) {
for _, test := range []*testCase{
{
name: "VP8",
Expand Down Expand Up @@ -295,7 +295,7 @@ func (r *Runner) testTrackCompositeMulti(t *testing.T) {
return
}

r.runTrackTest(t, "TrackComposite/Multi", types.MimeTypeOpus, types.MimeTypeVP8,
r.runTrackTest(t, "4E/TrackComposite/Multi", types.MimeTypeOpus, types.MimeTypeVP8,
func(t *testing.T, audioTrackID, videoTrackID string) {
req := &rpc.StartEgressRequest{
EgressId: utils.NewGuid(utils.EgressPrefix),
Expand Down
8 changes: 4 additions & 4 deletions test/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *Runner) testWebFile(t *testing.T) {
return
}

r.runWebTest(t, "Web/File", func(t *testing.T) {
r.runWebTest(t, "2A/Web/File", func(t *testing.T) {
fileOutput := &livekit.EncodedFileOutput{
Filepath: r.getFilePath("web_{time}"),
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func (r *Runner) testWebStream(t *testing.T) {
return
}

r.runWebTest(t, "Web/Stream", func(t *testing.T) {
r.runWebTest(t, "2B/Web/Stream", func(t *testing.T) {
req := &rpc.StartEgressRequest{
EgressId: utils.NewGuid(utils.EgressPrefix),
Request: &rpc.StartEgressRequest_Web{
Expand All @@ -106,7 +106,7 @@ func (r *Runner) testWebSegments(t *testing.T) {
return
}

r.runWebTest(t, "Web/Segments", func(t *testing.T) {
r.runWebTest(t, "2C/Web/Segments", func(t *testing.T) {
segmentOutput := &livekit.SegmentedFileOutput{
FilenamePrefix: r.getFilePath("web_{time}"),
PlaylistName: "web_{time}.m3u8",
Expand Down Expand Up @@ -139,7 +139,7 @@ func (r *Runner) testWebMulti(t *testing.T) {
return
}

r.runWebTest(t, "Web/Multi", func(t *testing.T) {
r.runWebTest(t, "2D/Web/Multi", func(t *testing.T) {
req := &rpc.StartEgressRequest{
EgressId: utils.NewGuid(utils.EgressPrefix),

Expand Down

0 comments on commit 21bffe7

Please sign in to comment.