Skip to content

Commit

Permalink
Merge pull request #14 from adamazad/chore/indexer-changes
Browse files Browse the repository at this point in the history
chore: indexer changes
  • Loading branch information
adamazad authored Aug 7, 2024
2 parents bc03e2c + f249927 commit 958b4c9
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 142 deletions.
1 change: 1 addition & 0 deletions apps/gnosis-pay-rewards-indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"debug": "^4.3.4",
"dotenv": "^16.0.1",
"express": "^4.18.2",
"jotai": "^2.2.1",
"mongoose": "^7.1.0",
"nanoid": "^4.0.1",
"numeral": "^2.0.6",
Expand Down
108 changes: 58 additions & 50 deletions apps/gnosis-pay-rewards-indexer/src/addHttpRoutes.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,43 @@
import {
GnosisPayTransactionFieldsType_Unpopulated,
GnosisPayTransactionFieldsType_Populated,
toWeekDataId,
} from '@karpatkey/gnosis-pay-rewards-sdk';
import { GnosisPayTransactionFieldsType_Unpopulated, toWeekDataId } from '@karpatkey/gnosis-pay-rewards-sdk';
import {
createMongooseLogger,
getGnosisPayTransactionModel,
getWeekCashbackRewardModel,
toDocumentId,
createWeekCashbackRewardDocument,
} from '@karpatkey/gnosis-pay-rewards-sdk/mongoose';
import { Response } from 'express';
import { isAddress } from 'viem';
import { buildExpressApp } from './server.js';
import dayjs from 'dayjs';
import dayjsUtc from 'dayjs/plugin/utc.js';
import { IndexerStateAtomType } from './state.js';

dayjs.extend(dayjsUtc);

export function addHttpRoutes({
expressApp,
gnosisPayTransactionModel,
weekCashbackRewardModel,
getIndexerState,
}: {
expressApp: ReturnType<typeof buildExpressApp>;
gnosisPayTransactionModel: ReturnType<typeof getGnosisPayTransactionModel>;
weekCashbackRewardModel: ReturnType<typeof getWeekCashbackRewardModel>;
logger: ReturnType<typeof createMongooseLogger>;
getIndexerState: () => IndexerStateAtomType;
}) {
expressApp.get<'/status'>('/status', (_, res) => {
const { fromBlockNumber, toBlockNumber, latestBlockNumber } = getIndexerState();

return res.send({
data: {
indexerState: {
fromBlockNumber: Number(fromBlockNumber),
toBlockNumber: Number(toBlockNumber),
latestBlockNumber: Number(latestBlockNumber),
},
},
status: 'ok',
statusCode: 200,
});
Expand All @@ -37,19 +50,6 @@ export function addHttpRoutes({
});
});

expressApp.get<'/pending-rewards'>('/pending-rewards', async (_, res) => {
try {
const spendTransactions = await gnosisPayTransactionModel.find({}).lean();
return res.json({
data: spendTransactions,
status: 'ok',
statusCode: 200,
});
} catch (error) {
return returnInternalServerError(res, error as Error);
}
});

expressApp.get<'/cashbacks/:safeAddress'>('/cashbacks/:safeAddress', async (req, res) => {
try {
const safeAddress = req.params.safeAddress;
Expand All @@ -62,40 +62,48 @@ export function addHttpRoutes({
});
}

const allCashbacks = await weekCashbackRewardModel
.find({
address: new RegExp(safeAddress, 'i'),
})
.populate<{
transactions: GnosisPayTransactionFieldsType_Populated;
}>({
path: 'transactions',
select: {
_id: 0,
blockNumber: 1,
blockTimestamp: 1,
transactionHash: 1,
spentAmount: 1,
spentAmountUsd: 1,
gnoBalance: 1,
},
populate: {
path: 'amountToken',
select: {
symbol: 1,
decimals: 1,
name: 1,
},
transform: (doc, id) => ({
...doc,
address: id,
}),
},
})
.lean();
const week = toWeekDataId(dayjs.utc().unix());
const weekCashbackRewardDocument = await createWeekCashbackRewardDocument({
address: safeAddress,
populateTransactions: true,
weekCashbackRewardModel,
week,
});

// const allCashbacks = await weekCashbackRewardModel
// .find({
// address: new RegExp(safeAddress, 'i'),
// })
// .populate<{
// transactions: GnosisPayTransactionFieldsType_Populated;
// }>({
// path: 'transactions',
// select: {
// _id: 0,
// blockNumber: 1,
// blockTimestamp: 1,
// transactionHash: 1,
// spentAmount: 1,
// spentAmountUsd: 1,
// gnoBalance: 1,
// },
// populate: {
// path: 'amountToken',
// select: {
// symbol: 1,
// decimals: 1,
// name: 1,
// },
// transform: (doc, id) => ({
// ...doc,
// address: id,
// }),
// },
// })
// .lean();

return res.json({
data: allCashbacks,
data: weekCashbackRewardDocument,
status: 'ok',
statusCode: 200,
_query: {
Expand Down
149 changes: 84 additions & 65 deletions apps/gnosis-pay-rewards-indexer/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
getLoggerModel,
getWeekMetricsSnapshotModel,
getBlockModel,
saveBlock,
getGnosisPaySafeAddressModel,
getWeekCashbackRewardModel,
LogLevel,
Expand All @@ -26,8 +25,10 @@ import { addHttpRoutes } from './addHttpRoutes.js';
import { addSocketComms } from './addSocketComms.js';
import { processRefundLog, processSpendLog } from './processSpendLog.js';
import { getGnosisPayRefundLogs } from './gp/getGnosisPayRefundLogs.js';
import { atom, createStore } from 'jotai';
import { IndexerStateAtomType } from './state.js';

const indexBlockSize = 12n; // 12 blocks is roughly 60 seconds of data
const indexBlockSize = 120n; // 12 blocks is roughly 60 seconds of data

export async function startIndexing({
client,
Expand All @@ -44,97 +45,102 @@ export async function startIndexing({
// Connect to the database
const mongooseConnection = await createConnection(MONGODB_URI);

console.log('Migrating Gnosis Pay tokens to database');

const gnosisPaySafeAddressModel = getGnosisPaySafeAddressModel(mongooseConnection);
const gnosisPayTransactionModel = getGnosisPayTransactionModel(mongooseConnection);
const weekCashbackRewardModel = getWeekCashbackRewardModel(mongooseConnection);
const weekMetricsSnapshotModel = getWeekMetricsSnapshotModel(mongooseConnection);
const gnosisPayTokenModel = getTokenModel(mongooseConnection);
const loggerModel = getLoggerModel(mongooseConnection);
const blockModel = getBlockModel(mongooseConnection);
const logger = createMongooseLogger(loggerModel);
console.log('Connected to mongodb at', mongooseConnection.connection.host);

const restApiServer = addHttpRoutes({
expressApp: buildExpressApp(),
gnosisPayTransactionModel,
weekCashbackRewardModel,
logger,
});
console.log('Migrating Gnosis Pay tokens to database');

const socketIoServer = addSocketComms({
socketIoServer: buildSocketIoServer(restApiServer),
gnosisPayTransactionModel,
weekMetricsSnapshotModel,
});
const mongooseModels = {
gnosisPaySafeAddressModel: getGnosisPaySafeAddressModel(mongooseConnection),
gnosisPayTransactionModel: getGnosisPayTransactionModel(mongooseConnection),
weekCashbackRewardModel: getWeekCashbackRewardModel(mongooseConnection),
weekMetricsSnapshotModel: getWeekMetricsSnapshotModel(mongooseConnection),
gnosisPayTokenModel: getTokenModel(mongooseConnection),
loggerModel: getLoggerModel(mongooseConnection),
blockModel: getBlockModel(mongooseConnection),
};

restApiServer.listen(HTTP_SERVER_PORT, HTTP_SERVER_HOST);
socketIoServer.listen(SOCKET_IO_SERVER_PORT);
const logger = createMongooseLogger(mongooseModels.loggerModel);

console.log('Starting indexing');

// Initialize the latest block
let latestBlock = await client.getBlock({ includeTransactions: false });

const latestBlockInitial = await client.getBlock({ includeTransactions: false });
// default value is June 29th, 2024. Otherwise, we fetch the latest block from the indexed pending rewards
let fromBlockNumber = gnosisPayStartBlock;
const fromBlockNumberInitial = gnosisPayStartBlock;
const toBlockNumberInitial = clampToBlockRange(fromBlockNumberInitial, latestBlockInitial.number, indexBlockSize);

const indexerStateAtom = atom<IndexerStateAtomType>({
latestBlockNumber: latestBlockInitial.number,
fromBlockNumber: fromBlockNumberInitial,
toBlockNumber: toBlockNumberInitial,
});
const indexerStateStore = createStore();
const getIndexerState = () => indexerStateStore.get(indexerStateAtom);

if (resumeIndexing === true) {
const [latestGnosisPayTransaction] = await gnosisPayTransactionModel.find().sort({ blockNumber: -1 }).limit(1);
const [latestGnosisPayTransaction] = await mongooseModels.gnosisPayTransactionModel
.find()
.sort({ blockNumber: -1 })
.limit(1);

if (latestGnosisPayTransaction !== undefined) {
fromBlockNumber = BigInt(latestGnosisPayTransaction.blockNumber) - indexBlockSize;
const fromBlockNumber = BigInt(latestGnosisPayTransaction.blockNumber) - indexBlockSize;
indexerStateStore.set(indexerStateAtom, (prev) => ({
...prev,
fromBlockNumber,
}));
console.log(`Resuming indexing from #${fromBlockNumber}`);
} else {
console.warn(`No pending rewards found, starting from the beginning at #${gnosisPayStartBlock}`);
console.warn(`No pending rewards found, starting from the beginning at #${fromBlockNumberInitial}`);
}
} else {
const session = await mongooseConnection.startSession();

// Clean up the database
await session.withTransaction(async () => {
await gnosisPaySafeAddressModel.deleteMany();
await gnosisPayTransactionModel.deleteMany();
await blockModel.deleteMany();
await weekCashbackRewardModel.deleteMany();
await weekMetricsSnapshotModel.deleteMany();
await loggerModel.deleteMany();
await gnosisPayTokenModel.deleteMany();
for (const modelName of mongooseConnection.modelNames()) {
await mongooseConnection.model(modelName).deleteMany();
}
});

await session.commitTransaction();
await session.endSession();

// Save the Gnosis Pay tokens to the database
await saveGnosisPayTokensToDatabase(gnosisPayTokenModel, gnosisPayTokens);
await saveGnosisPayTokensToDatabase(mongooseModels.gnosisPayTokenModel, gnosisPayTokens);
}

let toBlockNumber = clampToBlockRange(fromBlockNumber, latestBlock.number, indexBlockSize);

// Watch for new blocks
client.watchBlocks({
includeTransactions: false,
onBlock(block) {
latestBlock = block;
indexerStateStore.set(indexerStateAtom, (prev) => ({
...prev,
latestBlockNumber: block.number,
}));
},
});

saveBlock(
{
number: Number(block.number),
hash: block.hash,
timestamp: Number(block.timestamp),
},
blockModel
).catch((e) => {
console.error('Error creating block', e);
});
const restApiServer = addHttpRoutes({
expressApp: buildExpressApp(),
gnosisPayTransactionModel: mongooseModels.gnosisPayTransactionModel,
weekCashbackRewardModel: mongooseModels.weekCashbackRewardModel,
logger,
getIndexerState() {
return indexerStateStore.get(indexerStateAtom);
},
});

const shouldFetchLogs = toBlockNumber <= latestBlock.number;
const socketIoServer = addSocketComms({
socketIoServer: buildSocketIoServer(restApiServer),
gnosisPayTransactionModel: mongooseModels.gnosisPayTransactionModel,
weekMetricsSnapshotModel: mongooseModels.weekMetricsSnapshotModel,
});

console.log({ fromBlockNumber, toBlockNumber, shouldFetchLogs });
restApiServer.listen(HTTP_SERVER_PORT, HTTP_SERVER_HOST);
socketIoServer.listen(SOCKET_IO_SERVER_PORT);

// Index all the logs until the latest block
while (toBlockNumber <= latestBlock.number) {
while (shouldFetchLogs(getIndexerState)) {
const { fromBlockNumber, toBlockNumber, latestBlockNumber } = getIndexerState();

try {
await logger.logDebug({
message: `Fetching logs from #${fromBlockNumber} to #${toBlockNumber}`,
Expand All @@ -159,23 +165,30 @@ export async function startIndexing({
await handleBatchLogs({
client,
mongooseModels: {
gnosisPayTransactionModel,
weekCashbackRewardModel,
weekMetricsSnapshotModel,
gnosisPaySafeAddressModel,
gnosisPayTransactionModel: mongooseModels.gnosisPayTransactionModel,
weekCashbackRewardModel: mongooseModels.weekCashbackRewardModel,
weekMetricsSnapshotModel: mongooseModels.weekMetricsSnapshotModel,
gnosisPaySafeAddressModel: mongooseModels.gnosisPaySafeAddressModel,
},
logs: [...spendLogs, ...refundLogs],
logger,
socketIoServer,
});

// Move to the next block range
fromBlockNumber += indexBlockSize;
toBlockNumber = clampToBlockRange(fromBlockNumber, latestBlock.number, indexBlockSize);
const nextFromBlockNumber = fromBlockNumber + indexBlockSize;
const nextToBlockNumber = clampToBlockRange(nextFromBlockNumber, latestBlockNumber, indexBlockSize);

indexerStateStore.set(indexerStateAtom, (prev) => ({
...prev,
fromBlockNumber: nextFromBlockNumber,
toBlockNumber: nextToBlockNumber,
}));

// Sanity check to make sure we're not going too fast
const distanceToLatestBlock = bigMath.abs(toBlockNumber - latestBlock.number);
const distanceToLatestBlock = bigMath.abs(nextToBlockNumber - latestBlockNumber);
console.log({ distanceToLatestBlock });

// Cooldown for 20 seconds if we're within a distance of 10 blocks
if (distanceToLatestBlock < 10n) {
const targetBlockNumber = toBlockNumber + indexBlockSize + 3n;
Expand All @@ -194,6 +207,12 @@ export async function startIndexing({
}
}

function shouldFetchLogs(getIndexerState: () => IndexerStateAtomType) {
const { toBlockNumber, latestBlockNumber } = getIndexerState();

return toBlockNumber <= latestBlockNumber;
}

async function handleBatchLogs({
client,
mongooseModels,
Expand Down
5 changes: 5 additions & 0 deletions apps/gnosis-pay-rewards-indexer/src/state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export type IndexerStateAtomType = {
latestBlockNumber: bigint;
fromBlockNumber: bigint;
toBlockNumber: bigint;
};
Loading

0 comments on commit 958b4c9

Please sign in to comment.