Skip to content

Commit

Permalink
feat(): working public topics
Browse files Browse the repository at this point in the history
  • Loading branch information
tiagosiebler committed May 30, 2024
1 parent 13b829f commit 0010733
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 15 deletions.
104 changes: 104 additions & 0 deletions examples/ws-public.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// eslint-disable-next-line @typescript-eslint/no-unused-vars
import { LogParams, WebsocketClient } from '../src';
import { WsTopicRequest } from '../src/lib/websocket/websocket-util';

// const customLogger = {
// // eslint-disable-next-line @typescript-eslint/no-unused-vars
// trace: (...params: LogParams): void => {
// console.log('trace', ...params);
// },
// info: (...params: LogParams): void => {
// console.log('info', ...params);
// },
// error: (...params: LogParams): void => {
// console.error('error', ...params);
// },
// };

async function start() {
const client = new WebsocketClient();

// Optional, inject a custom logger
// const client = new WebsocketClient({}, customLogger);

client.on('open', (data) => {
console.log('connected ', data?.wsKey);
});

// Data received
client.on('update', (data) => {
console.info('data received: ', JSON.stringify(data));
});

// Something happened, attempting to reconenct
client.on('reconnect', (data) => {
console.log('reconnect: ', data);
});

// Reconnect successful
client.on('reconnected', (data) => {
console.log('reconnected: ', data);
});

// Connection closed. If unexpected, expect reconnect -> reconnected.
client.on('close', (data) => {
console.error('close: ', data);
});

// Reply to a request, e.g. "subscribe"/"unsubscribe"/"authenticate"
client.on('response', (data) => {
console.info('server reply: ', JSON.stringify(data), '\n');
});

client.on('exception', (data) => {
console.error('exception: ', data);
});

client.on('authenticated', (data) => {
console.error('authenticated: ', data);
});

try {
const tickersRequestWithParams: WsTopicRequest = {
topic: 'spot.tickers',
params: ['BTC_USDT', 'ETH_USDT', 'MATIC_USDT'],
};

const rawTradesRequestWithParams: WsTopicRequest = {
topic: 'spot.trades',
params: ['BTC_USDT', 'ETH_USDT', 'MATIC_USDT'],
};

// const topicWithoutParamsAsString = 'spot.balances';

// This has the same effect as above, it's just a different way of messaging which topic this is for
// const topicWithoutParamsAsObject: WsTopicRequest = {
// topic: 'spot.balances',
// };

/**
* Either send one topic (with optional params) at a time
*/
// client.subscribe(tickersRequestWithParams, 'spotV4');

/**
* Or send multiple topics in a batch (grouped by ws connection (WsKey))
*/
client.subscribe(
[tickersRequestWithParams, rawTradesRequestWithParams],
'spotV4',
);

// /**
// * You can also use strings for topics that don't have any parameters, even if you mix multiple requests into one function call:
// */
// client.subscribe(
// [tickersRequestWithParams, rawTradesRequestWithParams, topicWithoutParamsAsString],
// 'spotV4',
// );
} catch (e) {
console.error(`Req error: `, e);
}
}

start();
102 changes: 89 additions & 13 deletions src/WebsocketClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
import {
WsOperation,
WsRequestOperationGate,
WsRequestPing,
} from './types/websockets/requests.js';

