Skip to content

Commit

Permalink
fix(bonsai-core): id can be null (#1430)
Browse files Browse the repository at this point in the history
  • Loading branch information
tyleroooo authored Jan 14, 2025
1 parent 025de60 commit 30cb312
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 32 deletions.
79 changes: 50 additions & 29 deletions src/abacus-ts/websocket/lib/indexerWebsocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ const CHANNEL_RETRY_COOLDOWN_MS = timeUnits.minute;
export class IndexerWebsocket {
private socket: ReconnectingWebSocket | null = null;

// for logging purposes, to differentiate when user has many tabs open
private indexerWsId = crypto.randomUUID();

private subscriptions: {
[channel: string]: {
[id: string]: {
Expand Down Expand Up @@ -82,14 +85,18 @@ export class IndexerWebsocket {
}) => {
this.subscriptions[channel] ??= {};
if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] != null) {
logAbacusTsError('IndexerWebsocket', 'this subscription already exists', `${channel}/${id}`);
logAbacusTsError('IndexerWebsocket', 'this subscription already exists', {
id: `${channel}/${id}`,
wsId: this.indexerWsId,
});
throw new Error(`IndexerWebsocket error: this subscription already exists. ${channel}/${id}`);
}
logAbacusTsInfo('IndexerWebsocket', 'adding subscription', {
channel,
id,
socketNonNull: this.socket != null,
socketActive: this.socket?.isActive(),
wsId: this.indexerWsId,
});
this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] = {
channel,
Expand All @@ -115,16 +122,15 @@ export class IndexerWebsocket {
logAbacusTsError(
'IndexerWebsocket',
'unsubbing from nonexistent or already unsubbed channel',
channel
{ channel, wsId: this.indexerWsId }
);
return;
}
if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) {
logAbacusTsError(
'IndexerWebsocket',
'unsubbing from nonexistent or already unsubbed channel',
channel,
id
{ channel, id, wsId: this.indexerWsId }
);
return;
}
Expand All @@ -133,6 +139,7 @@ export class IndexerWebsocket {
id,
socketNonNull: this.socket != null,
socketActive: this.socket?.isActive(),
wsId: this.indexerWsId,
});
if (
this.socket != null &&
Expand All @@ -148,8 +155,8 @@ export class IndexerWebsocket {
delete this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID];
};

private _refreshSub = (channel: string, id: string) => {
const sub = this.subscriptions[channel]?.[id];
private _refreshSub = (channel: string, id: string | undefined) => {
const sub = this.subscriptions[channel]?.[id ?? NO_ID_SPECIAL_STRING_ID];
if (sub == null) {
return;
}
Expand All @@ -163,25 +170,31 @@ export class IndexerWebsocket {
if (message.message.startsWith('Internal error, could not fetch data for subscription: ')) {
const maybeChannel = message.channel;
const maybeId = message.id;
if (maybeChannel != null && maybeId != null && maybeChannel.startsWith('v4_')) {
const channelAndId = `${maybeChannel}${CHANNEL_ID_SAFE_DIVIDER}${maybeId}`;
if (maybeChannel != null && maybeChannel.startsWith('v4_')) {
const channelAndId = `${maybeChannel}${CHANNEL_ID_SAFE_DIVIDER}${maybeId ?? NO_ID_SPECIAL_STRING_ID}`;
const lastRefresh = this.lastRetryTimeMsByChannelAndId[channelAndId] ?? 0;
if (Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) {
this.lastRetryTimeMsByChannelAndId[channelAndId] = Date.now();
this._refreshSub(maybeChannel, maybeId);
logAbacusTsInfo(
'IndexerWebsocket',
'error fetching data for channel, refetching',
logAbacusTsInfo('IndexerWebsocket', 'error fetching data for channel, refetching', {
maybeChannel,
maybeId
);
maybeId,
wsId: this.indexerWsId,
});
return;
}
logAbacusTsError('IndexerWebsocket', 'hit max retries for channel:', maybeChannel, maybeId);
logAbacusTsError('IndexerWebsocket', 'hit max retries for channel:', {
maybeChannel,
maybeId,
wsId: this.indexerWsId,
});
return;
}
}
logAbacusTsError('IndexerWebsocket', 'encountered server side error:', message);
logAbacusTsError('IndexerWebsocket', 'encountered server side error:', {
message,
wsId: this.indexerWsId,
});
};

private _handleMessage = (messagePre: any) => {
Expand All @@ -195,6 +208,7 @@ export class IndexerWebsocket {
logAbacusTsInfo('IndexerWebsocket', `unsubscribe confirmed`, {
channel: message.channel,
id: message.id,
wsId: this.indexerWsId,
});
} else if (
message.type === 'subscribed' ||
Expand All @@ -208,31 +222,30 @@ export class IndexerWebsocket {
if (this.subscriptions[channel] == null) {
// hide error for channel we expect to see it on
if (channel !== 'v4_orderbook') {
logAbacusTsError(
'IndexerWebsocket',
'encountered message with unknown target',
logAbacusTsError('IndexerWebsocket', 'encountered message with unknown target', {
channel,
id
);
id,
wsId: this.indexerWsId,
});
}
return;
}
if (this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID] == null) {
// hide error for channel we expect to see it on
if (channel !== 'v4_orderbook') {
logAbacusTsError(
'IndexerWebsocket',
'encountered message with unknown target',
logAbacusTsError('IndexerWebsocket', 'encountered message with unknown target', {
channel,
id
);
id,
wsId: this.indexerWsId,
});
}
return;
}
if (message.type === 'subscribed') {
logAbacusTsInfo('IndexerWebsocket', `subscription confirmed`, {
channel,
id,
wsId: this.indexerWsId,
});
this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID]!.handleBaseData(
message.contents,
Expand All @@ -256,18 +269,25 @@ export class IndexerWebsocket {
assertNever(message);
}
} catch (e) {
logAbacusTsError('IndexerWebsocket', 'Error handling websocket message', messagePre, e);
logAbacusTsError('IndexerWebsocket', 'Error handling websocket message', {
messagePre,
wsId: this.indexerWsId,
error: e,
});
}
};

// when websocket churns, reconnect all known subscribers
private _handleFreshConnect = () => {
logAbacusTsInfo('IndexerWebsocket', 'freshly connected', {
socketUrl: this.socket?.url,
wsId: this.indexerWsId,
socketNonNull: this.socket != null,
socketActive: this.socket?.isActive(),
numSubs: Object.values(this.subscriptions)
subs: Object.values(this.subscriptions)
.flatMap((o) => Object.values(o))
.filter(isTruthy).length,
.filter(isTruthy)
.map((o) => `${o.channel}///${o.id}`),
});
if (this.socket != null && this.socket.isActive()) {
Object.values(this.subscriptions)
Expand All @@ -286,7 +306,8 @@ export class IndexerWebsocket {
} else {
logAbacusTsError(
'IndexerWebsocket',
"handle fresh connect called when websocket isn't ready."
"handle fresh connect called when websocket isn't ready.",
{ wsId: this.indexerWsId }
);
}
};
Expand Down
10 changes: 8 additions & 2 deletions src/abacus-ts/websocket/lib/reconnectingWebsocket.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable max-classes-per-file */
import { logAbacusTsError } from '@/abacus-ts/logs';
import { logAbacusTsError, logAbacusTsInfo } from '@/abacus-ts/logs';

interface ReconnectingWebSocketConfig {
url: string;
Expand All @@ -11,7 +11,7 @@ interface ReconnectingWebSocketConfig {
}

export class ReconnectingWebSocket {
private readonly url: string;
public readonly url: string;

private readonly handleMessage: (data: any) => void;

Expand Down Expand Up @@ -205,6 +205,12 @@ class WebSocketConnection {
reason: close.reason,
clean: close.wasClean,
});
} else {
logAbacusTsInfo('WebSocketConnection', `socket ${this.id} closed`, {
code: close.code,
reason: close.reason,
clean: close.wasClean,
});
}
this.close();
};
Expand Down
4 changes: 4 additions & 0 deletions src/hooks/useInitializePage.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { useEffect, useRef } from 'react';

// eslint-disable-next-line no-restricted-imports
import { logAbacusTsInfo } from '@/abacus-ts/logs';
// eslint-disable-next-line no-restricted-imports
import { IndexerWebsocketManager } from '@/abacus-ts/websocket/lib/indexerWebsocketManager';

Expand Down Expand Up @@ -50,6 +52,7 @@ export const useInitializePage = () => {
// reconnect abacus (reestablish connections to indexer, validator etc.) if app was hidden for more than 10 seconds
abacusStateManager.restart({ network: localStorageNetwork });
IndexerWebsocketManager.getActiveResources().forEach((r) => r.restart());
logAbacusTsInfo('useInitializePage', 'restarting because visibility change');
}
hiddenTimeRef.current = null;
}
Expand All @@ -66,6 +69,7 @@ export const useInitializePage = () => {
const handleOnline = () => {
abacusStateManager.restart({ network: localStorageNetwork });
IndexerWebsocketManager.getActiveResources().forEach((r) => r.restart());
logAbacusTsInfo('useInitializePage', 'restarting because network status change');
};
document.addEventListener('online', handleOnline);
return () => {
Expand Down
5 changes: 4 additions & 1 deletion src/lib/analytics/datadog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const PROXY_URL = import.meta.env.VITE_DATADOG_PROXY_URL;
const SERVICE_NAME = 'v4-web';
const LOGGER_NAME = 'v4-web';
const SITE_NAME = 'datadoghq.com';
const instanceId = crypto.randomUUID();

const LOG_ENDPOINT_PATH = (PROXY_URL ?? '').endsWith('/') ? 'api/v2/logs' : '/api/v2/logs';

Expand All @@ -19,13 +20,15 @@ if (CLIENT_TOKEN) {
sessionSampleRate: 100,
env: CURRENT_MODE,
proxy: PROXY_URL ? `${PROXY_URL}${LOG_ENDPOINT_PATH}` : undefined,
sendLogsAfterSessionExpiration: true,
});
}

datadogLogs.createLogger(LOGGER_NAME);

const datadogLogger = datadogLogs.getLogger(LOGGER_NAME)!!;
const datadogLogger = datadogLogs.getLogger(LOGGER_NAME)!;
datadogLogger.setContextProperty('dd-client-token', CLIENT_TOKEN);
datadogLogger.setContextProperty('instance-id', instanceId);

/**
* TODO: make a logger wrapper that enables us also log to the console
Expand Down

0 comments on commit 30cb312

Please sign in to comment.