From 01461da28691bd36d662c0e2f615efe760b8c7bc Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Wed, 23 Oct 2024 12:17:10 +0800 Subject: [PATCH] Fast publish (#548) * Process fast publish option * parallel negotiate with signal addtrackrequest --- engine.go | 2 +- localparticipant.go | 35 +++++++++++++++++------------------ transport.go | 2 +- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/engine.go b/engine.go index 4364fcb3..7ff01767 100644 --- a/engine.go +++ b/engine.go @@ -144,7 +144,7 @@ func (e *RTCEngine) Join(url string, token string, params *connectParams) (*live e.client.Start() // send offer - if !res.SubscriberPrimary { + if !res.SubscriberPrimary || res.FastPublish { if publisher, ok := e.Publisher(); ok { publisher.Negotiate() } else { diff --git a/localparticipant.go b/localparticipant.go index c650f088..2ab2053a 100644 --- a/localparticipant.go +++ b/localparticipant.go @@ -55,6 +55,11 @@ func (p *LocalParticipant) PublishTrack(track webrtc.TrackLocal, opts *TrackPubl } } + publisher, ok := p.engine.Publisher() + if !ok { + return nil, ErrNoPeerConnection + } + pub := NewLocalTrackPublication(kind, track, *opts, p.engine.client) pub.onMuteChanged = p.onTrackMuted @@ -88,21 +93,6 @@ func (p *LocalParticipant) PublishTrack(track webrtc.TrackLocal, opts *TrackPubl return nil, err } - pubChan := p.engine.TrackPublishedChan() - var pubRes *livekit.TrackPublishedResponse - - select { - case pubRes = <-pubChan: - break - case <-time.After(trackPublishTimeout): - return nil, ErrTrackPublishTimeout - } - - publisher, ok := p.engine.Publisher() - if !ok { - return nil, ErrNoPeerConnection - } - // add transceivers transceiver, err := publisher.PeerConnection().AddTransceiverFromTrack(track, webrtc.RTPTransceiverInit{ Direction: webrtc.RTPTransceiverDirectionSendonly, @@ -115,11 +105,21 @@ func (p *LocalParticipant) PublishTrack(track webrtc.TrackLocal, opts *TrackPubl _, isSampleTrack := track.(*LocalTrack) pub.setSender(transceiver.Sender(), !isSampleTrack) + publisher.Negotiate() + + pubChan := p.engine.TrackPublishedChan() + var pubRes *livekit.TrackPublishedResponse + + select { + case pubRes = <-pubChan: + break + case <-time.After(trackPublishTimeout): + return nil, ErrTrackPublishTimeout + } + pub.updateInfo(pubRes.Track) p.addPublication(pub) - publisher.Negotiate() - p.engine.log.Infow("published track", "name", opts.Name, "source", opts.Source.String(), "trackID", pubRes.Track.Sid) return pub, nil @@ -497,7 +497,6 @@ func (p *LocalParticipant) onTrackMuted(pub *LocalTrackPublication, muted bool) // `TrackPermission.AllTracks` are false), any newer published tracks // will not grant permissions to any participants and will require a subsequent // permissions update to allow subscription. -// func (p *LocalParticipant) SetSubscriptionPermission(sp *livekit.SubscriptionPermission) { p.lock.Lock() p.subscriptionPermission = proto.Clone(sp).(*livekit.SubscriptionPermission) diff --git a/transport.go b/transport.go index d574908b..15a99670 100644 --- a/transport.go +++ b/transport.go @@ -38,7 +38,7 @@ import ( ) const ( - negotiationFrequency = 150 * time.Millisecond + negotiationFrequency = 20 * time.Millisecond dtlsRetransmissionInterval = 100 * time.Millisecond iceDisconnectedTimeout = 10 * time.Second