diff --git a/packages/client/src/Call.ts b/packages/client/src/Call.ts index 4721a9f3ae..d89e32fc42 100644 --- a/packages/client/src/Call.ts +++ b/packages/client/src/Call.ts @@ -804,6 +804,7 @@ export class Call { logTag: String(this.sfuClientTag++), dispatcher: this.dispatcher, credentials: this.credentials, + streamClient: this.streamClient, // a new session_id is necessary for the REJOIN strategy. // we use the previous session_id if available sessionId: performingRejoin ? undefined : previousSessionId, diff --git a/packages/client/src/StreamSfuClient.ts b/packages/client/src/StreamSfuClient.ts index 9a2d1fafb5..275a23049d 100644 --- a/packages/client/src/StreamSfuClient.ts +++ b/packages/client/src/StreamSfuClient.ts @@ -24,6 +24,7 @@ import { UpdateMuteStatesRequest, } from './gen/video/sfu/signal_rpc/signal'; import { ICETrickle, TrackType } from './gen/video/sfu/models/models'; +import { StreamClient } from './coordinator/connection/client'; import { generateUUIDv4, sleep } from './coordinator/connection/utils'; import { Credentials } from './gen/coordinator'; import { Logger } from './coordinator/connection/types'; @@ -65,6 +66,11 @@ export type StreamSfuClientConstructor = { * Callback for when the WebSocket connection is closed. */ onSignalClose?: () => void; + + /** + * The StreamClient instance to use for the connection. + */ + streamClient: StreamClient; }; /** @@ -112,12 +118,14 @@ export class StreamSfuClient { private lastMessageTimestamp?: Date; private readonly restoreWebSocketConcurrencyTag = Symbol('recoverWebSocket'); private readonly unsubscribeIceTrickle: () => void; + private readonly unsubscribeNetworkChanged: () => void; private readonly onSignalClose: (() => void) | undefined; private readonly logger: Logger; private readonly logTag: string; private readonly credentials: Credentials; private readonly dispatcher: Dispatcher; private readonly joinResponseTimeout?: number; + private networkAvailableTask: PromiseWithResolvers | undefined; /** * Promise that resolves when the JoinResponse is received. * Rejects after a certain threshold if the response is not received. @@ -163,6 +171,7 @@ export class StreamSfuClient { logTag, joinResponseTimeout = 5000, onSignalClose, + streamClient, }: StreamSfuClientConstructor) { this.dispatcher = dispatcher; this.sessionId = sessionId || generateUUIDv4(); @@ -193,6 +202,16 @@ export class StreamSfuClient { this.iceTrickleBuffer.push(iceTrickle); }); + // listen to network changes to handle offline state + // we shouldn't attempt to recover websocket connection when offline + this.unsubscribeNetworkChanged = streamClient.on('network.changed', (e) => { + if (!e.online) { + this.networkAvailableTask = promiseWithResolvers(); + } else { + this.networkAvailableTask?.resolve(); + } + }); + this.createWebSocket(); } @@ -226,6 +245,7 @@ export class StreamSfuClient { private restoreWebSocket = () => { withoutConcurrency(this.restoreWebSocketConcurrencyTag, async () => { + await this.networkAvailableTask?.promise; this.logger('debug', 'Restoring SFU WS connection'); this.cleanUpWebSocket(); await sleep(500); @@ -260,6 +280,7 @@ export class StreamSfuClient { dispose = () => { this.logger('debug', 'Disposing SFU client'); this.unsubscribeIceTrickle(); + this.unsubscribeNetworkChanged(); clearInterval(this.keepAliveInterval); clearTimeout(this.connectionCheckTimeout); clearTimeout(this.migrateAwayTimeout); @@ -453,7 +474,6 @@ export class StreamSfuClient { }, }, }), - 2000, // two-second timeout for leave request ); }; @@ -461,25 +481,16 @@ export class StreamSfuClient { * Sends a message to the SFU via the WebSocket connection. * * @param message the message to send. - * @param timeout an optional timeout in milliseconds for sending the message. */ - private send = async (message: SfuRequest, timeout: number = 0) => { + private send = async (message: SfuRequest) => { // wait for the signal ws to be open - if (timeout > 0) { - await Promise.race([ - this.signalReady, - new Promise((_, reject) => { - setTimeout(() => reject(new Error('Timeout sending msg')), timeout); - }), - ]); - } else { - await this.signalReady; - } + await this.signalReady; const msgJson = SfuRequest.toJson(message); if (this.signalWs.readyState !== WebSocket.OPEN) { this.logger('debug', 'Signal WS is not open. Skipping message', msgJson); return; } + this.logger('debug', `Sending message to: ${this.edgeName}`, msgJson); this.signalWs.send(SfuRequest.toBinary(message)); };