Skip to content

Commit

Permalink
fix: refactor indexers and processor classes and add env vars (#94)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexandru Matei <[email protected]>
  • Loading branch information
amateima and alexandrumatei36 authored Nov 4, 2024
1 parent b10bcac commit 99e5813
Show file tree
Hide file tree
Showing 10 changed files with 279 additions and 105 deletions.
5 changes: 4 additions & 1 deletion packages/indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ RPC_PROVIDER_URLS_10=https://optimism-mainnet.infura.io/v3/xxx
RPC_PROVIDER_URLS_137=https://polygon-mainnet.infura.io/v3/xxx
HUBPOOL_CHAIN=1
SPOKEPOOL_CHAINS_ENABLED=1,2
PROVIDER_CACHE_TTL=3600
// optional
Expand All @@ -47,4 +46,8 @@ PROVIDER_CACHE_TTL=100000
NODE_QUORUM=1
NODE_RETRIES=2
NODE_RETRY_DELAY=1000
ENABLE_HUBPOOL_INDEXER=true
ENABLE_BUNDLE_EVENTS_PROCESSOR=true
ENABLE_BUNDLE_BUILDER=true
```
123 changes: 123 additions & 0 deletions packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import { Logger } from "winston";

import { DataSource } from "@repo/indexer-database";

import { Config } from "../../parseEnv";
import { HubPoolRepository } from "../../database/HubPoolRepository";
import { RedisCache } from "../../redis/redisCache";
import { RetryProvidersFactory } from "../../web3/RetryProvidersFactory";
import { SpokePoolRepository } from "../../database/SpokePoolRepository";
import { IndexerQueuesService } from "../../messaging/service";
import { SpokePoolProcessor } from "../../services/spokePoolProcessor";

import { HubPoolIndexerDataHandler } from "./HubPoolIndexerDataHandler";
import { SpokePoolIndexerDataHandler } from "./SpokePoolIndexerDataHandler";
import {
ConfigStoreClientFactory,
HubPoolClientFactory,
SpokePoolClientFactory,
} from "../../utils";
import { Indexer } from "./Indexer";
import {
getFinalisedBlockBufferDistance,
getLoopWaitTimeSeconds,
} from "./constants";

export class AcrossIndexerManager {
private hubPoolIndexer?: Indexer;
private spokePoolIndexers: Indexer[] = [];

constructor(
private logger: Logger,
private config: Config,
private postgres: DataSource,
private configStoreClientFactory: ConfigStoreClientFactory,
private hubPoolClientFactory: HubPoolClientFactory,
private spokePoolClientFactory: SpokePoolClientFactory,
private retryProvidersFactory: RetryProvidersFactory,
private hubPoolRepository: HubPoolRepository,
private spokePoolRepository: SpokePoolRepository,
private redisCache: RedisCache,
private indexerQueuesService: IndexerQueuesService,
) {}

public async start() {
return Promise.all([
this.startHubPoolIndexer(),
this.startSpokePoolIndexers(),
]);
}

public async stopGracefully() {
this.hubPoolIndexer?.stopGracefully();
this.spokePoolIndexers.map((indexer) => indexer.stopGracefully());
}

private startHubPoolIndexer() {
if (!this.config.enableHubPoolIndexer) {
this.logger.warn("Hub pool indexer is disabled");
return;
}
const hubPoolIndexerDataHandler = new HubPoolIndexerDataHandler(
this.logger,
this.config.hubChainId,
this.configStoreClientFactory,
this.hubPoolClientFactory,
this.hubPoolRepository,
);
this.hubPoolIndexer = new Indexer(
{
loopWaitTimeSeconds: getLoopWaitTimeSeconds(this.config.hubChainId),
finalisedBlockBufferDistance: getFinalisedBlockBufferDistance(
this.config.hubChainId,
),
},
hubPoolIndexerDataHandler,
this.retryProvidersFactory.getProviderForChainId(this.config.hubChainId),
this.redisCache,
this.logger,
);

return this.hubPoolIndexer.start();
}

private async startSpokePoolIndexers() {
const spokePoolIndexers = this.config.spokePoolChainsEnabled.map(
(chainId) => {
const spokePoolIndexerDataHandler = new SpokePoolIndexerDataHandler(
this.logger,
chainId,
this.config.hubChainId,
this.retryProvidersFactory.getProviderForChainId(chainId),
this.configStoreClientFactory,
this.hubPoolClientFactory,
this.spokePoolClientFactory,
this.spokePoolRepository,
new SpokePoolProcessor(this.postgres, this.logger, chainId),
this.indexerQueuesService,
);
const spokePoolIndexer = new Indexer(
{
loopWaitTimeSeconds: getLoopWaitTimeSeconds(chainId),
finalisedBlockBufferDistance:
getFinalisedBlockBufferDistance(chainId),
},
spokePoolIndexerDataHandler,
this.retryProvidersFactory.getProviderForChainId(chainId),
this.redisCache,
this.logger,
);
return spokePoolIndexer;
},
);

if (this.spokePoolIndexers.length === 0) {
this.logger.warn("No spoke pool indexers to start");
return;
}
this.spokePoolIndexers = spokePoolIndexers;
return Promise.all(
this.spokePoolIndexers.map((indexer) => indexer.start()),
);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { Logger } from "winston";
import * as across from "@across-protocol/sdk";

import * as utils from "../utils";
import * as utils from "../../utils";
import {
getDeployedBlockNumber,
getDeployedAddress,
} from "@across-protocol/contracts";
import { IndexerDataHandler } from "../data-indexing/service/IndexerDataHandler";
import { BlockRange } from "../data-indexing/model";
import { HubPoolRepository } from "../database/HubPoolRepository";
import { IndexerDataHandler } from "./IndexerDataHandler";
import { BlockRange } from "../model";
import { HubPoolRepository } from "../../database/HubPoolRepository";

type FetchEventsResult = {
proposedRootBundleEvents: (across.interfaces.ProposedRootBundle & {
Expand Down
15 changes: 13 additions & 2 deletions packages/indexer/src/data-indexing/service/Indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type BlockRangeResult = {
latestBlockNumber: number;
blockRange: BlockRange | undefined;
lastFinalisedBlock: number;
isBackfilling: boolean;
};

/**
Expand Down Expand Up @@ -76,7 +77,15 @@ export class Indexer {
});
blockRangeProcessedSuccessfully = false;
} finally {
await across.utils.delay(this.config.loopWaitTimeSeconds);
if (!blockRangeResult?.isBackfilling) {
await across.utils.delay(this.config.loopWaitTimeSeconds);
} else {
this.logger.info({
at: "Indexer::start",
message: `Skip delay ${this.dataHandler.getDataIdentifier()}. Backfill in progress...`,
dataIdentifier: this.dataHandler.getDataIdentifier(),
});
}
}
}
}
Expand Down Expand Up @@ -113,6 +122,7 @@ export class Indexer {
latestBlockNumber,
blockRange: undefined,
lastFinalisedBlock: lastFinalisedBlockOnChain,
isBackfilling: false,
};
}
const fromBlock = lastBlockFinalisedStored
Expand All @@ -124,11 +134,12 @@ export class Indexer {
blockRange.to,
lastFinalisedBlockOnChain,
);

const isBackfilling = latestBlockNumber - blockRange.to > 100_000;
return {
latestBlockNumber,
blockRange,
lastFinalisedBlock: lastFinalisedBlockInBlockRange,
isBackfilling,
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import {
} from "@across-protocol/contracts";
import { entities } from "@repo/indexer-database";

import { BlockRange } from "../data-indexing/model";
import { IndexerDataHandler } from "../data-indexing/service/IndexerDataHandler";
import { BlockRange } from "../model";
import { IndexerDataHandler } from "./IndexerDataHandler";

import * as utils from "../utils";
import { SpokePoolRepository } from "../database/SpokePoolRepository";
import { SpokePoolProcessor } from "./spokePoolProcessor";
import { IndexerQueues, IndexerQueuesService } from "../messaging/service";
import { IntegratorIdMessage } from "../messaging/IntegratorIdWorker";
import * as utils from "../../utils";
import { SpokePoolRepository } from "../../database/SpokePoolRepository";
import { SpokePoolProcessor } from "../../services/spokePoolProcessor";
import { IndexerQueues, IndexerQueuesService } from "../../messaging/service";
import { IntegratorIdMessage } from "../../messaging/IntegratorIdWorker";

type FetchEventsResult = {
v3FundsDepositedEvents: utils.V3FundsDepositedWithIntegradorId[];
Expand Down
Loading

0 comments on commit 99e5813

Please sign in to comment.