Skip to content

Commit

Permalink
Add support for new chat API (#979)
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasIO authored Sep 25, 2024
1 parent 95ab29a commit 22fa65e
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 129 deletions.
6 changes: 6 additions & 0 deletions .changeset/rotten-bikes-burn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@livekit/components-core": patch
"@livekit/components-react": patch
---

Add support for new chat API
4 changes: 4 additions & 0 deletions .npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dedupe-peer-dependents=true
resolve-peers-from-workspace-root=true
manage-package-manager-versions=true
package-manager-strict-version=true
2 changes: 1 addition & 1 deletion docs/storybook/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"dependencies": {
"@livekit/components-react": "workspace:*",
"@livekit/components-styles": "workspace:*",
"livekit-client": "^2.4.0",
"livekit-client": "^2.5.4",
"react": "^18.2.0",
"react-dom": "^18.2.0"
},
Expand Down
2 changes: 1 addition & 1 deletion examples/nextjs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"@livekit/components-react": "workspace:*",
"@livekit/components-styles": "workspace:*",
"@livekit/track-processors": "^0.3.2",
"livekit-client": "^2.4.0",
"livekit-client": "^2.5.4",
"livekit-server-sdk": "^2.6.1",
"next": "^12.3.4",
"react": "^18.2.0",
Expand Down
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@
"turbo": "^2.1.1",
"typescript": "5.4.2"
},
"dependencies": {
"livekit-client": "^2.5.4"
},
"engines": {
"node": ">=18"
},
"packageManager": "pnpm@9.2.0"
"packageManager": "pnpm@9.10.0"
}
61 changes: 42 additions & 19 deletions packages/core/etc/components-core.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

