diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index f64bc4de..9fe36821 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -334,7 +334,7 @@ func (s *SDKSource) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Remo } s.AudioTranscoding = true - writer, err := s.createWriter(track, rp, ts) + writer, err := s.createWriter(track, pub, rp, ts) if err != nil { onSubscribeErr = err return @@ -360,7 +360,7 @@ func (s *SDKSource) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Remo s.VideoTranscoding = true } - writer, err := s.createWriter(track, rp, ts) + writer, err := s.createWriter(track, pub, rp, ts) if err != nil { onSubscribeErr = err return @@ -417,6 +417,7 @@ func (s *SDKSource) onTrackSubscribed(track *webrtc.TrackRemote, pub *lksdk.Remo func (s *SDKSource) createWriter( track *webrtc.TrackRemote, + pub lksdk.TrackPublication, rp *lksdk.RemoteParticipant, ts *config.TrackSource, ) (*sdk.AppWriter, error) { @@ -435,7 +436,7 @@ func (s *SDKSource) createWriter( } ts.AppSrc = app.SrcFromElement(src) - writer, err := sdk.NewAppWriter(track, rp, ts, s.sync, s.callbacks, logFilename) + writer, err := sdk.NewAppWriter(track, pub, rp, ts, s.sync, s.callbacks, logFilename) if err != nil { return nil, err } @@ -505,6 +506,7 @@ func (s *SDKSource) onTrackFinished(trackID string) { func (s *SDKSource) onParticipantDisconnected(rp *lksdk.RemoteParticipant) { if rp.Identity() == s.Identity { + logger.Debugw("Participant disconnected") s.onDisconnected() } } diff --git a/pkg/pipeline/source/sdk/appwriter.go b/pkg/pipeline/source/sdk/appwriter.go index 6e86c19b..e64d851b 100644 --- a/pkg/pipeline/source/sdk/appwriter.go +++ b/pkg/pipeline/source/sdk/appwriter.go @@ -56,6 +56,7 @@ const ( type AppWriter struct { logger logger.Logger logFile *os.File + pub lksdk.TrackPublication track *webrtc.TrackRemote codec types.MimeType src *app.Source @@ -83,6 +84,7 @@ type AppWriter struct { func NewAppWriter( track *webrtc.TrackRemote, + pub lksdk.TrackPublication, rp *lksdk.RemoteParticipant, ts *config.TrackSource, sync *synchronizer.Synchronizer, @@ -92,6 +94,7 @@ func NewAppWriter( w := &AppWriter{ logger: logger.GetLogger().WithValues("trackID", track.ID(), "kind", track.Kind().String()), track: track, + pub: pub, codec: ts.MimeType, src: ts.AppSrc, callbacks: callbacks, @@ -154,10 +157,18 @@ func (w *AppWriter) TrackID() string { } func (w *AppWriter) Play() { - w.playing.Break() + w.playing.Once(func() { + if w.pub.IsMuted() { + w.SetTrackMuted(true) + } + }) } func (w *AppWriter) SetTrackMuted(muted bool) { + if !w.playing.IsBroken() { + return + } + w.muted.Store(muted) if muted { w.logger.Debugw("track muted", "timestamp", time.Since(w.startTime).Seconds()) diff --git a/test/integration.go b/test/integration.go index 20273b9d..502389c8 100644 --- a/test/integration.go +++ b/test/integration.go @@ -157,7 +157,6 @@ func (r *Runner) publishSampleToRoom(t *testing.T, codec types.MimeType, withMut if withMuting { go func() { muted := false - time.Sleep(time.Second * 15) for { select { case <-done: