Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(indexer): split io server and indexer #49

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions apps/gnosis-pay-rewards-indexer/src/addSocketComms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,32 @@ import {
WeekMetricsSnapshotModelType,
} from '@karpatkey/gnosis-pay-rewards-sdk/mongoose';
import { buildSocketIoServer } from './server.js';
import { MongooseConfiguredModels } from './process/types.js';

export function addSocketComms({
socketIoServer,
gnosisPayTransactionModel,
weekMetricsSnapshotModel,
mongooseModels,
}: {
socketIoServer: ReturnType<typeof buildSocketIoServer>;
gnosisPayTransactionModel: GnosisPayTransactionModelType;
weekMetricsSnapshotModel: WeekMetricsSnapshotModelType;
mongooseModels: MongooseConfiguredModels;
}) {
const { gnosisPayTransactionModel, weekMetricsSnapshotModel } = mongooseModels;

// Emit the 10 recent pending rewards to the UI when a client connects
socketIoServer.on('connection', async (socketClient) => {
socketClient.on('disconnect', () => {
console.log('Client disconnected');
});

socketClient.on('getRecentTransactions', async (limit: number) => {
const spendTransactions = (await gnosisPayTransactionModel
const spendTransactions = ((await gnosisPayTransactionModel
.find()
.populate({
path: 'amountToken',
})
.limit(limit)
.sort({ blockNumber: -1 })
.lean()) as unknown as GnosisPayTransactionFieldsType_Populated[];
.lean()) as unknown) as GnosisPayTransactionFieldsType_Populated[];
socketClient.emit('recentTransactions', spendTransactions);
});

Expand All @@ -51,7 +52,7 @@ export function addSocketComms({
const allWeekData = await weekMetricsSnapshotModel.find().sort({ timestamp: 1 });
socketClient.emit(
'allWeekMetricsSnapshots',
allWeekData.map((w) => w.toJSON()),
allWeekData.map((w) => w.toJSON())
);
});
});
Expand Down
147 changes: 77 additions & 70 deletions apps/gnosis-pay-rewards-indexer/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,77 @@ export type StartIndexingParamsType = {
logger: Logger;
};

type StartServersParamsType = {
client: PublicClient<Transport, typeof gnosis>;
mongooseModels: StartIndexingParamsType['mongooseModels'];
logger: Logger;
};

/**
* Atom for the indexer state with default values
*/
const indexerStateAtom = atom<IndexerStateAtomType>({
startBlock: 0n,
fetchBlockSize: 12n * 5n,
latestBlockNumber: 0n,
distanceToLatestBlockNumber: 0n,
fromBlockNumber: 0n,
toBlockNumber: 0n,
});

/**
* Store for the indexer state
*/
const indexerStateStore = createStore();

/**
* Start the I/O HTTP and WebSocket servers,
* ports are defined in {@link HTTP_SERVER_PORT} and {@link SOCKET_IO_SERVER_PORT}
* @param client - the client to use for the servers
* @param mongooseModels - the mongoose models to use for the servers
* @param logger - the logger to use for the servers
* @returns the rest API server and the socket.io server
*/
export async function startIoServers({ client, mongooseModels, logger }: StartServersParamsType) {
const restApiServer = addHttpRoutes({
expressApp: buildExpressApp(),
client,
mongooseModels,
getIndexerState() {
return indexerStateStore.get(indexerStateAtom);
},
logger,
});

const socketIoServer = addSocketComms({
socketIoServer: buildSocketIoServer(restApiServer),
mongooseModels,
});

restApiServer.listen(HTTP_SERVER_PORT, HTTP_SERVER_HOST);
socketIoServer.listen(SOCKET_IO_SERVER_PORT);

const apiServerUrl = `http://${HTTP_SERVER_HOST}:${HTTP_SERVER_PORT}`;
const wsServerUrl = `ws://${HTTP_SERVER_HOST}:${SOCKET_IO_SERVER_PORT}`;

logger.info(`WebSocket server available at ${wsServerUrl}`);
logger.info(`REST API server available at ${apiServerUrl}`);

return {
restApiServer,
socketIoServer,
};
}

/**
* Start the indexing process
* @param client - the client to use for the indexing
* @param resumeIndexing - if true, the indexer will resume indexing from the latest pending reward in the database
* @param fetchBlockSize - the size of the block range to fetch
* @param mongooseConnection - the mongoose connection to use for the indexing
* @param mongooseModels - the mongoose models to use for the indexing
* @param logger - the logger to use for the indexing
*/
export async function startIndexing({
client,
resumeIndexing = false,
Expand All @@ -68,31 +139,21 @@ export async function startIndexing({
mongooseModels,
logger,
}: StartIndexingParamsType) {
const {
gnosisPayRewardDistributionModel,
gnosisPaySafeAddressModel,
gnosisPayTransactionModel,
gnosisTokenBalanceSnapshotModel,
weekCashbackRewardModel,
weekMetricsSnapshotModel,
} = mongooseModels;

logger.debug('starting indexing');

// Initialize the latest block
const latestBlockInitial = await client.getBlock({ includeTransactions: false });
const fromBlockNumberInitial = gnosisPayStartBlock;
const toBlockNumberInitial = clampToBlockRange(fromBlockNumberInitial, latestBlockInitial.number, fetchBlockSize);

const indexerStateAtom = atom<IndexerStateAtomType>({
indexerStateStore.set(indexerStateAtom, {
startBlock: fromBlockNumberInitial,
fetchBlockSize,
latestBlockNumber: latestBlockInitial.number,
distanceToLatestBlockNumber: bigMath.abs(latestBlockInitial.number - fromBlockNumberInitial),
fromBlockNumber: fromBlockNumberInitial,
toBlockNumber: toBlockNumberInitial,
});
const indexerStateStore = createStore();
const getIndexerState = () => indexerStateStore.get(indexerStateAtom);

if (resumeIndexing === true) {
Expand Down Expand Up @@ -143,38 +204,6 @@ export async function startIndexing({
},
});

const restApiServer = addHttpRoutes({
expressApp: buildExpressApp(),
client,
mongooseModels: {
gnosisTokenBalanceSnapshotModel,
gnosisPaySafeAddressModel,
gnosisPayTransactionModel,
weekCashbackRewardModel,
gnosisPayRewardDistributionModel,
weekMetricsSnapshotModel,
},
getIndexerState() {
return indexerStateStore.get(indexerStateAtom);
},
logger,
});

const socketIoServer = addSocketComms({
socketIoServer: buildSocketIoServer(restApiServer),
gnosisPayTransactionModel,
weekMetricsSnapshotModel,
});

restApiServer.listen(HTTP_SERVER_PORT, HTTP_SERVER_HOST);
socketIoServer.listen(SOCKET_IO_SERVER_PORT);

const apiServerUrl = `http://${HTTP_SERVER_HOST}:${HTTP_SERVER_PORT}`;
const wsServerUrl = `ws://${HTTP_SERVER_HOST}:${SOCKET_IO_SERVER_PORT}`;

logger.info(`WebSocket server available at ${wsServerUrl}`);
logger.info(`REST API server available at ${apiServerUrl}`);

// Index all the logs until the latest block
while (shouldFetchLogs(getIndexerState)) {
const { fromBlockNumber, toBlockNumber, latestBlockNumber } = getIndexerState();
Expand Down Expand Up @@ -229,57 +258,35 @@ export async function startIndexing({

await handleSpendLogs({
client,
mongooseModels: {
gnosisPayTransactionModel,
weekCashbackRewardModel,
weekMetricsSnapshotModel,
gnosisPaySafeAddressModel,
gnosisTokenBalanceSnapshotModel,
},
mongooseModels,
logs: spendLogs,
logger,
});

await handleRefundLogs({
client,
mongooseModels: {
gnosisPayTransactionModel,
weekCashbackRewardModel,
weekMetricsSnapshotModel,
gnosisPaySafeAddressModel,
gnosisTokenBalanceSnapshotModel,
},
mongooseModels,
logs: refundLogs,
logger,
socketIoServer,
});

await handleGnosisTokenTransferLogs({
client,
mongooseModels: {
gnosisPaySafeAddressModel,
gnosisTokenBalanceSnapshotModel,
weekCashbackRewardModel,
},
mongooseModels,
logs: gnosisTokenTransferLogs,
logger,
});

await handleGnosisPayRewardsDistributionLogs({
client,
mongooseModels: {
gnosisPayRewardDistributionModel,
weekCashbackRewardModel,
},
mongooseModels,
logs: gnosisPayRewardDistributionLogs,
logger,
});

await handleGnosisPayOgNftTransferLogs({
client,
mongooseModels: {
gnosisPaySafeAddressModel,
},
mongooseModels,
logs: claimOgNftLogs,
logger,
});
Expand Down
10 changes: 9 additions & 1 deletion apps/gnosis-pay-rewards-indexer/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
process.env.TZ = 'UTC'; // Set the timezone to UTC
import './sentry.js'; // imported first to setup sentry
import { gnosisChainPublicClient as client } from './publicClient.js';
import { startIndexing, StartIndexingParamsType } from './core.js';
import { startIndexing, StartIndexingParamsType, startIoServers } from './core.js';
import { FETCH_BLOCK_SIZE, MONGODB_URI, RESUME_INDEXING } from './config/env.js';
import {
createBlockModel,
Expand Down Expand Up @@ -38,6 +38,14 @@ async function main(resumeIndexing: boolean = RESUME_INDEXING) {
gnosisPayRewardDistributionModel: createGnosisPayRewardDistributionModel(mongooseConnection),
};

// start the I/O servers
await startIoServers({
client,
mongooseModels,
logger,
});

// start the indexing process
await startIndexing({
client,
fetchBlockSize: FETCH_BLOCK_SIZE,
Expand Down
Loading