Skip to content

Commit

Permalink
just one
Browse files Browse the repository at this point in the history
  • Loading branch information
tyleroooo committed Jan 13, 2025
1 parent fcb8990 commit 47250cf
Showing 1 changed file with 30 additions and 23 deletions.
53 changes: 30 additions & 23 deletions src/abacus-ts/websocket/lib/indexerWebsocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { isTruthy } from '@/lib/isTruthy';
import { ReconnectingWebSocket } from './reconnectingWebsocket';

const NO_ID_SPECIAL_STRING_ID = '______EMPTY_ID______';
const CHANNEL_ID_SAFE_DIVIDER = '//////////';
const CHANNEL_RETRY_COOLDOWN_MS = timeUnits.minute;

export class IndexerWebsocket {
Expand All @@ -27,7 +28,7 @@ export class IndexerWebsocket {
};
} = {};

private lastRetryTimeMsByChannel: { [channel: string]: number } = {};
private lastRetryTimeMsByChannelAndId: { [channelAndId: string]: number } = {};

constructor(url: string) {
this.socket = new ReconnectingWebSocket({
Expand Down Expand Up @@ -135,36 +136,36 @@ export class IndexerWebsocket {
delete this.subscriptions[channel][id ?? NO_ID_SPECIAL_STRING_ID];
};

private _refreshChannelSubs = (channel: string) => {
const allSubs = Object.values(this.subscriptions[channel] ?? {});
allSubs.forEach((sub) => {
this._performUnsub(sub);
this._addSub(sub);
});
private _refreshSub = (channel: string, id: string) => {
const sub = this.subscriptions[channel]?.[id];
if (sub == null) {
return;
}
this._performUnsub(sub);
this._addSub(sub);
};

// if we get a "could not fetch data" error, we retry once as long as this channel is not on cooldown
// TODO: when backend adds the channel and id to the error message, use that to retry only one subscription
// if we get a "could not fetch data" error, we retry once as long as this channel+id is not on cooldown
// TODO: remove this entirely when backend is more reliable
private _handleErrorReceived = (message: string) => {
if (message.startsWith('Internal error, could not fetch data for subscription: ')) {
const maybeChannel = message
.trim()
.split(/[\s,.]/)
.at(-2);
if (maybeChannel != null && maybeChannel.startsWith('v4_')) {
const lastRefresh = this.lastRetryTimeMsByChannel[maybeChannel] ?? 0;
private _handleErrorReceived = (message: IndexerWebsocketErrorMessage) => {
if (message.message.startsWith('Internal error, could not fetch data for subscription: ')) {
const maybeChannel = message.channel;
const maybeId = message.id;
if (maybeChannel != null && maybeId != null && maybeChannel.startsWith('v4_')) {
const channelAndId = `${maybeChannel}${CHANNEL_ID_SAFE_DIVIDER}${maybeId}`;
const lastRefresh = this.lastRetryTimeMsByChannelAndId[channelAndId] ?? 0;
if (Date.now() - lastRefresh > CHANNEL_RETRY_COOLDOWN_MS) {
this.lastRetryTimeMsByChannel[maybeChannel] = Date.now();
this._refreshChannelSubs(maybeChannel);
this.lastRetryTimeMsByChannelAndId[channelAndId] = Date.now();
this._refreshSub(maybeChannel, maybeId);
logAbacusTsError(
'IndexerWebsocket',
'error fetching data for channel, refetching',
maybeChannel
maybeChannel,
maybeId
);
return;
}
logAbacusTsError('IndexerWebsocket', 'hit max retries for channel:', maybeChannel);
logAbacusTsError('IndexerWebsocket', 'hit max retries for channel:', maybeChannel, maybeId);
return;
}
}
Expand All @@ -175,7 +176,7 @@ export class IndexerWebsocket {
try {
const message = isWsMessage(messagePre);
if (message.type === 'error') {
this._handleErrorReceived(message.message);
this._handleErrorReceived(message);
} else if (message.type === 'connected' || message.type === 'unsubscribed') {
// do nothing
} else if (
Expand Down Expand Up @@ -262,8 +263,14 @@ export class IndexerWebsocket {
};
}

type IndexerWebsocketErrorMessage = {
type: 'error';
message: string;
channel?: string;
id?: string;
};
type IndexerWebsocketMessageType =
| { type: 'error'; message: string }
| IndexerWebsocketErrorMessage
| { type: 'connected' }
| {
type: 'channel_batch_data';
Expand Down

0 comments on commit 47250cf

Please sign in to comment.