import type { AudioCaptureOptions } from 'livekit-client';
import { BehaviorSubject } from 'rxjs';
import { ChatMessage } from 'livekit-client';
import { ConnectionQuality } from 'livekit-client';
import { ConnectionState } from 'livekit-client';
import { DataPacket_Kind } from 'livekit-client';
import type { DataPublishOptions } from 'livekit-client';
import { LocalAudioTrack } from 'livekit-client';
import type { LocalParticipant } from 'livekit-client';
import { LocalParticipant } from 'livekit-client';
import { LocalVideoTrack } from 'livekit-client';
import loglevel from 'loglevel';
import { Observable } from 'rxjs';
Expand Down Expand Up @@ -72,20 +73,12 @@ export interface BaseDataMessage<T extends string | undefined> {
// @public (undocumented)
export type CaptureOptionsBySource<T extends ToggleSource> = T extends Track.Source.Camera ? VideoCaptureOptions : T extends Track.Source.Microphone ? AudioCaptureOptions : T extends Track.Source.ScreenShare ? ScreenShareCaptureOptions : never;

// @public (undocumented)
export interface ChatMessage {
// (undocumented)
id: string;
// (undocumented)
message: string;
// (undocumented)
timestamp: number;
}
export { ChatMessage }

// @public (undocumented)
export type ChatOptions = {
messageEncoder?: (message: ChatMessage) => Uint8Array;
messageDecoder?: (message: Uint8Array) => ReceivedChatMessage;
messageEncoder?: (message: LegacyChatMessage) => Uint8Array;
messageDecoder?: (message: Uint8Array) => LegacyReceivedChatMessage;
channelTopic?: string;
updateChannelTopic?: string;
};
Expand Down Expand Up @@ -115,6 +108,9 @@ export function connectionStateObserver(room: Room): Observable<ConnectionState>
// @public (undocumented)
export function createActiveDeviceObservable(room: Room, kind: MediaDeviceKind): Observable<string | undefined>;

// @public (undocumented)
export function createChatObserver(room: Room): Observable<[message: ChatMessage, participant?: LocalParticipant | RemoteParticipant | undefined]>;

// @public (undocumented)
export function createConnectionQualityObserver(participant: Participant): Observable<ConnectionQuality>;

Expand Down Expand Up @@ -263,6 +259,18 @@ export function isTrackReferencePlaceholder(trackReference?: TrackReferenceOrPla
// @internal (undocumented)
export function isWeb(): boolean;

// @public (undocumented)
export interface LegacyChatMessage extends ChatMessage {
// (undocumented)
ignore?: true;
}

// @public (undocumented)
export interface LegacyReceivedChatMessage extends ReceivedChatMessage {
// (undocumented)
ignore?: true;
}

// @alpha
export function loadUserChoices(defaults?: Partial<LocalUserChoices>,
preventLoad?: boolean): LocalUserChoices;
Expand All @@ -287,11 +295,11 @@ export type MediaToggleType<T extends ToggleSource> = {
enabledObserver: Observable<boolean>;
};

// @public (undocumented)
export type MessageDecoder = (message: Uint8Array) => ReceivedChatMessage;
// @public @deprecated (undocumented)
export type MessageDecoder = (message: Uint8Array) => LegacyReceivedChatMessage;

// @public (undocumented)
export type MessageEncoder = (message: ChatMessage) => Uint8Array;
// @public @deprecated (undocumented)
export type MessageEncoder = (message: LegacyChatMessage) => Uint8Array;

// @public (undocumented)
export function mutedObserver(trackRef: TrackReferenceOrPlaceholder): Observable<boolean>;
Expand Down Expand Up @@ -400,8 +408,6 @@ export type PinState = TrackReferenceOrPlaceholder[];

// @public (undocumented)
export interface ReceivedChatMessage extends ChatMessage {
// (undocumented)
editTimestamp?: number;
// (undocumented)
from?: Participant;
}
Expand Down Expand Up @@ -492,7 +498,24 @@ export function setupChat(room: Room, options?: ChatOptions): {
messageObservable: Observable<ReceivedChatMessage[]>;
isSendingObservable: BehaviorSubject<boolean>;
send: (message: string) => Promise<ChatMessage>;
update: (message: string, messageId: string) => Promise<ChatMessage>;
update: (message: string, originalMessageOrId: string | ChatMessage) => Promise<{
readonly message: string;
readonly editTimestamp: number;
readonly id: string;
readonly timestamp: number;
}>;
};

// @public (undocumented)
export function setupChatMessageHandler(room: Room): {
chatObservable: Observable<[message: ChatMessage, participant?: LocalParticipant | RemoteParticipant | undefined]>;
send: (text: string) => Promise<ChatMessage>;
edit: (text: string, originalMsg: ChatMessage) => Promise<{
readonly message: string;
readonly editTimestamp: number;
readonly id: string;
readonly timestamp: number;
}>;
};

// @public (undocumented)
Expand Down
4 changes: 2 additions & 2 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@
"rxjs": "7.8.1"
},
"peerDependencies": {
"livekit-client": "^2.4.0",
"@livekit/protocol": "^1.20.1",
"livekit-client": "^2.5.4",
"tslib": "^2.6.2"
},
"devDependencies": {
"@livekit/components-styles": "workspace:*",
"@livekit/protocol": "^1.22.0",
"@microsoft/api-extractor": "^7.36.0",
"@size-limit/file": "^11.0.2",
"@size-limit/webpack": "^11.0.2",
Expand Down
130 changes: 84 additions & 46 deletions packages/core/src/components/chat.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,49 @@
/* eslint-disable camelcase */
import type { Participant, Room } from 'livekit-client';
import type { Participant, Room, ChatMessage } from 'livekit-client';
import { RoomEvent } from 'livekit-client';
import { BehaviorSubject, Subject, scan, map, takeUntil } from 'rxjs';
import { DataTopic, sendMessage, setupDataMessageHandler } from '../observables/dataChannel';
import { BehaviorSubject, Subject, scan, map, takeUntil, merge } from 'rxjs';
import {
DataTopic,
sendMessage,
setupChatMessageHandler,
setupDataMessageHandler,
} from '../observables/dataChannel';

/** @public */
export interface ChatMessage {
id: string;
timestamp: number;
message: string;
}
export type { ChatMessage };

/** @public */
export interface ReceivedChatMessage extends ChatMessage {
from?: Participant;
editTimestamp?: number;
}

/** @public */
export type MessageEncoder = (message: ChatMessage) => Uint8Array;
/** @public */
export type MessageDecoder = (message: Uint8Array) => ReceivedChatMessage;
export interface LegacyChatMessage extends ChatMessage {
ignore?: true;
}

export interface LegacyReceivedChatMessage extends ReceivedChatMessage {
ignore?: true;
}

/**
* @public
* @deprecated the new chat API doesn't rely on encoders and decoders anymore and uses a dedicated chat API instead
*/
export type MessageEncoder = (message: LegacyChatMessage) => Uint8Array;
/**
* @public
* @deprecated the new chat API doesn't rely on encoders and decoders anymore and uses a dedicated chat API instead
*/
export type MessageDecoder = (message: Uint8Array) => LegacyReceivedChatMessage;
/** @public */
export type ChatOptions = {
messageEncoder?: (message: ChatMessage) => Uint8Array;
messageDecoder?: (message: Uint8Array) => ReceivedChatMessage;
/** @deprecated the new chat API doesn't rely on encoders and decoders anymore and uses a dedicated chat API instead */
messageEncoder?: (message: LegacyChatMessage) => Uint8Array;
/** @deprecated the new chat API doesn't rely on encoders and decoders anymore and uses a dedicated chat API instead */
messageDecoder?: (message: Uint8Array) => LegacyReceivedChatMessage;
/** @deprecated the new chat API doesn't rely on topics anymore and uses a dedicated chat API instead */
channelTopic?: string;
/** @deprecated the new chat API doesn't rely on topics anymore and uses a dedicated chat API instead */
updateChannelTopic?: string;
};

Expand All @@ -40,9 +58,10 @@ const decoder = new TextDecoder();

const topicSubjectMap: Map<Room, Map<string, Subject<RawMessage>>> = new Map();

const encode = (message: ChatMessage) => encoder.encode(JSON.stringify(message));
const encode = (message: LegacyReceivedChatMessage) => encoder.encode(JSON.stringify(message));

const decode = (message: Uint8Array) => JSON.parse(decoder.decode(message)) as ReceivedChatMessage;
const decode = (message: Uint8Array) =>
JSON.parse(decoder.decode(message)) as LegacyReceivedChatMessage | ReceivedChatMessage;

export function setupChat(room: Room, options?: ChatOptions) {
const onDestroyObservable = new Subject<void>();
Expand All @@ -67,17 +86,33 @@ export function setupChat(room: Room, options?: ChatOptions) {
const { messageObservable } = setupDataMessageHandler(room, [topic, updateTopic]);
messageObservable.pipe(takeUntil(onDestroyObservable)).subscribe(messageSubject);
}
const { chatObservable, send: sendChatMessage } = setupChatMessageHandler(room);

const finalMessageDecoder = messageDecoder ?? decode;

/** Build up the message array over time. */
const messagesObservable = messageSubject.pipe(
map((msg) => {
const parsedMessage = finalMessageDecoder(msg.payload);
const newMessage: ReceivedChatMessage = { ...parsedMessage, from: msg.from };
return newMessage;
}),
scan<ReceivedChatMessage, ReceivedChatMessage[]>((acc, value) => {
const messagesObservable = merge(
messageSubject.pipe(
map((msg) => {
const parsedMessage = finalMessageDecoder(msg.payload);
const newMessage = { ...parsedMessage, from: msg.from };
if (isIgnorableChatMessage(newMessage)) {
return undefined;
}
return newMessage;
}),
),
chatObservable.pipe(
map(([msg, participant]) => {
return { ...msg, from: participant };
}),
),
).pipe(
scan<ReceivedChatMessage | undefined, ReceivedChatMessage[]>((acc, value) => {
// ignore legacy message updates
if (!value) {
return acc;
}
// handle message updates
if (
'id' in value &&
Expand All @@ -89,7 +124,7 @@ export function setupChat(room: Room, options?: ChatOptions) {
acc[replaceIndex] = {
...value,
timestamp: originalMsg.timestamp,
editTimestamp: value.timestamp,
editTimestamp: value.editTimestamp ?? value.timestamp,
};
}

Expand All @@ -105,43 +140,35 @@ export function setupChat(room: Room, options?: ChatOptions) {
const finalMessageEncoder = messageEncoder ?? encode;

const send = async (message: string) => {
const timestamp = Date.now();
const id = crypto.randomUUID();
const chatMessage: ChatMessage = { id, message, timestamp };
const encodedMsg = finalMessageEncoder(chatMessage);
isSending$.next(true);
try {
await sendMessage(room.localParticipant, encodedMsg, {
const chatMessage = await sendChatMessage(message);
const encodedLegacyMsg = finalMessageEncoder({ ...chatMessage, ignore: true });
await sendMessage(room.localParticipant, encodedLegacyMsg, {
reliable: true,
topic,
});
messageSubject.next({
payload: encodedMsg,
topic: topic,
from: room.localParticipant,
});
return chatMessage;
} finally {
isSending$.next(false);
}
};

const update = async (message: string, messageId: string) => {
const update = async (message: string, originalMessageOrId: string | ChatMessage) => {
const timestamp = Date.now();
const chatMessage: ChatMessage = { id: messageId, message, timestamp };
const encodedMsg = finalMessageEncoder(chatMessage);
const originalMessage: ChatMessage =
typeof originalMessageOrId === 'string'
? { id: originalMessageOrId, message: '', timestamp }
: originalMessageOrId;
isSending$.next(true);
try {
await sendMessage(room.localParticipant, encodedMsg, {
const editedMessage = await room.localParticipant.editChatMessage(message, originalMessage);
const encodedLegacyMessage = finalMessageEncoder(editedMessage);
await sendMessage(room.localParticipant, encodedLegacyMessage, {
topic: updateTopic,
reliable: true,
});
messageSubject.next({
payload: encodedMsg,
topic: topic,
from: room.localParticipant,
});
return chatMessage;
return editedMessage;
} finally {
isSending$.next(false);
}
Expand All @@ -154,5 +181,16 @@ export function setupChat(room: Room, options?: ChatOptions) {
}
room.once(RoomEvent.Disconnected, destroy);

return { messageObservable: messagesObservable, isSendingObservable: isSending$, send, update };
return {
messageObservable: messagesObservable,
isSendingObservable: isSending$,
send,
update,
};
}

function isIgnorableChatMessage(
msg: ReceivedChatMessage | LegacyReceivedChatMessage,
): msg is ReceivedChatMessage {
return (msg as LegacyChatMessage).ignore == true;
}
Loading

0 comments on commit 22fa65e

Please sign in to comment.