diff --git a/go.mod b/go.mod index 1481315..056b2b7 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,8 @@ require ( ) require ( + github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/google/uuid v1.3.0 // indirect github.com/pion/datachannel v1.5.2 // indirect github.com/pion/dtls/v2 v2.2.4 // indirect @@ -34,6 +36,8 @@ require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect github.com/tidwall/sjson v1.2.4 // indirect + go.opentelemetry.io/otel v1.14.0 // indirect + go.opentelemetry.io/otel/trace v1.14.0 // indirect golang.org/x/crypto v0.5.0 // indirect golang.org/x/net v0.7.0 // indirect golang.org/x/sys v0.5.0 // indirect diff --git a/go.sum b/go.sum index 0b62d00..635a130 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,11 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= @@ -95,6 +100,7 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/tidwall/gjson v1.12.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.14.1 h1:iymTbGkQBhveq21bEvAQ81I0LEBork8BFe1CUZXdyuo= github.com/tidwall/gjson v1.14.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= @@ -106,6 +112,10 @@ github.com/tidwall/sjson v1.2.4 h1:cuiLzLnaMeBhRmEv00Lpk3tkYrcxpmbU81tAY4Dw0tc= github.com/tidwall/sjson v1.2.4/go.mod h1:098SZ494YoMWPmMO6ct4dcFnqxwj9r/gF0Etp19pSNM= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.opentelemetry.io/otel v1.14.0 h1:/79Huy8wbf5DnIPhemGB+zEPVwnN6fuQybr/SRXa6hM= +go.opentelemetry.io/otel v1.14.0/go.mod h1:o4buv+dJzx8rohcUeRmWUZhqupFvzWis188WlggnNeU= +go.opentelemetry.io/otel/trace v1.14.0 h1:wp2Mmvj41tDsyAJXiWDWpfNsOiIyd38fy85pyKcFq/M= +go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/pkg/conference/matrix_message_processing.go b/pkg/conference/matrix_message_processing.go index 8931301..c8ab4ad 100644 --- a/pkg/conference/matrix_message_processing.go +++ b/pkg/conference/matrix_message_processing.go @@ -7,8 +7,11 @@ import ( "github.com/matrix-org/waterfall/pkg/conference/participant" "github.com/matrix-org/waterfall/pkg/peer" "github.com/matrix-org/waterfall/pkg/signaling" + "github.com/matrix-org/waterfall/pkg/telemetry" "github.com/pion/webrtc/v3" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "maunium.net/go/mautrix/event" ) @@ -52,6 +55,8 @@ func (c *Conference) onNewParticipant(id participant.ID, inviteEvent *event.Call peerConnection, answer, err := peer.NewPeer(c.connectionFactory, inviteEvent.Offer.SDP, messageSink, logger) if err != nil { logger.WithError(err).Errorf("Failed to process SDP offer") + telemetrySpan := trace.SpanFromContext(c.telemetryContext) + telemetrySpan.RecordError(err) return err } @@ -67,12 +72,17 @@ func (c *Conference) onNewParticipant(id participant.ID, inviteEvent *event.Call OnTimeout: func() { messageSink.Send(peer.LeftTheCall{event.CallHangupKeepAliveTimeout}) }, } + participantContext, participantSpan := telemetry.TRACER.Start(c.telemetryContext, "Participant") + participantSpan.SetAttributes(attribute.String("user_id", id.UserID.String())) + participantSpan.SetAttributes(attribute.String("device_id", id.DeviceID.String())) + p = &participant.Participant{ - ID: id, - Peer: peerConnection, - Logger: logger, - RemoteSessionID: inviteEvent.SenderSessionID, - Pong: heartbeat.Start(), + ID: id, + Peer: peerConnection, + Logger: logger, + RemoteSessionID: inviteEvent.SenderSessionID, + Pong: heartbeat.Start(), + TelemetryContext: participantContext, } c.tracker.AddParticipant(p) @@ -100,6 +110,8 @@ func (c *Conference) onNewParticipant(id participant.ID, inviteEvent *event.Call func (c *Conference) onCandidates(id participant.ID, ev *event.CallCandidatesEventContent) { if participant := c.getParticipant(id); participant != nil { participant.Logger.Debug("Received remote ICE candidates") + telemetryContext := trace.SpanFromContext(participant.TelemetryContext) + telemetryContext.AddEvent("Received remote ICE candidates") // Convert the candidates to the WebRTC format. candidates := make([]webrtc.ICECandidateInit, len(ev.Candidates)) @@ -122,6 +134,8 @@ func (c *Conference) onCandidates(id participant.ID, ev *event.CallCandidatesEve func (c *Conference) onSelectAnswer(id participant.ID, ev *event.CallSelectAnswerEventContent) { if participant := c.getParticipant(id); participant != nil { participant.Logger.Info("Received remote answer selection") + telemetryContext := trace.SpanFromContext(participant.TelemetryContext) + telemetryContext.AddEvent("Received remote answer selection") if ev.SelectedPartyID != string(c.matrixWorker.deviceID) { c.logger.WithFields(logrus.Fields{ @@ -137,6 +151,8 @@ func (c *Conference) onSelectAnswer(id participant.ID, ev *event.CallSelectAnswe func (c *Conference) onHangup(id participant.ID, ev *event.CallHangupEventContent) { if participant := c.getParticipant(id); participant != nil { participant.Logger.Info("Received remote hangup") + telemetryContext := trace.SpanFromContext(participant.TelemetryContext) + telemetryContext.AddEvent("Received remote hangup") c.removeParticipant(id) } } diff --git a/pkg/conference/participant/participant.go b/pkg/conference/participant/participant.go index 08f13ce..e16e44c 100644 --- a/pkg/conference/participant/participant.go +++ b/pkg/conference/participant/participant.go @@ -1,6 +1,8 @@ package participant import ( + "context" + "github.com/matrix-org/waterfall/pkg/peer" "github.com/matrix-org/waterfall/pkg/signaling" "github.com/sirupsen/logrus" @@ -16,13 +18,19 @@ type ID struct { CallID string } +func (id ID) String() string { + return string(id.UserID) + "/" + string(id.DeviceID) +} + // Participant represents a participant in the conference. type Participant struct { ID ID - Logger *logrus.Entry Peer *peer.Peer[ID] RemoteSessionID id.SessionID Pong chan<- Pong + + Logger *logrus.Entry + TelemetryContext context.Context } func (p *Participant) AsMatrixRecipient() signaling.MatrixRecipient { diff --git a/pkg/conference/participant/tracker.go b/pkg/conference/participant/tracker.go index 7b0a402..7113ecf 100644 --- a/pkg/conference/participant/tracker.go +++ b/pkg/conference/participant/tracker.go @@ -6,6 +6,7 @@ import ( "github.com/matrix-org/waterfall/pkg/conference/track" "github.com/matrix-org/waterfall/pkg/webrtc_ext" "github.com/pion/webrtc/v3" + "go.opentelemetry.io/otel/trace" ) type TrackStoppedMessage struct { @@ -62,6 +63,9 @@ func (t *Tracker) RemoveParticipant(participantID ID) map[string]bool { return make(map[string]bool) } + telemetrySpan := trace.SpanFromContext(participant.TelemetryContext) + defer telemetrySpan.End() + // Terminate the participant and remove it from the list. participant.Peer.Terminate() close(participant.Pong) @@ -113,6 +117,7 @@ func (t *Tracker) AddPublishedTrack( remoteTrack, metadata, participant.Logger, + participant.TelemetryContext, ) if err != nil { return err diff --git a/pkg/conference/peer_message_processing.go b/pkg/conference/peer_message_processing.go index c37e52d..ec34d3d 100644 --- a/pkg/conference/peer_message_processing.go +++ b/pkg/conference/peer_message_processing.go @@ -5,11 +5,18 @@ import ( published "github.com/matrix-org/waterfall/pkg/conference/track" "github.com/matrix-org/waterfall/pkg/peer" "github.com/matrix-org/waterfall/pkg/signaling" + "go.opentelemetry.io/otel/trace" "maunium.net/go/mautrix/event" ) func (c *Conference) processJoinedTheCallMessage(sender participant.ID, message peer.JoinedTheCall) { c.newLogger(sender).Info("Joined the call") + + if p := c.getParticipant(sender); p != nil { + telemetrySpan := trace.SpanFromContext(p.TelemetryContext) + telemetrySpan.AddEvent("joined the call") + return + } } func (c *Conference) processLeftTheCallMessage(sender participant.ID, msg peer.LeftTheCall) { @@ -48,6 +55,8 @@ func (c *Conference) processNewICECandidateMessage(sender participant.ID, msg pe } p.Logger.Debug("Received a new local ICE candidate") + telemetrySpan := trace.SpanFromContext(p.TelemetryContext) + telemetrySpan.AddEvent("received a new local ICE candidate") // Convert WebRTC ICE candidate to Matrix ICE candidate. jsonCandidate := msg.Candidate.ToJSON() @@ -80,6 +89,8 @@ func (c *Conference) processRenegotiationRequiredMessage(sender participant.ID, streamsMetadata := c.getAvailableStreamsFor(p.ID) p.Logger.Infof("Renegotiating, sending SDP offer (%d streams)", len(streamsMetadata)) + telemetrySpan := trace.SpanFromContext(p.TelemetryContext) + telemetrySpan.AddEvent("renegotiating, sending SDP offer") p.SendDataChannelMessage(event.Event{ Type: event.FocusCallNegotiate, @@ -175,6 +186,8 @@ func (c *Conference) processNegotiateMessage(p *participant.Participant, msg eve case event.CallDataTypeOffer: p.Logger.Info("New offer from peer received") p.Logger.WithField("SDP", msg.Description.SDP).Trace("Received SDP offer over DC") + telemetryContext := trace.SpanFromContext(p.TelemetryContext) + telemetryContext.AddEvent("new offer from peer received") answer, err := p.Peer.ProcessSDPOffer(msg.Description.SDP) if err != nil { @@ -197,6 +210,8 @@ func (c *Conference) processNegotiateMessage(p *participant.Participant, msg eve case event.CallDataTypeAnswer: p.Logger.Info("Renegotiation answer received") p.Logger.WithField("SDP", msg.Description.SDP).Trace("Received SDP answer over DC") + telemetryContext := trace.SpanFromContext(p.TelemetryContext) + telemetryContext.AddEvent("renegotiation answer received") if err := p.Peer.ProcessSDPAnswer(msg.Description.SDP); err != nil { p.Logger.Errorf("Failed to set SDP answer: %v", err) diff --git a/pkg/conference/processing.go b/pkg/conference/processing.go index 06208b1..e05dca2 100644 --- a/pkg/conference/processing.go +++ b/pkg/conference/processing.go @@ -4,16 +4,18 @@ import ( "github.com/matrix-org/waterfall/pkg/channel" "github.com/matrix-org/waterfall/pkg/conference/participant" "github.com/matrix-org/waterfall/pkg/peer" + "go.opentelemetry.io/otel/trace" "maunium.net/go/mautrix/event" ) // Listen on messages from incoming channels and process them. // This is essentially the main loop of the conference. // If this function returns, the conference is over. -func (c *Conference) processMessages(signalDone chan struct{}) { +func (c *Conference) processMessages(signalDone chan struct{}, telemetrySpan trace.Span) { // When the main loop of the conference ends, clean up the resources. defer close(signalDone) defer c.matrixWorker.stop() + defer telemetrySpan.End() for { select { diff --git a/pkg/conference/start.go b/pkg/conference/start.go index 521a97d..a55db07 100644 --- a/pkg/conference/start.go +++ b/pkg/conference/start.go @@ -17,12 +17,16 @@ limitations under the License. package conference import ( + "context" + "github.com/matrix-org/waterfall/pkg/channel" "github.com/matrix-org/waterfall/pkg/conference/participant" "github.com/matrix-org/waterfall/pkg/peer" "github.com/matrix-org/waterfall/pkg/signaling" + "github.com/matrix-org/waterfall/pkg/telemetry" "github.com/matrix-org/waterfall/pkg/webrtc_ext" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/id" ) @@ -39,13 +43,17 @@ func StartConference( inviteEvent *event.CallInviteEventContent, ) (<-chan struct{}, error) { signalDone := make(chan struct{}) - tracker, publishedTrackStopped := participant.NewParticipantTracker(signalDone) + + telemetryCtx, telemetrySpan := telemetry.TRACER.Start(context.Background(), "Conference") + telemetrySpan.SetAttributes(attribute.String("conference_id", confID)) + conference := &Conference{ id: confID, config: config, connectionFactory: peerConnectionFactory, logger: logrus.WithFields(logrus.Fields{"conf_id": confID}), + telemetryContext: telemetryCtx, matrixWorker: newMatrixWorker(signaling), tracker: tracker, streamsMetadata: make(event.CallSDPStreamMetadata), @@ -60,7 +68,7 @@ func StartConference( } // Start conference "main loop". - go conference.processMessages(signalDone) + go conference.processMessages(signalDone, telemetrySpan) return signalDone, nil } diff --git a/pkg/conference/state.go b/pkg/conference/state.go index 8c58f05..f2b7468 100644 --- a/pkg/conference/state.go +++ b/pkg/conference/state.go @@ -1,6 +1,8 @@ package conference import ( + "context" + "github.com/matrix-org/waterfall/pkg/channel" "github.com/matrix-org/waterfall/pkg/conference/participant" published "github.com/matrix-org/waterfall/pkg/conference/track" @@ -14,7 +16,9 @@ import ( type Conference struct { id string config Config - logger *logrus.Entry + + logger *logrus.Entry + telemetryContext context.Context connectionFactory *webrtc_ext.PeerConnectionFactory matrixWorker *matrixWorker diff --git a/pkg/conference/track/internal.go b/pkg/conference/track/internal.go index 4bf959b..e337a23 100644 --- a/pkg/conference/track/internal.go +++ b/pkg/conference/track/internal.go @@ -5,6 +5,8 @@ import ( "github.com/matrix-org/waterfall/pkg/webrtc_ext" "github.com/matrix-org/waterfall/pkg/worker" "github.com/pion/webrtc/v3" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type trackOwner[SubscriberID comparable] struct { @@ -52,6 +54,12 @@ func (p *PublishedTrack[SubscriberID]) addVideoPublisher(track *webrtc.TrackRemo simulcast := webrtc_ext.RIDToSimulcastLayer(track.RID()) p.video.publishers[simulcast] = pub + telemetrySpan := trace.SpanFromContext(p.telemetryContext) + defer telemetrySpan.AddEvent( + "video publisher added", + trace.WithAttributes(attribute.String("simulcast", simulcast.String())), + ) + // Listen on `done` and remove the track once it's done. p.activePublishers.Add(1) go func() { diff --git a/pkg/conference/track/track.go b/pkg/conference/track/track.go index fd71d51..480896f 100644 --- a/pkg/conference/track/track.go +++ b/pkg/conference/track/track.go @@ -1,24 +1,36 @@ package track import ( + "context" "fmt" "sync" "time" "github.com/matrix-org/waterfall/pkg/conference/publisher" "github.com/matrix-org/waterfall/pkg/conference/subscription" + "github.com/matrix-org/waterfall/pkg/telemetry" "github.com/matrix-org/waterfall/pkg/webrtc_ext" "github.com/matrix-org/waterfall/pkg/worker" "github.com/pion/webrtc/v3" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) +type SubscriberIdentifier interface { + comparable + fmt.Stringer +} + type TrackID = string // Represents a track that a peer has published (has already started sending to the SFU). -type PublishedTrack[SubscriberID comparable] struct { +type PublishedTrack[SubscriberID SubscriberIdentifier] struct { // Logger. logger *logrus.Entry + // Telemetry context. + telemetryContext context.Context // Info about the track. info webrtc_ext.TrackInfo // Owner of a published track. @@ -43,16 +55,22 @@ type PublishedTrack[SubscriberID comparable] struct { done chan struct{} } -func NewPublishedTrack[SubscriberID comparable]( +func NewPublishedTrack[SubscriberID SubscriberIdentifier]( ownerID SubscriberID, requestKeyFrame func(track *webrtc.TrackRemote) error, track *webrtc.TrackRemote, metadata TrackMetadata, logger *logrus.Entry, + parentTelemetryContext context.Context, ) (*PublishedTrack[SubscriberID], error) { + telemetryContext, telemetrySpan := telemetry.TRACER.Start(parentTelemetryContext, "PublishedTrack") + telemetrySpan.SetAttributes(attribute.String("track_id", track.ID())) + telemetrySpan.SetAttributes(attribute.String("type", track.Kind().String())) + published := &PublishedTrack[SubscriberID]{ logger: logger.WithField("track", track.ID()), info: webrtc_ext.TrackInfoFromTrack(track), + telemetryContext: telemetryContext, owner: trackOwner[SubscriberID]{ownerID, requestKeyFrame}, subscriptions: make(map[SubscriberID]subscription.Subscription), audio: &audioTrack{outputTrack: nil}, @@ -73,6 +91,9 @@ func NewPublishedTrack[SubscriberID comparable]( track.StreamID(), ) if err != nil { + telemetrySpan.SetStatus(codes.Error, err.Error()) + telemetrySpan.RecordError(err) + telemetrySpan.End() return nil, err } @@ -98,7 +119,7 @@ func NewPublishedTrack[SubscriberID comparable]( }, } - worker := worker.StartWorker[webrtc_ext.SimulcastLayer](workerConfig) + worker := worker.StartWorker(workerConfig) published.video.keyframeHandler = worker // Start video publisher. @@ -108,6 +129,7 @@ func NewPublishedTrack[SubscriberID comparable]( // Wait for all publishers to stop. go func() { defer close(published.done) + defer telemetrySpan.End() published.activePublishers.Wait() }() @@ -180,12 +202,22 @@ func (p *PublishedTrack[SubscriberID]) Subscribe( layer = getOptimalLayer(layers, p.metadata, requirements.MaxWidth, requirements.MaxHeight) } + telemetrySpan := trace.SpanFromContext(p.telemetryContext) + telemetryAttributes := trace.WithAttributes( + attribute.String("id", subscriberID.String()), + attribute.String("layer", layer.String()), + ) + + defer telemetrySpan.AddEvent("new subscriber", telemetryAttributes) + // If the subscription exists, let's see if we need to update it. if sub := p.subscriptions[subscriberID]; sub != nil { currentLayer := sub.Simulcast() // If we do, let's switch the layer. if currentLayer != layer { + defer telemetrySpan.AddEvent("switched layer", telemetryAttributes) + p.video.publishers[currentLayer].RemoveSubscription(sub) sub.SwitchLayer(layer) p.video.publishers[layer].AddSubscription(sub) @@ -213,6 +245,8 @@ func (p *PublishedTrack[SubscriberID]) Subscribe( // If there was an error, let's return it. if err != nil { + telemetrySpan.AddEvent("could not create subscription", telemetryAttributes) + telemetrySpan.RecordError(err) return err } @@ -234,6 +268,12 @@ func (p *PublishedTrack[SubscriberID]) Unsubscribe(subscriberID SubscriberID) { p.mutex.Lock() defer p.mutex.Unlock() + telemetrySpan := trace.SpanFromContext(p.telemetryContext) + defer telemetrySpan.AddEvent( + "unsubscribed", + trace.WithAttributes(attribute.String("id", subscriberID.String())), + ) + if sub := p.subscriptions[subscriberID]; sub != nil { sub.Unsubscribe() delete(p.subscriptions, subscriberID) diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go new file mode 100644 index 0000000..f507832 --- /dev/null +++ b/pkg/telemetry/telemetry.go @@ -0,0 +1,7 @@ +package telemetry + +import "go.opentelemetry.io/otel" + +const PACKAGE = "waterfall" + +var TRACER = otel.Tracer(PACKAGE)