From e1d239c9a945478cd0cb4ac93aaefa5d20de8851 Mon Sep 17 00:00:00 2001 From: Tiago Siebler Date: Mon, 3 Jun 2024 16:32:59 +0100 Subject: [PATCH] feat(): private ws support for spot --- examples/ws-private.ts | 76 +++++++++--- examples/ws-public.ts | 4 +- src/WebsocketClient.ts | 172 +++++++++++++++++++++++----- src/lib/BaseWSClient.ts | 112 ++++++++++++++---- src/lib/websocket/websocket-util.ts | 3 +- src/types/websockets/client.ts | 2 + 6 files changed, 300 insertions(+), 69 deletions(-) diff --git a/examples/ws-private.ts b/examples/ws-private.ts index d83b757..fa98bdb 100644 --- a/examples/ws-private.ts +++ b/examples/ws-private.ts @@ -1,21 +1,23 @@ -import { LogParams, WebsocketClient, WsTopic } from '../src'; +/* eslint-disable @typescript-eslint/no-unused-vars */ +import { LogParams, WebsocketClient } from '../src'; +import { WsTopicRequest } from '../src/lib/websocket/websocket-util'; +// eslint-disable-next-line @typescript-eslint/no-unused-vars const account = { key: process.env.API_KEY || 'apiKeyHere', secret: process.env.API_SECRET || 'apiSecretHere', - apiApplicationId: process.env.API_APPLICATION_ID || 'apiMemoHere', }; const customLogger = { // eslint-disable-next-line @typescript-eslint/no-unused-vars trace: (...params: LogParams): void => { - // console.log('trace', ...params); + // console.log(new Date(), 'trace', ...params); }, info: (...params: LogParams): void => { - console.log('info', ...params); + console.log(new Date(), 'info', ...params); }, error: (...params: LogParams): void => { - console.error('error', ...params); + console.error(new Date(), 'error', ...params); }, }; @@ -24,7 +26,6 @@ async function start() { { apiKey: account.key, apiSecret: account.secret, - apiApplicationId: account.apiApplicationId, }, customLogger, ); @@ -56,7 +57,7 @@ async function start() { // Reply to a request, e.g. "subscribe"/"unsubscribe"/"authenticate" client.on('response', (data) => { - console.info('response: ', JSON.stringify(data)); + console.info('server reply: ', JSON.stringify(data), '\n'); }); client.on('exception', (data) => { @@ -68,15 +69,58 @@ async function start() { }); try { - const topics: WsTopic[] = [ - 'balance', - 'algoexecutionreportv2', - 'executionreport', - 'marginassignment', - 'position', - ]; - - client.subscribe(topics); + // client.subscribe(topics, 'spotV4'); + + const topicWithoutParamsAsString = 'spot.balances'; + + // This has the same effect as above, it's just a different way of messaging which topic this is for + // const topicWithoutParamsAsObject: WsTopicRequest = { + // topic: 'spot.balances', + // }; + + const userOrders: WsTopicRequest = { + topic: 'spot.orders', + payload: ['BTC_USDT', 'ETH_USDT', 'MATIC_USDT'], + }; + + const userTrades: WsTopicRequest = { + topic: 'spot.usertrades', + payload: ['BTC_USDT', 'ETH_USDT', 'MATIC_USDT'], + }; + + const userPriceOrders: WsTopicRequest = { + topic: 'spot.priceorders', + payload: ['!all'], + }; + + /** + * Either send one topic (with optional params) at a time + */ + // client.subscribe(topicWithoutParamsAsObject, 'spotV4'); + + /** + * Or send multiple topics in a batch (grouped by ws connection (WsKey)) + * You can also use strings for topics that don't have any parameters, even if you mix multiple requests into one function call: + */ + client.subscribe( + [topicWithoutParamsAsString, userOrders, userTrades, userPriceOrders], + 'spotV4', + ); + + /** + * You can also subscribe in separate function calls as you wish. + * + * Any duplicate requests should get filtered out (e.g. here we subscribed to "spot.balances" twice, but the WS client will filter this out) + */ + client.subscribe( + [ + 'spot.balances', + 'spot.margin_balances', + 'spot.funding_balances', + 'spot.cross_balances', + ], + 'spotV4', + ); } catch (e) { console.error(`Req error: `, e); } diff --git a/examples/ws-public.ts b/examples/ws-public.ts index 9662789..203284f 100644 --- a/examples/ws-public.ts +++ b/examples/ws-public.ts @@ -61,12 +61,12 @@ async function start() { try { const tickersRequestWithParams: WsTopicRequest = { topic: 'spot.tickers', - params: ['BTC_USDT', 'ETH_USDT', 'MATIC_USDT'], + payload: ['BTC_USDT', 'ETH_USDT', 'MATIC_USDT'], }; const rawTradesRequestWithParams: WsTopicRequest = { topic: 'spot.trades', - params: ['BTC_USDT', 'ETH_USDT', 'MATIC_USDT'], + payload: ['BTC_USDT', 'ETH_USDT', 'MATIC_USDT'], }; // const topicWithoutParamsAsString = 'spot.balances'; diff --git a/src/WebsocketClient.ts b/src/WebsocketClient.ts index 13b02e1..cfc4f8f 100644 --- a/src/WebsocketClient.ts +++ b/src/WebsocketClient.ts @@ -3,7 +3,11 @@ import WebSocket from 'isomorphic-ws'; import { BaseWebsocketClient, EmittableEvent } from './lib/BaseWSClient.js'; import { neverGuard } from './lib/misc-util.js'; import { MessageEventLike } from './lib/requestUtils.js'; -import { signMessage } from './lib/webCryptoAPI.js'; +import { + SignAlgorithm, + SignEncodeMethod, + signMessage, +} from './lib/webCryptoAPI.js'; import { WS_BASE_URL_MAP, WS_KEY_MAP, @@ -30,6 +34,32 @@ export const PUBLIC_WS_KEYS: WsKey[] = []; */ export type WsTopic = string; +function getPrivateSpotTopics(): string[] { + // Consumeable channels for spot + const privateSpotTopics = [ + 'spot.orders', + 'spot.usertrades', + 'spot.balances', + 'spot.margin_balances', + 'spot.funding_balances', + 'spot.cross_balances', + 'spot.priceorders', + ]; + + // WebSocket API for spot + const privateSpotWSAPITopics = [ + 'spot.login', + 'spot.order_place', + 'spot.order_cancel', + 'spot.order_cancel_ids', + 'spot.order_cancel_cp', + 'spot.order_amend', + 'spot.order_status', + ]; + + return [...privateSpotTopics, ...privateSpotWSAPITopics]; +} + // /** // * Used to split sub/unsub logic by websocket connection. Groups & dedupes requests into per-WsKey arrays // */ @@ -300,24 +330,30 @@ export class WebsocketClient extends BaseWebsocketClient { /** * Determines if a topic is for a private channel, using a hardcoded list of strings */ - protected isPrivateTopicRequest(request: WsTopicRequest): boolean { + protected isPrivateTopicRequest( + request: WsTopicRequest, + wsKey: WsKey, + ): boolean { const topicName = request?.topic?.toLowerCase(); if (!topicName) { return false; } - const privateTopics = [ - 'todo', - 'todo', - 'todo', - 'todo', - 'todo', - 'todo', - 'todo', - ]; + switch (wsKey) { + case 'spotV4': + return getPrivateSpotTopics().includes(topicName); - if (topicName && privateTopics.includes(topicName)) { - return true; + // TODO: + case 'announcementsV4': + case 'deliveryFuturesBTCV4': + case 'deliveryFuturesUSDTV4': + case 'optionsV4': + case 'perpFuturesBTCV4': + case 'perpFuturesUSDTV4': + return getPrivateSpotTopics().includes(topicName); + + default: + throw neverGuard(wsKey, `Unhandled WsKey "${wsKey}"`); } return false; @@ -373,11 +409,11 @@ export class WebsocketClient extends BaseWebsocketClient { /** * Map one or more topics into fully prepared "subscribe request" events (already stringified and ready to send) */ - protected getWsOperationEventsForTopics( + protected async getWsOperationEventsForTopics( topics: WsTopicRequest[], wsKey: WsKey, operation: WsOperation, - ): string[] { + ): Promise { // console.log(new Date(), `called getWsSubscribeEventsForTopics()`, topics); // console.trace(); if (!topics.length) { @@ -396,10 +432,11 @@ export class WebsocketClient extends BaseWebsocketClient { ) { for (let i = 0; i < topics.length; i += maxTopicsPerEvent) { const batch = topics.slice(i, i + maxTopicsPerEvent); - const subscribeRequestEvents = this.getWsRequestEvent( + const subscribeRequestEvents = await this.getWsRequestEvents( market, operation, batch, + wsKey, ); for (const event of subscribeRequestEvents) { @@ -410,10 +447,11 @@ export class WebsocketClient extends BaseWebsocketClient { return jsonStringEvents; } - const subscribeRequestEvents = this.getWsRequestEvent( + const subscribeRequestEvents = await this.getWsRequestEvents( market, operation, topics, + wsKey, ); for (const event of subscribeRequestEvents) { @@ -425,33 +463,113 @@ export class WebsocketClient extends BaseWebsocketClient { /** * @returns a correctly structured events for performing an operation over WS. This can vary per exchange spec. */ - private getWsRequestEvent( + private async getWsRequestEvents( market: WsMarket, operation: WsOperation, requests: WsTopicRequest[], - ): WsRequestOperationGate[] { - const timeInSeconds = +(Date.now() / 1000).toFixed(0); + wsKey: WsKey, + ): Promise[]> { + const wsRequestEvents: WsRequestOperationGate[] = []; + const wsRequestBuildingErrors: unknown[] = []; + switch (market) { case 'all': { - return requests.map((request) => { - const wsRequestEvent: WsRequestOperationGate = { + for (const request of requests) { + const timeInSeconds = +(Date.now() / 1000).toFixed(0); + + const wsEvent: WsRequestOperationGate = { time: timeInSeconds, channel: request.topic, event: operation, - // payload: 'todo', }; - if (request.params) { - wsRequestEvent.payload = request.params; + if (request.payload) { + wsEvent.payload = request.payload; } - return wsRequestEvent; - }); + if (!this.isPrivateTopicRequest(request, wsKey)) { + wsRequestEvents.push(wsEvent); + continue; + } + + // If private topic request, build auth part for request + + // No key or secret, push event as failed + if (!this.options.apiKey || !this.options.apiSecret) { + wsRequestBuildingErrors.push({ + error: `apiKey or apiSecret missing from config`, + operation, + event: wsEvent, + }); + continue; + } + + const signAlgoritm: SignAlgorithm = 'SHA-512'; + const signEncoding: SignEncodeMethod = 'hex'; + + const signInput = `channel=${wsEvent.channel}&event=${wsEvent.event}&time=${timeInSeconds}`; + + try { + const sign = await this.signMessage( + signInput, + this.options.apiSecret, + signEncoding, + signAlgoritm, + ); + + wsEvent.auth = { + method: 'api_key', + KEY: this.options.apiKey, + SIGN: sign, + }; + + wsRequestEvents.push(wsEvent); + } catch (e) { + wsRequestBuildingErrors.push({ + error: `exception during sign`, + errorTrace: e, + operation, + event: wsEvent, + }); + } + } + break; } default: { throw neverGuard(market, `Unhandled market "${market}"`); } } + + if (wsRequestBuildingErrors.length) { + const label = + wsRequestBuildingErrors.length === requests.length ? 'all' : 'some'; + this.logger.error( + `Failed to build/send ${wsRequestBuildingErrors.length} event(s) for ${label} WS requests due to exceptions`, + { + ...WS_LOGGER_CATEGORY, + wsRequestBuildingErrors, + wsRequestBuildingErrorsStringified: JSON.stringify( + wsRequestBuildingErrors, + null, + 2, + ), + }, + ); + } + + return wsRequestEvents; + } + + private async signMessage( + paramsStr: string, + secret: string, + method: 'hex' | 'base64', + algorithm: SignAlgorithm, + ): Promise { + if (typeof this.options.customSignMessageFn === 'function') { + return this.options.customSignMessageFn(paramsStr, secret); + } + return await signMessage(paramsStr, secret, method, algorithm); } protected async getWsAuthRequestEvent(wsKey: WsKey): Promise { diff --git a/src/lib/BaseWSClient.ts b/src/lib/BaseWSClient.ts index 2d5d736..3f071cf 100644 --- a/src/lib/BaseWSClient.ts +++ b/src/lib/BaseWSClient.ts @@ -31,6 +31,7 @@ interface WSClientEventMap { update: (response: any & { wsKey: WsKey }) => void; /** Exception from ws client OR custom listeners (e.g. if you throw inside your event handler) */ exception: (response: any & { wsKey: WsKey }) => void; + error: (response: any & { wsKey: WsKey }) => void; /** Confirmation that a connection successfully authenticated */ authenticated: (event: { wsKey: WsKey; event: any }) => void; } @@ -68,7 +69,7 @@ function getNormalisedTopicRequests( if (typeof wsTopicRequest === 'string') { const topicRequest: WsTopicRequest = { topic: wsTopicRequest, - params: undefined, + payload: undefined, }; normalisedTopicRequests.push(topicRequest); continue; @@ -108,10 +109,15 @@ export abstract class BaseWebsocketClient< this.wsStore = new WsStore(this.logger); this.options = { - pongTimeout: 1000, + // Some defaults: + pongTimeout: 1500, pingInterval: 10000, reconnectTimeout: 500, recvWindow: 0, + // Gate.io only has one connection (for both public & private). Auth works with every sub, not on connect, so this is turned off. + authPrivateConnectionsOnConnect: false, + // Gate.io requires auth to be added to every request, when subscribing to private topics. This is handled automatically. + authPrivateRequests: true, ...options, }; } @@ -126,6 +132,7 @@ export abstract class BaseWebsocketClient< protected abstract isPrivateTopicRequest( request: WsTopicRequest, + wsKey: TWSKey, ): boolean; protected abstract getPrivateWSKeys(): TWSKey[]; @@ -142,7 +149,7 @@ export abstract class BaseWebsocketClient< topics: WsTopicRequest[], wsKey: TWSKey, operation: WsOperation, - ): string[]; + ): Promise; /** * Abstraction called to sort ws events into emittable event types (response to a request, data update, etc) @@ -201,6 +208,22 @@ export abstract class BaseWebsocketClient< return this.connect(wsKey); } + // Subscribe should happen automatically once connected, nothing to do here after topics are added to wsStore. + if (!isConnected) { + /** + * Are we in the process of connection? Nothing to send yet. + */ + this.logger.trace( + `WS not connected - requests queued for retry once connected.`, + { + ...WS_LOGGER_CATEGORY, + wsKey, + wsTopicRequests, + }, + ); + return; + } + // We're connected. Check if auth is needed and if already authenticated const isPrivateConnection = this.isPrivateWsKey(wsKey); const isAuthenticated = this.wsStore.get(wsKey)?.isAuthenticated; @@ -254,6 +277,33 @@ export abstract class BaseWebsocketClient< this.requestUnsubscribeTopics(wsKey, normalisedTopicRequests); } + /** + * Splits topic requests into two groups, public & private topic requests + */ + private sortTopicRequestsIntoPublicPrivate( + wsTopicRequests: WsTopicRequest[], + wsKey: TWSKey, + ): { + publicReqs: WsTopicRequest[]; + privateReqs: WsTopicRequest[]; + } { + const publicTopicRequests: WsTopicRequest[] = []; + const privateTopicRequests: WsTopicRequest[] = []; + + for (const topic of wsTopicRequests) { + if (this.isPrivateTopicRequest(topic, wsKey)) { + privateTopicRequests.push(topic); + } else { + publicTopicRequests.push(topic); + } + } + + return { + publicReqs: publicTopicRequests, + privateReqs: privateTopicRequests, + }; + } + /** Get the WsStore that tracks websockets & topics */ public getWsStore(): WsStore> { return this.wsStore; @@ -414,8 +464,9 @@ export abstract class BaseWebsocketClient< ...WS_LOGGER_CATEGORY, wsKey, }); + this.clearPongTimer(wsKey); + this.getWs(wsKey)?.terminate(); - delete this.wsStore.get(wsKey, true).activePongTimer; }, this.options.pongTimeout); } @@ -443,6 +494,9 @@ export abstract class BaseWebsocketClient< if (wsState?.activePongTimer) { clearTimeout(wsState.activePongTimer); wsState.activePongTimer = undefined; + // this.logger.trace(`Cleared pong timeout for "${wsKey}"`); + } else { + // this.logger.trace(`No active pong timer for "${wsKey}"`); } } @@ -451,7 +505,7 @@ export abstract class BaseWebsocketClient< * * @private Use the `subscribe(topics)` or `subscribeTopicsForWsKey(topics, wsKey)` method to subscribe to topics. Send WS message to subscribe to topics. */ - private requestSubscribeTopics( + private async requestSubscribeTopics( wsKey: TWSKey, topics: WsTopicRequest[], ) { @@ -459,7 +513,8 @@ export abstract class BaseWebsocketClient< return; } - const subscribeWsMessages = this.getWsOperationEventsForTopics( + // Automatically splits requests into smaller batches, if needed + const subscribeWsMessages = await this.getWsOperationEventsForTopics( topics, wsKey, 'subscribe', @@ -486,7 +541,7 @@ export abstract class BaseWebsocketClient< * * @private Use the `unsubscribe(topics)` method to unsubscribe from topics. Send WS message to unsubscribe from topics. */ - private requestUnsubscribeTopics( + private async requestUnsubscribeTopics( wsKey: TWSKey, wsTopicRequests: WsTopicRequest[], ) { @@ -494,7 +549,7 @@ export abstract class BaseWebsocketClient< return; } - const subscribeWsMessages = this.getWsOperationEventsForTopics( + const subscribeWsMessages = await this.getWsOperationEventsForTopics( wsTopicRequests, wsKey, 'unsubscribe', @@ -584,35 +639,48 @@ export abstract class BaseWebsocketClient< this.setWsState(wsKey, WsConnectionStateEnum.CONNECTED); + this.logger.trace(`Enabled ping timer`, { ...WS_LOGGER_CATEGORY, wsKey }); + this.wsStore.get(wsKey, true)!.activePingTimer = setInterval( + () => this.ping(wsKey), + this.options.pingInterval, + ); + // Some websockets require an auth packet to be sent after opening the connection - if (this.isPrivateWsKey(wsKey)) { + if ( + this.isPrivateWsKey(wsKey) && + this.options.authPrivateConnectionsOnConnect + ) { await this.sendAuthRequest(wsKey); } // Reconnect to topics known before it connected - // Private topics will be resubscribed to once reconnected - const topics = [...this.wsStore.getTopics(wsKey)]; - const publicTopics = topics.filter( - (topic) => !this.isPrivateTopicRequest(topic), + const { privateReqs, publicReqs } = this.sortTopicRequestsIntoPublicPrivate( + [...this.wsStore.getTopics(wsKey)], + wsKey, ); - this.requestSubscribeTopics(wsKey, publicTopics); + // Request sub to public topics, if any + this.requestSubscribeTopics(wsKey, publicReqs); - this.logger.trace(`Enabled ping timer`, { ...WS_LOGGER_CATEGORY, wsKey }); - this.wsStore.get(wsKey, true)!.activePingTimer = setInterval( - () => this.ping(wsKey), - this.options.pingInterval, - ); + // Request sub to private topics, if auth on connect isn't needed + if (!this.options.authPrivateConnectionsOnConnect) { + // TODO: check for API keys and throw if missing + this.requestSubscribeTopics(wsKey, privateReqs); + } } - /** Handle subscription to private topics _after_ authentication successfully completes asynchronously */ + /** + * Handle subscription to private topics _after_ authentication successfully completes asynchronously. + * + * Only used for exchanges that require auth before sending private topic subscription requests + */ private onWsAuthenticated(wsKey: TWSKey) { const wsState = this.wsStore.get(wsKey, true); wsState.isAuthenticated = true; const topics = [...this.wsStore.getTopics(wsKey)]; const privateTopics = topics.filter((topic) => - this.isPrivateTopicRequest(topic), + this.isPrivateTopicRequest(topic, wsKey), ); if (privateTopics.length) { @@ -629,7 +697,7 @@ export abstract class BaseWebsocketClient< this.logger.trace('Received pong', { ...WS_LOGGER_CATEGORY, wsKey, - event, + event: (event as any)?.data, }); return; } diff --git a/src/lib/websocket/websocket-util.ts b/src/lib/websocket/websocket-util.ts index 82b18cd..28a50b1 100644 --- a/src/lib/websocket/websocket-util.ts +++ b/src/lib/websocket/websocket-util.ts @@ -46,7 +46,6 @@ export type WsMarket = 'all'; /** * Normalised internal format for a request (subscribe/unsubscribe/etc) on a topic, with optional parameters. * - * - WsKey: the WS connection this event is for * - Topic: the topic this event is for * - Payload: the parameters to include, optional. E.g. auth requires key + sign. Some topics allow configurable parameters. */ @@ -55,7 +54,7 @@ export interface WsTopicRequest< TWSPayload = any, > { topic: TWSTopic; - params?: TWSPayload; + payload?: TWSPayload; } /** diff --git a/src/types/websockets/client.ts b/src/types/websockets/client.ts index ab861df..f1f0e7e 100644 --- a/src/types/websockets/client.ts +++ b/src/types/websockets/client.ts @@ -53,4 +53,6 @@ export interface WebsocketClientOptions extends WSClientConfigurableOptions { pongTimeout: number; reconnectTimeout: number; recvWindow: number; + authPrivateConnectionsOnConnect: boolean; + authPrivateRequests: boolean; }