Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support HTTP long polling as an alternative to WebSockets #859

Merged
merged 12 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions app/client/components/GristClientSocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import WS from 'ws';
import {Socket as EIOSocket} from 'engine.io-client';

export interface GristClientSocketOptions {
headers?: Record<string, string>;
}

export class GristClientSocket {
private _wsSocket: WS.WebSocket | WebSocket | undefined;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to have a comment that exactly one of _wsSocket, _eioSocket is set.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private _eioSocket: EIOSocket | undefined;

// Set to true when the connection process is complete, either succesfully or
// after the WebSocket and polling transports have both failed. Events from
// the underlying socket are not forwarded to the client until that point.
private _openDone: boolean = false;

private _messageHandler: null | ((data: string) => void);
private _openHandler: null | (() => void);
private _errorHandler: null | ((err: Error) => void);
private _closeHandler: null | (() => void);

constructor(private _url: string, private _options?: GristClientSocketOptions) {
this._createWSSocket();
}

public set onmessage(cb: null | ((data: string) => void)) {
this._messageHandler = cb;
}

public set onopen(cb: null | (() => void)) {
this._openHandler = cb;
}

public set onerror(cb: null | ((err: Error) => void)) {
this._errorHandler = cb;
}

public set onclose(cb: null | (() => void)) {
this._closeHandler = cb;
}

public close() {
if (this._wsSocket) {
this._wsSocket.close();
} else {
this._eioSocket!.close();
}
}

public send(data: string) {
if (this._wsSocket) {
this._wsSocket.send(data);
} else {
this._eioSocket!.send(data);
}
}

// pause() and resume() are used for tests and assume a WS.WebSocket transport
public pause() {
(this._wsSocket as WS.WebSocket)?.pause();
}

public resume() {
(this._wsSocket as WS.WebSocket)?.resume();
}

private _createWSSocket() {
if(typeof WebSocket !== 'undefined') {
this._wsSocket = new WebSocket(this._url);
} else {
this._wsSocket = new WS(this._url, undefined, this._options);
}
this._wsSocket.onmessage = this._onWSMessage.bind(this);
this._wsSocket.onopen = this._onWSOpen.bind(this);
this._wsSocket.onerror = this._onWSError.bind(this);
this._wsSocket.onclose = this._onWSClose.bind(this);
}

private _destroyWSSocket() {
if (this._wsSocket) {
this._wsSocket.onmessage = null;
this._wsSocket.onopen = null;
this._wsSocket.onerror = null;
this._wsSocket.onclose = null;
this._wsSocket = undefined;
}
}

private _onWSMessage(event: WS.MessageEvent | MessageEvent<any>) {
if (this._openDone) {
// event.data is guaranteed to be a string here because we only send text frames.
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event#event_properties
this._messageHandler?.(event.data);
}
}

private _onWSOpen() {
// The connection was established successfully. Any future events can now
// be forwarded to the client.
this._openDone = true;
this._openHandler?.();
}

private _onWSError(ev: Event) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure it's impossible to receive the error event for a socket that's already open? If possible, should we forward it to _errorHandler rather than reconnect using polling at this point?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I ended up simplifying the logic, replacing _openDone with a _wsConnected bool that only tells us whether the next WebSocket error event means we should retry with Engine.IO or emit the error.

// The first connection attempt failed. Trigger an attempt with another transport.
this._destroyWSSocket();
this._createEIOSocket();
}

private _onWSClose() {
if (this._openDone) {
this._closeHandler?.();
}
}

private _createEIOSocket() {
this._eioSocket = new EIOSocket(this._url, {
path: new URL(this._url).pathname,
addTrailingSlash: false,
transports: ['polling'],
upgrade: false,
extraHeaders: this._options?.headers,
withCredentials: true,
});

this._eioSocket.on('message', this._onEIOMessage.bind(this));
this._eioSocket.on('open', this._onEIOOpen.bind(this));
this._eioSocket.on('error', this._onEIOError.bind(this));
this._eioSocket.on('close', this._onEIOClose.bind(this));
}

private _onEIOMessage(data: string) {
this._messageHandler?.(data);
}

private _onEIOOpen() {
this._openHandler?.();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this need to set _openDone? And doesn't _onEIOMessage need to check that? (If not, a comment to explain would be good, since that's a sign of a difference with WS implementation)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned below, I reworked the logic and there's only one state check left, in _onWSError since that's the only point where we need to decide whether to emit an error or downgrade the socket.
_onWSMessage doesn't need to check the state (now called _wsConnected) because it can only occur after a successful call to _onWSOpen. None of the _onEIO handlers need to check the state either, since the very fact they are called implies the socket was downgraded to Engine.IO.

}

private _onEIOError(ev: any) {
// We will make no further attempt to connect. Any future events can now
// be forwarded to the client.
this._openDone = true;
this._errorHandler?.(ev);
}

private _onEIOClose() {
this._closeHandler?.();
}
}
20 changes: 10 additions & 10 deletions app/client/components/GristWSConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {addOrgToPath, docUrl, getGristConfig} from 'app/common/urlUtils';
import {UserAPI} from 'app/common/UserAPI';
import {Events as BackboneEvents} from 'backbone';
import {Disposable} from 'grainjs';
import {GristClientSocket} from './GristClientSocket';

