Skip to content

Commit

Permalink
Introduce GristSocketServer, GristServerSocket and GristClientSocket
Browse files Browse the repository at this point in the history
These classes, used as an alternative to native WebSockets,
provide an automatic fallback to HTTP long polling (implemented
using Engine.IO) when a WebSocket connection fails.
  • Loading branch information
jonathanperret committed Mar 21, 2024
1 parent a354b9b commit ea08b49
Show file tree
Hide file tree
Showing 12 changed files with 755 additions and 79 deletions.
150 changes: 150 additions & 0 deletions app/client/components/GristClientSocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import WS from 'ws';
import {Socket as EIOSocket} from 'engine.io-client';

export interface GristClientSocketOptions {
headers?: Record<string, string>;
}

export class GristClientSocket {
private _wsSocket: WS.WebSocket | WebSocket | undefined;
private _eioSocket: EIOSocket | undefined;

// Set to true when the connection process is complete, either succesfully or
// after the WebSocket and polling transports have both failed. Events from
// the underlying socket are not forwarded to the client until that point.
private _openDone: boolean = false;

private _messageHandler: null | ((data: string) => void);
private _openHandler: null | (() => void);
private _errorHandler: null | ((err: Error) => void);
private _closeHandler: null | (() => void);

constructor(private _url: string, private _options?: GristClientSocketOptions) {
this._createWSSocket();
}

public set onmessage(cb: null | ((data: string) => void)) {
this._messageHandler = cb;
}

public set onopen(cb: null | (() => void)) {
this._openHandler = cb;
}

public set onerror(cb: null | ((err: Error) => void)) {
this._errorHandler = cb;
}

public set onclose(cb: null | (() => void)) {
this._closeHandler = cb;
}

public close() {
if (this._wsSocket) {
this._wsSocket.close();
} else {
this._eioSocket!.close();
}
}

public send(data: string) {
if (this._wsSocket) {
this._wsSocket.send(data);
} else {
this._eioSocket!.send(data);
}
}

// pause() and resume() are used for tests and assume a WS.WebSocket transport
public pause() {
(this._wsSocket as WS.WebSocket)?.pause();
}

public resume() {
(this._wsSocket as WS.WebSocket)?.resume();
}

private _createWSSocket() {
if(typeof WebSocket !== 'undefined') {
this._wsSocket = new WebSocket(this._url);
} else {
this._wsSocket = new WS(this._url, undefined, this._options);
}
this._wsSocket.onmessage = this._onWSMessage.bind(this);
this._wsSocket.onopen = this._onWSOpen.bind(this);
this._wsSocket.onerror = this._onWSError.bind(this);
this._wsSocket.onclose = this._onWSClose.bind(this);
}

private _destroyWSSocket() {
if (this._wsSocket) {
this._wsSocket.onmessage = null;
this._wsSocket.onopen = null;
this._wsSocket.onerror = null;
this._wsSocket.onclose = null;
this._wsSocket = undefined;
}
}

private _onWSMessage(event: WS.MessageEvent | MessageEvent<any>) {
if (this._openDone) {
// event.data is guaranteed to be a string here because we only send text frames.
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event#event_properties
this._messageHandler?.(event.data);
}
}

private _onWSOpen() {
// The connection was established successfully. Any future events can now
// be forwarded to the client.
this._openDone = true;
this._openHandler?.();
}

private _onWSError(ev: Event) {
// The first connection attempt failed. Trigger an attempt with another transport.
this._destroyWSSocket();
this._createEIOSocket();
}

private _onWSClose() {
if (this._openDone) {
this._closeHandler?.();
}
}

private _createEIOSocket() {
this._eioSocket = new EIOSocket(this._url, {
path: new URL(this._url).pathname,
addTrailingSlash: false,
transports: ['polling'],
upgrade: false,
extraHeaders: this._options?.headers,
withCredentials: true,
});

this._eioSocket.on('message', this._onEIOMessage.bind(this));
this._eioSocket.on('open', this._onEIOOpen.bind(this));
this._eioSocket.on('error', this._onEIOError.bind(this));
this._eioSocket.on('close', this._onEIOClose.bind(this));
}

private _onEIOMessage(data: string) {
this._messageHandler?.(data);
}

private _onEIOOpen() {
this._openHandler?.();
}

private _onEIOError(ev: any) {
// We will make no further attempt to connect. Any future events can now
// be forwarded to the client.
this._openDone = true;
this._errorHandler?.(ev);
}

private _onEIOClose() {
this._closeHandler?.();
}
}
18 changes: 9 additions & 9 deletions app/client/components/GristWSConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {addOrgToPath, docUrl, getGristConfig} from 'app/common/urlUtils';
import {UserAPI} from 'app/common/UserAPI';
import {Events as BackboneEvents} from 'backbone';
import {Disposable} from 'grainjs';
import {GristClientSocket} from './GristClientSocket';

