Skip to content

Commit

Permalink
Use single state var to track connection state of signal client (#949)
Browse files Browse the repository at this point in the history
* use single state var to track connection state of signal client

* cleanup

* avoid enum math

* Create proud-drinks-shout.md
  • Loading branch information
lukasIO authored Nov 30, 2023
1 parent 00824de commit 77301cb
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 123 deletions.
5 changes: 5 additions & 0 deletions .changeset/proud-drinks-shout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-client": patch
---

Use enum to track connection state of signal client
260 changes: 142 additions & 118 deletions src/api/SignalClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,16 @@ function canPassThroughQueue(req: SignalMessage): boolean {
return canPass;
}

export enum SignalConnectionState {
CONNECTING,
CONNECTED,
RECONNECTING,
DISCONNECTING,
DISCONNECTED,
}

/** @internal */
export class SignalClient {
isConnected: boolean;

isReconnecting: boolean;

requestQueue: AsyncQueue;

queuedRequests: Array<() => Promise<void>>;
Expand Down Expand Up @@ -141,6 +145,17 @@ export class SignalClient {

ws?: WebSocket;

get currentState() {
return this.state;
}

get isDisconnected() {
return (
this.state === SignalConnectionState.DISCONNECTING ||
this.state === SignalConnectionState.DISCONNECTED
);
}

private options?: SignalOptions;

private pingTimeout: ReturnType<typeof setTimeout> | undefined;
Expand All @@ -153,13 +168,17 @@ export class SignalClient {

private closingLock: Mutex;

private state: SignalConnectionState = SignalConnectionState.DISCONNECTED;

private connectionLock: Mutex;

constructor(useJSON: boolean = false) {
this.isConnected = false;
this.isReconnecting = false;
this.useJSON = useJSON;
this.requestQueue = new AsyncQueue();
this.queuedRequests = [];
this.closingLock = new Mutex();
this.connectionLock = new Mutex();
this.state = SignalConnectionState.DISCONNECTED;
}

async join(
Expand All @@ -170,7 +189,7 @@ export class SignalClient {
): Promise<JoinResponse> {
// during a full reconnect, we'd want to start the sequence even if currently
// connected
this.isConnected = false;
this.state = SignalConnectionState.CONNECTING;
this.options = opts;
const res = await this.connect(url, token, opts, abortSignal);
return res as JoinResponse;
Expand All @@ -186,7 +205,7 @@ export class SignalClient {
log.warn('attempted to reconnect without signal options being set, ignoring');
return;
}
this.isReconnecting = true;
this.state = SignalConnectionState.RECONNECTING;
// clear ping interval and restart it once reconnected
this.clearPingInterval();

Expand Down Expand Up @@ -215,127 +234,132 @@ export class SignalClient {
const params = createConnectionParams(token, clientInfo, opts);

return new Promise<JoinResponse | ReconnectResponse | void>(async (resolve, reject) => {
const abortHandler = async () => {
this.close();
clearTimeout(wsTimeout);
reject(new ConnectionError('room connection has been cancelled (signal)'));
};

const wsTimeout = setTimeout(() => {
this.close();
reject(new ConnectionError('room connection has timed out (signal)'));
}, opts.websocketTimeout);

if (abortSignal?.aborted) {
abortHandler();
}
abortSignal?.addEventListener('abort', abortHandler);
log.debug(`connecting to ${url + params}`);
if (this.ws) {
await this.close();
}
this.ws = new WebSocket(url + params);
this.ws.binaryType = 'arraybuffer';
const unlock = await this.connectionLock.lock();
try {
const abortHandler = async () => {
this.close();
clearTimeout(wsTimeout);
reject(new ConnectionError('room connection has been cancelled (signal)'));
};

this.ws.onopen = () => {
clearTimeout(wsTimeout);
};
const wsTimeout = setTimeout(() => {
this.close();
reject(new ConnectionError('room connection has timed out (signal)'));
}, opts.websocketTimeout);

this.ws.onerror = async (ev: Event) => {
if (!this.isConnected) {
if (abortSignal?.aborted) {
abortHandler();
}
abortSignal?.addEventListener('abort', abortHandler);
log.debug(`connecting to ${url + params}`);
if (this.ws) {
await this.close();
}
this.ws = new WebSocket(url + params);
this.ws.binaryType = 'arraybuffer';

this.ws.onopen = () => {
clearTimeout(wsTimeout);
try {
const resp = await fetch(`http${url.substring(2)}/validate${params}`);
if (resp.status.toFixed(0).startsWith('4')) {
const msg = await resp.text();
reject(new ConnectionError(msg, ConnectionErrorReason.NotAllowed, resp.status));
} else {
};

this.ws.onerror = async (ev: Event) => {
if (this.state !== SignalConnectionState.CONNECTED) {
clearTimeout(wsTimeout);
try {
const resp = await fetch(`http${url.substring(2)}/validate${params}`);
if (resp.status.toFixed(0).startsWith('4')) {
const msg = await resp.text();
reject(new ConnectionError(msg, ConnectionErrorReason.NotAllowed, resp.status));
} else {
reject(
new ConnectionError(
'Internal error',
ConnectionErrorReason.InternalError,
resp.status,
),
);
}
} catch (e) {
reject(
new ConnectionError(
'Internal error',
ConnectionErrorReason.InternalError,
resp.status,
'server was not reachable',
ConnectionErrorReason.ServerUnreachable,
),
);
}
} catch (e) {
reject(
new ConnectionError(
'server was not reachable',
ConnectionErrorReason.ServerUnreachable,
),
);
return;
}
// other errors, handle
this.handleWSError(ev);
};

this.ws.onmessage = async (ev: MessageEvent) => {
// not considered connected until JoinResponse is received
let resp: SignalResponse;
if (typeof ev.data === 'string') {
const json = JSON.parse(ev.data);
resp = SignalResponse.fromJson(json);
} else if (ev.data instanceof ArrayBuffer) {
resp = SignalResponse.fromBinary(new Uint8Array(ev.data));
} else {
log.error(`could not decode websocket message: ${typeof ev.data}`);
return;
}
return;
}
// other errors, handle
this.handleWSError(ev);
};

this.ws.onmessage = async (ev: MessageEvent) => {
// not considered connected until JoinResponse is received
let resp: SignalResponse;
if (typeof ev.data === 'string') {
const json = JSON.parse(ev.data);
resp = SignalResponse.fromJson(json);
} else if (ev.data instanceof ArrayBuffer) {
resp = SignalResponse.fromBinary(new Uint8Array(ev.data));
} else {
log.error(`could not decode websocket message: ${typeof ev.data}`);
return;
}

if (!this.isConnected) {
let shouldProcessMessage = false;
// handle join message only
if (resp.message?.case === 'join') {
this.isConnected = true;
abortSignal?.removeEventListener('abort', abortHandler);
this.pingTimeoutDuration = resp.message.value.pingTimeout;
this.pingIntervalDuration = resp.message.value.pingInterval;

if (this.pingTimeoutDuration && this.pingTimeoutDuration > 0) {
log.debug('ping config', {
timeout: this.pingTimeoutDuration,
interval: this.pingIntervalDuration,
});
if (this.state !== SignalConnectionState.CONNECTED) {
let shouldProcessMessage = false;
// handle join message only
if (resp.message?.case === 'join') {
this.state = SignalConnectionState.CONNECTED;
abortSignal?.removeEventListener('abort', abortHandler);
this.pingTimeoutDuration = resp.message.value.pingTimeout;
this.pingIntervalDuration = resp.message.value.pingInterval;

if (this.pingTimeoutDuration && this.pingTimeoutDuration > 0) {
log.debug('ping config', {
timeout: this.pingTimeoutDuration,
interval: this.pingIntervalDuration,
});
this.startPingInterval();
}
resolve(resp.message.value);
} else if (this.state === SignalConnectionState.RECONNECTING) {
// in reconnecting, any message received means signal reconnected
this.state = SignalConnectionState.CONNECTED;
abortSignal?.removeEventListener('abort', abortHandler);
this.startPingInterval();
if (resp.message?.case === 'reconnect') {
resolve(resp.message?.value);
} else {
resolve();
shouldProcessMessage = true;
}
} else if (!opts.reconnect) {
// non-reconnect case, should receive join response first
reject(
new ConnectionError(
`did not receive join response, got ${resp.message?.case} instead`,
),
);
}
resolve(resp.message.value);
} else if (opts.reconnect) {
// in reconnecting, any message received means signal reconnected
this.isConnected = true;
abortSignal?.removeEventListener('abort', abortHandler);
this.startPingInterval();
if (resp.message?.case === 'reconnect') {
resolve(resp.message?.value);
} else {
resolve();
shouldProcessMessage = true;
if (!shouldProcessMessage) {
return;
}
} else if (!opts.reconnect) {
// non-reconnect case, should receive join response first
reject(
new ConnectionError(
`did not receive join response, got ${resp.message?.case} instead`,
),
);
}
if (!shouldProcessMessage) {
return;
}
}

if (this.signalLatency) {
await sleep(this.signalLatency);
}
this.handleSignalResponse(resp);
};

this.ws.onclose = (ev: CloseEvent) => {
log.warn(`websocket closed`, { ev });
this.handleOnClose(ev.reason);
};
if (this.signalLatency) {
await sleep(this.signalLatency);
}
this.handleSignalResponse(resp);
};

this.ws.onclose = (ev: CloseEvent) => {
log.warn(`websocket closed`, { ev });
this.handleOnClose(ev.reason);
};
} finally {
unlock();
}
});
}

Expand All @@ -357,7 +381,7 @@ export class SignalClient {
async close() {
const unlock = await this.closingLock.lock();
try {
this.isConnected = false;
this.state = SignalConnectionState.DISCONNECTING;
if (this.ws) {
this.ws.onmessage = null;
this.ws.onopen = null;
Expand All @@ -382,6 +406,7 @@ export class SignalClient {
this.ws = undefined;
}
} finally {
this.state = SignalConnectionState.DISCONNECTED;
this.clearPingInterval();
unlock();
}
Expand Down Expand Up @@ -522,7 +547,7 @@ export class SignalClient {
// capture all requests while reconnecting and put them in a queue
// unless the request originates from the queue, then don't enqueue again
const canQueue = !fromQueue && !canPassThroughQueue(message);
if (canQueue && this.isReconnecting) {
if (canQueue && this.state === SignalConnectionState.RECONNECTING) {
this.queuedRequests.push(async () => {
await this.sendRequest(message, true);
});
Expand Down Expand Up @@ -648,11 +673,10 @@ export class SignalClient {
this.requestQueue.run(req);
}
}
this.isReconnecting = false;
}

private async handleOnClose(reason: string) {
if (!this.isConnected) return;
if (this.state === SignalConnectionState.DISCONNECTED) return;
const onCloseCallback = this.onClose;
await this.close();
log.debug(`websocket connection closed: ${reason}`);
Expand Down
10 changes: 7 additions & 3 deletions src/room/RTCEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ import { EventEmitter } from 'events';
import type { MediaAttributes } from 'sdp-transform';
import type TypedEventEmitter from 'typed-emitter';
import type { SignalOptions } from '../api/SignalClient';
import { SignalClient, toProtoSessionDescription } from '../api/SignalClient';
import {
SignalClient,
SignalConnectionState,
toProtoSessionDescription,
} from '../api/SignalClient';
import log from '../logger';
import type { InternalRoomOptions } from '../options';
import {
Expand Down Expand Up @@ -867,7 +871,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
log.info(`reconnecting, attempt: ${this.reconnectAttempts}`);
this.emit(EngineEvent.Restarting);

if (this.client.isConnected) {
if (!this.client.isDisconnected) {
await this.client.sendLeave();
}
await this.cleanupPeerConnections();
Expand Down Expand Up @@ -1266,7 +1270,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit

private handleBrowserOnLine = () => {
// in case the engine is currently reconnecting, attempt a reconnect immediately after the browser state has changed to 'onLine'
if (this.client.isReconnecting) {
if (this.client.currentState === SignalConnectionState.RECONNECTING) {
this.clearReconnectTimeout();
this.attemptReconnect(ReconnectReason.RR_SIGNAL_DISCONNECTED);
}
Expand Down
Loading

0 comments on commit 77301cb

Please sign in to comment.