Skip to content

Commit

Permalink
fix-race-condition
Browse files Browse the repository at this point in the history
  • Loading branch information
tyleroooo committed Jan 9, 2025
1 parent dfd2686 commit f2b669e
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 76 deletions.
24 changes: 20 additions & 4 deletions src/abacus-ts/lib/resourceCacheManager.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { logAbacusTsError } from '../logs';

type CacheEntry<T> = {
resource: T;
count: number;
Expand All @@ -14,7 +16,7 @@ export class ResourceCacheManager<T, U> {
constructor(
private options: {
constructor: (key: U) => T;
destroyer: (resource: NoInfer<T>) => void;
destroyer: (resource: NoInfer<T>, key: NoInfer<U>) => void;
keySerializer: (key: NoInfer<U>) => string;
destroyDelayMs?: number;
}
Expand Down Expand Up @@ -42,7 +44,14 @@ export class ResourceCacheManager<T, U> {
markDone(key: U): void {
const serializedKey = this.options.keySerializer(key);
const entry = this.cache[serializedKey];
if (!entry) return;
if (!entry) {
logAbacusTsError('ResourceCacheManager', 'tried to mark done unknown key', key);
return;
}
if (entry.count <= 0) {
logAbacusTsError('ResourceCacheManager', 'tried to mark done key with no users', key);
entry.count = 1;
}

entry.count -= 1;

Expand All @@ -55,9 +64,16 @@ export class ResourceCacheManager<T, U> {
const delay = this.options.destroyDelayMs ?? 1000;
entry.destroyTimeout = setTimeout(() => {
const latestVal = this.cache[serializedKey];
if (!latestVal) return;
if (!latestVal) {
logAbacusTsError(
'ResourceCacheManager',
'unexpectedly couldnt find resource to destroy',
key
);
return;
}

this.options.destroyer(latestVal.resource);
this.options.destroyer(latestVal.resource, key);
delete this.cache[serializedKey];
}, delay);
}
Expand Down
2 changes: 2 additions & 0 deletions src/abacus-ts/websocket/lib/reconnectingWebsocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ class WebSocketConnection {
1001,
// normal but no code
1005,
// supposedly abnormal tcp failure but super super common
1006,
]);
if (!allowedCodes.has(close.code)) {
logAbacusTsError('WebSocketConnection', `socket ${this.id} closed abnormally`, {
Expand Down
7 changes: 1 addition & 6 deletions src/abacus-ts/websocket/lib/websocketDerivedValue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@ export class WebsocketDerivedValue<T> {
handleBaseData: (data: any, value: T, fullMessage: any) => T;
handleUpdates: (updates: any[], value: T, fullMessage: any) => T;
},
value: T,
changeHandler: ((val: T) => void) | undefined
value: T
) {
this.value = value;

if (changeHandler) {
this.subscribe(changeHandler);
}

this.unsubFromWs = websocket.addChannelSubscription({
channel: sub.channel,
id: sub.id,
Expand Down
24 changes: 11 additions & 13 deletions src/abacus-ts/websocket/markets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ import { setAllMarketsRaw } from '@/state/raw';

import { createStoreEffect } from '../lib/createStoreEffect';
import { Loadable, loadableLoaded, loadablePending } from '../lib/loadable';
import { logAbacusTsError } from '../logs';
import { MarketsData } from '../rawTypes';
import { selectWebsocketUrl } from '../socketSelectors';
import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers';
import { IndexerWebsocket } from './lib/indexerWebsocket';
import { IndexerWebsocketManager } from './lib/indexerWebsocketManager';
import { WebsocketDerivedValue } from './lib/websocketDerivedValue';

function marketsWebsocketValue(
websocket: IndexerWebsocket,
onChange: (val: Loadable<MarketsData>) => void
) {
function marketsWebsocketValueCreator(websocket: IndexerWebsocket) {
return new WebsocketDerivedValue<Loadable<MarketsData>>(
websocket,
{
Expand All @@ -34,8 +32,7 @@ function marketsWebsocketValue(
const updates = isWsMarketUpdateResponses(baseUpdates);
let startingValue = value.data;
if (startingValue == null) {
// eslint-disable-next-line no-console
console.log('MarketsTracker found unexpectedly null base data in update');
logAbacusTsError('MarketsTracker', 'found unexpectedly null base data in update');
return value;
}
startingValue = { ...startingValue };
Expand Down Expand Up @@ -65,24 +62,25 @@ function marketsWebsocketValue(
return loadableLoaded(startingValue);
},
},
loadablePending(),
onChange
loadablePending()
);
}

const MarketsValueManager = makeWsValueManager(marketsWebsocketValueCreator);

export function setUpMarkets(store: RootStore) {
const throttledSetMarkets = throttle((val: Loadable<MarketsData>) => {
store.dispatch(setAllMarketsRaw(val));
}, 2 * timeUnits.second);

return createStoreEffect(store, selectWebsocketUrl, (wsUrl) => {
const thisTracker = marketsWebsocketValue(IndexerWebsocketManager.use(wsUrl), (val) =>
const unsub = subscribeToWsValue(MarketsValueManager, { wsUrl }, (val) =>
throttledSetMarkets(val)
);

return () => {
thisTracker.teardown();
IndexerWebsocketManager.markDone(wsUrl);
unsub();
throttledSetMarkets.cancel();
store.dispatch(setAllMarketsRaw(loadablePending()));
};
});
}
25 changes: 12 additions & 13 deletions src/abacus-ts/websocket/orderbook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ import { isTruthy } from '@/lib/isTruthy';

import { createStoreEffect } from '../lib/createStoreEffect';
import { Loadable, loadableIdle, loadableLoaded, loadablePending } from '../lib/loadable';
import { logAbacusTsError } from '../logs';
import { OrderbookData } from '../rawTypes';
import { selectWebsocketUrl } from '../socketSelectors';
import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers';
import { IndexerWebsocket } from './lib/indexerWebsocket';
import { IndexerWebsocketManager } from './lib/indexerWebsocketManager';
import { WebsocketDerivedValue } from './lib/websocketDerivedValue';

function orderbookWebsocketValue(
function orderbookWebsocketValueCreator(
websocket: IndexerWebsocket,
marketId: string,
onChange: (val: Loadable<OrderbookData>) => void
{ marketId }: { marketId: string }
) {
return new WebsocketDerivedValue<Loadable<OrderbookData>>(
websocket,
Expand All @@ -45,8 +45,7 @@ function orderbookWebsocketValue(
const updates = isWsOrderbookUpdateResponses(baseUpdates);
let startingValue = value.data;
if (startingValue == null) {
// eslint-disable-next-line no-console
console.log('MarketsTracker found unexpectedly null base data in update');
logAbacusTsError('OrderbookTracker', 'found unexpectedly null base data in update');
return value;
}
startingValue = { asks: { ...startingValue.asks }, bids: { ...startingValue.bids } };
Expand All @@ -61,11 +60,12 @@ function orderbookWebsocketValue(
return loadableLoaded(startingValue);
},
},
loadablePending(),
onChange
loadablePending()
);
}

const OrderbookValueManager = makeWsValueManager(orderbookWebsocketValueCreator);

const selectMarketAndWsInfo = createAppSelector(
[selectWebsocketUrl, getCurrentMarketIdIfTradeable],
(wsUrl, currentMarketId) => ({ wsUrl, currentMarketId })
Expand All @@ -80,15 +80,14 @@ export function setUpOrderbook(store: RootStore) {
store.dispatch(setOrderbookRaw({ marketId: currentMarketId, data }));
}, timeUnits.second / 2);

const thisTracker = orderbookWebsocketValue(
IndexerWebsocketManager.use(wsUrl),
currentMarketId,
const unsub = subscribeToWsValue(
OrderbookValueManager,
{ wsUrl, marketId: currentMarketId },
(data) => throttledSetOrderbook(data)
);

return () => {
thisTracker.teardown();
IndexerWebsocketManager.markDone(wsUrl);
unsub();
throttledSetOrderbook.cancel();
store.dispatch(setOrderbookRaw({ marketId: currentMarketId, data: loadableIdle() }));
};
Expand Down
29 changes: 16 additions & 13 deletions src/abacus-ts/websocket/parentSubaccount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import { createStoreEffect } from '../lib/createStoreEffect';
import { Loadable, loadableIdle, loadableLoaded, loadablePending } from '../lib/loadable';
import { ChildSubaccountData, ParentSubaccountData } from '../rawTypes';
import { selectParentSubaccountInfo, selectWebsocketUrl } from '../socketSelectors';
import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers';
import { IndexerWebsocket } from './lib/indexerWebsocket';
import { IndexerWebsocketManager } from './lib/indexerWebsocketManager';
import { WebsocketDerivedValue } from './lib/websocketDerivedValue';

function isValidSubaccount(childSubaccount: IndexerSubaccountResponseObject) {
Expand Down Expand Up @@ -64,11 +64,14 @@ function freshChildSubaccount({
};
}

function accountWebsocketValue(
interface AccountValueArgsBase {
address: string;
parentSubaccountNumber: string;
}

function accountWebsocketValueCreator(
websocket: IndexerWebsocket,
address: string,
parentSubaccountNumber: string,
onChange: (val: Loadable<ParentSubaccountData>) => void
{ address, parentSubaccountNumber }: AccountValueArgsBase
) {
return new WebsocketDerivedValue<Loadable<ParentSubaccountData>>(
websocket,
Expand Down Expand Up @@ -214,11 +217,12 @@ function accountWebsocketValue(
return { ...value, data: resultData };
},
},
loadablePending(),
onChange
loadablePending()
);
}

const AccountValueManager = makeWsValueManager(accountWebsocketValueCreator);

const selectParentSubaccount = createAppSelector(
[selectWebsocketUrl, selectParentSubaccountInfo],
(wsUrl, { wallet, subaccount }) => ({ wsUrl, wallet, subaccount })
Expand All @@ -229,16 +233,15 @@ export function setUpParentSubaccount(store: RootStore) {
if (!isTruthy(wallet) || subaccount == null) {
return undefined;
}
const thisTracker = accountWebsocketValue(
IndexerWebsocketManager.use(wsUrl),
wallet,
subaccount.toString(),

const unsub = subscribeToWsValue(
AccountValueManager,
{ wsUrl, address: wallet, parentSubaccountNumber: subaccount.toString() },
(val) => store.dispatch(setParentSubaccountRaw(val))
);

return () => {
thisTracker.teardown();
IndexerWebsocketManager.markDone(wsUrl);
unsub();
store.dispatch(setParentSubaccountRaw(loadableIdle()));
};
});
Expand Down
42 changes: 15 additions & 27 deletions src/abacus-ts/websocket/trades.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,23 @@ import { useEffect, useState } from 'react';
import { isWsTradesResponse, isWsTradesUpdateResponses } from '@/types/indexer/indexerChecks';
import { orderBy } from 'lodash';

import { DydxNetwork } from '@/constants/networks';

import { getSelectedNetwork } from '@/state/appSelectors';
import { useAppSelector } from '@/state/appTypes';
import { getCurrentMarketIdIfTradeable } from '@/state/perpetualsSelectors';

import { mergeById } from '@/lib/mergeById';

import { Loadable, loadableIdle, loadableLoaded, loadablePending } from '../lib/loadable';
import { ResourceCacheManager } from '../lib/resourceCacheManager';
import { TradesData } from '../rawTypes';
import { getWebsocketUrlForNetwork } from '../socketSelectors';
import { selectWebsocketUrl } from '../socketSelectors';
import { makeWsValueManager, subscribeToWsValue } from './lib/indexerValueManagerHelpers';
import { IndexerWebsocket } from './lib/indexerWebsocket';
import { IndexerWebsocketManager } from './lib/indexerWebsocketManager';
import { WebsocketDerivedValue } from './lib/websocketDerivedValue';

const POST_LIMIT = 250;

function tradesWebsocketValue(
function tradesWebsocketValueCreator(
websocket: IndexerWebsocket,
marketId: string,
onChange?: (val: Loadable<TradesData>) => void
{ marketId }: { marketId: string }
) {
return new WebsocketDerivedValue<Loadable<TradesData>>(
websocket,
Expand All @@ -51,41 +46,34 @@ function tradesWebsocketValue(
return loadableLoaded({ trades: sortedMerged.slice(0, POST_LIMIT) });
},
},
loadablePending(),
onChange
loadablePending()
);
}

export const TradeValuesManager = new ResourceCacheManager({
constructor: ({ marketId, network }: { network: DydxNetwork; marketId: string }) =>
tradesWebsocketValue(IndexerWebsocketManager.use(getWebsocketUrlForNetwork(network)), marketId),
destroyer: (instance) => {
instance.teardown();
},
keySerializer: ({ network, marketId }) => `${network}//////${marketId}`,
});
export const TradeValuesManager = makeWsValueManager(tradesWebsocketValueCreator);

export function useCurrentMarketTradesValue() {
const selectedNetwork = useAppSelector(getSelectedNetwork);
const wsUrl = useAppSelector(selectWebsocketUrl);
const currentMarketId = useAppSelector(getCurrentMarketIdIfTradeable);

// useSyncExternalStore is better but the API doesn't fit this use case very well
const [trades, setTrades] = useState<Loadable<TradesData>>(loadableIdle());

useEffect(() => {
if (currentMarketId == null) {
return () => null;
}
const tradesManager = TradeValuesManager.use({
marketId: currentMarketId,
network: selectedNetwork,
});
const unsubListener = tradesManager.subscribe((val) => setTrades(val));

const unsubListener = subscribeToWsValue(
TradeValuesManager,
{ wsUrl, marketId: currentMarketId },
(val) => setTrades(val)
);

return () => {
setTrades(loadableIdle());
unsubListener();
TradeValuesManager.markDone({ marketId: currentMarketId, network: selectedNetwork });
};
}, [selectedNetwork, currentMarketId]);
}, [currentMarketId, wsUrl]);
return trades;
}

0 comments on commit f2b669e

Please sign in to comment.