const G = getBrowserGlobals('window');
const reconnectInterval = [1000, 1000, 2000, 5000, 10000];
Expand Down Expand Up @@ -37,7 +38,7 @@ async function getDocWorkerUrl(assignmentId: string|null): Promise<string|null>
export interface GristWSSettings {
// A factory function for creating the WebSocket so that we can use from node
// or browser.
makeWebSocket(url: string): WebSocket;
makeWebSocket(url: string): GristClientSocket;

// A function for getting the timezone, so the code can be used outside webpack -
// currently a timezone library is lazy loaded in a way that doesn't quite work
Expand Down Expand Up @@ -74,7 +75,7 @@ export interface GristWSSettings {
export class GristWSSettingsBrowser implements GristWSSettings {
private _sessionStorage = getSessionStorage();

public makeWebSocket(url: string) { return new WebSocket(url); }
public makeWebSocket(url: string) { return new GristClientSocket(url); }
public getTimezone() { return guessTimezone(); }
public getPageUrl() { return G.window.location.href; }
public async getDocWorkerUrl(assignmentId: string|null) {
Expand Down Expand Up @@ -125,7 +126,7 @@ export class GristWSConnection extends Disposable {
private _reconnectTimeout: ReturnType<typeof setTimeout> | null = null;
private _reconnectAttempts: number = 0;
private _wantReconnect: boolean = true;
private _ws: WebSocket|null = null;
private _ws: GristClientSocket|null = null;

// The server sends incremental seqId numbers with each message on the connection, starting with
// 0. We keep track of them to allow for seamless reconnects.
Expand Down Expand Up @@ -211,17 +212,16 @@ export class GristWSConnection extends Disposable {
}

/**
* @event serverMessage Triggered when a message arrives from the server. Callbacks receive
* the raw message data as an additional argument.
* Triggered when a message arrives from the server.
*/
public onmessage(ev: MessageEvent) {
public onmessage(data: string) {
if (!this._ws) {
// It's possible to receive a message after we disconnect, at least in tests (where
// WebSocket is a node library). Ignoring is easier than unsubscribing properly.
return;
}
this._scheduleHeartbeat();
this._processReceivedMessage(ev.data, true);
this._processReceivedMessage(data, true);
}

public send(message: any) {
Expand Down Expand Up @@ -317,7 +317,7 @@ export class GristWSConnection extends Disposable {
private _sendHeartbeat() {
this.send(JSON.stringify({
beat: 'alive',
url: G.window.location.href,
url: this._settings.getPageUrl(),
docId: this._assignmentId,
}));
}
Expand Down Expand Up @@ -352,8 +352,8 @@ export class GristWSConnection extends Disposable {

this._ws.onmessage = this.onmessage.bind(this);

this._ws.onerror = (ev: Event|ErrorEvent) => {
this._log('GristWSConnection: onerror', 'error' in ev ? String(ev.error) : ev);
this._ws.onerror = (err: Error) => {
this._log('GristWSConnection: onerror', String(err));
};

this._ws.onclose = () => {
Expand Down
36 changes: 10 additions & 26 deletions app/server/lib/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {fromCallback} from 'app/server/lib/serverUtils';
import {i18n} from 'i18next';
import * as crypto from 'crypto';
import moment from 'moment';
import * as WebSocket from 'ws';
import {GristServerSocket} from 'app/server/lib/GristServerSocket';

// How many messages and bytes to accumulate for a disconnected client before booting it.
// The benefit is that a client who temporarily disconnects and reconnects without missing much,
Expand Down Expand Up @@ -97,8 +97,7 @@ export class Client {
private _missedMessagesTotalLength: number = 0;
private _destroyTimer: NodeJS.Timer|null = null;
private _destroyed: boolean = false;
private _websocket: WebSocket|null;
private _websocketEventHandlers: Array<{event: string, handler: (...args: any[]) => void}> = [];
private _websocket: GristServerSocket|null;
private _org: string|null = null;
private _profile: UserProfile|null = null;
private _user: FullUser|undefined = undefined;
Expand Down Expand Up @@ -131,18 +130,14 @@ export class Client {
return this._locale;
}

public setConnection(websocket: WebSocket, counter: string|null, browserSettings: BrowserSettings) {
public setConnection(websocket: GristServerSocket, counter: string|null, browserSettings: BrowserSettings) {
this._websocket = websocket;
this._counter = counter;
this.browserSettings = browserSettings;

const addHandler = (event: string, handler: (...args: any[]) => void) => {
websocket.on(event, handler);
this._websocketEventHandlers.push({event, handler});
};
addHandler('error', (err: unknown) => this._onError(err));
addHandler('close', () => this._onClose());
addHandler('message', (msg: string) => this._onMessage(msg));
websocket.onerror = (err: Error) => this._onError(err);
websocket.onclose = () => this._onClose();
websocket.onmessage = (msg: string) => this._onMessage(msg);
}

/**
Expand Down Expand Up @@ -189,7 +184,7 @@ export class Client {

public interruptConnection() {
if (this._websocket) {
this._removeWebsocketListeners();
this._websocket?.removeAllListeners();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question mark unneeded here (given the conditional)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, removed.

this._websocket.terminate(); // close() is inadequate when ws routed via loadbalancer
this._websocket = null;
}
Expand Down Expand Up @@ -359,7 +354,7 @@ export class Client {
// See also my report at https://stackoverflow.com/a/48411315/328565
await delay(250);

if (!this._destroyed && this._websocket?.readyState === WebSocket.OPEN) {
if (!this._destroyed && this._websocket?.isOpen) {
await this._sendToWebsocket(JSON.stringify({...clientConnectMsg, dup: true}));
}
} catch (err) {
Expand Down Expand Up @@ -604,7 +599,7 @@ export class Client {
/**
* Processes an error on the websocket.
*/
private _onError(err: unknown) {
private _onError(err: Error) {
this._log.warn(null, "onError", err);
// TODO Make sure that this is followed by onClose when the connection is lost.
}
Expand All @@ -613,7 +608,7 @@ export class Client {
* Processes the closing of a websocket.
*/
private _onClose() {
this._removeWebsocketListeners();
this._websocket?.removeAllListeners();

// Remove all references to the websocket.
this._websocket = null;
Expand All @@ -629,15 +624,4 @@ export class Client {
this._destroyTimer = setTimeout(() => this.destroy(), Deps.clientRemovalTimeoutMs);
}
}

private _removeWebsocketListeners() {
if (this._websocket) {
// Avoiding websocket.removeAllListeners() because WebSocket.Server registers listeners
// internally for websockets it keeps track of, and we should not accidentally remove those.
for (const {event, handler} of this._websocketEventHandlers) {
this._websocket.off(event, handler);
}
this._websocketEventHandlers = [];
}
}
}
22 changes: 11 additions & 11 deletions app/server/lib/Comm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
import {EventEmitter} from 'events';
import * as http from 'http';
import * as https from 'https';
import * as WebSocket from 'ws';
import {GristSocketServer} from 'app/server/lib/GristSocketServer';
import {GristServerSocket} from 'app/server/lib/GristServerSocket';

import {parseFirstUrlPart} from 'app/common/gristUrls';
import {firstDefined, safeJsonParse} from 'app/common/gutil';
Expand All @@ -50,6 +51,7 @@ import {localeFromRequest} from 'app/server/lib/ServerLocale';
import {fromCallback} from 'app/server/lib/serverUtils';
import {Sessions} from 'app/server/lib/Sessions';
import {i18n} from 'i18next';
import { trustOrigin } from './requestUtils';

export interface CommOptions {
sessions: Sessions; // A collection of all sessions for this instance of Grist
Expand All @@ -74,7 +76,7 @@ export interface CommOptions {
export class Comm extends EventEmitter {
// Collection of all sessions; maps sessionIds to ScopedSession objects.
public readonly sessions: Sessions = this._options.sessions;
private _wss: WebSocket.Server[]|null = null;
private _wss: GristSocketServer[]|null = null;

private _clients = new Map<string, Client>(); // Maps clientIds to Client objects.

Expand Down Expand Up @@ -146,11 +148,6 @@ export class Comm extends EventEmitter {
public async testServerShutdown() {
if (this._wss) {
for (const wssi of this._wss) {
// Terminate all clients. WebSocket.Server used to do it automatically in close() but no
// longer does (see https://github.com/websockets/ws/pull/1904#discussion_r668844565).
for (const ws of wssi.clients) {
ws.terminate();
}
await fromCallback((cb) => wssi.close(cb));
}
this._wss = null;
Expand Down Expand Up @@ -204,7 +201,7 @@ export class Comm extends EventEmitter {
/**
* Processes a new websocket connection, and associates the websocket and a Client object.
*/
private async _onWebSocketConnection(websocket: WebSocket, req: http.IncomingMessage) {
private async _onWebSocketConnection(websocket: GristServerSocket, req: http.IncomingMessage) {
if (this._options.hosts) {
// DocWorker ID (/dw/) and version tag (/v/) may be present in this request but are not
// needed. addOrgInfo assumes req.url starts with /o/ if present.
Expand Down Expand Up @@ -255,15 +252,18 @@ export class Comm extends EventEmitter {
if (this._options.httpsServer) { servers.push(this._options.httpsServer); }
const wss = [];
for (const server of servers) {
const wssi = new WebSocket.Server({server});
wssi.on('connection', async (websocket: WebSocket, req) => {
const wssi = new GristSocketServer(server, {
verifyClient: (req) => trustOrigin(req)
});

wssi.onconnection = async (websocket: GristServerSocket, req) => {
try {
await this._onWebSocketConnection(websocket, req);
} catch (e) {
log.error("Comm connection for %s threw exception: %s", req.url, e.message);
websocket.terminate(); // close() is inadequate when ws routed via loadbalancer
}
});
};
wss.push(wssi);
}
return wss;
Expand Down
Loading
Loading