From 24d0627d9f36fbc140c2bb35991cfd9440a2c71b Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Wed, 10 Apr 2024 14:34:39 +0300 Subject: [PATCH] Use slog in a backward compatible way. --- engine.go | 57 ++++++++++++++++++++++----------------- go.mod | 11 ++++---- go.sum | 17 +++++++----- integration_test.go | 6 ++--- localparticipant.go | 8 +++--- localtrack.go | 8 +++--- logger.go | 23 +++++++++++----- participant.go | 7 +++++ pkg/jitter/buffer.go | 6 ++--- pkg/jitter/options.go | 13 ++++++++- pkg/synchronizer/track.go | 12 +++++---- publication.go | 3 ++- readersampleprovider.go | 3 ++- remoteparticipant.go | 4 +-- room.go | 15 ++++++++--- signalclient.go | 19 +++++++------ transport.go | 32 +++++++++++++--------- 17 files changed, 156 insertions(+), 88 deletions(-) diff --git a/engine.go b/engine.go index adcc3a42..edfddb36 100644 --- a/engine.go +++ b/engine.go @@ -15,6 +15,7 @@ package lksdk import ( + "log/slog" "sync" "time" @@ -36,6 +37,7 @@ const ( ) type RTCEngine struct { + log *slog.Logger pclock sync.Mutex publisher *PCTransport subscriber *PCTransport @@ -76,6 +78,7 @@ type RTCEngine struct { func NewRTCEngine() *RTCEngine { e := &RTCEngine{ + log: getLogger(), client: NewSignalClient(), trackPublishedChan: make(chan *livekit.TrackPublishedResponse, 1), JoinTimeout: 15 * time.Second, @@ -111,6 +114,10 @@ func NewRTCEngine() *RTCEngine { return e } +func (e *RTCEngine) SetLogger(log *slog.Logger) { + e.log = log +} + func (e *RTCEngine) Join(url string, token string, params *connectParams) (*livekit.JoinResponse, error) { res, err := e.client.Join(url, token, *params) if err != nil { @@ -229,6 +236,7 @@ func (e *RTCEngine) configure( var err error if e.publisher, err = NewPCTransport(PCTransportParams{ + Logger: e.log, Configuration: configuration, RetransmitBufferSize: e.connParams.RetransmitBufferSize, Pacer: e.connParams.Pacer, @@ -239,12 +247,13 @@ func (e *RTCEngine) configure( return err } if e.subscriber, err = NewPCTransport(PCTransportParams{ + Logger: e.log, Configuration: configuration, RetransmitBufferSize: e.connParams.RetransmitBufferSize, }); err != nil { return err } - logger.Debugw("Using ICE servers", "servers", iceServers) + e.log.Debug("Using ICE servers", "servers", iceServers) if subscriberPrimary != nil { e.subscriberPrimary = *subscriberPrimary @@ -257,12 +266,12 @@ func (e *RTCEngine) configure( return } init := candidate.ToJSON() - logger.Debugw("local ICE candidate", + e.log.Debug("local ICE candidate", "target", livekit.SignalTarget_PUBLISHER, "candidate", init.Candidate, ) if err := e.client.SendICECandidate(init, livekit.SignalTarget_PUBLISHER); err != nil { - logger.Errorw("could not send ICE candidates for publisher", err) + e.log.Error("could not send ICE candidates for publisher", "error", err) } }) @@ -272,12 +281,12 @@ func (e *RTCEngine) configure( return } init := candidate.ToJSON() - logger.Debugw("local ICE candidate", + e.log.Debug("local ICE candidate", "target", livekit.SignalTarget_SUBSCRIBER, "candidate", init.Candidate, ) if err := e.client.SendICECandidate(init, livekit.SignalTarget_SUBSCRIBER); err != nil { - logger.Errorw("could not send ICE candidates for subscriber", err) + e.log.Error("could not send ICE candidates for subscriber", "error", err) } }) @@ -288,15 +297,15 @@ func (e *RTCEngine) configure( primaryTransport.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { switch state { case webrtc.ICEConnectionStateConnected: - var fields []interface{} + var fields []any if pair, err := primaryTransport.GetSelectedCandidatePair(); err == nil { fields = append(fields, "iceCandidatePair", pair) } - logger.Debugw("ICE connected", fields...) + e.log.Debug("ICE connected", fields...) case webrtc.ICEConnectionStateDisconnected: - logger.Debugw("ICE disconnected") + e.log.Debug("ICE disconnected") case webrtc.ICEConnectionStateFailed: - logger.Debugw("ICE failed") + e.log.Debug("ICE failed") e.handleDisconnect(false) } }) @@ -323,7 +332,7 @@ func (e *RTCEngine) configure( e.publisher.OnOffer = func(offer webrtc.SessionDescription) { e.hasPublish.Store(true) if err := e.client.SendOffer(offer); err != nil { - logger.Errorw("could not send offer", err) + e.log.Error("could not send offer", "error", err) } } @@ -352,14 +361,14 @@ func (e *RTCEngine) configure( // configure client e.client.OnAnswer = func(sd webrtc.SessionDescription) { if err := e.publisher.SetRemoteDescription(sd); err != nil { - logger.Errorw("could not set remote description", err) + e.log.Error("could not set remote description", "error", err) } else { - logger.Debugw("successfully set publisher answer") + e.log.Debug("successfully set publisher answer") } } e.client.OnTrickle = func(init webrtc.ICECandidateInit, target livekit.SignalTarget) { var err error - logger.Debugw("remote ICE candidate", + e.log.Debug("remote ICE candidate", "target", target, "candidate", init.Candidate, ) @@ -369,13 +378,13 @@ func (e *RTCEngine) configure( err = e.subscriber.AddICECandidate(init) } if err != nil { - logger.Errorw("could not add ICE candidate", err) + e.log.Error("could not add ICE candidate", "error", err) } } e.client.OnOffer = func(sd webrtc.SessionDescription) { - logger.Debugw("received offer for subscriber") + e.log.Debug("received offer for subscriber") if err := e.subscriber.SetRemoteDescription(sd); err != nil { - logger.Errorw("could not set remote description", err) + e.log.Error("could not set remote description", "error", err) return } @@ -535,9 +544,9 @@ func (e *RTCEngine) handleDisconnect(fullReconnect bool) { if reconnectCount == 0 && e.OnRestarting != nil { e.OnRestarting() } - logger.Infow("restarting connection...", "reconnectCount", reconnectCount) + e.log.Info("restarting connection...", "reconnectCount", reconnectCount) if err := e.restartConnection(); err != nil { - logger.Errorw("restart connection failed", err) + e.log.Error("restart connection failed", "error", err) } else { return } @@ -545,9 +554,9 @@ func (e *RTCEngine) handleDisconnect(fullReconnect bool) { if reconnectCount == 0 && e.OnResuming != nil { e.OnResuming() } - logger.Infow("resuming connection...", "reconnectCount", reconnectCount) + e.log.Info("resuming connection...", "reconnectCount", reconnectCount) if err := e.resumeConnection(); err != nil { - logger.Errorw("resume connection failed", err) + e.log.Error("resume connection failed", "error", err) } else { return } @@ -625,15 +634,15 @@ func (e *RTCEngine) restartConnection() error { func (e *RTCEngine) createPublisherAnswerAndSend() error { answer, err := e.subscriber.pc.CreateAnswer(nil) if err != nil { - logger.Errorw("could not create answer", err) + e.log.Error("could not create answer", "error", err) return err } if err := e.subscriber.pc.SetLocalDescription(answer); err != nil { - logger.Errorw("could not set subscriber local description", err) + e.log.Error("could not set subscriber local description", "error", err) return err } if err := e.client.SendAnswer(answer); err != nil { - logger.Errorw("could not send answer for subscriber", err) + e.log.Error("could not send answer for subscriber", "error", err) return err } return nil @@ -643,7 +652,7 @@ func (e *RTCEngine) handleLeave(leave *livekit.LeaveRequest) { if leave.GetCanReconnect() { e.handleDisconnect(true) } else { - logger.Infow("server initiated leave", + e.log.Info("server initiated leave", "reason", leave.GetReason(), "canReconnect", leave.GetCanReconnect(), ) diff --git a/go.mod b/go.mod index dbbd2ec5..394a9409 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,9 @@ go 1.21 require ( github.com/bep/debounce v1.2.1 - github.com/go-logr/logr v1.4.1 - github.com/go-logr/stdr v1.2.2 github.com/gorilla/websocket v1.5.1 github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8 - github.com/livekit/protocol v1.12.0 + github.com/livekit/protocol v1.12.1-0.20240409153235-c69c1b0580dc github.com/magefile/mage v1.15.0 github.com/pion/dtls/v2 v2.2.10 github.com/pion/interceptor v0.1.27 @@ -24,6 +22,7 @@ require ( ) require ( + github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -34,6 +33,7 @@ require ( github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gammazero/deque v0.2.1 // indirect github.com/go-jose/go-jose/v3 v3.0.3 // indirect + github.com/go-logr/logr v1.4.1 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/uuid v1.6.0 // indirect github.com/jxskiss/base62 v1.1.0 // indirect @@ -65,10 +65,11 @@ require ( github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.19.0 // indirect + go.uber.org/zap/exp v0.2.0 // indirect + golang.org/x/crypto v0.22.0 // indirect golang.org/x/net v0.21.0 // indirect golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.17.0 // indirect + golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect google.golang.org/grpc v1.62.0 // indirect diff --git a/go.sum b/go.sum index 8a669ed9..c38e7299 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= +github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bep/debounce v1.2.1 h1:v67fRdBA9UQu2NhLFXrSg0Brw7CexQekrBwDMM8bzeY= @@ -27,11 +29,8 @@ github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0 github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k= github.com/go-jose/go-jose/v3 v3.0.3/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ= -github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= -github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= -github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= @@ -77,8 +76,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8 h1:xawydPEACNO5Ncs2LgioTjWghXQ0eUN1q1RnVUUyVnI= github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.12.0 h1:B7qsqq5xf9MmyG9WEk9/gMsfMVXuyLNxX5cO6TQil6s= -github.com/livekit/protocol v1.12.0/go.mod h1:G7Pa985GhZv2MCC3UnUocBhZfi3DsWA6WmlSkkpQYTM= +github.com/livekit/protocol v1.12.1-0.20240409153235-c69c1b0580dc h1:l6tUvbsi3NAxkHKrjuNPSwe0n1fGMd+IkUfmD+wq+R4= +github.com/livekit/protocol v1.12.1-0.20240409153235-c69c1b0580dc/go.mod h1:jB6PWwf4tdMAwy+jxexqaVWuQiiklAtO4F5zZzWkTII= github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4 h1:253WtQ2VGVHzIIzW9MUZj7vUDDILESU3zsEbiRdxYF0= github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= @@ -192,6 +191,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.uber.org/zap/exp v0.2.0 h1:FtGenNNeCATRB3CmB/yEUnjEFeJWpB/pMcy7e2bKPYs= +go.uber.org/zap/exp v0.2.0/go.mod h1:t0gqAIdh1MfKv9EwN/dLwfZnJxe9ITAZN78HEWPFWDQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -200,8 +201,9 @@ golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= -golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= +golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw= golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -254,8 +256,9 @@ golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/integration_test.go b/integration_test.go index 2452f62e..7a659c37 100644 --- a/integration_test.go +++ b/integration_test.go @@ -79,7 +79,7 @@ func pubNullTrack(t *testing.T, room *Room, name string) *LocalTrackPublication track.OnBind(func() { if err := track.StartWrite(provider, func() {}); err != nil { - logger.Errorw("Could not start writing", err) + room.log.Error("Could not start writing", "error", err) } }) @@ -185,7 +185,7 @@ func TestResume(t *testing.T) { pub.Simulate(SimulateSignalReconnect) require.Eventually(t, func() bool { return reconnected.Load() }, 5*time.Second, 100*time.Millisecond) - logger.Infow("reconnected") + getLogger().Info("reconnected") localPub := pubNullTrack(t, pub, audioTrackName) require.Equal(t, localPub.Name(), audioTrackName) @@ -215,7 +215,7 @@ func TestForceTLS(t *testing.T) { pub.Simulate(SimulateForceTLS) require.Eventually(t, func() bool { return reconnected.Load() && pub.engine.ensurePublisherConnected(true) == nil }, 15*time.Second, 100*time.Millisecond) - logger.Infow("reconnected") + getLogger().Info("reconnected") getSelectedPair := func(pc *webrtc.PeerConnection) (*webrtc.ICECandidatePair, error) { sctp := pc.SCTP() diff --git a/localparticipant.go b/localparticipant.go index a7ec4129..7f435895 100644 --- a/localparticipant.go +++ b/localparticipant.go @@ -119,7 +119,7 @@ func (p *LocalParticipant) PublishTrack(track webrtc.TrackLocal, opts *TrackPubl publisher.Negotiate() - logger.Infow("published track", "name", opts.Name, "source", opts.Source.String(), "trackID", pubRes.Track.Sid) + p.log.Info("published track", "name", opts.Name, "source", opts.Source.String(), "trackID", pubRes.Track.Sid) return pub, nil } @@ -224,7 +224,7 @@ func (p *LocalParticipant) PublishSimulcastTrack(tracks []*LocalTrack, opts *Tra publisher.Negotiate() - logger.Infow("published simulcast track", "name", opts.Name, "source", opts.Source.String(), "trackID", pubRes.Track.Sid) + p.log.Info("published simulcast track", "name", opts.Name, "source", opts.Source.String(), "trackID", pubRes.Track.Sid) return pub, nil } @@ -252,7 +252,7 @@ func (p *LocalParticipant) republishTracks() { } else if track := pub.TrackLocal(); track != nil { p.PublishTrack(track, &opt) } else { - logger.Warnw("could not republish track as no track local found", nil, "track", pub.SID()) + p.log.Warn("could not republish track as no track local found", "track", pub.SID()) } } } @@ -401,7 +401,7 @@ func (p *LocalParticipant) UnpublishTrack(sid string) error { pub.CloseTrack() - logger.Infow("unpublished track", "name", pub.Name(), "sid", sid) + p.log.Info("unpublished track", "name", pub.Name(), "sid", sid) return err } diff --git a/localtrack.go b/localtrack.go index b94844ee..22139070 100644 --- a/localtrack.go +++ b/localtrack.go @@ -496,6 +496,7 @@ func (s *LocalTrack) setMuted(muted bool) { } func (s *LocalTrack) rtcpWorker(rtcpReader interceptor.RTCPReader) { + log := getLogger() // read incoming rtcp packets, interceptors require this b := make([]byte, rtpInboundMTU) rtcpCB := s.onRTCP @@ -510,7 +511,7 @@ func (s *LocalTrack) rtcpWorker(rtcpReader interceptor.RTCPReader) { pkts, err := rtcp.Unmarshal(b[:i]) if err != nil { - logger.Warnw("could not unmarshal rtcp", err) + log.Warn("could not unmarshal rtcp", "error", err) return } for _, packet := range pkts { @@ -535,6 +536,7 @@ func (s *LocalTrack) rtcpWorker(rtcpReader interceptor.RTCPReader) { } func (s *LocalTrack) writeWorker(provider SampleProvider, onComplete func()) { + log := getLogger() s.writeStartupLock.Lock() s.lock.RLock() @@ -573,7 +575,7 @@ func (s *LocalTrack) writeWorker(provider SampleProvider, onComplete func()) { return } if err != nil { - logger.Errorw("could not get sample from provider", err) + log.Error("could not get sample from provider", "error", err) return } @@ -587,7 +589,7 @@ func (s *LocalTrack) writeWorker(provider SampleProvider, onComplete func()) { } if err := s.WriteSample(sample, opts); err != nil { - logger.Errorw("could not write sample", err) + log.Error("could not write sample", "error", err) return } } diff --git a/logger.go b/logger.go index 7a992313..733c3b98 100644 --- a/logger.go +++ b/logger.go @@ -15,17 +15,28 @@ package lksdk import ( - "log" - - "github.com/go-logr/stdr" + "log/slog" protoLogger "github.com/livekit/protocol/logger" ) -var logger protoLogger.Logger = protoLogger.LogRLogger(stdr.New(log.Default())) +var globalLog *slog.Logger + +func getLogger() *slog.Logger { + if globalLog != nil { + return globalLog + } + return slog.Default() +} // SetLogger overrides default logger. To use a [logr](https://github.com/go-logr/logr) compatible logger, -// pass in SetLogger(logger.LogRLogger(logRLogger)) +// pass in SetLogger(logger.LogRLogger(logRLogger)). +// +// If no logger is set, slog.Default will be used. func SetLogger(l protoLogger.Logger) { - logger = l + if l == nil { + globalLog = nil + return + } + globalLog = slog.New(protoLogger.ToSlogHandler(l)) } diff --git a/participant.go b/participant.go index da3aef93..40e8c7d0 100644 --- a/participant.go +++ b/participant.go @@ -15,6 +15,7 @@ package lksdk import ( + "log/slog" "sync" "go.uber.org/atomic" @@ -45,6 +46,7 @@ type Participant interface { } type baseParticipant struct { + log *slog.Logger sid string identity string name string @@ -65,6 +67,7 @@ type baseParticipant struct { func newBaseParticipant(roomCallback *RoomCallback) *baseParticipant { p := &baseParticipant{ + log: getLogger(), audioTracks: &sync.Map{}, videoTracks: &sync.Map{}, tracks: &sync.Map{}, @@ -77,6 +80,10 @@ func newBaseParticipant(roomCallback *RoomCallback) *baseParticipant { return p } +func (p *baseParticipant) SetLogger(log *slog.Logger) { + p.log = log +} + func (p *baseParticipant) SID() string { p.lock.RLock() defer p.lock.RUnlock() diff --git a/pkg/jitter/buffer.go b/pkg/jitter/buffer.go index 930f5e1a..541dd920 100644 --- a/pkg/jitter/buffer.go +++ b/pkg/jitter/buffer.go @@ -15,10 +15,10 @@ package jitter import ( + "log/slog" "sync" "time" - "github.com/go-logr/logr" "github.com/pion/rtp" "github.com/livekit/protocol/logger" @@ -31,7 +31,7 @@ type Buffer struct { onPacketDropped func() packetsDropped int packetsTotal int - logger logger.Logger + logger *slog.Logger mu sync.Mutex pool *packet @@ -57,7 +57,7 @@ func NewBuffer(depacketizer rtp.Depacketizer, clockRate uint32, maxLatency time. depacketizer: depacketizer, maxLate: uint32(float64(maxLatency) / float64(time.Second) * float64(clockRate)), clockRate: clockRate, - logger: logger.LogRLogger(logr.Discard()), + logger: slog.New(logger.NewSlogDiscard()), } for _, opt := range opts { opt(b) diff --git a/pkg/jitter/options.go b/pkg/jitter/options.go index cea0d7a5..c7d778d4 100644 --- a/pkg/jitter/options.go +++ b/pkg/jitter/options.go @@ -14,7 +14,11 @@ package jitter -import "github.com/livekit/protocol/logger" +import ( + "log/slog" + + "github.com/livekit/protocol/logger" +) type Option func(b *Buffer) @@ -28,6 +32,13 @@ func WithPacketDroppedHandler(f func()) Option { // WithLogger sets a logger which will log packets dropped func WithLogger(l logger.Logger) Option { + return func(b *Buffer) { + b.logger = slog.New(logger.ToSlogHandler(l)) + } +} + +// WithSlog sets a slog logger which will log packets dropped +func WithSlog(l *slog.Logger) Option { return func(b *Buffer) { b.logger = l } diff --git a/pkg/synchronizer/track.go b/pkg/synchronizer/track.go index 69ac8822..3e7d7b9b 100644 --- a/pkg/synchronizer/track.go +++ b/pkg/synchronizer/track.go @@ -17,6 +17,7 @@ package synchronizer import ( "errors" "io" + "log/slog" "math" "sync" "time" @@ -26,7 +27,6 @@ import ( "github.com/pion/webrtc/v3" "github.com/livekit/mediatransportutil" - "github.com/livekit/protocol/logger" ) const ( @@ -113,13 +113,14 @@ func (t *TrackSynchronizer) Initialize(pkt *rtp.Packet) { // GetPTS will reset sequence numbers and/or offsets if necessary // Packets are expected to be in order func (t *TrackSynchronizer) GetPTS(pkt *rtp.Packet) (time.Duration, error) { + log := slog.Default() t.Lock() defer t.Unlock() ts, pts, valid := t.adjust(pkt) if pts < t.lastPTS { if t.backwards == 0 { - logger.Warnw("backwards pts", ErrBackwardsPTS, + log.Warn("backwards pts", "error", ErrBackwardsPTS, "timestamp", pkt.Timestamp, "sequence number", pkt.SequenceNumber, "pts", pts, @@ -131,7 +132,7 @@ func (t *TrackSynchronizer) GetPTS(pkt *rtp.Packet) (time.Duration, error) { t.backwards++ return 0, ErrBackwardsPTS } else if t.backwards > 0 { - logger.Debugw("packet dropped", + log.Debug("packet dropped", "count", t.backwards, "reason", "backwards pts", ) @@ -161,6 +162,7 @@ func (t *TrackSynchronizer) GetPTS(pkt *rtp.Packet) (time.Duration, error) { // adjust accounts for uint32 overflow, and will reset sequence numbers or rtp time if necessary func (t *TrackSynchronizer) adjust(pkt *rtp.Packet) (int64, time.Duration, bool) { + log := slog.Default() // adjust sequence number and reset if needed pkt.SequenceNumber += t.snOffset if t.lastTS != 0 && @@ -172,7 +174,7 @@ func (t *TrackSynchronizer) adjust(pkt *rtp.Packet) (int64, time.Duration, bool) pkt.SequenceNumber = t.lastSN + 1 // reset RTP timestamps - logger.Debugw("resetting track synchronizer", "reason", "SN gap", "lastSN", t.lastSN, "SN", pkt.SequenceNumber) + log.Debug("resetting track synchronizer", "reason", "SN gap", "lastSN", t.lastSN, "SN", pkt.SequenceNumber) ts, pts := t.resetRTP(pkt) return ts, pts, false } @@ -192,7 +194,7 @@ func (t *TrackSynchronizer) adjust(pkt *rtp.Packet) (int64, time.Duration, bool) pts := t.getElapsed(ts) + t.ptsOffset if expected := time.Since(t.startedAt.Add(t.ptsOffset)); pts > expected+maxTSDiff { // reset RTP timestamps - logger.Debugw("resetting track synchronizer", "reason", "pts out of bounds", "pts", pts, "expected", expected) + log.Debug("resetting track synchronizer", "reason", "pts out of bounds", "pts", pts, "expected", expected) ts, pts = t.resetRTP(pkt) return ts, pts, false } diff --git a/publication.go b/publication.go index 0c77a276..95eaadd6 100644 --- a/publication.go +++ b/publication.go @@ -199,6 +199,7 @@ func (p *RemoteTrackPublication) OnRTCP(cb func(rtcp.Packet)) { } func (p *RemoteTrackPublication) updateSettings() { + log := getLogger() p.lock.RLock() settings := &livekit.UpdateTrackSettings{ TrackSids: []string{p.SID()}, @@ -216,7 +217,7 @@ func (p *RemoteTrackPublication) updateSettings() { p.lock.RUnlock() if err := p.client.SendUpdateTrackSettings(settings); err != nil { - logger.Errorw("could not send track settings", err, "trackID", p.SID()) + log.Error("could not send track settings", "error", err, "trackID", p.SID()) } } diff --git a/readersampleprovider.go b/readersampleprovider.go index e963df78..7ca9a37f 100644 --- a/readersampleprovider.go +++ b/readersampleprovider.go @@ -145,6 +145,7 @@ func NewLocalFileTrack(file string, options ...ReaderSampleProviderOption) (*Loc // NewLocalReaderTrack uses io.ReadCloser interface to adapt to various ingress types // - mime: has to be one of webrtc.MimeType... (e.g. webrtc.MimeTypeOpus) func NewLocalReaderTrack(in io.ReadCloser, mime string, options ...ReaderSampleProviderOption) (*LocalTrack, error) { + log := getLogger() provider := &ReaderSampleProvider{ Mime: mime, reader: in, @@ -170,7 +171,7 @@ func NewLocalReaderTrack(in io.ReadCloser, mime string, options ...ReaderSampleP } track.OnBind(func() { if err := track.StartWrite(provider, provider.OnWriteComplete); err != nil { - logger.Errorw("Could not start writing", err) + log.Error("Could not start writing", "error", err) } }) diff --git a/remoteparticipant.go b/remoteparticipant.go index b568c046..33ded50b 100644 --- a/remoteparticipant.go +++ b/remoteparticipant.go @@ -116,7 +116,7 @@ func (p *RemoteParticipant) addSubscribedMediaTrack(track *webrtc.TrackRemote, t } pub.setReceiverAndTrack(receiver, track) - logger.Infow("track subscribed", + p.log.Info("track subscribed", "participant", p.Identity(), "track", pub.sid.Load(), "kind", pub.kind.Load()) p.Callback.OnTrackSubscribed(track, pub, p) @@ -133,7 +133,7 @@ func (p *RemoteParticipant) getPublication(trackSID string) *RemoteTrackPublicat func (p *RemoteParticipant) unpublishTrack(sid string, sendUnpublish bool) { pub := p.getPublication(sid) if pub == nil { - logger.Warnw("could not find track to unpublish", nil, "sid", sid) + p.log.Warn("could not find track to unpublish", "sid", sid) return } diff --git a/room.go b/room.go index 6b2e62ec..73deebdf 100644 --- a/room.go +++ b/room.go @@ -15,6 +15,7 @@ package lksdk import ( + "log/slog" "reflect" "sort" "strings" @@ -125,6 +126,7 @@ func WithICETransportPolicy(iceTransportPolicy webrtc.ICETransportPolicy) Connec type PLIWriter func(webrtc.SSRC) type Room struct { + log *slog.Logger engine *RTCEngine sid string name string @@ -145,6 +147,7 @@ type Room struct { func NewRoom(callback *RoomCallback) *Room { engine := NewRTCEngine() r := &Room{ + log: getLogger(), engine: engine, remoteParticipants: make(map[livekit.ParticipantIdentity]*RemoteParticipant), sidToIdentity: make(map[livekit.ParticipantID]livekit.ParticipantIdentity), @@ -192,6 +195,12 @@ func ConnectToRoomWithToken(url, token string, callback *RoomCallback, opts ...C return room, nil } +func (r *Room) SetLogger(log *slog.Logger) { + r.log = log + r.engine.SetLogger(log) + r.LocalParticipant.SetLogger(log) +} + func (r *Room) Name() string { r.lock.RLock() defer r.lock.RUnlock() @@ -353,7 +362,7 @@ func (r *Room) handleMediaTrack(track *webrtc.TrackRemote, receiver *webrtc.RTPR rp := r.GetParticipantBySID(participantID) if rp == nil { - logger.Errorw("could not find participant", nil, "participantID", participantID) + r.log.Error("could not find participant", "participantID", participantID) return } rp.addSubscribedMediaTrack(track, trackID, receiver) @@ -498,7 +507,7 @@ func (r *Room) handleConnectionQualityUpdate(updates []*livekit.ConnectionQualit if p != nil { p.setConnectionQualityInfo(update) } else { - logger.Debugw("could not find participant", "sid", update.ParticipantSid, + r.log.Debug("could not find participant", "sid", update.ParticipantSid, "localParticipant", r.LocalParticipant.SID()) } } @@ -532,7 +541,7 @@ func (r *Room) handleTrackRemoteMuted(msg *livekit.MuteTrackRequest) { func (r *Room) handleLocalTrackUnpublished(msg *livekit.TrackUnpublishedResponse) { err := r.LocalParticipant.UnpublishTrack(msg.TrackSid) if err != nil { - logger.Errorw("could not unpublish track", err, "trackID", msg.TrackSid) + r.log.Error("could not unpublish track", "error", err, "trackID", msg.TrackSid) } } diff --git a/signalclient.go b/signalclient.go index 88c4b356..1d988e55 100644 --- a/signalclient.go +++ b/signalclient.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "io" + "log/slog" "net/http" "net/url" "runtime" @@ -36,6 +37,7 @@ import ( const PROTOCOL = 12 type SignalClient struct { + log *slog.Logger conn atomic.Pointer[websocket.Conn] lock sync.Mutex isStarted atomic.Bool @@ -58,7 +60,7 @@ type SignalClient struct { } func NewSignalClient() *SignalClient { - c := &SignalClient{} + c := &SignalClient{log: getLogger()} return c } @@ -98,7 +100,7 @@ func (c *SignalClient) Reconnect(urlPrefix string, token string, params connectP } c.pendingResponse = nil - logger.Debugw("reconnect received response", "response", res.String()) + c.log.Debug("reconnect received response", "response", res.String()) if res != nil { switch msg := res.Message.(type) { case *livekit.SignalResponse_Reconnect: @@ -139,11 +141,12 @@ func (c *SignalClient) connect(urlPrefix string, token string, params connectPar header := newHeaderWithToken(token) conn, hresp, err := websocket.DefaultDialer.Dial(u.String(), header) if err != nil { - var fields []interface{} + var fields []any + fields = append(fields, "error", err) if hresp != nil { fields = append(fields, "status", hresp.StatusCode) } - logger.Errorw("error establishing signal connection", err, fields...) + c.log.Error("error establishing signal connection", fields...) if strings.HasSuffix(err.Error(), ":53: server misbehaving") { // DNS issue, abort @@ -154,17 +157,17 @@ func (c *SignalClient) connect(urlPrefix string, token string, params connectPar validateReq, err1 := http.NewRequest(http.MethodGet, ToHttpURL(urlPrefix)+validateSuffix, nil) if err1 != nil { - logger.Errorw("error creating validate request", err1) + c.log.Error("error creating validate request", "error", err1) return nil, ErrCannotDialSignal } validateReq.Header = header hresp, err := http.DefaultClient.Do(validateReq) if err != nil { - logger.Errorw("error getting validation", err, "httpResponse", hresp) + c.log.Error("error getting validation", "error", err, "httpResponse", hresp) return nil, ErrCannotDialSignal } else if hresp.StatusCode == http.StatusOK { // no specific errors to return if validate succeeds - logger.Infow("validate succeeded") + c.log.Info("validate succeeded") return nil, ErrCannotConnectSignal } else { var errString string @@ -386,7 +389,7 @@ func (c *SignalClient) readWorker(readerClosedCh chan struct{}) { res, err := c.readResponse() if err != nil { if !isIgnoredWebsocketError(err) { - logger.Infow("error while reading from signal client", "err", err) + c.log.Info("error while reading from signal client", "error", err) } return } diff --git a/transport.go b/transport.go index d3156380..9485c660 100644 --- a/transport.go +++ b/transport.go @@ -17,6 +17,7 @@ package lksdk import ( "errors" "fmt" + "log/slog" "sync" "sync/atomic" "time" @@ -32,6 +33,7 @@ import ( lkinterceptor "github.com/livekit/mediatransportutil/pkg/interceptor" "github.com/livekit/mediatransportutil/pkg/pacer" lksdp "github.com/livekit/protocol/sdp" + sdkinterceptor "github.com/livekit/server-sdk-go/v2/pkg/interceptor" ) @@ -46,7 +48,8 @@ const ( // PCTransport is a wrapper around PeerConnection, with some helper methods type PCTransport struct { - pc *webrtc.PeerConnection + log *slog.Logger + pc *webrtc.PeerConnection lock sync.Mutex pendingCandidates []webrtc.ICECandidateInit @@ -65,6 +68,7 @@ type PCTransport struct { } type PCTransportParams struct { + Logger *slog.Logger Configuration webrtc.Configuration RetransmitBufferSize uint16 @@ -128,6 +132,9 @@ func (t *PCTransport) registerDefaultInterceptors(params PCTransportParams, i *i } func NewPCTransport(params PCTransportParams) (*PCTransport, error) { + if params.Logger == nil { + params.Logger = getLogger() + } m := &webrtc.MediaEngine{} if err := m.RegisterDefaultCodecs(); err != nil { return nil, err @@ -148,6 +155,7 @@ func NewPCTransport(params PCTransportParams) (*PCTransport, error) { i := &interceptor.Registry{} t := &PCTransport{ + log: params.Logger, debouncedNegotiate: debounce.New(negotiationFrequency), onRTTUpdate: params.OnRTTUpdate, } @@ -211,17 +219,17 @@ func (t *PCTransport) onICEGatheringStateChange(state webrtc.ICEGathererState) { t.lock.Lock() if t.restartAfterGathering { t.lock.Unlock() - logger.Debugw("restarting ICE after ICE gathering") + t.log.Debug("restarting ICE after ICE gathering") if err := t.createAndSendOffer(&webrtc.OfferOptions{ICERestart: true}); err != nil { - logger.Errorw("could not restart ICE", err) + t.log.Error("could not restart ICE", "error", err) } } else if t.pendingRestartIceOffer != nil { - logger.Debugw("accept remote restart ice offer after ICE gathering") + t.log.Debug("accept remote restart ice offer after ICE gathering") offer := t.pendingRestartIceOffer t.pendingRestartIceOffer = nil t.lock.Unlock() if err := t.SetRemoteDescription(*offer); err != nil { - logger.Errorw("could not accept remote restart ICE offer", err) + t.log.Error("could not accept remote restart ICE offer", "error", err) } } else { t.lock.Unlock() @@ -275,14 +283,14 @@ func (t *PCTransport) SetRemoteDescription(sd webrtc.SessionDescription) error { var err error iceCredential, offerRestartICE, err = t.isRemoteOfferRestartICE(sd) if err != nil { - logger.Errorw("check remote offer restart ICE failed", err) + t.log.Error("check remote offer restart ICE failed", "error", err) t.lock.Unlock() return err } } if offerRestartICE && t.pc.ICEGatheringState() == webrtc.ICEGatheringStateGathering { - logger.Debugw("remote offer restart ice while ice gathering") + t.log.Debug("remote offer restart ice while ice gathering") t.pendingRestartIceOffer = &sd t.lock.Unlock() return nil @@ -379,7 +387,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { t.restartAfterGathering = true return nil } - logger.Debugw("restarting ICE") + t.log.Debug("restarting ICE") } if t.pc.SignalingState() == webrtc.SignalingStateHaveLocalOffer { if iceRestart { @@ -395,15 +403,15 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { } } - logger.Debugw("starting to negotiate") + t.log.Debug("starting to negotiate") offer, err := t.pc.CreateOffer(options) - logger.Debugw("create offer", "offer", offer.SDP) + t.log.Debug("create offer", "offer", offer.SDP) if err != nil { - logger.Errorw("could not negotiate", err) + t.log.Error("could not negotiate", "error", err) return err } if err := t.pc.SetLocalDescription(offer); err != nil { - logger.Errorw("could not set local description", err) + t.log.Error("could not set local description", "error", err) return err } t.restartAfterGathering = false