Skip to content

Commit

Permalink
use data class
Browse files Browse the repository at this point in the history
  • Loading branch information
bcherry committed Oct 28, 2024
1 parent 10f183a commit 508dac6
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 56 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,9 @@ room.localParticipant?.registerRpcMethod(
'greet',

// method handler - will be called when the method is invoked by a RemoteParticipant
async (requestId: string, callerIdentity: string, payload: string, responseTimeout: number) => {
console.log(`Received greeting from ${callerIdentity}: ${payload}`);
return `Hello, ${callerIdentity}!`;
async (data: RpcInvocationData) => {
console.log(`Received greeting from ${data.callerIdentity}: ${data.payload}`);
return `Hello, ${data.callerIdentity}!`;
}
);
```
Expand All @@ -338,7 +338,7 @@ try {
const response = await room.localParticipant!.performRpc(
'recipient-identity',
'greet',
'Hello from RPC!'
'Hello from RPC!',
);
console.log('RPC response:', response);
} catch (error) {
Expand Down
34 changes: 9 additions & 25 deletions examples/rpc/rpc-demo.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Room, type RoomConnectOptions, RoomEvent, RpcError } from '../../src/index';
import { Room, type RoomConnectOptions, RoomEvent, RpcError, RpcInvocationData } from '../../src/index';

let startTime: number;

Expand Down Expand Up @@ -72,31 +72,21 @@ const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room)
await greetersRoom.localParticipant?.registerRpcMethod(
'arrival',
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async (
requestId: string,
callerIdentity: string,
payload: string,
responseTimeout: number,
) => {
console.log(`[Greeter] Oh ${callerIdentity} arrived and said "${payload}"`);
async (data: RpcInvocationData) => {
console.log(`[Greeter] Oh ${data.callerIdentity} arrived and said "${data.payload}"`);
await new Promise((resolve) => setTimeout(resolve, 2000));
return 'Welcome and have a wonderful day!';
},
);

await mathGeniusRoom.localParticipant?.registerRpcMethod(
'square-root',
async (
requestId: string,
callerIdentity: string,
payload: string,
responseTimeout: number,
) => {
const jsonData = JSON.parse(payload);
async (data: RpcInvocationData) => {
const jsonData = JSON.parse(data.payload);
const number = jsonData.number;

console.log(
`[Math Genius] I guess ${callerIdentity} wants the square root of ${number}. I've only got ${responseTimeout / 1000} seconds to respond but I think I can pull it off.`,
`[Math Genius] I guess ${data.callerIdentity} wants the square root of ${number}. I've only got ${data.responseTimeout / 1000} seconds to respond but I think I can pull it off.`,
);

console.log(`[Math Genius] *doing math*…`);
Expand All @@ -110,18 +100,12 @@ const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room)

