Skip to content

Commit

Permalink
Include PR livekit#3050
Browse files Browse the repository at this point in the history
  • Loading branch information
pabloFuente committed Oct 10, 2024
1 parent 1a1f98b commit a1166c7
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 6 deletions.
27 changes: 24 additions & 3 deletions pkg/rtc/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ type ParticipantImpl struct {

tracksQuality map[livekit.TrackID]livekit.ConnectionQuality

// reliable data packets received while transitioning from JOINED to ACTIVE state. Stored for later delivery
reliableDataPacketsQueue []*livekit.DataPacket

// loggers for publisher and subscriber
pubLogger logger.Logger
subLogger logger.Logger
Expand Down Expand Up @@ -270,9 +273,10 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeData, params.SID),
params.SID,
params.Telemetry),
tracksQuality: make(map[livekit.TrackID]livekit.ConnectionQuality),
pubLogger: params.Logger.WithComponent(sutils.ComponentPub),
subLogger: params.Logger.WithComponent(sutils.ComponentSub),
tracksQuality: make(map[livekit.TrackID]livekit.ConnectionQuality),
reliableDataPacketsQueue: make([]*livekit.DataPacket, 0),
pubLogger: params.Logger.WithComponent(sutils.ComponentPub),
subLogger: params.Logger.WithComponent(sutils.ComponentSub),
}
if !params.DisableSupervisor {
p.supervisor = supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger})
Expand Down Expand Up @@ -2741,3 +2745,20 @@ func (p *ParticipantImpl) UpdateVideoTrack(update *livekit.UpdateLocalVideoTrack
p.pubLogger.Debugw("could not locate track", "trackID", update.TrackSid)
return errors.New("could not find track")
}

func (p *ParticipantImpl) StoreReliableDataPacketForLaterDelivery(dp *livekit.DataPacket) {
p.reliableDataPacketsQueue = append(p.reliableDataPacketsQueue, dp)
}

func (p *ParticipantImpl) DeliverStoredReliableDataPackets() {
for _, dp := range p.reliableDataPacketsQueue {
var dpData, err = proto.Marshal(dp)
if err != nil {
logger.Errorw("failed to marshal data packet", err)
continue
}
p.GetLogger().Debugw("resending stored reliable data packet", "source", dp.ParticipantIdentity, "destinationIdentities", dp.DestinationIdentities)
p.SendDataPacket(livekit.DataPacket_RELIABLE, dpData)
}
p.reliableDataPacketsQueue = p.reliableDataPacketsQueue[:0]
}
12 changes: 9 additions & 3 deletions pkg/rtc/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,9 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me
r.broadcastParticipantState(p, broadcastOptions{skipSource: true})

if state == livekit.ParticipantInfo_ACTIVE {
// send stored data packets received when the participant was in state JOINED waiting to be ACTIVE
p.DeliverStoredReliableDataPackets()

// subscribe participant to existing published tracks
r.subscribeToExistingTracks(p)

Expand Down Expand Up @@ -1765,9 +1768,6 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, kin

var dpData []byte
for _, op := range participants {
if op.State() != livekit.ParticipantInfo_ACTIVE {
continue
}
if source != nil && op.ID() == source.ID() {
continue
}
Expand All @@ -1776,6 +1776,12 @@ func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, kin
continue
}
}
if op.State() == livekit.ParticipantInfo_JOINED && kind == livekit.DataPacket_RELIABLE {
// If the destination participant is in state JOINED transitioning to ACTIVE and the data
// packet is of kind reliable store it for later delivery just after the participant becomes ACTIVE
op.StoreReliableDataPacketForLaterDelivery(dp)
continue
}
if dpData == nil {
var err error
dpData, err = proto.Marshal(dp)
Expand Down
5 changes: 5 additions & 0 deletions pkg/rtc/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,11 @@ type LocalParticipant interface {
GetPacer() pacer.Pacer

GetDisableSenderReportPassThrough() bool

// reliable data packets may be lost for participants currently in transition from JOINED to ACTIVE state
// these methods allow the participant to buffer these packets for later delivery just after reaching ACTIVE state
StoreReliableDataPacketForLaterDelivery(dpData *livekit.DataPacket)
DeliverStoredReliableDataPackets()
}

// Room is a container of participants, and can provide room-level actions
Expand Down
8 changes: 8 additions & 0 deletions pkg/rtc/types/typesfakes/fake_local_participant.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a1166c7

Please sign in to comment.