From ea75a210858da6becf0ad3c00ba079d22ff7dc1d Mon Sep 17 00:00:00 2001 From: Adam Azad Date: Thu, 12 Dec 2024 17:47:34 +0000 Subject: [PATCH] refactor(indexer): streamline socket communication and server initialization - Updated `addSocketComms` to accept a single `mongooseModels` parameter, simplifying the function signature. - Enhanced the `startIoServers` function to initialize both HTTP and WebSocket servers with improved parameter handling. - Refactored the `startIndexing` function to utilize the new `mongooseModels` structure, promoting cleaner code and better maintainability. - Improved logging for server availability, ensuring clearer output for developers. These changes enhance the modularity and readability of the codebase, laying the groundwork for future enhancements. --- .../src/addSocketComms.ts | 15 +- apps/gnosis-pay-rewards-indexer/src/core.ts | 147 +++++++++--------- apps/gnosis-pay-rewards-indexer/src/index.ts | 10 +- 3 files changed, 94 insertions(+), 78 deletions(-) diff --git a/apps/gnosis-pay-rewards-indexer/src/addSocketComms.ts b/apps/gnosis-pay-rewards-indexer/src/addSocketComms.ts index 6cb02af..cda26bb 100644 --- a/apps/gnosis-pay-rewards-indexer/src/addSocketComms.ts +++ b/apps/gnosis-pay-rewards-indexer/src/addSocketComms.ts @@ -6,16 +6,17 @@ import { WeekMetricsSnapshotModelType, } from '@karpatkey/gnosis-pay-rewards-sdk/mongoose'; import { buildSocketIoServer } from './server.js'; +import { MongooseConfiguredModels } from './process/types.js'; export function addSocketComms({ socketIoServer, - gnosisPayTransactionModel, - weekMetricsSnapshotModel, + mongooseModels, }: { socketIoServer: ReturnType; - gnosisPayTransactionModel: GnosisPayTransactionModelType; - weekMetricsSnapshotModel: WeekMetricsSnapshotModelType; + mongooseModels: MongooseConfiguredModels; }) { + const { gnosisPayTransactionModel, weekMetricsSnapshotModel } = mongooseModels; + // Emit the 10 recent pending rewards to the UI when a client connects socketIoServer.on('connection', async (socketClient) => { socketClient.on('disconnect', () => { @@ -23,14 +24,14 @@ export function addSocketComms({ }); socketClient.on('getRecentTransactions', async (limit: number) => { - const spendTransactions = (await gnosisPayTransactionModel + const spendTransactions = ((await gnosisPayTransactionModel .find() .populate({ path: 'amountToken', }) .limit(limit) .sort({ blockNumber: -1 }) - .lean()) as unknown as GnosisPayTransactionFieldsType_Populated[]; + .lean()) as unknown) as GnosisPayTransactionFieldsType_Populated[]; socketClient.emit('recentTransactions', spendTransactions); }); @@ -51,7 +52,7 @@ export function addSocketComms({ const allWeekData = await weekMetricsSnapshotModel.find().sort({ timestamp: 1 }); socketClient.emit( 'allWeekMetricsSnapshots', - allWeekData.map((w) => w.toJSON()), + allWeekData.map((w) => w.toJSON()) ); }); }); diff --git a/apps/gnosis-pay-rewards-indexer/src/core.ts b/apps/gnosis-pay-rewards-indexer/src/core.ts index 9905f6c..46d5cf8 100644 --- a/apps/gnosis-pay-rewards-indexer/src/core.ts +++ b/apps/gnosis-pay-rewards-indexer/src/core.ts @@ -60,6 +60,77 @@ export type StartIndexingParamsType = { logger: Logger; }; +type StartServersParamsType = { + client: PublicClient; + mongooseModels: StartIndexingParamsType['mongooseModels']; + logger: Logger; +}; + +/** + * Atom for the indexer state with default values + */ +const indexerStateAtom = atom({ + startBlock: 0n, + fetchBlockSize: 12n * 5n, + latestBlockNumber: 0n, + distanceToLatestBlockNumber: 0n, + fromBlockNumber: 0n, + toBlockNumber: 0n, +}); + +/** + * Store for the indexer state + */ +const indexerStateStore = createStore(); + +/** + * Start the I/O HTTP and WebSocket servers, + * ports are defined in {@link HTTP_SERVER_PORT} and {@link SOCKET_IO_SERVER_PORT} + * @param client - the client to use for the servers + * @param mongooseModels - the mongoose models to use for the servers + * @param logger - the logger to use for the servers + * @returns the rest API server and the socket.io server + */ +export async function startIoServers({ client, mongooseModels, logger }: StartServersParamsType) { + const restApiServer = addHttpRoutes({ + expressApp: buildExpressApp(), + client, + mongooseModels, + getIndexerState() { + return indexerStateStore.get(indexerStateAtom); + }, + logger, + }); + + const socketIoServer = addSocketComms({ + socketIoServer: buildSocketIoServer(restApiServer), + mongooseModels, + }); + + restApiServer.listen(HTTP_SERVER_PORT, HTTP_SERVER_HOST); + socketIoServer.listen(SOCKET_IO_SERVER_PORT); + + const apiServerUrl = `http://${HTTP_SERVER_HOST}:${HTTP_SERVER_PORT}`; + const wsServerUrl = `ws://${HTTP_SERVER_HOST}:${SOCKET_IO_SERVER_PORT}`; + + logger.info(`WebSocket server available at ${wsServerUrl}`); + logger.info(`REST API server available at ${apiServerUrl}`); + + return { + restApiServer, + socketIoServer, + }; +} + +/** + * Start the indexing process + * @param client - the client to use for the indexing + * @param resumeIndexing - if true, the indexer will resume indexing from the latest pending reward in the database + * @param fetchBlockSize - the size of the block range to fetch + * @param mongooseConnection - the mongoose connection to use for the indexing + * @param mongooseModels - the mongoose models to use for the indexing + * @param logger - the logger to use for the indexing + */ export async function startIndexing({ client, resumeIndexing = false, @@ -68,15 +139,6 @@ export async function startIndexing({ mongooseModels, logger, }: StartIndexingParamsType) { - const { - gnosisPayRewardDistributionModel, - gnosisPaySafeAddressModel, - gnosisPayTransactionModel, - gnosisTokenBalanceSnapshotModel, - weekCashbackRewardModel, - weekMetricsSnapshotModel, - } = mongooseModels; - logger.debug('starting indexing'); // Initialize the latest block @@ -84,7 +146,7 @@ export async function startIndexing({ const fromBlockNumberInitial = gnosisPayStartBlock; const toBlockNumberInitial = clampToBlockRange(fromBlockNumberInitial, latestBlockInitial.number, fetchBlockSize); - const indexerStateAtom = atom({ + indexerStateStore.set(indexerStateAtom, { startBlock: fromBlockNumberInitial, fetchBlockSize, latestBlockNumber: latestBlockInitial.number, @@ -92,7 +154,6 @@ export async function startIndexing({ fromBlockNumber: fromBlockNumberInitial, toBlockNumber: toBlockNumberInitial, }); - const indexerStateStore = createStore(); const getIndexerState = () => indexerStateStore.get(indexerStateAtom); if (resumeIndexing === true) { @@ -143,38 +204,6 @@ export async function startIndexing({ }, }); - const restApiServer = addHttpRoutes({ - expressApp: buildExpressApp(), - client, - mongooseModels: { - gnosisTokenBalanceSnapshotModel, - gnosisPaySafeAddressModel, - gnosisPayTransactionModel, - weekCashbackRewardModel, - gnosisPayRewardDistributionModel, - weekMetricsSnapshotModel, - }, - getIndexerState() { - return indexerStateStore.get(indexerStateAtom); - }, - logger, - }); - - const socketIoServer = addSocketComms({ - socketIoServer: buildSocketIoServer(restApiServer), - gnosisPayTransactionModel, - weekMetricsSnapshotModel, - }); - - restApiServer.listen(HTTP_SERVER_PORT, HTTP_SERVER_HOST); - socketIoServer.listen(SOCKET_IO_SERVER_PORT); - - const apiServerUrl = `http://${HTTP_SERVER_HOST}:${HTTP_SERVER_PORT}`; - const wsServerUrl = `ws://${HTTP_SERVER_HOST}:${SOCKET_IO_SERVER_PORT}`; - - logger.info(`WebSocket server available at ${wsServerUrl}`); - logger.info(`REST API server available at ${apiServerUrl}`); - // Index all the logs until the latest block while (shouldFetchLogs(getIndexerState)) { const { fromBlockNumber, toBlockNumber, latestBlockNumber } = getIndexerState(); @@ -229,57 +258,35 @@ export async function startIndexing({ await handleSpendLogs({ client, - mongooseModels: { - gnosisPayTransactionModel, - weekCashbackRewardModel, - weekMetricsSnapshotModel, - gnosisPaySafeAddressModel, - gnosisTokenBalanceSnapshotModel, - }, + mongooseModels, logs: spendLogs, logger, }); await handleRefundLogs({ client, - mongooseModels: { - gnosisPayTransactionModel, - weekCashbackRewardModel, - weekMetricsSnapshotModel, - gnosisPaySafeAddressModel, - gnosisTokenBalanceSnapshotModel, - }, + mongooseModels, logs: refundLogs, logger, - socketIoServer, }); await handleGnosisTokenTransferLogs({ client, - mongooseModels: { - gnosisPaySafeAddressModel, - gnosisTokenBalanceSnapshotModel, - weekCashbackRewardModel, - }, + mongooseModels, logs: gnosisTokenTransferLogs, logger, }); await handleGnosisPayRewardsDistributionLogs({ client, - mongooseModels: { - gnosisPayRewardDistributionModel, - weekCashbackRewardModel, - }, + mongooseModels, logs: gnosisPayRewardDistributionLogs, logger, }); await handleGnosisPayOgNftTransferLogs({ client, - mongooseModels: { - gnosisPaySafeAddressModel, - }, + mongooseModels, logs: claimOgNftLogs, logger, }); diff --git a/apps/gnosis-pay-rewards-indexer/src/index.ts b/apps/gnosis-pay-rewards-indexer/src/index.ts index 376e470..1fd5d8a 100644 --- a/apps/gnosis-pay-rewards-indexer/src/index.ts +++ b/apps/gnosis-pay-rewards-indexer/src/index.ts @@ -1,7 +1,7 @@ process.env.TZ = 'UTC'; // Set the timezone to UTC import './sentry.js'; // imported first to setup sentry import { gnosisChainPublicClient as client } from './publicClient.js'; -import { startIndexing, StartIndexingParamsType } from './core.js'; +import { startIndexing, StartIndexingParamsType, startIoServers } from './core.js'; import { FETCH_BLOCK_SIZE, MONGODB_URI, RESUME_INDEXING } from './config/env.js'; import { createBlockModel, @@ -38,6 +38,14 @@ async function main(resumeIndexing: boolean = RESUME_INDEXING) { gnosisPayRewardDistributionModel: createGnosisPayRewardDistributionModel(mongooseConnection), }; + // start the I/O servers + await startIoServers({ + client, + mongooseModels, + logger, + }); + + // start the indexing process await startIndexing({ client, fetchBlockSize: FETCH_BLOCK_SIZE,