From a3283e98c36a21bd87906e4df7470bba9082968b Mon Sep 17 00:00:00 2001 From: david Date: Tue, 24 Dec 2024 08:41:31 -0500 Subject: [PATCH] refactor price lookups to use workers Signed-off-by: david --- .../src/entities/RelayHashInfo.ts | 3 + .../src/entities/evm/V3FundsDeposited.ts | 1 + .../service/SpokePoolIndexerDataHandler.ts | 17 ++ packages/indexer/src/main.ts | 11 +- packages/indexer/src/messaging/priceWorker.ts | 175 ++++++++++++++++++ packages/indexer/src/messaging/service.ts | 1 + packages/indexer/src/services/index.ts | 1 - .../indexer/src/services/priceProcessor.ts | 87 --------- packages/indexer/src/utils/coingeckoClient.ts | 40 ++-- packages/indexer/src/utils/currencyUtils.ts | 49 +++++ packages/indexer/src/utils/index.ts | 2 + 11 files changed, 280 insertions(+), 107 deletions(-) create mode 100644 packages/indexer/src/messaging/priceWorker.ts delete mode 100644 packages/indexer/src/services/priceProcessor.ts create mode 100644 packages/indexer/src/utils/currencyUtils.ts diff --git a/packages/indexer-database/src/entities/RelayHashInfo.ts b/packages/indexer-database/src/entities/RelayHashInfo.ts index ff4272ad..37247eb4 100644 --- a/packages/indexer-database/src/entities/RelayHashInfo.ts +++ b/packages/indexer-database/src/entities/RelayHashInfo.ts @@ -91,6 +91,9 @@ export class RelayHashInfo { @CreateDateColumn() createdAt: Date; + @Column() + bridgeFeeUsd: string; + @UpdateDateColumn() updatedAt: Date; } diff --git a/packages/indexer-database/src/entities/evm/V3FundsDeposited.ts b/packages/indexer-database/src/entities/evm/V3FundsDeposited.ts index 0718b9cf..9c29b040 100644 --- a/packages/indexer-database/src/entities/evm/V3FundsDeposited.ts +++ b/packages/indexer-database/src/entities/evm/V3FundsDeposited.ts @@ -90,6 +90,7 @@ export class V3FundsDeposited { @CreateDateColumn() createdAt: Date; + // this has been converted from block time seconds @Column({ nullable: true }) blockTimestamp?: Date; } diff --git a/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts b/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts index 11d333eb..6260cc84 100644 --- a/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts +++ b/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts @@ -20,6 +20,8 @@ import { SpokePoolRepository } from "../../database/SpokePoolRepository"; import { SpokePoolProcessor } from "../../services/spokePoolProcessor"; import { IndexerQueues, IndexerQueuesService } from "../../messaging/service"; import { IntegratorIdMessage } from "../../messaging/IntegratorIdWorker"; +import { PriceMessage } from "../../messaging/priceWorker"; +import { FillWithBlock } from "@across-protocol/sdk/dist/cjs/interfaces/SpokePool"; export type FetchEventsResult = { v3FundsDepositedEvents: utils.V3FundsDepositedWithIntegradorId[]; @@ -114,6 +116,8 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler { ); await this.updateNewDepositsWithIntegratorId(newInsertedDeposits); await this.spokePoolProcessor.process(storedEvents); + // publish new relays to workers to fill in prices + await this.publishNewRelays(events.filledV3RelayEvents); } private async getBlockTime(blockNumber: number): Promise { const block = await this.provider.getBlock(blockNumber); @@ -275,6 +279,19 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler { }); } + private async publishNewRelays(relays: FillWithBlock[]) { + const messages: PriceMessage[] = relays.map((relay) => { + return { + depositId: relay.depositId, + originChainId: relay.originChainId, + }; + }); + await this.indexerQueuesService.publishMessagesBulk( + IndexerQueues.PriceQuery, + IndexerQueues.PriceQuery, // Use queue name as job name + messages, + ); + } private async publishIntegratorIdMessages( deposits: entities.V3FundsDeposited[], ) { diff --git a/packages/indexer/src/main.ts b/packages/indexer/src/main.ts index 2c8b3ddf..6cc443e9 100644 --- a/packages/indexer/src/main.ts +++ b/packages/indexer/src/main.ts @@ -17,9 +17,9 @@ import { import { BundleRepository } from "./database/BundleRepository"; import { IndexerQueuesService } from "./messaging/service"; import { IntegratorIdWorker } from "./messaging/IntegratorIdWorker"; +import { PriceWorker } from "./messaging/priceWorker"; import { AcrossIndexerManager } from "./data-indexing/service/AcrossIndexerManager"; import { BundleServicesManager } from "./services/BundleServicesManager"; -import { CoingeckoPriceProcessor } from "./services"; async function initializeRedis( config: parseEnv.RedisConfig, @@ -56,10 +56,6 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { const redis = await initializeRedis(redisConfig, logger); const redisCache = new RedisCache(redis); const postgres = await connectToDatabase(postgresConfig, logger); - const priceProcessor = new CoingeckoPriceProcessor( - { symbols: config.coingeckoSymbols }, - { logger, postgres }, - ); // Call write to kick off webhook calls const { write } = await WebhookFactory(config.webhookConfig, { postgres, @@ -121,6 +117,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { logger, retryProvidersFactory, ); + const priceWorker = new PriceWorker(redis, postgres, logger); let exitRequested = false; process.on("SIGINT", () => { @@ -130,9 +127,9 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { message: "Wait for shutdown, or press Ctrl+C again to forcefully exit.", }); integratorIdWorker.close(); + priceWorker.close(); acrossIndexerManager.stopGracefully(); bundleServicesManager.stop(); - priceProcessor.stop(); } else { integratorIdWorker.close(); logger.info({ at: "Indexer#Main", message: "Forcing exit..." }); @@ -152,8 +149,6 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { await Promise.allSettled([ bundleServicesManager.start(), acrossIndexerManager.start(), - // run prices call to check every minute or so. it will only cache once a day - priceProcessor.start(60), ]); logger.info({ diff --git a/packages/indexer/src/messaging/priceWorker.ts b/packages/indexer/src/messaging/priceWorker.ts new file mode 100644 index 00000000..9b2aba6f --- /dev/null +++ b/packages/indexer/src/messaging/priceWorker.ts @@ -0,0 +1,175 @@ +import Redis from "ioredis"; +import winston from "winston"; +import { Job, Worker } from "bullmq"; +import { DataSource, entities } from "@repo/indexer-database"; +import { IndexerQueues } from "./service"; +import { + getIntegratorId, + yesterday, + CoingeckoClient, + findTokenByAddress, +} from "../utils"; +import { RetryProvidersFactory } from "../web3/RetryProvidersFactory"; + +export type PriceMessage = { + depositId: number; + originChainId: number; +}; + +/** + * This worker listens to the `PriceQuery` queue and processes each job by: + * - Retrieving the deposit and relay hash information from the database using the deposit ID and origin chain ID. + * - Verifying the existence of the relay hash info and deposit records. + * - Determining the block time from the relay hash info and calculating the price time as the previous day's timestamp. + * - Identifying the base currency using the output token and destination chain ID. + * - Checking if a historic price for the base currency and quote currency (USD) already exists in the database. + * - If not, fetching the historic price from Coingecko and inserting it into the database. + * - Logging errors and information at various stages of the process. + */ +export class PriceWorker { + public worker: Worker; + private coingeckoClient: CoingeckoClient; + + constructor( + private redis: Redis, + private postgres: DataSource, + private logger: winston.Logger, + ) { + this.coingeckoClient = new CoingeckoClient(); + this.setWorker(); + } + + public setWorker() { + this.worker = new Worker( + IndexerQueues.PriceQuery, + async (job: Job) => { + try { + await this.run(job.data); + } catch (error) { + this.logger.error({ + at: "PriceWorker", + message: `Error getting price for deposit ${job.data.depositId} on chain ${job.data.originChainId}`, + error, + }); + throw error; + } + }, + { connection: this.redis, concurrency: 10 }, + ); + } + private async run(params: PriceMessage) { + const { depositId, originChainId } = params; + const relayHashInfoRepository = this.postgres.getRepository( + entities.RelayHashInfo, + ); + const depositRepository = this.postgres.getRepository( + entities.V3FundsDeposited, + ); + const historicPriceRepository = this.postgres.getRepository( + entities.HistoricPrice, + ); + + const relayHashInfo = await relayHashInfoRepository.findOne({ + where: { depositId, originChainId }, + }); + const deposit = await depositRepository.findOne({ + where: { depositId, originChainId }, + }); + + if (!relayHashInfo || !deposit) { + this.logger.error({ + at: "PriceWorker", + message: "Relay hash info not found", + ...params, + }); + return; + } + + const blockTime = relayHashInfo?.depositEvent?.blockTimestamp; + if (!blockTime) { + this.logger.error({ + at: "PriceWorker", + message: "Deposit block time not found for relay hash info", + ...params, + }); + return; + } + const priceTime = yesterday(blockTime); + const quoteCurrency = "usd"; + const baseTokenInfo = findTokenByAddress( + relayHashInfo.fillEvent.outputToken, + relayHashInfo.destinationChainId, + ); + const baseCurrency = baseTokenInfo?.coingeckoId; + let price: undefined | number; + + if (!baseCurrency) { + this.logger.error({ + at: "PriceWorker", + message: "Unable to find base currency to quote", + ...params, + outputToken: relayHashInfo.fillEvent.outputToken, + destinationChainId: relayHashInfo.destinationChainId, + }); + return; + } + const existingPrice = await historicPriceRepository.findOne({ + where: { + date: priceTime, + baseCurrency, + quoteCurrency, + }, + }); + // fetch price if one hasnt been saved + if (!existingPrice) { + try { + const historicPriceData = + await this.coingeckoClient.getHistoricDailyPrice( + priceTime.getTime(), + baseCurrency, + ); + price = historicPriceData.market_data?.current_price[quoteCurrency]; + // wasnt able to get a price + if (price === undefined) { + this.logger.error( + `Unable to find ${quoteCurrency} for ${baseCurrency} at time ${priceTime}`, + ); + return; + } + await historicPriceRepository.insert({ + date: priceTime, + baseCurrency, + quoteCurrency, + price: price.toString(), + }); + this.logger.info({ + at: "PriceWorker", + ...params, + message: `Fetched and inserted historic price for ${baseCurrency} on ${priceTime}`, + }); + } catch (error) { + this.logger.error({ + at: "PriceWorker", + ...params, + message: `Failed to fetch or insert historic price for ${baseCurrency} on ${priceTime}`, + error: (error as Error).message, + }); + } + } else { + price = Number(existingPrice.price); + } + + if (price === undefined) { + this.logger.error({ + at: "PriceWorker", + ...params, + message: "Failed to get a valid price from cache or coingecko", + }); + return; + } + // TODO: Compute bridge fee + } + public async close() { + return this.worker.close(); + } +} diff --git a/packages/indexer/src/messaging/service.ts b/packages/indexer/src/messaging/service.ts index 9f23abfe..a32fc272 100644 --- a/packages/indexer/src/messaging/service.ts +++ b/packages/indexer/src/messaging/service.ts @@ -3,6 +3,7 @@ import { Queue, JobsOptions, BulkJobOptions } from "bullmq"; export enum IndexerQueues { IntegratorId = "IntegratorId", + PriceQuery = "PriceQuery", } export class IndexerQueuesService { diff --git a/packages/indexer/src/services/index.ts b/packages/indexer/src/services/index.ts index 553ec4c2..0a111299 100644 --- a/packages/indexer/src/services/index.ts +++ b/packages/indexer/src/services/index.ts @@ -1,4 +1,3 @@ export * from "./bundles"; export * from "./spokePoolProcessor"; export * from "./BundleBuilderService"; -export * from "./priceProcessor"; diff --git a/packages/indexer/src/services/priceProcessor.ts b/packages/indexer/src/services/priceProcessor.ts deleted file mode 100644 index 8e392d58..00000000 --- a/packages/indexer/src/services/priceProcessor.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { CoingeckoSymbol, CoingeckoClient } from "../utils/coingeckoClient"; -import { Logger } from "winston"; -import { DataSource, entities } from "@repo/indexer-database"; -import { BaseIndexer } from "../generics"; -import { DateTime } from "luxon"; - -type Config = { - symbols: CoingeckoSymbol[]; - // not used currently - quoteCurrency?: string; -}; - -type Deps = { - logger: Logger; - postgres: DataSource; -}; - -export class CoingeckoPriceProcessor extends BaseIndexer { - private coingeckoClient: CoingeckoClient; - constructor( - private config: Config, - private deps: Deps, - ) { - super(deps.logger, "CoingeckoPriceProcessor"); - this.coingeckoClient = new CoingeckoClient(); - } - - protected async indexerLogic(): Promise { - const now = Date.now(); - // we are always checking if we have the price for previous day, not the current day, so we go back - // to right before midnight of last day. - const previousDay = DateTime.fromMillis(now) - .minus({ days: 1 }) - .set({ hour: 23, minute: 59, second: 0, millisecond: 0 }); - const dbFormattedDate = previousDay.toJSDate(); - const quoteCurrency = this.config.quoteCurrency ?? "usd"; - const historicPriceRepository = this.deps.postgres.getRepository( - entities.HistoricPrice, - ); - - for (const symbol of this.config.symbols) { - const existingPrice = await historicPriceRepository.findOne({ - where: { - date: dbFormattedDate, - baseCurrency: symbol, - quoteCurrency, - }, - }); - // do nothing, we have a price for this day - if (existingPrice) return; - - try { - const historicPriceData = - await this.coingeckoClient.getHistoricDailyPrice(now, symbol); - const price = - historicPriceData.market_data?.current_price[quoteCurrency]; - // wasnt able to get a price - if (price === undefined) { - this.deps.logger.error( - `Unable to find ${quoteCurrency} for ${symbol}`, - ); - return; - } - await historicPriceRepository.insert({ - date: dbFormattedDate, - baseCurrency: symbol, - quoteCurrency, - price: price.toString(), - }); - this.logger.info({ - at: "CoingeckoPriceProcessor#indexerLogic", - message: `Inserted historic price for ${symbol} on ${dbFormattedDate}`, - }); - } catch (error) { - this.logger.error({ - at: "CoingeckoPriceProcessor#indexerLogic", - message: `Failed to fetch or insert historic price for ${symbol} on ${dbFormattedDate}`, - error: (error as Error).message, - }); - } - } - } - - protected async initialize(): Promise { - // Initialization logic if needed - } -} diff --git a/packages/indexer/src/utils/coingeckoClient.ts b/packages/indexer/src/utils/coingeckoClient.ts index 9c292fbc..7233b02d 100644 --- a/packages/indexer/src/utils/coingeckoClient.ts +++ b/packages/indexer/src/utils/coingeckoClient.ts @@ -1,24 +1,31 @@ import * as s from "superstruct"; import { DateTime } from "luxon"; +// tken from scraper and adapted from https://github.com/across-protocol/constants/blob/master/src/tokens.ts export const CoingeckoSymbol = s.enums([ - "ethereum", - "matic-network", - "wrapped-bitcoin", - "usd-coin", - "uma", + "across-protocol", + "aleph-zero", + "arbitrum", "badger-dao", - "weth", + "balancer", "boba-network", + "bridged-usd-coin-base", "dai", - "balancer", - "tether", - "across-protocol", + "ethereum", + "gho", "havven", - "pooltogether", - "bridged-usd-coin-base", + "lisk", + "matic-network", "optimism", + "pooltogether", + "tether", + "uma", + "usd-coin", "usd-coin-ethereum-bridged", + "usdb", + "weth", + "wmatic", + "wrapped-bitcoin", ]); export type CoingeckoSymbol = s.Infer; export const CGHistoricPriceBase = s.object({ @@ -31,8 +38,19 @@ export const CGHistoricPriceBase = s.object({ }), ), }); +export const isCoingeckoSymbol = (symbol: string) => + s.is(symbol, CoingeckoSymbol); + export type CGHistoricPriceBase = s.Infer; +// Convert now to a consistent price timestamp yesterday for lookup purposes +export function yesterday(now: Date) { + return DateTime.fromJSDate(now) + .minus({ days: 1 }) + .set({ hour: 23, minute: 59, second: 0, millisecond: 0 }) + .toJSDate(); +} + export class CoingeckoClient { constructor(private baseUrl: string = "https://api.coingecko.com/api/v3") {} diff --git a/packages/indexer/src/utils/currencyUtils.ts b/packages/indexer/src/utils/currencyUtils.ts new file mode 100644 index 00000000..d6bdeeee --- /dev/null +++ b/packages/indexer/src/utils/currencyUtils.ts @@ -0,0 +1,49 @@ +import * as constants from "@across-protocol/constants"; +import { isCoingeckoSymbol, CoingeckoSymbol } from "./coingeckoClient"; + +export type TokenInfo = { + name: string; + symbol: string; + decimals: number; + addresses: Record; + coingeckoId: string; +}; +export type Token = { + name: string; + symbol: string; + decimals: number; + address: string; + chainId: number; + coingeckoId: CoingeckoSymbol; +}; +// mapping the token constants to something easier to search +export const tokenSymbolsMap = [ + ...Object.values(constants.TOKEN_SYMBOLS_MAP), +] as TokenInfo[]; +// map to just a flat list +export const tokensList = tokenSymbolsMap.reduce((result, token) => { + Object.entries(token.addresses).forEach(([chainId, address]) => { + if (!isCoingeckoSymbol(token.coingeckoId)) return result; + result.push({ + name: token.name, + symbol: token.symbol, + decimals: token.decimals, + chainId: Number(chainId), + address: address, + coingeckoId: token.coingeckoId, + }); + }); + return result; +}, [] as Token[]); + +// given an address and chain id, return the token data +export function findTokenByAddress( + address: string, + chainId: number, +): Token | undefined { + return tokensList.find( + (token) => + token.address.toLowerCase() === address.toLowerCase() && + token.chainId === chainId, + ); +} diff --git a/packages/indexer/src/utils/index.ts b/packages/indexer/src/utils/index.ts index 1faf5dc2..0a4f36a7 100644 --- a/packages/indexer/src/utils/index.ts +++ b/packages/indexer/src/utils/index.ts @@ -2,3 +2,5 @@ export * from "./contractUtils"; export * from "./contractFactoryUtils"; export * from "./bundleBuilderUtils"; export * from "./spokePoolUtils"; +export * from "./coingeckoClient"; +export * from "./currencyUtils";