diff --git a/pkg/peer.go b/pkg/peer.go index 9b9b7ecc6..4ca5f93e8 100644 --- a/pkg/peer.go +++ b/pkg/peer.go @@ -61,8 +61,6 @@ func (p *Peer) Join(sid string, sdp webrtc.SessionDescription) (*webrtc.SessionD log.Debugf("peer already exists") return nil, ErrTransportExists } - p.Lock() - defer p.Unlock() me := MediaEngine{} err := me.PopulateFromSDP(sdp) @@ -151,8 +149,7 @@ func (p *Peer) Answer(sdp webrtc.SessionDescription) (*webrtc.SessionDescription if p.subscriber == nil { return nil, ErrNoTransportEstablished } - p.Lock() - defer p.Unlock() + log.Infof("peer %s got offer", p.id) if p.publisher.SignalingState() != webrtc.SignalingStateStable { diff --git a/pkg/publisher.go b/pkg/publisher.go index d65616b89..ce324c514 100644 --- a/pkg/publisher.go +++ b/pkg/publisher.go @@ -2,14 +2,13 @@ package sfu import ( "sync" + "sync/atomic" log "github.com/pion/ion-log" "github.com/pion/webrtc/v3" ) type Publisher struct { - sync.Mutex - id string pc *webrtc.PeerConnection @@ -18,7 +17,7 @@ type Publisher struct { candidates []webrtc.ICECandidateInit onTrackHandler func(*webrtc.Track, *webrtc.RTPReceiver) - onICEConnectionStateChangeHandler func(webrtc.ICEConnectionState) + onICEConnectionStateChangeHandler atomic.Value // func(webrtc.ICEConnectionState) closeOnce sync.Once } @@ -64,11 +63,10 @@ func NewPublisher(session *Session, id string, me MediaEngine, cfg WebRTCTranspo p.router.Stop() }) } - p.Lock() - if p.onICEConnectionStateChangeHandler != nil { - p.onICEConnectionStateChangeHandler(connectionState) + + if handler, ok := p.onICEConnectionStateChangeHandler.Load().(func()); ok && handler != nil { + handler() } - p.Unlock() }) return p, nil @@ -109,9 +107,7 @@ func (p *Publisher) OnICECandidate(f func(c *webrtc.ICECandidate)) { } func (p *Publisher) OnICEConnectionStateChange(f func(connectionState webrtc.ICEConnectionState)) { - p.Lock() - p.onICEConnectionStateChangeHandler = f - p.Unlock() + p.onICEConnectionStateChangeHandler.Store(f) } func (p *Publisher) SignalingState() webrtc.SignalingState { diff --git a/pkg/receiver.go b/pkg/receiver.go index f95eb56c4..80f7f1673 100644 --- a/pkg/receiver.go +++ b/pkg/receiver.go @@ -34,6 +34,7 @@ type Receiver interface { // WebRTCReceiver receives a video track type WebRTCReceiver struct { sync.RWMutex + rtcpMu sync.RWMutex receiver *webrtc.RTPReceiver track *webrtc.Track diff --git a/pkg/simplesender.go b/pkg/simplesender.go index e8518f6d0..7fbdb4fa8 100644 --- a/pkg/simplesender.go +++ b/pkg/simplesender.go @@ -122,10 +122,6 @@ func (s *SimpleSender) WriteRTP(pkt *rtp.Packet) { s.sdesMidHdrCtr++ } - if pkt.SequenceNumber%500 == 0 { - log.Tracef("rtp write sender %s with ssrc %d", s.id, s.track.SSRC()) - } - if err := s.track.WriteRTP(&rtp.Packet{Header: h, Payload: pkt.Payload}); err != nil { if err == io.ErrClosedPipe { return diff --git a/pkg/subscriber.go b/pkg/subscriber.go index debbf1f67..56173cb15 100644 --- a/pkg/subscriber.go +++ b/pkg/subscriber.go @@ -160,10 +160,10 @@ func (s *Subscriber) SetRemoteDescription(desc webrtc.SessionDescription) error return err } - s.Lock() - defer s.Unlock() if s.pendingSenders.Len() > 0 { pendingStart := make([]Sender, 0, s.pendingSenders.Len()) + + s.Lock() for _, md := range pd.MediaDescriptions { if s.pendingSenders.Len() == 0 { break @@ -181,6 +181,8 @@ func (s *Subscriber) SetRemoteDescription(desc webrtc.SessionDescription) error } } } + s.Unlock() + if len(pendingStart) > 0 { defer func() { if err == nil { @@ -203,7 +205,6 @@ func (s *Subscriber) SetRemoteDescription(desc webrtc.SessionDescription) error log.Errorf("SetRemoteDescription error: %v", err) return err } - return nil }