await mathGeniusRoom.localParticipant?.registerRpcMethod(
'divide',
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async (
requestId: string,
callerIdentity: string,
payload: string,
responseTimeout: number,
) => {
const jsonData = JSON.parse(payload);
async (data: RpcInvocationData) => {
const jsonData = JSON.parse(data.payload);
const { numerator, denominator } = jsonData;

console.log(
`[Math Genius] ${callerIdentity} wants to divide ${numerator} by ${denominator}. Let me think...`,
`[Math Genius] ${data.callerIdentity} wants to divide ${numerator} by ${denominator}. Let me think...`,
);

await new Promise((resolve) => setTimeout(resolve, 2000));
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import {
} from './room/utils';
import { getBrowser } from './utils/browserParser';

export { RpcError } from './room/rpc';
export { RpcError, RpcInvocationData } from './room/rpc';

export * from './connectionHelper/ConnectionCheck';
export * from './connectionHelper/checks/Checker';
Expand Down
35 changes: 9 additions & 26 deletions src/room/participant/LocalParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
UnexpectedConnectionState,
} from '../errors';
import { EngineEvent, ParticipantEvent, TrackEvent } from '../events';
import { MAX_PAYLOAD_BYTES, RpcError, byteLength } from '../rpc';
import { MAX_PAYLOAD_BYTES, RpcError, RpcInvocationData, byteLength } from '../rpc';
import LocalAudioTrack from '../track/LocalAudioTrack';
import LocalTrack from '../track/LocalTrack';
import LocalTrackPublication from '../track/LocalTrackPublication';
Expand Down Expand Up @@ -124,15 +124,7 @@ export default class LocalParticipant extends Participant {

private enabledPublishVideoCodecs: Codec[] = [];

private rpcHandlers: Map<
string,
(
requestId: string,
callerIdentity: string,
payload: string,
responseTimeout: number,
) => Promise<string>
> = new Map();
private rpcHandlers: Map<string, (data: RpcInvocationData) => Promise<string>> = new Map();

private pendingAcks = new Map<string, { resolve: () => void; participantIdentity: string }>();

Expand Down Expand Up @@ -1564,19 +1556,13 @@ export default class LocalParticipant extends Participant {
* ```typescript
* room.localParticipant?.registerRpcMethod(
* 'greet',
* async (requestId: string, callerIdentity: string, payload: string, responseTimeout: number) => {
* console.log(`Received greeting from ${callerIdentity}: ${payload}`);
* return `Hello, ${callerIdentity}!`;
* async (data: RpcInvocationData) => {
* console.log(`Received greeting from ${data.callerIdentity}: ${data.payload}`);
* return `Hello, ${data.callerIdentity}!`;
* }
* );
* ```
*
* The handler receives the following parameters:
* - `requestId`: A unique identifier for this RPC request
* - `callerIdentity`: The identity of the RemoteParticipant who initiated the RPC call
* - `payload`: The data sent by the caller (as a string)
* - `responseTimeout`: The maximum time available to return a response (milliseconds)
*
* The handler should return a Promise that resolves to a string.
* If unable to respond within `responseTimeout`, the request will result in an error on the caller's side.
*
Expand All @@ -1586,12 +1572,7 @@ export default class LocalParticipant extends Participant {
*/
registerRpcMethod(
method: string,
handler: (
requestId: string,
callerIdentity: string,
payload: string,
responseTimeout: number,
) => Promise<string>,
handler: (data: RpcInvocationData) => Promise<string>,
) {
this.rpcHandlers.set(method, handler);
}
Expand Down Expand Up @@ -1693,7 +1674,9 @@ export default class LocalParticipant extends Participant {
let responsePayload: string | null = null;

try {
const response = await handler(requestId, callerIdentity, payload, responseTimeout);
const response = await handler(
new RpcInvocationData(requestId, callerIdentity, payload, responseTimeout),
);
if (byteLength(response) > MAX_PAYLOAD_BYTES) {
responseError = RpcError.builtIn('RESPONSE_PAYLOAD_TOO_LARGE');
console.warn(`RPC Response payload too large for ${method}`);
Expand Down
32 changes: 32 additions & 0 deletions src/room/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,38 @@
// SPDX-License-Identifier: Apache-2.0
import { RpcError as RpcError_Proto } from '@livekit/protocol';

/**
* Data passed to method handler for incoming RPC invocations
*/
export class RpcInvocationData {
/**
* The unique request ID. Will match at both sides of the call, useful for debugging or logging.
*/
requestId: string;

/**
* The unique participant identity of the caller.
*/
callerIdentity: string;

/**
* The payload of the request. User-definable format, typically JSON.
*/
payload: string;

/**
* The maximum time the caller will wait for a response.
*/
responseTimeout: number;

constructor(requestId: string, callerIdentity: string, payload: string, responseTimeout: number) {
this.requestId = requestId;
this.callerIdentity = callerIdentity;
this.payload = payload;
this.responseTimeout = responseTimeout;
}
}

/**
* Specialized error handling for RPC methods.
*
Expand Down

0 comments on commit 508dac6

Please sign in to comment.