const G = getBrowserGlobals('window');
const reconnectInterval = [1000, 1000, 2000, 5000, 10000];
Expand Down Expand Up @@ -37,7 +38,7 @@ async function getDocWorkerUrl(assignmentId: string|null): Promise<string|null>
export interface GristWSSettings {
// A factory function for creating the WebSocket so that we can use from node
// or browser.
makeWebSocket(url: string): WebSocket;
makeWebSocket(url: string): GristClientSocket;

// A function for getting the timezone, so the code can be used outside webpack -
// currently a timezone library is lazy loaded in a way that doesn't quite work
Expand Down Expand Up @@ -74,7 +75,7 @@ export interface GristWSSettings {
export class GristWSSettingsBrowser implements GristWSSettings {
private _sessionStorage = getSessionStorage();

public makeWebSocket(url: string) { return new WebSocket(url); }
public makeWebSocket(url: string) { return new GristClientSocket(url); }
public getTimezone() { return guessTimezone(); }
public getPageUrl() { return G.window.location.href; }
public async getDocWorkerUrl(assignmentId: string|null) {
Expand Down Expand Up @@ -125,7 +126,7 @@ export class GristWSConnection extends Disposable {
private _reconnectTimeout: ReturnType<typeof setTimeout> | null = null;
private _reconnectAttempts: number = 0;
private _wantReconnect: boolean = true;
private _ws: WebSocket|null = null;
private _ws: GristClientSocket|null = null;

// The server sends incremental seqId numbers with each message on the connection, starting with
// 0. We keep track of them to allow for seamless reconnects.
Expand Down Expand Up @@ -211,17 +212,16 @@ export class GristWSConnection extends Disposable {
}

/**
* @event serverMessage Triggered when a message arrives from the server. Callbacks receive
* the raw message data as an additional argument.
* Triggered when a message arrives from the server.
*/
public onmessage(ev: MessageEvent) {
public onmessage(data: string) {
if (!this._ws) {
// It's possible to receive a message after we disconnect, at least in tests (where
// WebSocket is a node library). Ignoring is easier than unsubscribing properly.
return;
}
this._scheduleHeartbeat();
this._processReceivedMessage(ev.data, true);
this._processReceivedMessage(data, true);
}

public send(message: any) {
Expand Down Expand Up @@ -352,8 +352,8 @@ export class GristWSConnection extends Disposable {

this._ws.onmessage = this.onmessage.bind(this);

this._ws.onerror = (ev: Event|ErrorEvent) => {
this._log('GristWSConnection: onerror', 'error' in ev ? String(ev.error) : ev);
this._ws.onerror = (err: Error) => {
this._log('GristWSConnection: onerror', String(err));
};

this._ws.onclose = () => {
Expand Down
36 changes: 10 additions & 26 deletions app/server/lib/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {fromCallback} from 'app/server/lib/serverUtils';
import {i18n} from 'i18next';
import * as crypto from 'crypto';
import moment from 'moment';
import * as WebSocket from 'ws';
import {GristServerSocket} from 'app/server/lib/GristServerSocket';

// How many messages and bytes to accumulate for a disconnected client before booting it.
// The benefit is that a client who temporarily disconnects and reconnects without missing much,
Expand Down Expand Up @@ -97,8 +97,7 @@ export class Client {
private _missedMessagesTotalLength: number = 0;
private _destroyTimer: NodeJS.Timer|null = null;
private _destroyed: boolean = false;
private _websocket: WebSocket|null;
private _websocketEventHandlers: Array<{event: string, handler: (...args: any[]) => void}> = [];
private _websocket: GristServerSocket|null;
private _org: string|null = null;
private _profile: UserProfile|null = null;
private _user: FullUser|undefined = undefined;
Expand Down Expand Up @@ -131,18 +130,14 @@ export class Client {
return this._locale;
}

public setConnection(websocket: WebSocket, counter: string|null, browserSettings: BrowserSettings) {
public setConnection(websocket: GristServerSocket, counter: string|null, browserSettings: BrowserSettings) {
this._websocket = websocket;
this._counter = counter;
this.browserSettings = browserSettings;

const addHandler = (event: string, handler: (...args: any[]) => void) => {
websocket.on(event, handler);
this._websocketEventHandlers.push({event, handler});
};
addHandler('error', (err: unknown) => this._onError(err));
addHandler('close', () => this._onClose());
addHandler('message', (msg: string) => this._onMessage(msg));
websocket.onerror = (err: Error) => this._onError(err);
websocket.onclose = () => this._onClose();
websocket.onmessage = (msg: string) => this._onMessage(msg);
}

/**
Expand Down Expand Up @@ -189,7 +184,7 @@ export class Client {

public interruptConnection() {
if (this._websocket) {
this._removeWebsocketListeners();
this._websocket?.removeAllListeners();
this._websocket.terminate(); // close() is inadequate when ws routed via loadbalancer
this._websocket = null;
}
Expand Down Expand Up @@ -359,7 +354,7 @@ export class Client {
// See also my report at https://stackoverflow.com/a/48411315/328565
await delay(250);

if (!this._destroyed && this._websocket?.readyState === WebSocket.OPEN) {
if (!this._destroyed && this._websocket?.isOpen) {
await this._sendToWebsocket(JSON.stringify({...clientConnectMsg, dup: true}));
}
} catch (err) {
Expand Down Expand Up @@ -604,7 +599,7 @@ export class Client {
/**
* Processes an error on the websocket.
*/
private _onError(err: unknown) {
private _onError(err: Error) {
this._log.warn(null, "onError", err);
// TODO Make sure that this is followed by onClose when the connection is lost.
}
Expand All @@ -613,7 +608,7 @@ export class Client {
* Processes the closing of a websocket.
*/
private _onClose() {
this._removeWebsocketListeners();
this._websocket?.removeAllListeners();

// Remove all references to the websocket.
this._websocket = null;
Expand All @@ -629,15 +624,4 @@ export class Client {
this._destroyTimer = setTimeout(() => this.destroy(), Deps.clientRemovalTimeoutMs);
}
}

private _removeWebsocketListeners() {
if (this._websocket) {
// Avoiding websocket.removeAllListeners() because WebSocket.Server registers listeners
// internally for websockets it keeps track of, and we should not accidentally remove those.
for (const {event, handler} of this._websocketEventHandlers) {
this._websocket.off(event, handler);
}
this._websocketEventHandlers = [];
}
}
}
22 changes: 11 additions & 11 deletions app/server/lib/Comm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
import {EventEmitter} from 'events';
import * as http from 'http';
import * as https from 'https';
import * as WebSocket from 'ws';
import {GristSocketServer} from 'app/server/lib/GristSocketServer';
import {GristServerSocket} from 'app/server/lib/GristServerSocket';

import {parseFirstUrlPart} from 'app/common/gristUrls';
import {firstDefined, safeJsonParse} from 'app/common/gutil';
Expand All @@ -50,6 +51,7 @@ import {localeFromRequest} from 'app/server/lib/ServerLocale';
import {fromCallback} from 'app/server/lib/serverUtils';
import {Sessions} from 'app/server/lib/Sessions';
import {i18n} from 'i18next';
import { trustOrigin } from './requestUtils';

export interface CommOptions {
sessions: Sessions; // A collection of all sessions for this instance of Grist
Expand All @@ -74,7 +76,7 @@ export interface CommOptions {
export class Comm extends EventEmitter {
// Collection of all sessions; maps sessionIds to ScopedSession objects.
public readonly sessions: Sessions = this._options.sessions;
private _wss: WebSocket.Server[]|null = null;
private _wss: GristSocketServer[]|null = null;

private _clients = new Map<string, Client>(); // Maps clientIds to Client objects.

Expand Down Expand Up @@ -146,11 +148,6 @@ export class Comm extends EventEmitter {
public async testServerShutdown() {
if (this._wss) {
for (const wssi of this._wss) {
// Terminate all clients. WebSocket.Server used to do it automatically in close() but no
// longer does (see https://github.com/websockets/ws/pull/1904#discussion_r668844565).
for (const ws of wssi.clients) {
ws.terminate();
}
await fromCallback((cb) => wssi.close(cb));
}
this._wss = null;
Expand Down Expand Up @@ -204,7 +201,7 @@ export class Comm extends EventEmitter {
/**
* Processes a new websocket connection, and associates the websocket and a Client object.
*/
private async _onWebSocketConnection(websocket: WebSocket, req: http.IncomingMessage) {
private async _onWebSocketConnection(websocket: GristServerSocket, req: http.IncomingMessage) {
if (this._options.hosts) {
// DocWorker ID (/dw/) and version tag (/v/) may be present in this request but are not
// needed. addOrgInfo assumes req.url starts with /o/ if present.
Expand Down Expand Up @@ -255,15 +252,18 @@ export class Comm extends EventEmitter {
if (this._options.httpsServer) { servers.push(this._options.httpsServer); }
const wss = [];
for (const server of servers) {
const wssi = new WebSocket.Server({server});
wssi.on('connection', async (websocket: WebSocket, req) => {
const wssi = new GristSocketServer(server, {
verifyClient: (req) => trustOrigin(req)
});

wssi.onconnection = async (websocket: GristServerSocket, req) => {
try {
await this._onWebSocketConnection(websocket, req);
} catch (e) {
log.error("Comm connection for %s threw exception: %s", req.url, e.message);
websocket.terminate(); // close() is inadequate when ws routed via loadbalancer
}
});
};
wss.push(wssi);
}
return wss;
Expand Down
Loading

0 comments on commit ea08b49

Please sign in to comment.