From 9889f40f970ccbeb24a87d6e6d9848ae320b0430 Mon Sep 17 00:00:00 2001 From: "James Morris, MS" <96435344+james-a-morris@users.noreply.github.com> Date: Tue, 22 Oct 2024 13:02:48 -0400 Subject: [PATCH] feat: hubpool balance tracking and persistence (#78) Signed-off-by: james-a-morris Co-authored-by: amateima <89395931+amateima@users.noreply.github.com> --- packages/indexer/src/main.ts | 1 + .../indexer/src/redis/bundleLeavesCache.ts | 1 - .../indexer/src/redis/hubBalancesCache.ts | 176 ++++++++++++++++++ .../src/services/BundleBuilderService.ts | 131 +++++++++++-- packages/indexer/src/utils/contractUtils.ts | 2 + 5 files changed, 296 insertions(+), 15 deletions(-) create mode 100644 packages/indexer/src/redis/hubBalancesCache.ts diff --git a/packages/indexer/src/main.ts b/packages/indexer/src/main.ts index ccb87747..10fbcfc1 100644 --- a/packages/indexer/src/main.ts +++ b/packages/indexer/src/main.ts @@ -119,6 +119,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { hubClientFactory: hubPoolClientFactory, spokePoolClientFactory, configStoreClientFactory, + hubChainId, }); const hubPoolIndexerDataHandler = new HubPoolIndexerDataHandler( diff --git a/packages/indexer/src/redis/bundleLeavesCache.ts b/packages/indexer/src/redis/bundleLeavesCache.ts index 8679830a..2bde9dc8 100644 --- a/packages/indexer/src/redis/bundleLeavesCache.ts +++ b/packages/indexer/src/redis/bundleLeavesCache.ts @@ -1,4 +1,3 @@ -import assert from "assert"; import Redis from "ioredis"; import * as s from "superstruct"; diff --git a/packages/indexer/src/redis/hubBalancesCache.ts b/packages/indexer/src/redis/hubBalancesCache.ts new file mode 100644 index 00000000..5db7dc73 --- /dev/null +++ b/packages/indexer/src/redis/hubBalancesCache.ts @@ -0,0 +1,176 @@ +import { isDefined } from "@across-protocol/sdk/dist/cjs/utils/TypeGuards"; +import Redis from "ioredis"; +import * as s from "superstruct"; + +export type Config = { + redis: Redis; + prefix: string; +}; + +export const HubPoolBalance = s.object({ + l1Token: s.string(), + currentNetSendAmounts: s.string(), + pendingNetSendAmounts: s.nullable(s.string()), + currentLiquidReserves: s.string(), + pendingLiquidReserves: s.nullable(s.string()), +}); +export type HubPoolBalance = s.Infer; +export type HubPoolBalances = HubPoolBalance[]; + +/** + * Class to interact with a Redis-backed cache for storing and retrieving hub balances. + */ +export class HubPoolBalanceCache { + /** + * @param config - The configuration object, including the Redis instance and prefix. + */ + constructor(private config: Config) {} + + /** + * Stores a HubPoolBalance object in Redis, indexed by l1Token. + * Also adds the key to separate index for its l1Token for efficient lookups. + * + * @param datum A HubPoolBalance record to store. + * @returns A promise that resolves when the datum is successfully stored. + */ + async set(datum: HubPoolBalance): Promise { + const key = this.getKey(datum.l1Token); + await Promise.all([ + this.config.redis.set(key, JSON.stringify(datum)), + // Add to indexes for quick retrieval by l1Token separately + this.config.redis.sadd(this.getL1TokenIndexKey(), key), + ]); + } + + /** + * Stores multiple HubPoolBalance objects in Redis, indexed by l1Token. + * + * @param data An array of HubPoolBalance records to store. + */ + async setAll(data: HubPoolBalance[]): Promise { + await Promise.all(data.map((datum) => this.set(datum))); + } + + /** + * Retrieves a HubPoolBalance from Redis by l1Token. + * + * @param l1Token The l1Token to query. + * @returns The retrieved HubPoolBalance or undefined if not found. + */ + async get(l1Token: string): Promise { + const key = this.getKey(l1Token); + const data = await this.config.redis.get(key); + return data ? s.create(JSON.parse(data), HubPoolBalance) : undefined; + } + + /** + * Retrieves all HubPoolBalances from Redis that exist across all recorded l1 tokens. + * + * @returns An array of HubPoolBalances. + */ + async getAllL1Tokens(): Promise { + const keys = await this.config.redis.smembers(this.getL1TokenIndexKey()); + return this.getDataByKeys(keys); + } + + /** + * Deletes a HubPoolBalance from Redis by l1Token. + * Also removes the corresponding key from the l1Token index. + * + * @param l1Token The l1Token to delete. + * @returns True if the record was deleted, false otherwise. + */ + async delete(l1Token: string): Promise { + const key = this.getKey(l1Token); + + // Remove from Redis + const result = await this.config.redis.del(key); + + // Also remove from the indexes + await this.config.redis.srem(this.getL1TokenIndexKey(), key); + + return result > 0; + } + + /** + * Checks if a specific l1Token key exists in Redis. + * + * @param l1Token The l1Token to check. + * @returns True if the record exists, false otherwise. + */ + async has(l1Token: string): Promise { + const key = this.getKey(l1Token); + const result = await this.config.redis.exists(key); + return result > 0; + } + + /** + * Clears the entire cache by deleting all keys that match the configured prefix. + * This method uses the SCAN command to safely iterate through all matching keys. + * + * @returns A promise that resolves when the cache is cleared. + */ + async clear(): Promise { + const pattern = `${this.config.prefix}:*`; + let cursor = "0"; + do { + // SCAN the keys that match the pattern in batches + const [newCursor, keys] = await this.config.redis.scan( + cursor, + "MATCH", + pattern, + "COUNT", + 100, + ); + cursor = newCursor; + + if (keys.length > 0) { + // Use pipeline to efficiently delete multiple keys at once + const pipeline = this.config.redis.pipeline(); + keys.forEach((key) => pipeline.del(key)); + await pipeline.exec(); + } + } while (cursor !== "0"); + } + + /** + * Helper function to retrieve data by a list of Redis keys. + * + * @private + * @param keys The Redis keys to retrieve. + * @returns An array of HubPoolBalances. + */ + private async getDataByKeys(keys: string[]): Promise { + const pipeline = this.config.redis.pipeline(); + keys.forEach((key) => pipeline.get(key)); + const results = (await pipeline.exec()) ?? []; + return results + .filter(([err, result]) => !err && result) + .map(([_, result]) => + result + ? s.create(JSON.parse(result as string), HubPoolBalance) + : undefined, + ) + .filter(isDefined); + } + + /** + * Helper function to generate the Redis key for a specific l1Token. + * + * @param l1Token - The l1Token to use in the key. + * @returns The Redis key for the HubPoolBalance. + */ + private getKey(l1Token: string): string { + return `${this.config.prefix}:${l1Token}`; + } + + /** + * Helper function to generate the Redis key for the l1Token index. + * + * @private + * @returns The Redis key for the l1Token set of indices. + */ + private getL1TokenIndexKey(): string { + return `${this.config.prefix}:l1TokenIndex`; + } +} diff --git a/packages/indexer/src/services/BundleBuilderService.ts b/packages/indexer/src/services/BundleBuilderService.ts index 78dd5e4f..c527e7d9 100644 --- a/packages/indexer/src/services/BundleBuilderService.ts +++ b/packages/indexer/src/services/BundleBuilderService.ts @@ -1,5 +1,4 @@ -import { CHAIN_IDs } from "@across-protocol/constants"; -import { caching, clients, utils } from "@across-protocol/sdk"; +import { caching, clients, typechain, utils } from "@across-protocol/sdk"; import { entities } from "@repo/indexer-database"; import assert from "assert"; import Redis from "ioredis"; @@ -7,7 +6,9 @@ import winston from "winston"; import { BundleRepository } from "../database/BundleRepository"; import { BaseIndexer } from "../generics"; import { BundleLeavesCache } from "../redis/bundleLeavesCache"; +import { HubPoolBalanceCache } from "../redis/hubBalancesCache"; import { + BN_ZERO, ConfigStoreClientFactory, convertProposalRangeResultToProposalRange, getBlockRangeBetweenBundles, @@ -22,6 +23,13 @@ import { RetryProvidersFactory } from "../web3/RetryProvidersFactory"; const MAX_DISTANCE_TO_MAINNET_HEAD = 10_000; +type BundleLeafType = { + chainId: number; + l1Tokens: string[]; + netSendAmounts: string[]; + runningBalances: string[]; +}; + type BundleBuilderConfig = { logger: winston.Logger; bundleRepository: BundleRepository; @@ -30,11 +38,13 @@ type BundleBuilderConfig = { hubClientFactory: HubPoolClientFactory; configStoreClientFactory: ConfigStoreClientFactory; spokePoolClientFactory: SpokePoolClientFactory; + hubChainId: number; }; export class BundleBuilderService extends BaseIndexer { private currentBundleCache: BundleLeavesCache; private proposedBundleCache: BundleLeavesCache; + private hubBalanceCache: HubPoolBalanceCache; constructor(private config: BundleBuilderConfig) { super(config.logger, "bundleBuilder"); } @@ -66,11 +76,17 @@ export class BundleBuilderService extends BaseIndexer { this.handleCurrentBundleLoop(lastExecutedBundle, lastProposedBundle), this.handleProposedBundleLoop(lastExecutedBundle, lastProposedBundle), ]); + + const [hubBalanceResult] = await Promise.allSettled([ + this.handleHubBalanceAggregation(lastExecutedBundle), + ]); + this.logger.info({ - at: "BundleBuilder#Processor#indexerLogic", + at: "BundleBuilderService#indexerLogic", message: "Bundle builder loop completed", currentLoopResult: currentLoopResult.status, proposedLoopResult: proposedLoopResult.status, + hubBalanceResult: hubBalanceResult.status, }); } @@ -83,6 +99,10 @@ export class BundleBuilderService extends BaseIndexer { redis: this.config.redis, prefix: "proposedBundleCache", }); + this.hubBalanceCache = new HubPoolBalanceCache({ + redis: this.config.redis, + prefix: "hubBalanceCache", + }); return Promise.resolve(); } @@ -97,7 +117,7 @@ export class BundleBuilderService extends BaseIndexer { lastExecutedBundle: entities.ProposedRootBundle, ) { const currentMainnetBlock = await this.config.providerFactory - .getProviderForChainId(CHAIN_IDs.MAINNET) + .getProviderForChainId(this.config.hubChainId) .getBlockNumber(); const lastExecutedMainnetBlock = lastExecutedBundle.bundleEvaluationBlockNumbers[0]!; @@ -105,6 +125,92 @@ export class BundleBuilderService extends BaseIndexer { return distanceToHead < MAX_DISTANCE_TO_MAINNET_HEAD; } + /** + * Handles the hub balance aggregation logic. + * @dev Ensure that this function is run after the proposed and current bundle + * loops have been completed. + */ + private async handleHubBalanceAggregation( + executedBundle: entities.Bundle, + ): Promise { + // Resolve a hub client and config store client + const hubClient = this.config.hubClientFactory.get(this.config.hubChainId); + const configStoreClient = hubClient.configStoreClient; + void (await configStoreClient.update()); + void (await hubClient.update()); + + // Resolve the L1 tokens to aggregate + const l1Tokens = hubClient.getL1Tokens(); + + // Iterate over all l1 tokens and resolve the liquid reserves + const hubBalances = await Promise.all( + l1Tokens.map(async ({ address: l1Token }) => { + const hubPoolContract = hubClient.hubPool as typechain.HubPool; + + // Resolve the liquid reserve for the given L1Token stored in the + // pooledTokens structure at the end of the last executed bundle + // range for mainnet + const { liquidReserves } = + await hubPoolContract.callStatic.pooledTokens(l1Token, { + blockTag: executedBundle.proposal.bundleEvaluationBlockNumbers[0]!, + }); + // Resolve the current and proposed bundle data for the given L1Token from + // redis + const [currentBundleData, proposedBundleData] = await Promise.all([ + this.currentBundleCache.getByL1Token(l1Token), + this.proposedBundleCache.getByL1Token(l1Token), + ]); + // Filter out any undefined values + const currentBundleDataFiltered = currentBundleData.filter( + utils.isDefined, + ); + const proposedBundleDataFiltered = proposedBundleData.filter( + utils.isDefined, + ); + // Confirm that our current bundle data is not empty + if (!currentBundleData || currentBundleData.length === 0) { + this.logger.error({ + at: "BundleBuilder#Processor#handleHubBalanceAggregation", + message: + "No current bundle data found. Ensure that the current bundle loop has been run.", + l1Token, + }); + return; + } + const currentNetSendAmounts = currentBundleDataFiltered.reduce( + (acc, leaf) => acc.add(leaf.netSendAmount), + BN_ZERO, + ); + const proposedNetSendAmounts = proposedBundleDataFiltered.reduce( + (acc, leaf) => acc.add(leaf.netSendAmount), + BN_ZERO, + ); + const pendingLiquidReserves = liquidReserves.add( + proposedNetSendAmounts, + ); + const currentLiquidReserves = pendingLiquidReserves.add( + currentNetSendAmounts, + ); + const hasPendingBundle = proposedBundleDataFiltered.length > 0; + return { + l1Token, + currentNetSendAmounts: currentNetSendAmounts.toString(), + pendingNetSendAmounts: hasPendingBundle + ? proposedNetSendAmounts.toString() + : null, + currentLiquidReserves: currentLiquidReserves.toString(), + pendingLiquidReserves: hasPendingBundle + ? pendingLiquidReserves.toString() + : null, + }; + }), + ); + // Remove all l1 tokens from the redis cache + await this.hubBalanceCache.clear(); + // Persist the hub balances to the redis cache + await this.hubBalanceCache.setAll(hubBalances.filter(utils.isDefined)); + } + /** * Generates, processes, and persists the pool leaves for a bundle that * spans from the latest proposed (or executed if no proposal is live) to @@ -118,7 +224,7 @@ export class BundleBuilderService extends BaseIndexer { ): Promise { // Resolve a latest config store client and update it const configStoreClient = this.config.configStoreClientFactory.get( - CHAIN_IDs.MAINNET, + this.config.hubChainId, ); void (await configStoreClient.update()); // Resolve the latest proposal @@ -180,6 +286,8 @@ export class BundleBuilderService extends BaseIndexer { at: "BundleBuilder#Processor#handleProposedBundleLoop", message: "No proposed bundles found, skipping.", }); + // Clear the cache so that we don't have any stale data + await this.proposedBundleCache.clear(); return; } // Grab the ranges between the last executed and proposed bundles @@ -223,14 +331,7 @@ export class BundleBuilderService extends BaseIndexer { async resolvePoolLeafForBundleRange( ranges: ProposalRangeResult[], bundleHead: ProposalRange, - ): Promise< - { - chainId: number; - l1Tokens: string[]; - netSendAmounts: string[]; - runningBalances: string[]; - }[] - > { + ): Promise { // Convert into array of [start, end] for each chain const bundleRangeForBundleClient = ranges.map( ({ startBlock, endBlock }) => [startBlock, endBlock], @@ -255,7 +356,9 @@ export class BundleBuilderService extends BaseIndexer { } const historicalProposedBundle = historicalProposal.proposal; // Instantiate the Hub & ConfigStore Client from genesis - const hubPoolClient = this.config.hubClientFactory.get(CHAIN_IDs.MAINNET); + const hubPoolClient = this.config.hubClientFactory.get( + this.config.hubChainId, + ); const configStoreClient = hubPoolClient.configStoreClient; // Resolve lookback range for the spoke clients const lookbackRange = getBlockRangeBetweenBundles( diff --git a/packages/indexer/src/utils/contractUtils.ts b/packages/indexer/src/utils/contractUtils.ts index 2e126082..45080445 100644 --- a/packages/indexer/src/utils/contractUtils.ts +++ b/packages/indexer/src/utils/contractUtils.ts @@ -207,3 +207,5 @@ export function getRetryProvider( params.logger, ); } + +export const BN_ZERO = across.utils.bnZero;