diff --git a/.changeset/proud-drinks-shout.md b/.changeset/proud-drinks-shout.md new file mode 100644 index 0000000000..72d592b607 --- /dev/null +++ b/.changeset/proud-drinks-shout.md @@ -0,0 +1,5 @@ +--- +"livekit-client": patch +--- + +Use enum to track connection state of signal client diff --git a/src/api/SignalClient.ts b/src/api/SignalClient.ts index e92cdf3540..0de10587ae 100644 --- a/src/api/SignalClient.ts +++ b/src/api/SignalClient.ts @@ -82,12 +82,16 @@ function canPassThroughQueue(req: SignalMessage): boolean { return canPass; } +export enum SignalConnectionState { + CONNECTING, + CONNECTED, + RECONNECTING, + DISCONNECTING, + DISCONNECTED, +} + /** @internal */ export class SignalClient { - isConnected: boolean; - - isReconnecting: boolean; - requestQueue: AsyncQueue; queuedRequests: Array<() => Promise>; @@ -141,6 +145,17 @@ export class SignalClient { ws?: WebSocket; + get currentState() { + return this.state; + } + + get isDisconnected() { + return ( + this.state === SignalConnectionState.DISCONNECTING || + this.state === SignalConnectionState.DISCONNECTED + ); + } + private options?: SignalOptions; private pingTimeout: ReturnType | undefined; @@ -153,13 +168,17 @@ export class SignalClient { private closingLock: Mutex; + private state: SignalConnectionState = SignalConnectionState.DISCONNECTED; + + private connectionLock: Mutex; + constructor(useJSON: boolean = false) { - this.isConnected = false; - this.isReconnecting = false; this.useJSON = useJSON; this.requestQueue = new AsyncQueue(); this.queuedRequests = []; this.closingLock = new Mutex(); + this.connectionLock = new Mutex(); + this.state = SignalConnectionState.DISCONNECTED; } async join( @@ -170,7 +189,7 @@ export class SignalClient { ): Promise { // during a full reconnect, we'd want to start the sequence even if currently // connected - this.isConnected = false; + this.state = SignalConnectionState.CONNECTING; this.options = opts; const res = await this.connect(url, token, opts, abortSignal); return res as JoinResponse; @@ -186,7 +205,7 @@ export class SignalClient { log.warn('attempted to reconnect without signal options being set, ignoring'); return; } - this.isReconnecting = true; + this.state = SignalConnectionState.RECONNECTING; // clear ping interval and restart it once reconnected this.clearPingInterval(); @@ -215,127 +234,132 @@ export class SignalClient { const params = createConnectionParams(token, clientInfo, opts); return new Promise(async (resolve, reject) => { - const abortHandler = async () => { - this.close(); - clearTimeout(wsTimeout); - reject(new ConnectionError('room connection has been cancelled (signal)')); - }; - - const wsTimeout = setTimeout(() => { - this.close(); - reject(new ConnectionError('room connection has timed out (signal)')); - }, opts.websocketTimeout); - - if (abortSignal?.aborted) { - abortHandler(); - } - abortSignal?.addEventListener('abort', abortHandler); - log.debug(`connecting to ${url + params}`); - if (this.ws) { - await this.close(); - } - this.ws = new WebSocket(url + params); - this.ws.binaryType = 'arraybuffer'; + const unlock = await this.connectionLock.lock(); + try { + const abortHandler = async () => { + this.close(); + clearTimeout(wsTimeout); + reject(new ConnectionError('room connection has been cancelled (signal)')); + }; - this.ws.onopen = () => { - clearTimeout(wsTimeout); - }; + const wsTimeout = setTimeout(() => { + this.close(); + reject(new ConnectionError('room connection has timed out (signal)')); + }, opts.websocketTimeout); - this.ws.onerror = async (ev: Event) => { - if (!this.isConnected) { + if (abortSignal?.aborted) { + abortHandler(); + } + abortSignal?.addEventListener('abort', abortHandler); + log.debug(`connecting to ${url + params}`); + if (this.ws) { + await this.close(); + } + this.ws = new WebSocket(url + params); + this.ws.binaryType = 'arraybuffer'; + + this.ws.onopen = () => { clearTimeout(wsTimeout); - try { - const resp = await fetch(`http${url.substring(2)}/validate${params}`); - if (resp.status.toFixed(0).startsWith('4')) { - const msg = await resp.text(); - reject(new ConnectionError(msg, ConnectionErrorReason.NotAllowed, resp.status)); - } else { + }; + + this.ws.onerror = async (ev: Event) => { + if (this.state !== SignalConnectionState.CONNECTED) { + clearTimeout(wsTimeout); + try { + const resp = await fetch(`http${url.substring(2)}/validate${params}`); + if (resp.status.toFixed(0).startsWith('4')) { + const msg = await resp.text(); + reject(new ConnectionError(msg, ConnectionErrorReason.NotAllowed, resp.status)); + } else { + reject( + new ConnectionError( + 'Internal error', + ConnectionErrorReason.InternalError, + resp.status, + ), + ); + } + } catch (e) { reject( new ConnectionError( - 'Internal error', - ConnectionErrorReason.InternalError, - resp.status, + 'server was not reachable', + ConnectionErrorReason.ServerUnreachable, ), ); } - } catch (e) { - reject( - new ConnectionError( - 'server was not reachable', - ConnectionErrorReason.ServerUnreachable, - ), - ); + return; + } + // other errors, handle + this.handleWSError(ev); + }; + + this.ws.onmessage = async (ev: MessageEvent) => { + // not considered connected until JoinResponse is received + let resp: SignalResponse; + if (typeof ev.data === 'string') { + const json = JSON.parse(ev.data); + resp = SignalResponse.fromJson(json); + } else if (ev.data instanceof ArrayBuffer) { + resp = SignalResponse.fromBinary(new Uint8Array(ev.data)); + } else { + log.error(`could not decode websocket message: ${typeof ev.data}`); + return; } - return; - } - // other errors, handle - this.handleWSError(ev); - }; - - this.ws.onmessage = async (ev: MessageEvent) => { - // not considered connected until JoinResponse is received - let resp: SignalResponse; - if (typeof ev.data === 'string') { - const json = JSON.parse(ev.data); - resp = SignalResponse.fromJson(json); - } else if (ev.data instanceof ArrayBuffer) { - resp = SignalResponse.fromBinary(new Uint8Array(ev.data)); - } else { - log.error(`could not decode websocket message: ${typeof ev.data}`); - return; - } - if (!this.isConnected) { - let shouldProcessMessage = false; - // handle join message only - if (resp.message?.case === 'join') { - this.isConnected = true; - abortSignal?.removeEventListener('abort', abortHandler); - this.pingTimeoutDuration = resp.message.value.pingTimeout; - this.pingIntervalDuration = resp.message.value.pingInterval; - - if (this.pingTimeoutDuration && this.pingTimeoutDuration > 0) { - log.debug('ping config', { - timeout: this.pingTimeoutDuration, - interval: this.pingIntervalDuration, - }); + if (this.state !== SignalConnectionState.CONNECTED) { + let shouldProcessMessage = false; + // handle join message only + if (resp.message?.case === 'join') { + this.state = SignalConnectionState.CONNECTED; + abortSignal?.removeEventListener('abort', abortHandler); + this.pingTimeoutDuration = resp.message.value.pingTimeout; + this.pingIntervalDuration = resp.message.value.pingInterval; + + if (this.pingTimeoutDuration && this.pingTimeoutDuration > 0) { + log.debug('ping config', { + timeout: this.pingTimeoutDuration, + interval: this.pingIntervalDuration, + }); + this.startPingInterval(); + } + resolve(resp.message.value); + } else if (this.state === SignalConnectionState.RECONNECTING) { + // in reconnecting, any message received means signal reconnected + this.state = SignalConnectionState.CONNECTED; + abortSignal?.removeEventListener('abort', abortHandler); this.startPingInterval(); + if (resp.message?.case === 'reconnect') { + resolve(resp.message?.value); + } else { + resolve(); + shouldProcessMessage = true; + } + } else if (!opts.reconnect) { + // non-reconnect case, should receive join response first + reject( + new ConnectionError( + `did not receive join response, got ${resp.message?.case} instead`, + ), + ); } - resolve(resp.message.value); - } else if (opts.reconnect) { - // in reconnecting, any message received means signal reconnected - this.isConnected = true; - abortSignal?.removeEventListener('abort', abortHandler); - this.startPingInterval(); - if (resp.message?.case === 'reconnect') { - resolve(resp.message?.value); - } else { - resolve(); - shouldProcessMessage = true; + if (!shouldProcessMessage) { + return; } - } else if (!opts.reconnect) { - // non-reconnect case, should receive join response first - reject( - new ConnectionError( - `did not receive join response, got ${resp.message?.case} instead`, - ), - ); - } - if (!shouldProcessMessage) { - return; } - } - if (this.signalLatency) { - await sleep(this.signalLatency); - } - this.handleSignalResponse(resp); - }; - - this.ws.onclose = (ev: CloseEvent) => { - log.warn(`websocket closed`, { ev }); - this.handleOnClose(ev.reason); - }; + if (this.signalLatency) { + await sleep(this.signalLatency); + } + this.handleSignalResponse(resp); + }; + + this.ws.onclose = (ev: CloseEvent) => { + log.warn(`websocket closed`, { ev }); + this.handleOnClose(ev.reason); + }; + } finally { + unlock(); + } }); } @@ -357,7 +381,7 @@ export class SignalClient { async close() { const unlock = await this.closingLock.lock(); try { - this.isConnected = false; + this.state = SignalConnectionState.DISCONNECTING; if (this.ws) { this.ws.onmessage = null; this.ws.onopen = null; @@ -382,6 +406,7 @@ export class SignalClient { this.ws = undefined; } } finally { + this.state = SignalConnectionState.DISCONNECTED; this.clearPingInterval(); unlock(); } @@ -522,7 +547,7 @@ export class SignalClient { // capture all requests while reconnecting and put them in a queue // unless the request originates from the queue, then don't enqueue again const canQueue = !fromQueue && !canPassThroughQueue(message); - if (canQueue && this.isReconnecting) { + if (canQueue && this.state === SignalConnectionState.RECONNECTING) { this.queuedRequests.push(async () => { await this.sendRequest(message, true); }); @@ -648,11 +673,10 @@ export class SignalClient { this.requestQueue.run(req); } } - this.isReconnecting = false; } private async handleOnClose(reason: string) { - if (!this.isConnected) return; + if (this.state === SignalConnectionState.DISCONNECTED) return; const onCloseCallback = this.onClose; await this.close(); log.debug(`websocket connection closed: ${reason}`); diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 4c323fad01..242f7ff8ee 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -2,7 +2,11 @@ import { EventEmitter } from 'events'; import type { MediaAttributes } from 'sdp-transform'; import type TypedEventEmitter from 'typed-emitter'; import type { SignalOptions } from '../api/SignalClient'; -import { SignalClient, toProtoSessionDescription } from '../api/SignalClient'; +import { + SignalClient, + SignalConnectionState, + toProtoSessionDescription, +} from '../api/SignalClient'; import log from '../logger'; import type { InternalRoomOptions } from '../options'; import { @@ -867,7 +871,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit log.info(`reconnecting, attempt: ${this.reconnectAttempts}`); this.emit(EngineEvent.Restarting); - if (this.client.isConnected) { + if (!this.client.isDisconnected) { await this.client.sendLeave(); } await this.cleanupPeerConnections(); @@ -1266,7 +1270,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private handleBrowserOnLine = () => { // in case the engine is currently reconnecting, attempt a reconnect immediately after the browser state has changed to 'onLine' - if (this.client.isReconnecting) { + if (this.client.currentState === SignalConnectionState.RECONNECTING) { this.clearReconnectTimeout(); this.attemptReconnect(ReconnectReason.RR_SIGNAL_DISCONNECTED); } diff --git a/src/room/Room.ts b/src/room/Room.ts index 5cd737f785..d600fdb58b 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -664,7 +664,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.connectFuture = undefined; } // send leave - if (this.engine?.client.isConnected) { + if (!this.engine?.client.isDisconnected) { await this.engine.client.sendLeave(); } // close engine (also closes client) diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index c889189207..938a172d01 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -1129,7 +1129,7 @@ export default class LocalParticipant extends Participant { ) { this.participantTrackPermissions = participantTrackPermissions; this.allParticipantsAllowedToSubscribe = allParticipantsAllowed; - if (this.engine.client.isConnected) { + if (!this.engine.client.isDisconnected) { this.updateTrackSubscriptionPermissions(); } }