From 99e5813746f477c060db3b2e0079926970d2fd46 Mon Sep 17 00:00:00 2001 From: amateima <89395931+amateima@users.noreply.github.com> Date: Mon, 4 Nov 2024 18:33:27 +0200 Subject: [PATCH] fix: refactor indexers and processor classes and add env vars (#94) Co-authored-by: Alexandru Matei --- packages/indexer/README.md | 5 +- .../service/AcrossIndexerManager.ts | 123 ++++++++++++++++++ .../service}/HubPoolIndexerDataHandler.ts | 8 +- .../src/data-indexing/service/Indexer.ts | 15 ++- .../service}/SpokePoolIndexerDataHandler.ts | 14 +- packages/indexer/src/main.ts | 121 +++++------------ packages/indexer/src/parseEnv.ts | 15 +++ .../src/services/BundleServicesManager.ts | 75 +++++++++++ packages/indexer/src/services/bundles.ts | 2 +- packages/indexer/src/services/index.ts | 6 +- 10 files changed, 279 insertions(+), 105 deletions(-) create mode 100644 packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts rename packages/indexer/src/{services => data-indexing/service}/HubPoolIndexerDataHandler.ts (96%) rename packages/indexer/src/{services => data-indexing/service}/SpokePoolIndexerDataHandler.ts (94%) create mode 100644 packages/indexer/src/services/BundleServicesManager.ts diff --git a/packages/indexer/README.md b/packages/indexer/README.md index dba901d8..5e4c5f9c 100644 --- a/packages/indexer/README.md +++ b/packages/indexer/README.md @@ -34,7 +34,6 @@ RPC_PROVIDER_URLS_10=https://optimism-mainnet.infura.io/v3/xxx RPC_PROVIDER_URLS_137=https://polygon-mainnet.infura.io/v3/xxx HUBPOOL_CHAIN=1 SPOKEPOOL_CHAINS_ENABLED=1,2 - PROVIDER_CACHE_TTL=3600 // optional @@ -47,4 +46,8 @@ PROVIDER_CACHE_TTL=100000 NODE_QUORUM=1 NODE_RETRIES=2 NODE_RETRY_DELAY=1000 + +ENABLE_HUBPOOL_INDEXER=true +ENABLE_BUNDLE_EVENTS_PROCESSOR=true +ENABLE_BUNDLE_BUILDER=true ``` diff --git a/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts b/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts new file mode 100644 index 00000000..38d9aa0c --- /dev/null +++ b/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts @@ -0,0 +1,123 @@ +import { Logger } from "winston"; + +import { DataSource } from "@repo/indexer-database"; + +import { Config } from "../../parseEnv"; +import { HubPoolRepository } from "../../database/HubPoolRepository"; +import { RedisCache } from "../../redis/redisCache"; +import { RetryProvidersFactory } from "../../web3/RetryProvidersFactory"; +import { SpokePoolRepository } from "../../database/SpokePoolRepository"; +import { IndexerQueuesService } from "../../messaging/service"; +import { SpokePoolProcessor } from "../../services/spokePoolProcessor"; + +import { HubPoolIndexerDataHandler } from "./HubPoolIndexerDataHandler"; +import { SpokePoolIndexerDataHandler } from "./SpokePoolIndexerDataHandler"; +import { + ConfigStoreClientFactory, + HubPoolClientFactory, + SpokePoolClientFactory, +} from "../../utils"; +import { Indexer } from "./Indexer"; +import { + getFinalisedBlockBufferDistance, + getLoopWaitTimeSeconds, +} from "./constants"; + +export class AcrossIndexerManager { + private hubPoolIndexer?: Indexer; + private spokePoolIndexers: Indexer[] = []; + + constructor( + private logger: Logger, + private config: Config, + private postgres: DataSource, + private configStoreClientFactory: ConfigStoreClientFactory, + private hubPoolClientFactory: HubPoolClientFactory, + private spokePoolClientFactory: SpokePoolClientFactory, + private retryProvidersFactory: RetryProvidersFactory, + private hubPoolRepository: HubPoolRepository, + private spokePoolRepository: SpokePoolRepository, + private redisCache: RedisCache, + private indexerQueuesService: IndexerQueuesService, + ) {} + + public async start() { + return Promise.all([ + this.startHubPoolIndexer(), + this.startSpokePoolIndexers(), + ]); + } + + public async stopGracefully() { + this.hubPoolIndexer?.stopGracefully(); + this.spokePoolIndexers.map((indexer) => indexer.stopGracefully()); + } + + private startHubPoolIndexer() { + if (!this.config.enableHubPoolIndexer) { + this.logger.warn("Hub pool indexer is disabled"); + return; + } + const hubPoolIndexerDataHandler = new HubPoolIndexerDataHandler( + this.logger, + this.config.hubChainId, + this.configStoreClientFactory, + this.hubPoolClientFactory, + this.hubPoolRepository, + ); + this.hubPoolIndexer = new Indexer( + { + loopWaitTimeSeconds: getLoopWaitTimeSeconds(this.config.hubChainId), + finalisedBlockBufferDistance: getFinalisedBlockBufferDistance( + this.config.hubChainId, + ), + }, + hubPoolIndexerDataHandler, + this.retryProvidersFactory.getProviderForChainId(this.config.hubChainId), + this.redisCache, + this.logger, + ); + + return this.hubPoolIndexer.start(); + } + + private async startSpokePoolIndexers() { + const spokePoolIndexers = this.config.spokePoolChainsEnabled.map( + (chainId) => { + const spokePoolIndexerDataHandler = new SpokePoolIndexerDataHandler( + this.logger, + chainId, + this.config.hubChainId, + this.retryProvidersFactory.getProviderForChainId(chainId), + this.configStoreClientFactory, + this.hubPoolClientFactory, + this.spokePoolClientFactory, + this.spokePoolRepository, + new SpokePoolProcessor(this.postgres, this.logger, chainId), + this.indexerQueuesService, + ); + const spokePoolIndexer = new Indexer( + { + loopWaitTimeSeconds: getLoopWaitTimeSeconds(chainId), + finalisedBlockBufferDistance: + getFinalisedBlockBufferDistance(chainId), + }, + spokePoolIndexerDataHandler, + this.retryProvidersFactory.getProviderForChainId(chainId), + this.redisCache, + this.logger, + ); + return spokePoolIndexer; + }, + ); + + if (this.spokePoolIndexers.length === 0) { + this.logger.warn("No spoke pool indexers to start"); + return; + } + this.spokePoolIndexers = spokePoolIndexers; + return Promise.all( + this.spokePoolIndexers.map((indexer) => indexer.start()), + ); + } +} diff --git a/packages/indexer/src/services/HubPoolIndexerDataHandler.ts b/packages/indexer/src/data-indexing/service/HubPoolIndexerDataHandler.ts similarity index 96% rename from packages/indexer/src/services/HubPoolIndexerDataHandler.ts rename to packages/indexer/src/data-indexing/service/HubPoolIndexerDataHandler.ts index 99d1c458..11b171d0 100644 --- a/packages/indexer/src/services/HubPoolIndexerDataHandler.ts +++ b/packages/indexer/src/data-indexing/service/HubPoolIndexerDataHandler.ts @@ -1,14 +1,14 @@ import { Logger } from "winston"; import * as across from "@across-protocol/sdk"; -import * as utils from "../utils"; +import * as utils from "../../utils"; import { getDeployedBlockNumber, getDeployedAddress, } from "@across-protocol/contracts"; -import { IndexerDataHandler } from "../data-indexing/service/IndexerDataHandler"; -import { BlockRange } from "../data-indexing/model"; -import { HubPoolRepository } from "../database/HubPoolRepository"; +import { IndexerDataHandler } from "./IndexerDataHandler"; +import { BlockRange } from "../model"; +import { HubPoolRepository } from "../../database/HubPoolRepository"; type FetchEventsResult = { proposedRootBundleEvents: (across.interfaces.ProposedRootBundle & { diff --git a/packages/indexer/src/data-indexing/service/Indexer.ts b/packages/indexer/src/data-indexing/service/Indexer.ts index 238c8796..0be279f0 100644 --- a/packages/indexer/src/data-indexing/service/Indexer.ts +++ b/packages/indexer/src/data-indexing/service/Indexer.ts @@ -17,6 +17,7 @@ type BlockRangeResult = { latestBlockNumber: number; blockRange: BlockRange | undefined; lastFinalisedBlock: number; + isBackfilling: boolean; }; /** @@ -76,7 +77,15 @@ export class Indexer { }); blockRangeProcessedSuccessfully = false; } finally { - await across.utils.delay(this.config.loopWaitTimeSeconds); + if (!blockRangeResult?.isBackfilling) { + await across.utils.delay(this.config.loopWaitTimeSeconds); + } else { + this.logger.info({ + at: "Indexer::start", + message: `Skip delay ${this.dataHandler.getDataIdentifier()}. Backfill in progress...`, + dataIdentifier: this.dataHandler.getDataIdentifier(), + }); + } } } } @@ -113,6 +122,7 @@ export class Indexer { latestBlockNumber, blockRange: undefined, lastFinalisedBlock: lastFinalisedBlockOnChain, + isBackfilling: false, }; } const fromBlock = lastBlockFinalisedStored @@ -124,11 +134,12 @@ export class Indexer { blockRange.to, lastFinalisedBlockOnChain, ); - + const isBackfilling = latestBlockNumber - blockRange.to > 100_000; return { latestBlockNumber, blockRange, lastFinalisedBlock: lastFinalisedBlockInBlockRange, + isBackfilling, }; } diff --git a/packages/indexer/src/services/SpokePoolIndexerDataHandler.ts b/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts similarity index 94% rename from packages/indexer/src/services/SpokePoolIndexerDataHandler.ts rename to packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts index 75ecf8e1..e17cd59d 100644 --- a/packages/indexer/src/services/SpokePoolIndexerDataHandler.ts +++ b/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts @@ -6,14 +6,14 @@ import { } from "@across-protocol/contracts"; import { entities } from "@repo/indexer-database"; -import { BlockRange } from "../data-indexing/model"; -import { IndexerDataHandler } from "../data-indexing/service/IndexerDataHandler"; +import { BlockRange } from "../model"; +import { IndexerDataHandler } from "./IndexerDataHandler"; -import * as utils from "../utils"; -import { SpokePoolRepository } from "../database/SpokePoolRepository"; -import { SpokePoolProcessor } from "./spokePoolProcessor"; -import { IndexerQueues, IndexerQueuesService } from "../messaging/service"; -import { IntegratorIdMessage } from "../messaging/IntegratorIdWorker"; +import * as utils from "../../utils"; +import { SpokePoolRepository } from "../../database/SpokePoolRepository"; +import { SpokePoolProcessor } from "../../services/spokePoolProcessor"; +import { IndexerQueues, IndexerQueuesService } from "../../messaging/service"; +import { IntegratorIdMessage } from "../../messaging/IntegratorIdWorker"; type FetchEventsResult = { v3FundsDepositedEvents: utils.V3FundsDepositedWithIntegradorId[]; diff --git a/packages/indexer/src/main.ts b/packages/indexer/src/main.ts index 8bae7459..0d9edf53 100644 --- a/packages/indexer/src/main.ts +++ b/packages/indexer/src/main.ts @@ -1,4 +1,3 @@ -import * as services from "./services"; import winston from "winston"; import Redis from "ioredis"; import * as across from "@across-protocol/sdk"; @@ -7,12 +6,6 @@ import { connectToDatabase } from "./database/database.provider"; import * as parseEnv from "./parseEnv"; import { RetryProvidersFactory } from "./web3/RetryProvidersFactory"; import { RedisCache } from "./redis/redisCache"; -import { HubPoolIndexerDataHandler } from "./services/HubPoolIndexerDataHandler"; -import { - getFinalisedBlockBufferDistance, - getLoopWaitTimeSeconds, - Indexer, -} from "./data-indexing/service"; import { HubPoolRepository } from "./database/HubPoolRepository"; import { SpokePoolRepository } from "./database/SpokePoolRepository"; import { @@ -20,11 +13,11 @@ import { HubPoolClientFactory, SpokePoolClientFactory, } from "./utils/contractFactoryUtils"; -import { SpokePoolIndexerDataHandler } from "./services/SpokePoolIndexerDataHandler"; -import { SpokePoolProcessor } from "./services/spokePoolProcessor"; import { BundleRepository } from "./database/BundleRepository"; import { IndexerQueuesService } from "./messaging/service"; import { IntegratorIdWorker } from "./messaging/IntegratorIdWorker"; +import { AcrossIndexerManager } from "./data-indexing/service/AcrossIndexerManager"; +import { BundleServicesManager } from "./services/BundleServicesManager"; async function initializeRedis( config: parseEnv.RedisConfig, @@ -56,15 +49,16 @@ async function initializeRedis( } export async function Main(config: parseEnv.Config, logger: winston.Logger) { - const { redisConfig, postgresConfig, spokePoolChainsEnabled, hubChainId } = - config; + const { redisConfig, postgresConfig, hubChainId } = config; const redis = await initializeRedis(redisConfig, logger); const redisCache = new RedisCache(redis); const postgres = await connectToDatabase(postgresConfig, logger); + // Retry providers factory const retryProvidersFactory = new RetryProvidersFactory( redisCache, logger, ).initializeProviders(); + // SDK clients factories const configStoreClientFactory = new ConfigStoreClientFactory( retryProvidersFactory, logger, @@ -80,16 +74,32 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { logger, { hubPoolClientFactory }, ); - - const bundleProcessor = new services.bundles.Processor({ + const indexerQueuesService = new IndexerQueuesService(redis); + const acrossIndexerManager = new AcrossIndexerManager( + logger, + config, + postgres, + configStoreClientFactory, + hubPoolClientFactory, + spokePoolClientFactory, + retryProvidersFactory, + new HubPoolRepository(postgres, logger), + new SpokePoolRepository(postgres, logger), + redisCache, + indexerQueuesService, + ); + const bundleServicesManager = new BundleServicesManager( + config, logger, redis, postgres, hubPoolClientFactory, spokePoolClientFactory, - }); + configStoreClientFactory, + retryProvidersFactory, + new BundleRepository(postgres, logger, true), + ); - const indexerQueuesService = new IndexerQueuesService(redis); // Set up message workers const integratorIdWorker = new IntegratorIdWorker( redis, @@ -98,62 +108,6 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { retryProvidersFactory, ); - const spokePoolIndexers = spokePoolChainsEnabled.map((chainId) => { - const spokePoolIndexerDataHandler = new SpokePoolIndexerDataHandler( - logger, - chainId, - hubChainId, - retryProvidersFactory.getProviderForChainId(chainId), - configStoreClientFactory, - hubPoolClientFactory, - spokePoolClientFactory, - new SpokePoolRepository(postgres, logger), - new SpokePoolProcessor(postgres, logger, chainId), - indexerQueuesService, - ); - const spokePoolIndexer = new Indexer( - { - loopWaitTimeSeconds: getLoopWaitTimeSeconds(chainId), - finalisedBlockBufferDistance: getFinalisedBlockBufferDistance(chainId), - }, - spokePoolIndexerDataHandler, - retryProvidersFactory.getProviderForChainId(chainId), - redisCache, - logger, - ); - return spokePoolIndexer; - }); - - const bundleBuilderProcessor = - new services.bundleBuilder.BundleBuilderService({ - logger, - redis, - bundleRepository: new BundleRepository(postgres, logger), - providerFactory: retryProvidersFactory, - hubClientFactory: hubPoolClientFactory, - spokePoolClientFactory, - configStoreClientFactory, - hubChainId, - }); - - const hubPoolIndexerDataHandler = new HubPoolIndexerDataHandler( - logger, - hubChainId, - configStoreClientFactory, - hubPoolClientFactory, - new HubPoolRepository(postgres, logger), - ); - const hubPoolIndexer = new Indexer( - { - loopWaitTimeSeconds: getLoopWaitTimeSeconds(hubChainId), - finalisedBlockBufferDistance: getFinalisedBlockBufferDistance(hubChainId), - }, - hubPoolIndexerDataHandler, - retryProvidersFactory.getProviderForChainId(hubChainId), - new RedisCache(redis), - logger, - ); - let exitRequested = false; process.on("SIGINT", () => { if (!exitRequested) { @@ -162,10 +116,8 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { message: "Wait for shutdown, or press Ctrl+C again to forcefully exit.", }); integratorIdWorker.close(); - spokePoolIndexers.map((s) => s.stopGracefully()); - hubPoolIndexer.stopGracefully(); - bundleProcessor.stop(); - bundleBuilderProcessor.stop(); + acrossIndexerManager.stopGracefully(); + bundleServicesManager.stop(); } else { integratorIdWorker.close(); logger.info({ at: "Indexer#Main", message: "Forcing exit..." }); @@ -181,25 +133,20 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { at: "Indexer#Main", }); // start all indexers in parallel, will wait for them to complete, but they all loop independently - const [bundleResults, hubPoolResult, bundleBuilderResult, ...spokeResults] = + const [bundleServicesManagerResults, acrossIndexerManagerResult] = await Promise.allSettled([ - bundleProcessor.start(10), - hubPoolIndexer.start(), - bundleBuilderProcessor.start(10), - ...spokePoolIndexers.map((s) => s.start()), + bundleServicesManager.start(), + acrossIndexerManager.start(), ]); logger.info({ at: "Indexer#Main", message: "Indexer loop completed", results: { - spokeIndexerRunSuccess: [...spokeResults].every( - (r) => r.status === "fulfilled", - ), - bundleProcessorRunSuccess: bundleResults.status === "fulfilled", - hubPoolIndexerRunSuccess: hubPoolResult.status === "fulfilled", - bundleBuilderProcessorRunSuccess: - bundleBuilderResult.status === "fulfilled", + bundleServicesManagerRunSuccess: + bundleServicesManagerResults.status === "fulfilled", + acrossIndexerManagerRunSuccess: + acrossIndexerManagerResult.status === "fulfilled", }, }); await integratorIdWorker.close(); diff --git a/packages/indexer/src/parseEnv.ts b/packages/indexer/src/parseEnv.ts index 80e4dfa6..0ba792bd 100644 --- a/packages/indexer/src/parseEnv.ts +++ b/packages/indexer/src/parseEnv.ts @@ -8,6 +8,9 @@ export type Config = { postgresConfig: DatabaseConfig; hubChainId: number; spokePoolChainsEnabled: number[]; + enableHubPoolIndexer: boolean; + enableBundleEventsProcessor: boolean; + enableBundleBuilder: boolean; }; export type RedisConfig = { host: string; @@ -156,6 +159,15 @@ export function envToConfig(env: Env): Config { `Requires at least one RPC_PROVIDER_URLS_CHAIN_ID`, ); const hubChainId = hubPoolChain; + const enableHubPoolIndexer = env.ENABLE_HUBPOOL_INDEXER + ? env.ENABLE_HUBPOOL_INDEXER === "true" + : true; + const enableBundleEventsProcessor = env.ENABLE_BUNDLE_EVENTS_PROCESSOR + ? env.ENABLE_BUNDLE_EVENTS_PROCESSOR === "true" + : true; + const enableBundleBuilder = env.ENABLE_BUNDLE_BUILDER + ? env.ENABLE_BUNDLE_BUILDER === "true" + : true; spokePoolChainsEnabled.forEach((chainId) => { const providerConfigs = allProviderConfigs.filter( (provider) => provider[1] == chainId, @@ -170,5 +182,8 @@ export function envToConfig(env: Env): Config { postgresConfig, hubChainId, spokePoolChainsEnabled, + enableHubPoolIndexer, + enableBundleEventsProcessor, + enableBundleBuilder, }; } diff --git a/packages/indexer/src/services/BundleServicesManager.ts b/packages/indexer/src/services/BundleServicesManager.ts new file mode 100644 index 00000000..5d8eb7c3 --- /dev/null +++ b/packages/indexer/src/services/BundleServicesManager.ts @@ -0,0 +1,75 @@ +import { Logger } from "winston"; +import { Config } from "../parseEnv"; +import { BundleBuilderService } from "./BundleBuilderService"; +import { BundleEventsProcessor } from "./bundles"; +import { Redis } from "ioredis"; +import { DataSource } from "@repo/indexer-database"; +import { + ConfigStoreClientFactory, + HubPoolClientFactory, + SpokePoolClientFactory, +} from "../utils"; +import { RetryProvidersFactory } from "../web3/RetryProvidersFactory"; +import { BundleRepository } from "../database/BundleRepository"; + +export class BundleServicesManager { + private bundleEventsProcessor?: BundleEventsProcessor; + private bundleBuilderService?: BundleBuilderService; + + public constructor( + private config: Config, + private logger: Logger, + private redis: Redis, + private postgres: DataSource, + private hubPoolClientFactory: HubPoolClientFactory, + private spokePoolClientFactory: SpokePoolClientFactory, + private configStoreClientFactory: ConfigStoreClientFactory, + private retryProvidersFactory: RetryProvidersFactory, + private bundleRepository: BundleRepository, + ) {} + public start() { + return Promise.all([ + this.startBundleEventsProcessor(), + this.startBundleBuilderService(), + ]); + } + + public stop() { + this.bundleEventsProcessor?.stop(); + this.bundleBuilderService?.stop(); + } + + private startBundleEventsProcessor() { + if (!this.config.enableBundleEventsProcessor) { + this.logger.warn("Bundle events processor is disabled"); + return; + } + this.bundleEventsProcessor = new BundleEventsProcessor({ + logger: this.logger, + redis: this.redis, + postgres: this.postgres, + hubPoolClientFactory: this.hubPoolClientFactory, + spokePoolClientFactory: this.spokePoolClientFactory, + }); + return this.bundleEventsProcessor.start(10); + } + + private startBundleBuilderService() { + if (!this.config.enableBundleBuilder) { + this.logger.warn("Bundle builder service is disabled"); + return; + } + + this.bundleBuilderService = new BundleBuilderService({ + logger: this.logger, + redis: this.redis, + bundleRepository: this.bundleRepository, + providerFactory: this.retryProvidersFactory, + hubClientFactory: this.hubPoolClientFactory, + spokePoolClientFactory: this.spokePoolClientFactory, + configStoreClientFactory: this.configStoreClientFactory, + hubChainId: this.config.hubChainId, + }); + return this.bundleBuilderService.start(10); + } +} diff --git a/packages/indexer/src/services/bundles.ts b/packages/indexer/src/services/bundles.ts index 7e76e306..5883a2f7 100644 --- a/packages/indexer/src/services/bundles.ts +++ b/packages/indexer/src/services/bundles.ts @@ -40,7 +40,7 @@ class ConfigurationMalformedError extends Error { } } -export class Processor extends BaseIndexer { +export class BundleEventsProcessor extends BaseIndexer { private bundleRepository: BundleRepository; constructor(private readonly config: BundleConfig) { super(config.logger, "bundle"); diff --git a/packages/indexer/src/services/index.ts b/packages/indexer/src/services/index.ts index 62b7007e..0a111299 100644 --- a/packages/indexer/src/services/index.ts +++ b/packages/indexer/src/services/index.ts @@ -1,3 +1,3 @@ -export * as bundles from "./bundles"; -export * as spokeProcessor from "./spokePoolProcessor"; -export * as bundleBuilder from "./BundleBuilderService"; +export * from "./bundles"; +export * from "./spokePoolProcessor"; +export * from "./BundleBuilderService";