export const WS_LOGGER_CATEGORY = { category: 'gate-ws' };
Expand Down Expand Up @@ -86,9 +87,16 @@ export class WebsocketClient extends BaseWebsocketClient<WsKey> {
* Call `unsubscribe(topics)` to remove topics
*/
public subscribe(
requests: (WsTopicRequest<WsTopic> | WsTopic)[],
requests:
| (WsTopicRequest<WsTopic> | WsTopic)
| (WsTopicRequest<WsTopic> | WsTopic)[],
wsKey: WsKey,
) {
if (!Array.isArray(requests)) {
this.subscribeTopicsForWsKey([requests], wsKey);
return;
}

if (requests.length) {
this.subscribeTopicsForWsKey(requests, wsKey);
}
Expand All @@ -101,9 +109,16 @@ export class WebsocketClient extends BaseWebsocketClient<WsKey> {
* - These topics will be removed from the topic cache, so they won't be subscribed to again.
*/
public unsubscribe(
requests: (WsTopicRequest<WsTopic> | WsTopic)[],
requests:
| (WsTopicRequest<WsTopic> | WsTopic)
| (WsTopicRequest<WsTopic> | WsTopic)[],
wsKey: WsKey,
) {
if (!Array.isArray(requests)) {
this.unsubscribeTopicsForWsKey([requests], wsKey);
return;
}

if (requests.length) {
this.unsubscribeTopicsForWsKey(requests, wsKey);
}
Expand All @@ -116,27 +131,80 @@ export class WebsocketClient extends BaseWebsocketClient<WsKey> {
*/

protected sendPingEvent(wsKey: WsKey) {
return this.tryWsSend(wsKey, '{"event":"ping"}');
let pingChannel: WsRequestPing['channel'];

switch (wsKey) {
case 'deliveryFuturesBTCV4':
case 'deliveryFuturesUSDTV4':
case 'perpFuturesBTCV4':
case 'perpFuturesUSDTV4': {
pingChannel = 'futures.ping';
break;
}
case 'announcementsV4': {
pingChannel = 'announcement.ping';
break;
}
case 'optionsV4': {
pingChannel = 'options.ping';
break;
}
case 'spotV4': {
pingChannel = 'spot.ping';
break;
}
default: {
throw neverGuard(wsKey, `Unhandled WsKey "${wsKey}"`);
}
}

const timeInMs = Date.now();
const timeInS = (timeInMs / 1000).toFixed(0);
return this.tryWsSend(
wsKey,
`{ "time": ${timeInS}, "channel": "${pingChannel}" }`,
);
}

protected sendPongEvent(wsKey: WsKey) {
return this.tryWsSend(wsKey, '{"event":"pong"}');
}
try {
this.logger.trace(`Sending upstream ws PONG: `, {
...WS_LOGGER_CATEGORY,
wsMessage: 'PONG',
wsKey,
});
if (!wsKey) {
throw new Error(
'Cannot send PONG due to no known websocket for this wsKey',
);
}
const wsState = this.getWsStore().get(wsKey);
if (!wsState || !wsState?.ws) {
throw new Error(
`${wsKey} socket not connected yet, call "connectAll()" first then try again when the "open" event arrives`,
);
}

protected isWsPing(msg: any): boolean {
if (typeof msg?.data === 'string' && msg.data.includes('"event":"ping"')) {
return true;
// Send a protocol layer pong
wsState.ws.pong();
} catch (e) {
this.logger.error(`Failed to send WS PONG`, {
...WS_LOGGER_CATEGORY,
wsMessage: 'PONG',
wsKey,
exception: e,
});
}
// this.logger.info(`Not a ping: `, {
// data: msg?.data,
// type: typeof msg?.data,
// });
}

// Unused, pings for gate are protocol layer pings
// eslint-disable-next-line @typescript-eslint/no-unused-vars
protected isWsPing(_msg: any): boolean {
return false;
}

protected isWsPong(msg: any): boolean {
if (typeof msg?.data === 'string' && msg.data.includes('"event":"pong"')) {
if (typeof msg?.data === 'string' && msg.data.includes('.pong"')) {
return true;
}

Expand Down Expand Up @@ -165,6 +233,14 @@ export class WebsocketClient extends BaseWebsocketClient<WsKey> {
return results;
}

if (eventAction === 'update') {
results.push({
eventType: 'update',
event: parsed,
});
return results;
}

// These are request/reply pattern events (e.g. after subscribing to topics or authenticating)
if (responseEvents.includes(eventAction)) {
results.push({
Expand Down
7 changes: 6 additions & 1 deletion src/lib/BaseWSClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -626,14 +626,19 @@ export abstract class BaseWebsocketClient<
this.clearPongTimer(wsKey);

if (this.isWsPong(event)) {
this.logger.trace('Received pong', { ...WS_LOGGER_CATEGORY, wsKey });
this.logger.trace('Received pong', {
...WS_LOGGER_CATEGORY,
wsKey,
event,
});
return;
}

if (this.isWsPing(event)) {
this.logger.trace('Received ping', {
...WS_LOGGER_CATEGORY,
wsKey,
event,
});
this.sendPongEvent(wsKey, ws);
return;
Expand Down
5 changes: 4 additions & 1 deletion src/lib/websocket/websocket-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ export type WsMarket = 'all';
* - Topic: the topic this event is for
* - Payload: the parameters to include, optional. E.g. auth requires key + sign. Some topics allow configurable parameters.
*/
export interface WsTopicRequest<TWSTopic extends string, TWSPayload = any> {
export interface WsTopicRequest<
TWSTopic extends string = string,
TWSPayload = any,
> {
topic: TWSTopic;
params?: TWSPayload;
}
Expand Down

0 comments on commit 0010733

Please sign in to comment.