Skip to content

Commit

Permalink
fix: wait for connection to restore before attempting WS recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverlaz committed Sep 20, 2024
1 parent b3c1b54 commit d77b9a4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
1 change: 1 addition & 0 deletions packages/client/src/Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,7 @@ export class Call {
logTag: String(this.sfuClientTag++),
dispatcher: this.dispatcher,
credentials: this.credentials,
streamClient: this.streamClient,
// a new session_id is necessary for the REJOIN strategy.
// we use the previous session_id if available
sessionId: performingRejoin ? undefined : previousSessionId,
Expand Down
37 changes: 24 additions & 13 deletions packages/client/src/StreamSfuClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
UpdateMuteStatesRequest,
} from './gen/video/sfu/signal_rpc/signal';
import { ICETrickle, TrackType } from './gen/video/sfu/models/models';
import { StreamClient } from './coordinator/connection/client';
import { generateUUIDv4, sleep } from './coordinator/connection/utils';
import { Credentials } from './gen/coordinator';
import { Logger } from './coordinator/connection/types';
Expand Down Expand Up @@ -65,6 +66,11 @@ export type StreamSfuClientConstructor = {
* Callback for when the WebSocket connection is closed.
*/
onSignalClose?: () => void;

/**
* The StreamClient instance to use for the connection.
*/
streamClient: StreamClient;
};

/**
Expand Down Expand Up @@ -112,12 +118,14 @@ export class StreamSfuClient {
private lastMessageTimestamp?: Date;
private readonly restoreWebSocketConcurrencyTag = Symbol('recoverWebSocket');
private readonly unsubscribeIceTrickle: () => void;
private readonly unsubscribeNetworkChanged: () => void;
private readonly onSignalClose: (() => void) | undefined;
private readonly logger: Logger;
private readonly logTag: string;
private readonly credentials: Credentials;
private readonly dispatcher: Dispatcher;
private readonly joinResponseTimeout?: number;
private networkAvailableTask: PromiseWithResolvers<void> | undefined;
/**
* Promise that resolves when the JoinResponse is received.
* Rejects after a certain threshold if the response is not received.
Expand Down Expand Up @@ -163,6 +171,7 @@ export class StreamSfuClient {
logTag,
joinResponseTimeout = 5000,
onSignalClose,
streamClient,
}: StreamSfuClientConstructor) {
this.dispatcher = dispatcher;
this.sessionId = sessionId || generateUUIDv4();
Expand Down Expand Up @@ -193,6 +202,16 @@ export class StreamSfuClient {
this.iceTrickleBuffer.push(iceTrickle);
});

// listen to network changes to handle offline state
// we shouldn't attempt to recover websocket connection when offline
this.unsubscribeNetworkChanged = streamClient.on('network.changed', (e) => {
if (!e.online) {
this.networkAvailableTask = promiseWithResolvers();
} else {
this.networkAvailableTask?.resolve();
}
});

this.createWebSocket();
}

Expand Down Expand Up @@ -226,6 +245,7 @@ export class StreamSfuClient {

private restoreWebSocket = () => {
withoutConcurrency(this.restoreWebSocketConcurrencyTag, async () => {
await this.networkAvailableTask?.promise;
this.logger('debug', 'Restoring SFU WS connection');
this.cleanUpWebSocket();
await sleep(500);
Expand Down Expand Up @@ -260,6 +280,7 @@ export class StreamSfuClient {
dispose = () => {
this.logger('debug', 'Disposing SFU client');
this.unsubscribeIceTrickle();
this.unsubscribeNetworkChanged();
clearInterval(this.keepAliveInterval);
clearTimeout(this.connectionCheckTimeout);
clearTimeout(this.migrateAwayTimeout);
Expand Down Expand Up @@ -453,33 +474,23 @@ export class StreamSfuClient {
},
},
}),
2000, // two-second timeout for leave request
);
};

/**
* Sends a message to the SFU via the WebSocket connection.
*
* @param message the message to send.
* @param timeout an optional timeout in milliseconds for sending the message.
*/
private send = async (message: SfuRequest, timeout: number = 0) => {
private send = async (message: SfuRequest) => {
// wait for the signal ws to be open
if (timeout > 0) {
await Promise.race([
this.signalReady,
new Promise((_, reject) => {
setTimeout(() => reject(new Error('Timeout sending msg')), timeout);
}),
]);
} else {
await this.signalReady;
}
await this.signalReady;
const msgJson = SfuRequest.toJson(message);
if (this.signalWs.readyState !== WebSocket.OPEN) {
this.logger('debug', 'Signal WS is not open. Skipping message', msgJson);
return;
}
this.logger('debug', `Sending message to: ${this.edgeName}`, msgJson);
this.signalWs.send(SfuRequest.toBinary(message));
};

Expand Down

0 comments on commit d77b9a4

Please sign in to comment.