Skip to content

Commit

Permalink
fix: don't attempt to recover broken WebSockets when there isn't a ne…
Browse files Browse the repository at this point in the history
…twork connection (#1490)

We don't attempt to recover broken WebSockets when there isn't a network
connection. Instead, we wait for connectivity to be restored first.

This behavior created some weird bugs with `call.leave()`. As we were
creating new WebSockets, the `signalReady` never resolved (the socket
couldn't be opened) and hence, `send()` was blocking forever.
Due to this behavior, when attempting to leave a call while offline, the
`callingState` never transitioned from `OFFLINE -> LEFT`.

Ref:
https://getstream.slack.com/archives/C06H93HC60Z/p1725636744842199?thread_ts=1724334671.814709&cid=C06H93HC60Z

---------

Co-authored-by: Santhosh Vaiyapuri <[email protected]>
  • Loading branch information
oliverlaz and santhoshvai authored Sep 20, 2024
1 parent f92d00b commit d576f48
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 0 deletions.
1 change: 1 addition & 0 deletions packages/client/src/Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,7 @@ export class Call {
logTag: String(this.sfuClientTag++),
dispatcher: this.dispatcher,
credentials: this.credentials,
streamClient: this.streamClient,
// a new session_id is necessary for the REJOIN strategy.
// we use the previous session_id if available
sessionId: performingRejoin ? undefined : previousSessionId,
Expand Down
21 changes: 21 additions & 0 deletions packages/client/src/StreamSfuClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
UpdateMuteStatesRequest,
} from './gen/video/sfu/signal_rpc/signal';
import { ICETrickle, TrackType } from './gen/video/sfu/models/models';
import { StreamClient } from './coordinator/connection/client';
import { generateUUIDv4, sleep } from './coordinator/connection/utils';
import { Credentials } from './gen/coordinator';
import { Logger } from './coordinator/connection/types';
Expand Down Expand Up @@ -65,6 +66,11 @@ export type StreamSfuClientConstructor = {
* Callback for when the WebSocket connection is closed.
*/
onSignalClose?: () => void;

/**
* The StreamClient instance to use for the connection.
*/
streamClient: StreamClient;
};

/**
Expand Down Expand Up @@ -112,12 +118,14 @@ export class StreamSfuClient {
private lastMessageTimestamp?: Date;
private readonly restoreWebSocketConcurrencyTag = Symbol('recoverWebSocket');
private readonly unsubscribeIceTrickle: () => void;
private readonly unsubscribeNetworkChanged: () => void;
private readonly onSignalClose: (() => void) | undefined;
private readonly logger: Logger;
private readonly logTag: string;
private readonly credentials: Credentials;
private readonly dispatcher: Dispatcher;
private readonly joinResponseTimeout?: number;
private networkAvailableTask: PromiseWithResolvers<void> | undefined;
/**
* Promise that resolves when the JoinResponse is received.
* Rejects after a certain threshold if the response is not received.
Expand Down Expand Up @@ -163,6 +171,7 @@ export class StreamSfuClient {
logTag,
joinResponseTimeout = 5000,
onSignalClose,
streamClient,
}: StreamSfuClientConstructor) {
this.dispatcher = dispatcher;
this.sessionId = sessionId || generateUUIDv4();
Expand Down Expand Up @@ -193,6 +202,16 @@ export class StreamSfuClient {
this.iceTrickleBuffer.push(iceTrickle);
});

// listen to network changes to handle offline state
// we shouldn't attempt to recover websocket connection when offline
this.unsubscribeNetworkChanged = streamClient.on('network.changed', (e) => {
if (!e.online) {
this.networkAvailableTask = promiseWithResolvers();
} else {
this.networkAvailableTask?.resolve();
}
});

this.createWebSocket();
}

Expand Down Expand Up @@ -226,6 +245,7 @@ export class StreamSfuClient {

private restoreWebSocket = () => {
withoutConcurrency(this.restoreWebSocketConcurrencyTag, async () => {
await this.networkAvailableTask?.promise;
this.logger('debug', 'Restoring SFU WS connection');
this.cleanUpWebSocket();
await sleep(500);
Expand Down Expand Up @@ -260,6 +280,7 @@ export class StreamSfuClient {
dispose = () => {
this.logger('debug', 'Disposing SFU client');
this.unsubscribeIceTrickle();
this.unsubscribeNetworkChanged();
clearInterval(this.keepAliveInterval);
clearTimeout(this.connectionCheckTimeout);
clearTimeout(this.migrateAwayTimeout);
Expand Down
2 changes: 2 additions & 0 deletions packages/client/src/rtc/__tests__/Publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { DispatchableMessage, Dispatcher } from '../Dispatcher';
import { PeerType, TrackType } from '../../gen/video/sfu/models/models';
import { SfuEvent } from '../../gen/video/sfu/event/events';
import { IceTrickleBuffer } from '../IceTrickleBuffer';
import { StreamClient } from '../../coordinator/connection/client';

vi.mock('../../StreamSfuClient', () => {
console.log('MOCKING StreamSfuClient');
Expand Down Expand Up @@ -41,6 +42,7 @@ describe('Publisher', () => {
sfuClient = new StreamSfuClient({
dispatcher,
sessionId: 'session-id-test',
streamClient: new StreamClient('abc'),
credentials: {
server: {
url: 'https://getstream.io/',
Expand Down
2 changes: 2 additions & 0 deletions packages/client/src/rtc/__tests__/Subscriber.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { CallState } from '../../store';
import { SfuEvent } from '../../gen/video/sfu/event/events';
import { PeerType, TrackType } from '../../gen/video/sfu/models/models';
import { IceTrickleBuffer } from '../IceTrickleBuffer';
import { StreamClient } from '../../coordinator/connection/client';

vi.mock('../../StreamSfuClient', () => {
console.log('MOCKING StreamSfuClient');
Expand All @@ -27,6 +28,7 @@ describe('Subscriber', () => {
sfuClient = new StreamSfuClient({
dispatcher,
sessionId: 'sessionId',
streamClient: new StreamClient('abc'),
logTag: 'logTag',
credentials: {
server: {
Expand Down

0 comments on commit d576f48

Please sign in to comment.