Skip to content

Commit

Permalink
conference: use TrackID as identifier for tracks
Browse files Browse the repository at this point in the history
Theoretically we don't need to use a combination of TrackID and StreamID
to uniquely identify tracks inside as long as the GUIDs are used for the
tracks.

Closes #56.
  • Loading branch information
daniel-abramov committed Dec 5, 2022
1 parent 43be4d6 commit ec5841b
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pkg/conference/data_channel_message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (c *Conference) processSelectDCMessage(participant *Participant, msg event.
if len(tracks) != len(msg.Start) {
for _, expected := range msg.Start {
found := slices.IndexFunc(tracks, func(track *webrtc.TrackLocalStaticRTP) bool {
return track.StreamID() == expected.StreamID && track.ID() == expected.TrackID
return track.ID() == expected.TrackID
})

if found == -1 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/conference/matrix_message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *Conference) onNewParticipant(participantID ParticipantID, inviteEvent *
logger: logger,
remoteSessionID: inviteEvent.SenderSessionID,
streamMetadata: inviteEvent.SDPStreamMetadata,
publishedTracks: make(map[event.SFUTrackDescription]PublishedTrack),
publishedTracks: make(map[string]PublishedTrack),
}

c.participants[participantID] = participant
Expand Down
2 changes: 1 addition & 1 deletion pkg/conference/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Participant struct {
peer *peer.Peer[ParticipantID]
remoteSessionID id.SessionID
streamMetadata event.CallSDPStreamMetadata
publishedTracks map[event.SFUTrackDescription]PublishedTrack
publishedTracks map[string]PublishedTrack
}

func (p *Participant) asMatrixRecipient() signaling.MatrixRecipient {
Expand Down
21 changes: 7 additions & 14 deletions pkg/conference/peer_message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,19 @@ func (c *Conference) processLeftTheCallMessage(participant *Participant, msg pee

func (c *Conference) processNewTrackPublishedMessage(participant *Participant, msg peer.NewTrackPublished) {
participant.logger.Infof("Published new track: %s", msg.Track.ID())
key := event.SFUTrackDescription{
StreamID: msg.Track.StreamID(),
TrackID: msg.Track.ID(),
}

if _, ok := participant.publishedTracks[key]; ok {
c.logger.Errorf("Track already published: %v", key)
if _, ok := participant.publishedTracks[msg.Track.ID()]; ok {
c.logger.Errorf("Track already published: %v", msg.Track.ID())
return
}

participant.publishedTracks[key] = PublishedTrack{track: msg.Track}
participant.publishedTracks[msg.Track.ID()] = PublishedTrack{track: msg.Track}
c.resendMetadataToAllExcept(participant.id)
}

func (c *Conference) processPublishedTrackFailedMessage(participant *Participant, msg peer.PublishedTrackFailed) {
participant.logger.Infof("Failed published track: %s", msg.Track.ID())
delete(participant.publishedTracks, event.SFUTrackDescription{
StreamID: msg.Track.StreamID(),
TrackID: msg.Track.ID(),
})
delete(participant.publishedTracks, msg.Track.ID())

for _, otherParticipant := range c.participants {
if otherParticipant.id == participant.id {
Expand Down Expand Up @@ -116,9 +109,9 @@ func (c *Conference) processDataChannelAvailableMessage(participant *Participant

func (c *Conference) processForwardRTCPMessage(msg peer.RTCPReceived) {
for _, participant := range c.participants {
for _, publishedTrack := range participant.publishedTracks {
if publishedTrack.track.StreamID() == msg.StreamID && publishedTrack.track.ID() == msg.TrackID {
err := participant.peer.WriteRTCP(msg.Packets, msg.StreamID, msg.TrackID, publishedTrack.lastPLITimestamp)
for id, publishedTrack := range participant.publishedTracks {
if id == msg.TrackID {
err := participant.peer.WriteRTCP(msg.Packets, msg.TrackID, publishedTrack.lastPLITimestamp)
if err == nil {
publishedTrack.lastPLITimestamp = time.Now()
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/conference/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (c *Conference) removeParticipant(participantID ParticipantID) {
for _, publishedTrack := range participant.publishedTracks {
obsoleteTracks = append(obsoleteTracks, publishedTrack.track)
}

for _, otherParticipant := range c.participants {
otherParticipant.peer.UnsubscribeFrom(obsoleteTracks)
}
Expand Down Expand Up @@ -98,7 +99,7 @@ func (c *Conference) getTracks(identifiers []event.SFUTrackDescription) []*webrt
for _, participant := range c.participants {
// Check if this participant has any of the tracks that we're looking for.
for _, identifier := range identifiers {
if track, ok := participant.publishedTracks[identifier]; ok {
if track, ok := participant.publishedTracks[identifier.TrackID]; ok {
tracks = append(tracks, track.track)
}
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/peer/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type DataChannelMessage struct {
type DataChannelAvailable struct{}

type RTCPReceived struct {
Packets []rtcp.Packet
StreamID string
TrackID string
TrackID string
Packets []rtcp.Packet
}
6 changes: 3 additions & 3 deletions pkg/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,21 @@ func (p *Peer[ID]) SubscribeTo(track *webrtc.TrackLocalStaticRTP) error {
p.logger.WithError(err).Warn("failed to read RTCP on track")
}

p.sink.Send(RTCPReceived{Packets: packets, TrackID: track.ID(), StreamID: track.StreamID()})
p.sink.Send(RTCPReceived{Packets: packets, TrackID: track.ID()})
}
}()

return nil
}

func (p *Peer[ID]) WriteRTCP(packets []rtcp.Packet, streamID string, trackID string, lastPLITimestamp time.Time) error {
func (p *Peer[ID]) WriteRTCP(packets []rtcp.Packet, trackID string, lastPLITimestamp time.Time) error {
const minimalPLIInterval = time.Millisecond * 500

packetsToSend := []rtcp.Packet{}
var mediaSSRC uint32
receivers := p.peerConnection.GetReceivers()
receiverIndex := slices.IndexFunc(receivers, func(receiver *webrtc.RTPReceiver) bool {
return receiver.Track().ID() == trackID && receiver.Track().StreamID() == streamID
return receiver.Track().ID() == trackID
})

if receiverIndex == -1 {
Expand Down

0 comments on commit ec5841b

Please sign in to comment.