From 27cea6bd30db72f91b6fe720135c78517f7d8acd Mon Sep 17 00:00:00 2001 From: nicholaspai Date: Fri, 22 Mar 2024 18:10:55 -0400 Subject: [PATCH 1/7] improve(dataworker): Warm BundleData loadData cache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I’ve noticed that the executor is very slow and seems to stall for a long time when evaluating L2 leaves. I believe the problem is that the executor tries to execute leaves for all chains in parallel. For example, the executor tries to execute leaves from the latest 2 root bundles for 7 chains in parallel. That means that this [function](https://github.com/across-protocol/relayer-v2/blob/2a986a267c72af02390124ffee545840f14f7b0a/src/dataworker/Dataworker.ts#L1094) which calls `BundleDataClient.loadData` is running 7 times in parallel. The `loadData` function is designed to cache the bundle data in-memory (not in Redis!) but we can’t take advantage of this if we call it many times in parallel. Therefore, I propose warming this cache for each executor run, which ensures we call `loadData` only once per executor run. To add confluence to this observation about the source of slowdown, the execution of the relayer refund roots is very fast compared to the slow roots, and they `loadData` over the exact same block ranges. In this case, the refund root execution logic runs AFTER the loadData cache has been warmed. --- src/dataworker/Dataworker.ts | 63 ++++++++++++++++++++++++++++++++++++ src/dataworker/index.ts | 2 ++ 2 files changed, 65 insertions(+) diff --git a/src/dataworker/Dataworker.ts b/src/dataworker/Dataworker.ts index 44b243078..6c3448243 100644 --- a/src/dataworker/Dataworker.ts +++ b/src/dataworker/Dataworker.ts @@ -989,6 +989,69 @@ export class Dataworker { }; } + // Designed to be called before executing leaves in root bundle to ensure that the BundleDataClient.loadData + // output is cached. This is useful because `_proposeRootBundle` can be very slow if called in parallel + // for every spoke pool client. Instead, call it sequentially for the max number of bundles to inspect before + // trying to execute leaves in parallel. + async warmBundleDataCache( + spokePoolClients: { [chainId: number]: SpokePoolClient }, + earliestBlocksInSpokePoolClients: { [chainId: number]: number } = {} + ): Promise { + this.logger.debug({ + at: "Dataworker#warmBundleDataCache", + message: `Warming bundle data for the latest ${this.spokeRootsLookbackCount} root bundles`, + }); + + const timerStart = Date.now(); + let latestRootBundles = sortEventsDescending(this.clients.hubPoolClient.getValidatedRootBundles()); + if (this.spokeRootsLookbackCount !== 0) { + latestRootBundles = latestRootBundles.slice(0, this.spokeRootsLookbackCount); + } + + await Promise.all( + latestRootBundles.map(async (rootBundle) => { + const blockNumberRanges = getImpliedBundleBlockRanges( + this.clients.hubPoolClient, + this.clients.configStoreClient, + rootBundle + ); + const mainnetBlockRange = blockNumberRanges[0]; + const chainIds = this.clients.configStoreClient.getChainIdIndicesForBlock(mainnetBlockRange[0]); + if ( + Object.keys(earliestBlocksInSpokePoolClients).length > 0 && + (await blockRangesAreInvalidForSpokeClients( + spokePoolClients, + blockNumberRanges, + chainIds, + earliestBlocksInSpokePoolClients, + this.isV3(mainnetBlockRange[0]) + )) + ) { + // Log this as a debug level and let the executeX function log it at the warn level. + this.logger.debug({ + at: "Dataworker#warmBundleDataCache", + message: "Cannot validate bundle with insufficient event data. Set a larger DATAWORKER_FAST_LOOKBACK_COUNT", + rootBundleRanges: blockNumberRanges, + availableSpokePoolClients: Object.keys(spokePoolClients), + earliestBlocksInSpokePoolClients, + spokeClientsEventSearchConfigs: Object.fromEntries( + Object.entries(spokePoolClients).map(([chainId, client]) => [chainId, client.eventSearchConfig]) + ), + }); + return; + } + await this._proposeRootBundle(blockNumberRanges, spokePoolClients, rootBundle.blockNumber); + }) + ); + this.logger.debug({ + at: "Dataworker#warmBundleDataCache", + message: `Warmed bundle data cache for the latest ${this.spokeRootsLookbackCount} root bundles in ${ + Date.now() - timerStart + }ms`, + latestRootBundles, + }); + } + // TODO: this method and executeRelayerRefundLeaves have a lot of similarities, but they have some key differences // in both the events they search for and the comparisons they make. We should try to generalize this in the future, // but keeping them separate is probably the simplest for the initial implementation. diff --git a/src/dataworker/index.ts b/src/dataworker/index.ts index ae24e8d15..2e370f373 100644 --- a/src/dataworker/index.ts +++ b/src/dataworker/index.ts @@ -137,6 +137,8 @@ export async function runDataworker(_logger: winston.Logger, baseSigner: Signer) fromBlocks ); + // Warm cache before executing slow and refund leaves. + await dataworker.warmBundleDataCache(spokePoolClients, fromBlocks); // Execute slow relays before relayer refunds to give them priority for any L2 funds. await dataworker.executeSlowRelayLeaves( spokePoolClients, From 15e23b852c4fafb951da2a053249ba000bdd9e86 Mon Sep 17 00:00:00 2001 From: Matt Rice Date: Fri, 22 Mar 2024 18:41:50 -0400 Subject: [PATCH 2/7] feat: restructure cache to limit parallelism Signed-off-by: Matt Rice --- src/clients/BundleDataClient.ts | 22 ++++++++++++++-------- src/dataworker/Dataworker.ts | 2 +- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/clients/BundleDataClient.ts b/src/clients/BundleDataClient.ts index 13bafafa9..79f9cc5b8 100644 --- a/src/clients/BundleDataClient.ts +++ b/src/clients/BundleDataClient.ts @@ -53,7 +53,7 @@ import { LoadDataReturnValue, } from "../interfaces/BundleData"; -type DataCache = Record; +type DataCache = Record>; // V3 dictionary helper functions function updateExpiredDepositsV3(dict: ExpiredDepositsToRefundV3, deposit: V3DepositWithBlock): void { @@ -158,10 +158,10 @@ export class BundleDataClient { this.loadDataCache = {}; } - loadDataFromCache(key: string): LoadDataReturnValue { + async loadDataFromCache(key: string): Promise { // Always return a deep cloned copy of object stored in cache. Since JS passes by reference instead of value, we // want to minimize the risk that the programmer accidentally mutates data in the cache. - return _.cloneDeep(this.loadDataCache[key]); + return _.cloneDeep(await this.loadDataCache[key]); } getBundleTimestampsFromCache(key: string): undefined | { [chainId: number]: number[] } { @@ -299,10 +299,18 @@ export class BundleDataClient { ): Promise { const key = JSON.stringify(blockRangesForChains); - if (this.loadDataCache[key]) { - return this.loadDataFromCache(key); + if (!this.loadDataCache[key]) { + this.loadDataCache[key] = this._loadData(blockRangesForChains, spokePoolClients, logData); } + return this.loadDataFromCache(key); + } + + async _loadData( + blockRangesForChains: number[][], + spokePoolClients: SpokePoolClientsByChain, + logData = true + ): Promise { if (!this.clients.configStoreClient.isUpdated) { throw new Error("ConfigStoreClient not updated"); } else if (!this.clients.hubPoolClient.isUpdated) { @@ -1061,7 +1069,7 @@ export class BundleDataClient { }); } - this.loadDataCache[key] = { + return { fillsToRefund, deposits, unfilledDeposits, @@ -1073,8 +1081,6 @@ export class BundleDataClient { unexecutableSlowFills, bundleSlowFillsV3, }; - - return this.loadDataFromCache(key); } async getBundleBlockTimestamps( diff --git a/src/dataworker/Dataworker.ts b/src/dataworker/Dataworker.ts index 6c3448243..7ee8d1462 100644 --- a/src/dataworker/Dataworker.ts +++ b/src/dataworker/Dataworker.ts @@ -97,7 +97,7 @@ export type PoolRebalanceRoot = { tree: MerkleTree; }; -type PoolRebalanceRootCache = Record; +type PoolRebalanceRootCache = Record>; // @notice Constructs roots to submit to HubPool on L1. Fetches all data synchronously from SpokePool/HubPool clients // so this class assumes that those upstream clients are already updated and have fetched on-chain data from RPC's. From 0f5ba2b01e9ea2f7eb75ad8f7d28047410e603bb Mon Sep 17 00:00:00 2001 From: Matt Rice Date: Fri, 22 Mar 2024 18:41:58 -0400 Subject: [PATCH 3/7] Revert "improve(dataworker): Warm BundleData loadData cache" This reverts commit 27cea6bd30db72f91b6fe720135c78517f7d8acd. --- src/dataworker/Dataworker.ts | 63 ------------------------------------ src/dataworker/index.ts | 2 -- 2 files changed, 65 deletions(-) diff --git a/src/dataworker/Dataworker.ts b/src/dataworker/Dataworker.ts index 7ee8d1462..29a411f4d 100644 --- a/src/dataworker/Dataworker.ts +++ b/src/dataworker/Dataworker.ts @@ -989,69 +989,6 @@ export class Dataworker { }; } - // Designed to be called before executing leaves in root bundle to ensure that the BundleDataClient.loadData - // output is cached. This is useful because `_proposeRootBundle` can be very slow if called in parallel - // for every spoke pool client. Instead, call it sequentially for the max number of bundles to inspect before - // trying to execute leaves in parallel. - async warmBundleDataCache( - spokePoolClients: { [chainId: number]: SpokePoolClient }, - earliestBlocksInSpokePoolClients: { [chainId: number]: number } = {} - ): Promise { - this.logger.debug({ - at: "Dataworker#warmBundleDataCache", - message: `Warming bundle data for the latest ${this.spokeRootsLookbackCount} root bundles`, - }); - - const timerStart = Date.now(); - let latestRootBundles = sortEventsDescending(this.clients.hubPoolClient.getValidatedRootBundles()); - if (this.spokeRootsLookbackCount !== 0) { - latestRootBundles = latestRootBundles.slice(0, this.spokeRootsLookbackCount); - } - - await Promise.all( - latestRootBundles.map(async (rootBundle) => { - const blockNumberRanges = getImpliedBundleBlockRanges( - this.clients.hubPoolClient, - this.clients.configStoreClient, - rootBundle - ); - const mainnetBlockRange = blockNumberRanges[0]; - const chainIds = this.clients.configStoreClient.getChainIdIndicesForBlock(mainnetBlockRange[0]); - if ( - Object.keys(earliestBlocksInSpokePoolClients).length > 0 && - (await blockRangesAreInvalidForSpokeClients( - spokePoolClients, - blockNumberRanges, - chainIds, - earliestBlocksInSpokePoolClients, - this.isV3(mainnetBlockRange[0]) - )) - ) { - // Log this as a debug level and let the executeX function log it at the warn level. - this.logger.debug({ - at: "Dataworker#warmBundleDataCache", - message: "Cannot validate bundle with insufficient event data. Set a larger DATAWORKER_FAST_LOOKBACK_COUNT", - rootBundleRanges: blockNumberRanges, - availableSpokePoolClients: Object.keys(spokePoolClients), - earliestBlocksInSpokePoolClients, - spokeClientsEventSearchConfigs: Object.fromEntries( - Object.entries(spokePoolClients).map(([chainId, client]) => [chainId, client.eventSearchConfig]) - ), - }); - return; - } - await this._proposeRootBundle(blockNumberRanges, spokePoolClients, rootBundle.blockNumber); - }) - ); - this.logger.debug({ - at: "Dataworker#warmBundleDataCache", - message: `Warmed bundle data cache for the latest ${this.spokeRootsLookbackCount} root bundles in ${ - Date.now() - timerStart - }ms`, - latestRootBundles, - }); - } - // TODO: this method and executeRelayerRefundLeaves have a lot of similarities, but they have some key differences // in both the events they search for and the comparisons they make. We should try to generalize this in the future, // but keeping them separate is probably the simplest for the initial implementation. diff --git a/src/dataworker/index.ts b/src/dataworker/index.ts index 2e370f373..ae24e8d15 100644 --- a/src/dataworker/index.ts +++ b/src/dataworker/index.ts @@ -137,8 +137,6 @@ export async function runDataworker(_logger: winston.Logger, baseSigner: Signer) fromBlocks ); - // Warm cache before executing slow and refund leaves. - await dataworker.warmBundleDataCache(spokePoolClients, fromBlocks); // Execute slow relays before relayer refunds to give them priority for any L2 funds. await dataworker.executeSlowRelayLeaves( spokePoolClients, From d4bc61cdb4ab9626701bafd63323c6655af716b8 Mon Sep 17 00:00:00 2001 From: Matt Rice Date: Fri, 22 Mar 2024 18:48:01 -0400 Subject: [PATCH 4/7] WIP Signed-off-by: Matt Rice --- src/dataworker/Dataworker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dataworker/Dataworker.ts b/src/dataworker/Dataworker.ts index 29a411f4d..44b243078 100644 --- a/src/dataworker/Dataworker.ts +++ b/src/dataworker/Dataworker.ts @@ -97,7 +97,7 @@ export type PoolRebalanceRoot = { tree: MerkleTree; }; -type PoolRebalanceRootCache = Record>; +type PoolRebalanceRootCache = Record; // @notice Constructs roots to submit to HubPool on L1. Fetches all data synchronously from SpokePool/HubPool clients // so this class assumes that those upstream clients are already updated and have fetched on-chain data from RPC's. From a42239bb2751b0f40659b6c19a369030192596d1 Mon Sep 17 00:00:00 2001 From: nicholaspai <9457025+nicholaspai@users.noreply.github.com> Date: Fri, 22 Mar 2024 18:49:46 -0400 Subject: [PATCH 5/7] Update src/clients/BundleDataClient.ts --- src/clients/BundleDataClient.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/clients/BundleDataClient.ts b/src/clients/BundleDataClient.ts index 79f9cc5b8..6a68b203b 100644 --- a/src/clients/BundleDataClient.ts +++ b/src/clients/BundleDataClient.ts @@ -161,7 +161,7 @@ export class BundleDataClient { async loadDataFromCache(key: string): Promise { // Always return a deep cloned copy of object stored in cache. Since JS passes by reference instead of value, we // want to minimize the risk that the programmer accidentally mutates data in the cache. - return _.cloneDeep(await this.loadDataCache[key]); + return _.cloneDeep(await this._loadData[key]); } getBundleTimestampsFromCache(key: string): undefined | { [chainId: number]: number[] } { From a20e637b96a367f681738f15cf4f363279034531 Mon Sep 17 00:00:00 2001 From: Matt Rice Date: Fri, 22 Mar 2024 18:54:05 -0400 Subject: [PATCH 6/7] WIP Signed-off-by: Matt Rice --- src/clients/BundleDataClient.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/clients/BundleDataClient.ts b/src/clients/BundleDataClient.ts index 6a68b203b..69e2b39ae 100644 --- a/src/clients/BundleDataClient.ts +++ b/src/clients/BundleDataClient.ts @@ -161,7 +161,7 @@ export class BundleDataClient { async loadDataFromCache(key: string): Promise { // Always return a deep cloned copy of object stored in cache. Since JS passes by reference instead of value, we // want to minimize the risk that the programmer accidentally mutates data in the cache. - return _.cloneDeep(await this._loadData[key]); + return _.cloneDeep(await this.loadDataCache[key]); } getBundleTimestampsFromCache(key: string): undefined | { [chainId: number]: number[] } { @@ -311,6 +311,8 @@ export class BundleDataClient { spokePoolClients: SpokePoolClientsByChain, logData = true ): Promise { + const key = JSON.stringify(blockRangesForChains); + if (!this.clients.configStoreClient.isUpdated) { throw new Error("ConfigStoreClient not updated"); } else if (!this.clients.hubPoolClient.isUpdated) { From f8392787880c3d06c034ef4e8cd30045a301b16d Mon Sep 17 00:00:00 2001 From: Matt Rice Date: Fri, 22 Mar 2024 18:59:37 -0400 Subject: [PATCH 7/7] add same structure for root cache Signed-off-by: Matt Rice --- src/dataworker/Dataworker.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dataworker/Dataworker.ts b/src/dataworker/Dataworker.ts index 82473c4c6..7d53c8c31 100644 --- a/src/dataworker/Dataworker.ts +++ b/src/dataworker/Dataworker.ts @@ -97,7 +97,7 @@ export type PoolRebalanceRoot = { tree: MerkleTree; }; -type PoolRebalanceRootCache = Record; +type PoolRebalanceRootCache = Record>; // @notice Constructs roots to submit to HubPool on L1. Fetches all data synchronously from SpokePool/HubPool clients // so this class assumes that those upstream clients are already updated and have fetched on-chain data from RPC's. @@ -2275,7 +2275,7 @@ export class Dataworker { // executor running for tonight (2023-08-28) until we can fix the // root cache rebalancing bug. if (!this.rootCache[key] || process.env.DATAWORKER_DISABLE_REBALANCE_ROOT_CACHE === "true") { - this.rootCache[key] = await _buildPoolRebalanceRoot( + this.rootCache[key] = _buildPoolRebalanceRoot( latestMainnetBlock, mainnetBundleEndBlock, fillsToRefund, @@ -2296,7 +2296,7 @@ export class Dataworker { ); } - return _.cloneDeep(this.rootCache[key]); + return _.cloneDeep(await this.rootCache[key]); } _getRequiredEthForArbitrumPoolRebalanceLeaf(leaf: PoolRebalanceLeaf): BigNumber {