Skip to content

Commit

Permalink
Support SIP DTMF data messages (#1130)
Browse files Browse the repository at this point in the history
* Support SIP DTMF data messages
  • Loading branch information
dennwc authored May 17, 2024
1 parent 48c1bbd commit 1e8465a
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 12 deletions.
5 changes: 5 additions & 0 deletions .changeset/fifty-steaks-fly.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-client": minor
---

Support SIP DTMF data messages.
29 changes: 23 additions & 6 deletions src/room/RTCEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
TrackUnpublishedResponse,
Transcription,
UpdateSubscription,
UserPacket,
type UserPacket,
} from '@livekit/protocol';
import { EventEmitter } from 'events';
import type { MediaAttributes } from 'sdp-transform';
Expand Down Expand Up @@ -648,10 +648,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
if (dp.value?.case === 'speaker') {
// dispatch speaker updates
this.emit(EngineEvent.ActiveSpeakersUpdate, dp.value.value.speakers);
} else if (dp.value?.case === 'user') {
this.emit(EngineEvent.DataPacketReceived, dp.value.value, dp.kind);
} else if (dp.value?.case === 'transcription') {
this.emit(EngineEvent.TranscriptionReceived, dp.value.value);
} else {
if (dp.value?.case === 'user') {
// compatibility
applyUserDataCompat(dp, dp.value.value);
}
this.emit(EngineEvent.DataPacketReceived, dp);
}
} finally {
unlock();
Expand Down Expand Up @@ -1392,7 +1394,7 @@ export type EngineEventCallbacks = {
receiver?: RTCRtpReceiver,
) => void;
activeSpeakersUpdate: (speakers: Array<SpeakerInfo>) => void;
dataPacketReceived: (userPacket: UserPacket, kind: DataPacket_Kind) => void;
dataPacketReceived: (packet: DataPacket) => void;
transcriptionReceived: (transcription: Transcription) => void;
transportsCreated: (publisher: PCTransport, subscriber: PCTransport) => void;
/** @internal */
Expand All @@ -1415,3 +1417,18 @@ export type EngineEventCallbacks = {
function supportOptionalDatachannel(protocol: number | undefined): boolean {
return protocol !== undefined && protocol > 13;
}

function applyUserDataCompat(newObj: DataPacket, oldObj: UserPacket) {
const participantIdentity = newObj.participantIdentity
? newObj.participantIdentity
: oldObj.participantIdentity;
newObj.participantIdentity = participantIdentity;
oldObj.participantIdentity = participantIdentity;

const destinationIdentities =
newObj.destinationIdentities.length !== 0
? newObj.destinationIdentities
: oldObj.destinationIdentities;
newObj.destinationIdentities = destinationIdentities;
oldObj.destinationIdentities = destinationIdentities;
}
35 changes: 30 additions & 5 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
ConnectionQualityUpdate,
type DataPacket,
DataPacket_Kind,
DisconnectReason,
JoinResponse,
Expand All @@ -11,6 +12,7 @@ import {
Room as RoomModel,
ServerInfo,
SimulateScenario,
SipDTMF,
SpeakerInfo,
StreamStateUpdate,
SubscriptionError,
Expand Down Expand Up @@ -334,7 +336,6 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
})
.on(EngineEvent.ActiveSpeakersUpdate, this.handleActiveSpeakersUpdate)
.on(EngineEvent.DataPacketReceived, this.handleDataPacket)
.on(EngineEvent.TranscriptionReceived, this.handleTranscription)
.on(EngineEvent.Resuming, () => {
this.clearConnectionReconcile();
this.isResuming = true;
Expand Down Expand Up @@ -1472,24 +1473,47 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
pub.setSubscriptionError(update.err);
};

private handleDataPacket = (userPacket: UserPacket, kind: DataPacket_Kind) => {
private handleDataPacket = (packet: DataPacket) => {
// find the participant
const participant = this.remoteParticipants.get(userPacket.participantIdentity);
const participant = this.remoteParticipants.get(packet.participantIdentity);
if (packet.value.case === 'user') {
this.handleUserPacket(participant, packet.value.value, packet.kind);
} else if (packet.value.case === 'transcription') {
this.handleTranscription(participant, packet.value.value);
} else if (packet.value.case === 'sipDtmf') {
this.handleSipDtmf(participant, packet.value.value);
}
};

private handleUserPacket = (
participant: RemoteParticipant | undefined,
userPacket: UserPacket,
kind: DataPacket_Kind,
) => {
this.emit(RoomEvent.DataReceived, userPacket.payload, participant, kind, userPacket.topic);

// also emit on the participant
participant?.emit(ParticipantEvent.DataReceived, userPacket.payload, kind);
};

private handleSipDtmf = (participant: RemoteParticipant | undefined, dtmf: SipDTMF) => {
this.emit(RoomEvent.SipDTMFReceived, dtmf, participant);

// also emit on the participant
participant?.emit(ParticipantEvent.SipDTMFReceived, dtmf);
};

bufferedSegments: Map<string, TranscriptionSegmentModel> = new Map();

private handleTranscription = (transcription: TranscriptionModel) => {
private handleTranscription = (
remoteParticipant: RemoteParticipant | undefined,
transcription: TranscriptionModel,
) => {
// find the participant
const participant =
transcription.participantIdentity === this.localParticipant.identity
? this.localParticipant
: this.remoteParticipants.get(transcription.participantIdentity);
: remoteParticipant;
const publication = participant?.trackPublications.get(transcription.trackId);

const segments = extractTranscriptionSegments(transcription);
Expand Down Expand Up @@ -2099,6 +2123,7 @@ export type RoomEventCallbacks = {
kind?: DataPacket_Kind,
topic?: string,
) => void;
sipDTMFReceived: (dtmf: SipDTMF, participant?: RemoteParticipant) => void;
transcriptionReceived: (
transcription: TranscriptionSegment[],
participant?: Participant,
Expand Down
15 changes: 14 additions & 1 deletion src/room/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ export enum RoomEvent {
*/
DataReceived = 'dataReceived',

/**
* SIP DTMF tones received from another participant.
*
* args: (participant: [[Participant]], dtmf: [[DataPacket_Kind]])
*/
SipDTMFReceived = 'sipDTMFReceived',

/**
* Transcription received from a participant's track.
* @beta
Expand Down Expand Up @@ -408,6 +415,13 @@ export enum ParticipantEvent {
*/
DataReceived = 'dataReceived',

/**
* SIP DTMF tones received from this participant as sender.
*
* args: (dtmf: [[DataPacket_Kind]])
*/
SipDTMFReceived = 'sipDTMFReceived',

/**
* Transcription received from this participant as data source.
* @beta
Expand Down Expand Up @@ -491,7 +505,6 @@ export enum EngineEvent {
MediaTrackAdded = 'mediaTrackAdded',
ActiveSpeakersUpdate = 'activeSpeakersUpdate',
DataPacketReceived = 'dataPacketReceived',
TranscriptionReceived = 'transcriptionReceived',
RTPVideoMapUpdate = 'rtpVideoMapUpdate',
DCBufferStatusChanged = 'dcBufferStatusChanged',
ParticipantUpdate = 'participantUpdate',
Expand Down
2 changes: 2 additions & 0 deletions src/room/participant/Participant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
ParticipantInfo,
ParticipantPermission,
ConnectionQuality as ProtoQuality,
type SipDTMF,
SubscriptionError,
} from '@livekit/protocol';
import { EventEmitter } from 'events';
Expand Down Expand Up @@ -329,6 +330,7 @@ export type ParticipantEventCallbacks = {
participantMetadataChanged: (prevMetadata: string | undefined, participant?: any) => void;
participantNameChanged: (name: string) => void;
dataReceived: (payload: Uint8Array, kind: DataPacket_Kind) => void;
sipDTMFReceived: (dtmf: SipDTMF) => void;
transcriptionReceived: (
transcription: TranscriptionSegment[],
publication?: TrackPublication,
Expand Down

0 comments on commit 1e8465a

Please sign in to comment.