From 960c4e633abcae79be01b9e1f87cfe45ec47a5aa Mon Sep 17 00:00:00 2001 From: Enzo Cioppettini Date: Thu, 14 Mar 2024 13:47:32 -0300 Subject: [PATCH] parallel evm funnel: increase concurrency in readData --- .../src/funnels/parallelEvm/funnel.ts | 110 ++++++++++++++---- 1 file changed, 85 insertions(+), 25 deletions(-) diff --git a/packages/engine/paima-funnel/src/funnels/parallelEvm/funnel.ts b/packages/engine/paima-funnel/src/funnels/parallelEvm/funnel.ts index ed75d2d1..4a334c0a 100644 --- a/packages/engine/paima-funnel/src/funnels/parallelEvm/funnel.ts +++ b/packages/engine/paima-funnel/src/funnels/parallelEvm/funnel.ts @@ -38,21 +38,75 @@ export class ParallelEvmFunnel extends BaseFunnel implements ChainFunnel { public override async readData(blockHeight: number): Promise { const cachedState = this.getState(); + let latestParallelChainTimestamp: number | undefined; + // if in the previous round we couldn't return some blocks because the // parallel chain didn't get far enough, we first process those. if (cachedState.bufferedChainData.length === 0) { - const baseData = await this.baseFunnel.readData(blockHeight); - cachedState.bufferedChainData.push(...baseData); - } + await Promise.all([ + (async (): Promise => { + const baseData = await this.baseFunnel.readData(blockHeight); + cachedState.bufferedChainData.push(...baseData); + return; + })(), + , + (async (): Promise => { + const latestBlock = await this.updateLatestBlock(); + + latestParallelChainTimestamp = Number( + (await this.web3.eth.getBlock(latestBlock)).timestamp + ); + + // It may be possible that if we always optimistically fetch blocks we + // may end up with some sort of 'memory leak' here. + // + // For example, if on average the base funnel spans a range of 10 + // seconds in a single round, and this funnel spans a range of 20 + // seconds, the cached data would grow unbounded. In practice we would + // eventually reach the chain's tip, and that would solve that + // problem, but we could be syncing from a really old block. + // + // The following is to prevent that situation. If we still need more + // blocks we still fetch those later, so not doing anything here is + // not a problem. We need to do this here instead of the range + // finalization logic, since we don't have a block from the base chain + // yet. + if (cachedState.bufferedChainData.length > this.config.funnelBlockGroupSize) return; + + const from = Math.max( + cachedState.startBlockHeight, + cachedState.lastBlock ? cachedState.lastBlock + 1 : 0 + ); + + const to = Math.min(from + this.config.funnelBlockGroupSize, latestBlock); + + doLog(`ParallelEvm funnel ${this.config.chainId}: #${from}-${to}`); + + // note: we could potentially do multiple rounds here in the time it + // takes the wrapped funnel to return, but this may require tracking + // some stats, since there is no easy way to figure out just from the + // config how many rounds we could do. + const parallelEvmBlocks = await getMultipleBlockData(this.web3, from, to, this.chainName); + + for (const parallelChainBlock of parallelEvmBlocks) { + cachedState.timestampToBlockNumber.push([ + parallelChainBlock.timestamp, + parallelChainBlock.blockNumber, + ]); + + cachedState.lastBlock = parallelChainBlock.blockNumber; + } - const latestBlockQueryState = this.latestBlock(); - const latestBlock = await this.web3.eth.getBlock(latestBlockQueryState); + return; + })(), + ]); + } const chainData: ChainData[] = []; // filter the data so that we are sure we can get all the blocks in the range for (const data of cachedState.bufferedChainData) { - if (data.timestamp <= Number(latestBlock.timestamp)) { + if (data.timestamp <= latestParallelChainTimestamp!) { chainData.push(data); } } @@ -99,9 +153,11 @@ export class ParallelEvmFunnel extends BaseFunnel implements ChainFunnel { const maxTimestamp = chainData[chainData.length - 1].timestamp; - const blocks = []; - - while (true) { + // if we already have enough blocks, either because we fetched those + // concurrently, or because in the previous round we fetched too many + // (funnelBlockGroupSize for this funnel covers much more range than for the + // wrapped funnel), we don't pull anything. + while (!this.canFinalizeBlockRangeWith(maxTimestamp)) { const latestBlock = this.latestBlock(); const to = Math.min(latestBlock, cachedState.lastBlock + this.config.funnelBlockGroupSize); @@ -117,16 +173,17 @@ export class ParallelEvmFunnel extends BaseFunnel implements ChainFunnel { this.chainName ); - blocks.push(...parallelEvmBlocks); + for (const parallelChainBlock of parallelEvmBlocks) { + cachedState.timestampToBlockNumber.push([ + parallelChainBlock.timestamp, + parallelChainBlock.blockNumber, + ]); - // this has to be > instead of >= - // because there can be multiple blocks with the same timestamp (e.g. Arbitrum) - if (blocks.length > 0 && blocks[blocks.length - 1].timestamp > maxTimestamp) { - break; + cachedState.lastBlock = parallelChainBlock.blockNumber; } - if (blocks.length > 0) { - cachedState.lastBlock = blocks[blocks.length - 1].blockNumber; + if (this.canFinalizeBlockRangeWith(maxTimestamp)) { + break; } // We reach this part of the code if after we fetch blocks we still aren't done syncing. @@ -143,15 +200,6 @@ export class ParallelEvmFunnel extends BaseFunnel implements ChainFunnel { } } - for (const parallelChainBlock of blocks) { - cachedState.timestampToBlockNumber.push([ - parallelChainBlock.timestamp, - parallelChainBlock.blockNumber, - ]); - - cachedState.lastBlock = parallelChainBlock.blockNumber; - } - // remove old entries from the timestamp to block mapping, so that it // doesn't grow forever, since it's cached. while (true) { @@ -532,6 +580,18 @@ export class ParallelEvmFunnel extends BaseFunnel implements ChainFunnel { return bufferedState.result; } + + private canFinalizeBlockRangeWith(maxTimestamp: number): boolean { + const cachedState = this.getState(); + + // this has to be > instead of >= + // because there can be multiple blocks with the same timestamp (e.g. Arbitrum) + return ( + cachedState.timestampToBlockNumber.length > 0 && + cachedState.timestampToBlockNumber[cachedState.timestampToBlockNumber.length - 1][0] > + maxTimestamp + ); + } } export async function wrapToParallelEvmFunnel(