Skip to content

Commit

Permalink
improve(factories): contract client factories (#56)
Browse files Browse the repository at this point in the history
Signed-off-by: james-a-morris <[email protected]>
  • Loading branch information
james-a-morris authored Oct 7, 2024
1 parent 5310e2c commit 92c966b
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 174 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ npm-debug.log*
yarn-debug.log*
yarn-error.log*
*.log
*.rdb

# Misc
.DS_Store
Expand Down
63 changes: 45 additions & 18 deletions packages/indexer/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@ import winston from "winston";
import Redis from "ioredis";
import * as across from "@across-protocol/sdk";
import * as acrossConstants from "@across-protocol/constants";
import { providers } from "ethers";

import { connectToDatabase } from "./database/database.provider";
import * as parseEnv from "./parseEnv";
import { RetryProvidersFactory } from "./web3/RetryProvidersFactory";
import { RedisCache } from "./redis/redisCache";
import { DatabaseConfig } from "@repo/indexer-database";
import { HubPoolIndexerDataHandler } from "./services/HubPoolIndexerDataHandler";
import * as utils from "./utils";
import {
getFinalisedBlockBufferDistance,
getLoopWaitTimeSeconds,
Indexer,
} from "./data-indexing/service";
import { HubPoolRepository } from "./database/HubPoolRepository";
import {
ConfigStoreClientFactory,
HubPoolClientFactory,
SpokePoolClientFactory,
} from "./utils";

async function initializeRedis(
config: parseEnv.RedisConfig,
Expand All @@ -41,32 +43,57 @@ async function initializeRedis(
}

export async function Main(config: parseEnv.Config, logger: winston.Logger) {
const { redisConfig, postgresConfig, spokeConfigs } = config;
const { redisConfig, postgresConfig, spokePoolChainsEnabled, hubChainId } =
config;
const redis = await initializeRedis(redisConfig, logger);
const redisCache = new RedisCache(redis);
const retryProvidersFactory = new RetryProvidersFactory(redisCache, logger);
retryProvidersFactory.initializeProviders();
const postgres = await connectToDatabase(postgresConfig, logger);
const retryProvidersFactory = new RetryProvidersFactory(
redisCache,
logger,
).initializeProviders();

const configStoreClientFactory = new ConfigStoreClientFactory(
retryProvidersFactory,
logger,
undefined,
);
const hubPoolClientFactory = new HubPoolClientFactory(
retryProvidersFactory,
logger,
{ configStoreClientFactory },
);
const spokePoolClientFactory = new SpokePoolClientFactory(
retryProvidersFactory,
logger,
{ hubPoolClientFactory },
);

const bundleProcessor = new services.bundles.Processor({
logger,
redis,
postgres,
});
const spokePoolIndexers = spokeConfigs.map((spokeConfig) => {
return new services.spokePoolIndexer.Indexer({
logger,
redis,
postgres,
...spokeConfig,
});
});
const spokePoolIndexers = spokePoolChainsEnabled.map(
(spokePoolChainId) =>
new services.spokePoolIndexer.Indexer({
logger,
redis,
postgres,
spokePoolChainId,
configStoreFactory: configStoreClientFactory,
hubPoolFactory: hubPoolClientFactory,
spokePoolClientFactory,
hubChainId,
retryProviderFactory: retryProvidersFactory,
}),
);

const hubPoolIndexerDataHandler = new HubPoolIndexerDataHandler(
logger,
acrossConstants.CHAIN_IDs.MAINNET,
retryProvidersFactory.getProviderForChainId(
acrossConstants.CHAIN_IDs.MAINNET,
),
hubChainId,
configStoreClientFactory,
hubPoolClientFactory,
new HubPoolRepository(postgres, logger),
);
const hubPoolIndexer = new Indexer(
Expand Down
80 changes: 7 additions & 73 deletions packages/indexer/src/parseEnv.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
import assert from "assert";
import * as s from "superstruct";
import { DatabaseConfig } from "@repo/indexer-database";
import * as services from "./services";
import { DEFAULT_NO_TTL_DISTANCE } from "./web3/constants";
import { RetryProviderConfig } from "./utils";

export type Config = {
redisConfig: RedisConfig;
postgresConfig: DatabaseConfig;
spokeConfigs: Omit<
services.spokePoolIndexer.Config,
"logger" | "redis" | "postgres"
>[];
hubChainId: number;
spokePoolChainsEnabled: number[];
};
export type RedisConfig = {
host: string;
Expand Down Expand Up @@ -81,38 +77,6 @@ function parseProviderConfigs(env: Env): ProviderConfig[] {
return results;
}

function parseRetryProviderConfig(
env: Record<string, string | undefined>,
): Omit<RetryProviderConfig, "providerConfigs" | "chainId"> {
assert(env.PROVIDER_CACHE_NAMESPACE, "requires PROVIDER_CACHE_NAMESPACE");
assert(env.MAX_CONCURRENCY, "requires MAX_CONCURRENCY");
assert(env.PCT_RPC_CALLS_LOGGED, "requires PCT_RPC_CALLS_LOGGED");
assert(
env.STANDARD_TTL_BLOCK_DISTANCE,
"requires STANDARD_TTL_BLOCK_DISTANCE",
);
assert(env.NO_TTL_BLOCK_DISTANCE, "requires NO_TTL_BLOCK_DISTANCE");
assert(env.PROVIDER_CACHE_TTL, "requires PROVIDER_CACHE_TTL");
assert(env.NODE_QUORUM_THRESHOLD, "requires NODE_QUORUM_THRESHOLD");
assert(env.RETRIES, "requires RETRIES");
assert(env.DELAY, "requires DELAY");

return {
providerCacheNamespace: env.PROVIDER_CACHE_NAMESPACE,
maxConcurrency: s.create(env.MAX_CONCURRENCY, stringToInt),
pctRpcCallsLogged: s.create(env.PCT_RPC_CALLS_LOGGED, stringToInt),
standardTtlBlockDistance: s.create(
env.STANDARD_TTL_BLOCK_DISTANCE,
stringToInt,
),
noTtlBlockDistance: s.create(env.NO_TTL_BLOCK_DISTANCE, stringToInt),
providerCacheTtl: s.create(env.PROVIDER_CACHE_TTL, stringToInt),
nodeQuorumThreshold: s.create(env.NODE_QUORUM_THRESHOLD, stringToInt),
retries: s.create(env.RETRIES, stringToInt),
delay: s.create(env.DELAY, stringToInt),
};
}

export function parseProvidersUrls() {
const results: Map<number, string[]> = new Map();
for (const [key, value] of Object.entries(process.env)) {
Expand Down Expand Up @@ -180,58 +144,28 @@ export function envToConfig(env: Env): Config {
const redisConfig = parseRedisConfig(env);
const postgresConfig = parsePostgresConfig(env);
const allProviderConfigs = parseProviderConfigs(env);
const retryProviderConfig = parseRetryProviderConfig(env);
const hubPoolChain = parseNumber(env.HUBPOOL_CHAIN);
const spokePoolChainsEnabled = parseArray(env.SPOKEPOOL_CHAINS_ENABLED).map(
parseNumber,
);
const providerConfigs = allProviderConfigs.filter(
(provider) => provider[1] === hubPoolChain,
);
assert(
allProviderConfigs.length > 0,
`Requires at least one RPC_PROVIDER_URLS_CHAINID`,
`Requires at least one RPC_PROVIDER_URLS_CHAIN_ID`,
);

const hubConfig = {
retryProviderConfig: {
...retryProviderConfig,
chainId: hubPoolChain,
providerConfigs,
},
hubConfig: {
chainId: hubPoolChain,
maxBlockLookBack: 10000,
},
redisKeyPrefix: `hubPoolIndexer:${hubPoolChain}`,
};

const spokeConfigs = spokePoolChainsEnabled.map((chainId) => {
const hubChainId = hubPoolChain;
spokePoolChainsEnabled.forEach((chainId) => {
const providerConfigs = allProviderConfigs.filter(
(provider) => provider[1] == chainId,
);
assert(
providerConfigs.length > 0,
`SPOKEPOOL_CHAINS_ENABLED=${chainId} but did not find any corresponding RPC_PROVIDER_URLS_${chainId}`,
);
return {
retryProviderConfig: {
...retryProviderConfig,
chainId,
providerConfigs,
},
spokeConfig: {
chainId,
maxBlockLookBack: 10000,
},
hubConfig: hubConfig.hubConfig,
redisKeyPrefix: `spokePoolIndexer:${chainId}`,
};
});

return {
redisConfig,
postgresConfig,
spokeConfigs,
hubChainId,
spokePoolChainsEnabled,
};
}
26 changes: 12 additions & 14 deletions packages/indexer/src/services/HubPoolIndexerDataHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { IndexerDataHandler } from "../data-indexing/service/IndexerDataHandler"
import { BlockRange } from "../data-indexing/model";
import { HubPoolRepository } from "../database/HubPoolRepository";
import { getMaxBlockLookBack } from "../web3/constants";
import { RetryProvidersFactory } from "../web3/RetryProvidersFactory";

type FetchEventsResult = {
proposedRootBundleEvents: (across.interfaces.ProposedRootBundle & {
Expand All @@ -30,7 +31,8 @@ export class HubPoolIndexerDataHandler implements IndexerDataHandler {
constructor(
private logger: Logger,
private chainId: number,
private provider: across.providers.RetryProvider,
private configStoreFactory: utils.ConfigStoreClientFactory,
private hubPoolFactory: utils.HubPoolClientFactory,
private hubPoolRepository: HubPoolRepository,
) {
this.isInitialized = false;
Expand Down Expand Up @@ -73,19 +75,15 @@ export class HubPoolIndexerDataHandler implements IndexerDataHandler {
}

private async initialize() {
this.configStoreClient = await utils.getConfigStoreClient({
logger: this.logger,
provider: this.provider,
maxBlockLookBack: getMaxBlockLookBack(this.chainId),
chainId: this.chainId,
});
this.hubPoolClient = await utils.getHubPoolClient({
configStoreClient: this.configStoreClient,
provider: this.provider,
logger: this.logger,
maxBlockLookBack: getMaxBlockLookBack(this.chainId),
chainId: this.chainId,
});
this.configStoreClient = this.configStoreFactory.get(this.chainId);
this.hubPoolClient = this.hubPoolFactory.get(
this.chainId,
undefined,
undefined,
{
configStoreClient: this.configStoreClient,
},
);
}

private async fetchEventsByRange(
Expand Down
Loading

0 comments on commit 92c966b

Please sign in to comment.