diff --git a/packages/client/src/Call.ts b/packages/client/src/Call.ts index 4721a9f3ae..d89e32fc42 100644 --- a/packages/client/src/Call.ts +++ b/packages/client/src/Call.ts @@ -804,6 +804,7 @@ export class Call { logTag: String(this.sfuClientTag++), dispatcher: this.dispatcher, credentials: this.credentials, + streamClient: this.streamClient, // a new session_id is necessary for the REJOIN strategy. // we use the previous session_id if available sessionId: performingRejoin ? undefined : previousSessionId, diff --git a/packages/client/src/StreamSfuClient.ts b/packages/client/src/StreamSfuClient.ts index df928d2747..eca72c36c4 100644 --- a/packages/client/src/StreamSfuClient.ts +++ b/packages/client/src/StreamSfuClient.ts @@ -24,6 +24,7 @@ import { UpdateMuteStatesRequest, } from './gen/video/sfu/signal_rpc/signal'; import { ICETrickle, TrackType } from './gen/video/sfu/models/models'; +import { StreamClient } from './coordinator/connection/client'; import { generateUUIDv4, sleep } from './coordinator/connection/utils'; import { Credentials } from './gen/coordinator'; import { Logger } from './coordinator/connection/types'; @@ -65,6 +66,11 @@ export type StreamSfuClientConstructor = { * Callback for when the WebSocket connection is closed. */ onSignalClose?: () => void; + + /** + * The StreamClient instance to use for the connection. + */ + streamClient: StreamClient; }; /** @@ -112,12 +118,14 @@ export class StreamSfuClient { private lastMessageTimestamp?: Date; private readonly restoreWebSocketConcurrencyTag = Symbol('recoverWebSocket'); private readonly unsubscribeIceTrickle: () => void; + private readonly unsubscribeNetworkChanged: () => void; private readonly onSignalClose: (() => void) | undefined; private readonly logger: Logger; private readonly logTag: string; private readonly credentials: Credentials; private readonly dispatcher: Dispatcher; private readonly joinResponseTimeout?: number; + private networkAvailableTask: PromiseWithResolvers | undefined; /** * Promise that resolves when the JoinResponse is received. * Rejects after a certain threshold if the response is not received. @@ -163,6 +171,7 @@ export class StreamSfuClient { logTag, joinResponseTimeout = 5000, onSignalClose, + streamClient, }: StreamSfuClientConstructor) { this.dispatcher = dispatcher; this.sessionId = sessionId || generateUUIDv4(); @@ -193,6 +202,16 @@ export class StreamSfuClient { this.iceTrickleBuffer.push(iceTrickle); }); + // listen to network changes to handle offline state + // we shouldn't attempt to recover websocket connection when offline + this.unsubscribeNetworkChanged = streamClient.on('network.changed', (e) => { + if (!e.online) { + this.networkAvailableTask = promiseWithResolvers(); + } else { + this.networkAvailableTask?.resolve(); + } + }); + this.createWebSocket(); } @@ -226,6 +245,7 @@ export class StreamSfuClient { private restoreWebSocket = () => { withoutConcurrency(this.restoreWebSocketConcurrencyTag, async () => { + await this.networkAvailableTask?.promise; this.logger('debug', 'Restoring SFU WS connection'); this.cleanUpWebSocket(); await sleep(500); @@ -260,6 +280,7 @@ export class StreamSfuClient { dispose = () => { this.logger('debug', 'Disposing SFU client'); this.unsubscribeIceTrickle(); + this.unsubscribeNetworkChanged(); clearInterval(this.keepAliveInterval); clearTimeout(this.connectionCheckTimeout); clearTimeout(this.migrateAwayTimeout); diff --git a/packages/client/src/rtc/__tests__/Publisher.test.ts b/packages/client/src/rtc/__tests__/Publisher.test.ts index 7e79c0b710..1777a5e3d3 100644 --- a/packages/client/src/rtc/__tests__/Publisher.test.ts +++ b/packages/client/src/rtc/__tests__/Publisher.test.ts @@ -8,6 +8,7 @@ import { DispatchableMessage, Dispatcher } from '../Dispatcher'; import { PeerType, TrackType } from '../../gen/video/sfu/models/models'; import { SfuEvent } from '../../gen/video/sfu/event/events'; import { IceTrickleBuffer } from '../IceTrickleBuffer'; +import { StreamClient } from '../../coordinator/connection/client'; vi.mock('../../StreamSfuClient', () => { console.log('MOCKING StreamSfuClient'); @@ -41,6 +42,7 @@ describe('Publisher', () => { sfuClient = new StreamSfuClient({ dispatcher, sessionId: 'session-id-test', + streamClient: new StreamClient('abc'), credentials: { server: { url: 'https://getstream.io/', diff --git a/packages/client/src/rtc/__tests__/Subscriber.test.ts b/packages/client/src/rtc/__tests__/Subscriber.test.ts index cdd87153e1..d8b16ca886 100644 --- a/packages/client/src/rtc/__tests__/Subscriber.test.ts +++ b/packages/client/src/rtc/__tests__/Subscriber.test.ts @@ -8,6 +8,7 @@ import { CallState } from '../../store'; import { SfuEvent } from '../../gen/video/sfu/event/events'; import { PeerType, TrackType } from '../../gen/video/sfu/models/models'; import { IceTrickleBuffer } from '../IceTrickleBuffer'; +import { StreamClient } from '../../coordinator/connection/client'; vi.mock('../../StreamSfuClient', () => { console.log('MOCKING StreamSfuClient'); @@ -27,6 +28,7 @@ describe('Subscriber', () => { sfuClient = new StreamSfuClient({ dispatcher, sessionId: 'sessionId', + streamClient: new StreamClient('abc'), logTag: 'logTag', credentials: { server: {