From a037280bb9f32fb5aba7ab48a9fc93eb48c7ca67 Mon Sep 17 00:00:00 2001 From: Oliver Lazoroski Date: Thu, 19 Sep 2024 16:24:18 +0200 Subject: [PATCH 1/6] fix: add timeout for the `notifyLeave` operation --- packages/client/src/StreamSfuClient.ts | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/packages/client/src/StreamSfuClient.ts b/packages/client/src/StreamSfuClient.ts index df928d2747..d4da0f6672 100644 --- a/packages/client/src/StreamSfuClient.ts +++ b/packages/client/src/StreamSfuClient.ts @@ -453,18 +453,38 @@ export class StreamSfuClient { }, }, }), + 2000, // two-second timeout for leave request ); }; - private send = async (message: SfuRequest) => { + /** + * Sends a message to the SFU via the WebSocket connection. + * + * @param message the message to send. + * @param timeout an optional timeout in milliseconds for sending the message. + */ + private send = async (message: SfuRequest, timeout: number = 0) => { await this.signalReady; // wait for the signal ws to be open const msgJson = SfuRequest.toJson(message); if (this.signalWs.readyState !== WebSocket.OPEN) { this.logger('debug', 'Signal WS is not open. Skipping message', msgJson); return; } - this.logger('debug', `Sending message to: ${this.edgeName}`, msgJson); - this.signalWs.send(SfuRequest.toBinary(message)); + return new Promise((resolve, reject) => { + const timeoutId = + timeout > 0 + ? setTimeout(() => reject(new Error('Timeout sending msg')), timeout) + : undefined; + try { + this.logger('debug', `Sending message to: ${this.edgeName}`, msgJson); + this.signalWs.send(SfuRequest.toBinary(message)); + resolve(); + } catch (err) { + reject(err); + } finally { + clearTimeout(timeoutId); + } + }); }; private keepAlive = () => { From 9dcf1e7d5daaf342675edbc7e759c923e1cb7d98 Mon Sep 17 00:00:00 2001 From: Santhosh Vaiyapuri Date: Fri, 20 Sep 2024 13:45:44 +0200 Subject: [PATCH 2/6] fix: add timeout for promise waiting for ws open --- packages/client/src/StreamSfuClient.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/packages/client/src/StreamSfuClient.ts b/packages/client/src/StreamSfuClient.ts index d4da0f6672..ecea5df80a 100644 --- a/packages/client/src/StreamSfuClient.ts +++ b/packages/client/src/StreamSfuClient.ts @@ -464,7 +464,16 @@ export class StreamSfuClient { * @param timeout an optional timeout in milliseconds for sending the message. */ private send = async (message: SfuRequest, timeout: number = 0) => { - await this.signalReady; // wait for the signal ws to be open + if (timeout > 0) { + await Promise.race([ + this.signalReady, + new Promise((_, reject) => { + setTimeout(() => reject(new Error('Timeout sending msg')), timeout); + }), + ]); + } else { + await this.signalReady; // wait for the signal ws to be open + } const msgJson = SfuRequest.toJson(message); if (this.signalWs.readyState !== WebSocket.OPEN) { this.logger('debug', 'Signal WS is not open. Skipping message', msgJson); From b3c1b54d7b01779f7b3984a959c7b5903c0070df Mon Sep 17 00:00:00 2001 From: Santhosh Vaiyapuri Date: Fri, 20 Sep 2024 13:50:30 +0200 Subject: [PATCH 3/6] remove the previous timeout --- packages/client/src/StreamSfuClient.ts | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/packages/client/src/StreamSfuClient.ts b/packages/client/src/StreamSfuClient.ts index ecea5df80a..9a2d1fafb5 100644 --- a/packages/client/src/StreamSfuClient.ts +++ b/packages/client/src/StreamSfuClient.ts @@ -464,6 +464,7 @@ export class StreamSfuClient { * @param timeout an optional timeout in milliseconds for sending the message. */ private send = async (message: SfuRequest, timeout: number = 0) => { + // wait for the signal ws to be open if (timeout > 0) { await Promise.race([ this.signalReady, @@ -472,28 +473,14 @@ export class StreamSfuClient { }), ]); } else { - await this.signalReady; // wait for the signal ws to be open + await this.signalReady; } const msgJson = SfuRequest.toJson(message); if (this.signalWs.readyState !== WebSocket.OPEN) { this.logger('debug', 'Signal WS is not open. Skipping message', msgJson); return; } - return new Promise((resolve, reject) => { - const timeoutId = - timeout > 0 - ? setTimeout(() => reject(new Error('Timeout sending msg')), timeout) - : undefined; - try { - this.logger('debug', `Sending message to: ${this.edgeName}`, msgJson); - this.signalWs.send(SfuRequest.toBinary(message)); - resolve(); - } catch (err) { - reject(err); - } finally { - clearTimeout(timeoutId); - } - }); + this.signalWs.send(SfuRequest.toBinary(message)); }; private keepAlive = () => { From d77b9a4c2ce78413c04f1564c627693f4741fbff Mon Sep 17 00:00:00 2001 From: Oliver Lazoroski Date: Fri, 20 Sep 2024 14:50:01 +0200 Subject: [PATCH 4/6] fix: wait for connection to restore before attempting WS recovery --- packages/client/src/Call.ts | 1 + packages/client/src/StreamSfuClient.ts | 37 +++++++++++++++++--------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/packages/client/src/Call.ts b/packages/client/src/Call.ts index 4721a9f3ae..d89e32fc42 100644 --- a/packages/client/src/Call.ts +++ b/packages/client/src/Call.ts @@ -804,6 +804,7 @@ export class Call { logTag: String(this.sfuClientTag++), dispatcher: this.dispatcher, credentials: this.credentials, + streamClient: this.streamClient, // a new session_id is necessary for the REJOIN strategy. // we use the previous session_id if available sessionId: performingRejoin ? undefined : previousSessionId, diff --git a/packages/client/src/StreamSfuClient.ts b/packages/client/src/StreamSfuClient.ts index 9a2d1fafb5..275a23049d 100644 --- a/packages/client/src/StreamSfuClient.ts +++ b/packages/client/src/StreamSfuClient.ts @@ -24,6 +24,7 @@ import { UpdateMuteStatesRequest, } from './gen/video/sfu/signal_rpc/signal'; import { ICETrickle, TrackType } from './gen/video/sfu/models/models'; +import { StreamClient } from './coordinator/connection/client'; import { generateUUIDv4, sleep } from './coordinator/connection/utils'; import { Credentials } from './gen/coordinator'; import { Logger } from './coordinator/connection/types'; @@ -65,6 +66,11 @@ export type StreamSfuClientConstructor = { * Callback for when the WebSocket connection is closed. */ onSignalClose?: () => void; + + /** + * The StreamClient instance to use for the connection. + */ + streamClient: StreamClient; }; /** @@ -112,12 +118,14 @@ export class StreamSfuClient { private lastMessageTimestamp?: Date; private readonly restoreWebSocketConcurrencyTag = Symbol('recoverWebSocket'); private readonly unsubscribeIceTrickle: () => void; + private readonly unsubscribeNetworkChanged: () => void; private readonly onSignalClose: (() => void) | undefined; private readonly logger: Logger; private readonly logTag: string; private readonly credentials: Credentials; private readonly dispatcher: Dispatcher; private readonly joinResponseTimeout?: number; + private networkAvailableTask: PromiseWithResolvers | undefined; /** * Promise that resolves when the JoinResponse is received. * Rejects after a certain threshold if the response is not received. @@ -163,6 +171,7 @@ export class StreamSfuClient { logTag, joinResponseTimeout = 5000, onSignalClose, + streamClient, }: StreamSfuClientConstructor) { this.dispatcher = dispatcher; this.sessionId = sessionId || generateUUIDv4(); @@ -193,6 +202,16 @@ export class StreamSfuClient { this.iceTrickleBuffer.push(iceTrickle); }); + // listen to network changes to handle offline state + // we shouldn't attempt to recover websocket connection when offline + this.unsubscribeNetworkChanged = streamClient.on('network.changed', (e) => { + if (!e.online) { + this.networkAvailableTask = promiseWithResolvers(); + } else { + this.networkAvailableTask?.resolve(); + } + }); + this.createWebSocket(); } @@ -226,6 +245,7 @@ export class StreamSfuClient { private restoreWebSocket = () => { withoutConcurrency(this.restoreWebSocketConcurrencyTag, async () => { + await this.networkAvailableTask?.promise; this.logger('debug', 'Restoring SFU WS connection'); this.cleanUpWebSocket(); await sleep(500); @@ -260,6 +280,7 @@ export class StreamSfuClient { dispose = () => { this.logger('debug', 'Disposing SFU client'); this.unsubscribeIceTrickle(); + this.unsubscribeNetworkChanged(); clearInterval(this.keepAliveInterval); clearTimeout(this.connectionCheckTimeout); clearTimeout(this.migrateAwayTimeout); @@ -453,7 +474,6 @@ export class StreamSfuClient { }, }, }), - 2000, // two-second timeout for leave request ); }; @@ -461,25 +481,16 @@ export class StreamSfuClient { * Sends a message to the SFU via the WebSocket connection. * * @param message the message to send. - * @param timeout an optional timeout in milliseconds for sending the message. */ - private send = async (message: SfuRequest, timeout: number = 0) => { + private send = async (message: SfuRequest) => { // wait for the signal ws to be open - if (timeout > 0) { - await Promise.race([ - this.signalReady, - new Promise((_, reject) => { - setTimeout(() => reject(new Error('Timeout sending msg')), timeout); - }), - ]); - } else { - await this.signalReady; - } + await this.signalReady; const msgJson = SfuRequest.toJson(message); if (this.signalWs.readyState !== WebSocket.OPEN) { this.logger('debug', 'Signal WS is not open. Skipping message', msgJson); return; } + this.logger('debug', `Sending message to: ${this.edgeName}`, msgJson); this.signalWs.send(SfuRequest.toBinary(message)); }; From 0271bb0704ae0af0fcbce184a8e1498058bd0e87 Mon Sep 17 00:00:00 2001 From: Oliver Lazoroski Date: Fri, 20 Sep 2024 15:08:04 +0200 Subject: [PATCH 5/6] fix: revert change --- packages/client/src/StreamSfuClient.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/client/src/StreamSfuClient.ts b/packages/client/src/StreamSfuClient.ts index 275a23049d..fb38ad571d 100644 --- a/packages/client/src/StreamSfuClient.ts +++ b/packages/client/src/StreamSfuClient.ts @@ -483,8 +483,7 @@ export class StreamSfuClient { * @param message the message to send. */ private send = async (message: SfuRequest) => { - // wait for the signal ws to be open - await this.signalReady; + await this.signalReady; // wait for the signal ws to be open const msgJson = SfuRequest.toJson(message); if (this.signalWs.readyState !== WebSocket.OPEN) { this.logger('debug', 'Signal WS is not open. Skipping message', msgJson); From c299dcc63574be15da8986465c3566300c9aeb82 Mon Sep 17 00:00:00 2001 From: Oliver Lazoroski Date: Fri, 20 Sep 2024 15:13:07 +0200 Subject: [PATCH 6/6] fix: adjust tests --- packages/client/src/StreamSfuClient.ts | 5 ----- packages/client/src/rtc/__tests__/Publisher.test.ts | 2 ++ packages/client/src/rtc/__tests__/Subscriber.test.ts | 2 ++ 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/packages/client/src/StreamSfuClient.ts b/packages/client/src/StreamSfuClient.ts index fb38ad571d..eca72c36c4 100644 --- a/packages/client/src/StreamSfuClient.ts +++ b/packages/client/src/StreamSfuClient.ts @@ -477,11 +477,6 @@ export class StreamSfuClient { ); }; - /** - * Sends a message to the SFU via the WebSocket connection. - * - * @param message the message to send. - */ private send = async (message: SfuRequest) => { await this.signalReady; // wait for the signal ws to be open const msgJson = SfuRequest.toJson(message); diff --git a/packages/client/src/rtc/__tests__/Publisher.test.ts b/packages/client/src/rtc/__tests__/Publisher.test.ts index 7e79c0b710..1777a5e3d3 100644 --- a/packages/client/src/rtc/__tests__/Publisher.test.ts +++ b/packages/client/src/rtc/__tests__/Publisher.test.ts @@ -8,6 +8,7 @@ import { DispatchableMessage, Dispatcher } from '../Dispatcher'; import { PeerType, TrackType } from '../../gen/video/sfu/models/models'; import { SfuEvent } from '../../gen/video/sfu/event/events'; import { IceTrickleBuffer } from '../IceTrickleBuffer'; +import { StreamClient } from '../../coordinator/connection/client'; vi.mock('../../StreamSfuClient', () => { console.log('MOCKING StreamSfuClient'); @@ -41,6 +42,7 @@ describe('Publisher', () => { sfuClient = new StreamSfuClient({ dispatcher, sessionId: 'session-id-test', + streamClient: new StreamClient('abc'), credentials: { server: { url: 'https://getstream.io/', diff --git a/packages/client/src/rtc/__tests__/Subscriber.test.ts b/packages/client/src/rtc/__tests__/Subscriber.test.ts index cdd87153e1..d8b16ca886 100644 --- a/packages/client/src/rtc/__tests__/Subscriber.test.ts +++ b/packages/client/src/rtc/__tests__/Subscriber.test.ts @@ -8,6 +8,7 @@ import { CallState } from '../../store'; import { SfuEvent } from '../../gen/video/sfu/event/events'; import { PeerType, TrackType } from '../../gen/video/sfu/models/models'; import { IceTrickleBuffer } from '../IceTrickleBuffer'; +import { StreamClient } from '../../coordinator/connection/client'; vi.mock('../../StreamSfuClient', () => { console.log('MOCKING StreamSfuClient'); @@ -27,6 +28,7 @@ describe('Subscriber', () => { sfuClient = new StreamSfuClient({ dispatcher, sessionId: 'sessionId', + streamClient: new StreamClient('abc'), logTag: 'logTag', credentials: { server: {