diff --git a/examples/echotest/index.html b/examples/echotest/index.html index d329b21b7..af8cd0a05 100644 --- a/examples/echotest/index.html +++ b/examples/echotest/index.html @@ -63,6 +63,9 @@

Pion

+
+
Media
+
Pion

           
+
+
Data
+
+
+
+ + +
+
+

+        
+
@@ -286,13 +328,16 @@

Pion

integrity="sha384-B4gt1jrGC7Jh4AgTPSdUtOBvfO8shuf57BaghqFfPlYxofvL8/KUEfYiJOMMV+rV" crossorigin="anonymous" > - + - + - + - - - - - - diff --git a/pkg/api.go b/pkg/api.go index 62692d122..0c7efd16f 100644 --- a/pkg/api.go +++ b/pkg/api.go @@ -8,7 +8,7 @@ import ( ) const ( - channelLabel = "ion-sfu" + apiChannelLabel = "ion-sfu" videoHighQuality = "high" videoMediumQuality = "medium" diff --git a/pkg/errors.go b/pkg/errors.go index e7aac1e79..d0006a3e2 100644 --- a/pkg/errors.go +++ b/pkg/errors.go @@ -5,6 +5,7 @@ import "errors" var ( errPeerConnectionInitFailed = errors.New("pc init failed") errPtNotSupported = errors.New("payload type not supported") + errCreatingDataChannel = errors.New("failed to create data channel") // router errors errNoReceiverFound = errors.New("no receiver found") // Helpers errors diff --git a/pkg/publisher.go b/pkg/publisher.go index b266b3f47..89be48b27 100644 --- a/pkg/publisher.go +++ b/pkg/publisher.go @@ -16,7 +16,6 @@ type Publisher struct { session *Session candidates []webrtc.ICECandidateInit - onTrackHandler func(*webrtc.Track, *webrtc.RTPReceiver) onICEConnectionStateChangeHandler atomic.Value // func(webrtc.ICEConnectionState) closeOnce sync.Once @@ -44,9 +43,14 @@ func NewPublisher(session *Session, id string, me MediaEngine, cfg WebRTCTranspo if rr := p.router.AddReceiver(track, receiver); rr != nil { p.session.Publish(p.router, rr) } - if p.onTrackHandler != nil { - p.onTrackHandler(track, receiver) + }) + + pc.OnDataChannel(func(dc *webrtc.DataChannel) { + if dc.Label() == apiChannelLabel { + // terminate api data channel + return } + p.session.AddDatachannel(id, dc) }) pc.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { diff --git a/pkg/session.go b/pkg/session.go index 176bc6ecb..c7c60dbbb 100644 --- a/pkg/session.go +++ b/pkg/session.go @@ -4,6 +4,7 @@ import ( "sync" log "github.com/pion/ion-log" + "github.com/pion/webrtc/v3" ) // Session represents a set of peers. Transports inside a session @@ -47,6 +48,56 @@ func (s *Session) RemovePeer(pid string) { } } +func (s *Session) onMessage(origin, label string, msg webrtc.DataChannelMessage) { + s.mu.RLock() + defer s.mu.RUnlock() + for pid, p := range s.peers { + if origin == pid { + continue + } + + dc := p.subscriber.channels[label] + if dc != nil && dc.ReadyState() == webrtc.DataChannelStateOpen { + if msg.IsString { + dc.SendText(string(msg.Data)) + } else { + dc.Send(msg.Data) + } + } + } +} + +func (s *Session) AddDatachannel(owner string, dc *webrtc.DataChannel) { + label := dc.Label() + + s.mu.RLock() + defer s.mu.RUnlock() + + s.peers[owner].subscriber.channels[label] = dc + + dc.OnMessage(func(msg webrtc.DataChannelMessage) { + s.onMessage(owner, label, msg) + }) + + for pid, p := range s.peers { + // Don't add to self + if owner == pid { + continue + } + dc, err := p.subscriber.AddDataChannel(label) + + if err != nil { + log.Errorf("error adding datachannel: %s", err) + continue + } + + pid := pid + dc.OnMessage(func(msg webrtc.DataChannelMessage) { + s.onMessage(pid, label, msg) + }) + } +} + // Publish will add a Sender to all peers in current Session from given // Receiver func (s *Session) Publish(router Router, rr *receiverRouter) { diff --git a/pkg/subscriber.go b/pkg/subscriber.go index 047167a9f..cfef139b2 100644 --- a/pkg/subscriber.go +++ b/pkg/subscriber.go @@ -23,6 +23,7 @@ type Subscriber struct { me MediaEngine session *Session + channels map[string]*webrtc.DataChannel senders map[string][]Sender candidates []webrtc.ICECandidateInit @@ -44,14 +45,15 @@ func NewSubscriber(session *Session, id string, me MediaEngine, cfg WebRTCTransp } s := &Subscriber{ - id: id, - me: me, - pc: pc, - session: session, - senders: make(map[string][]Sender), + id: id, + me: me, + pc: pc, + session: session, + channels: make(map[string]*webrtc.DataChannel), + senders: make(map[string][]Sender), } - dc, err := pc.CreateDataChannel(channelLabel, &webrtc.DataChannelInit{}) + dc, err := pc.CreateDataChannel(apiChannelLabel, &webrtc.DataChannelInit{}) if err != nil { log.Errorf("DC creation error: %v", err) return nil, errPeerConnectionInitFailed @@ -151,6 +153,25 @@ func (s *Subscriber) AddSender(streamID string, sender Sender) { } } +func (s *Subscriber) AddDataChannel(label string) (*webrtc.DataChannel, error) { + s.Lock() + defer s.Unlock() + + if s.channels[label] != nil { + return s.channels[label], nil + } + + dc, err := s.pc.CreateDataChannel(label, &webrtc.DataChannelInit{}) + if err != nil { + log.Errorf("dc creation error: %v", err) + return nil, errCreatingDataChannel + } + + s.channels[label] = dc + + return dc, nil +} + // SetRemoteDescription sets the SessionDescription of the remote peer func (s *Subscriber) SetRemoteDescription(desc webrtc.SessionDescription) error { var err error