From 8b4a382e6d768fae1e38e35726ef890dfd738c7f Mon Sep 17 00:00:00 2001 From: forbesus Date: Mon, 9 Dec 2024 04:40:36 -0800 Subject: [PATCH] feat(observability): utilise new logger in indexer ref #434 --- apps/indexer/mvm.lock | 3 ++- apps/indexer/package.json | 1 + apps/indexer/src/chain/chainSync.ts | 27 ++++++++++--------- apps/indexer/src/chain/dataStore.ts | 6 +++-- apps/indexer/src/chain/genesisImporter.ts | 7 +++-- apps/indexer/src/chain/nodeAccessor.ts | 9 ++++--- apps/indexer/src/chain/statsProcessor.ts | 15 ++++++----- apps/indexer/src/db/buildDatabase.ts | 9 ++++--- apps/indexer/src/db/keybaseProvider.ts | 9 ++++--- apps/indexer/src/db/priceHistoryProvider.ts | 9 ++++--- apps/indexer/src/index.ts | 11 +++++--- .../indexer/src/indexers/akashStatsIndexer.ts | 7 +++-- .../src/indexers/messageAddressesIndexer.ts | 7 +++-- apps/indexer/src/indexers/validatorIndexer.ts | 11 +++++--- .../src/monitors/addressBalanceMonitor.ts | 5 +++- .../src/monitors/deploymentBalanceMonitor.ts | 7 +++-- .../src/providers/ipLocationProvider.ts | 12 +++++---- .../src/providers/providerStatusProvider.ts | 4 ++- apps/indexer/src/scheduler.ts | 21 ++++++++------- apps/indexer/src/shared/utils/benchmark.ts | 4 ++- apps/indexer/src/shared/utils/download.ts | 6 +++-- apps/indexer/src/shared/utils/files.ts | 8 ++++-- apps/indexer/src/shared/utils/query.ts | 8 ++++-- .../src/tasks/providerUptimeTracker.ts | 5 +++- apps/indexer/src/tasks/usdSpendingTracker.ts | 11 +++++--- 25 files changed, 144 insertions(+), 78 deletions(-) diff --git a/apps/indexer/mvm.lock b/apps/indexer/mvm.lock index 912d4a247..83d8609f9 100644 --- a/apps/indexer/mvm.lock +++ b/apps/indexer/mvm.lock @@ -1,7 +1,8 @@ { "dependencies": { "@akashnetwork/database": "1.0.0", - "@akashnetwork/env-loader": "1.0.1" + "@akashnetwork/env-loader": "1.0.1", + "@akashnetwork/logging": "2.0.2" }, "devDependencies": { "@akashnetwork/dev-config": "1.0.0" diff --git a/apps/indexer/package.json b/apps/indexer/package.json index 9ea3114e3..d7f6ebdd4 100644 --- a/apps/indexer/package.json +++ b/apps/indexer/package.json @@ -27,6 +27,7 @@ "@akashnetwork/akash-api": "^1.3.0", "@akashnetwork/database": "*", "@akashnetwork/env-loader": "*", + "@akashnetwork/logging": "*", "@cosmjs/crypto": "^0.32.4", "@cosmjs/encoding": "^0.32.4", "@cosmjs/math": "^0.32.4", diff --git a/apps/indexer/src/chain/chainSync.ts b/apps/indexer/src/chain/chainSync.ts index 3216afee3..15205f141 100644 --- a/apps/indexer/src/chain/chainSync.ts +++ b/apps/indexer/src/chain/chainSync.ts @@ -1,6 +1,7 @@ import { activeChain } from "@akashnetwork/database/chainDefinitions"; import { Block, Message } from "@akashnetwork/database/dbSchemas"; import { Day, Transaction } from "@akashnetwork/database/dbSchemas/base"; +import { LoggerService } from "@akashnetwork/logging"; import { fromBase64 } from "@cosmjs/encoding"; import { decodeTxRaw } from "@cosmjs/proto-signing"; import { asyncify, eachLimit } from "async"; @@ -25,6 +26,8 @@ import { import { nodeAccessor } from "./nodeAccessor"; import { statsProcessor } from "./statsProcessor"; +const logger = LoggerService.forContext("ChainSync"); + export const setMissingBlock = (height: number) => (missingBlock = height); let missingBlock: number; @@ -83,13 +86,13 @@ export async function syncBlocks() { const latestHeightInCache = await getLatestHeightInCache(); if (latestHeightInCache >= latestBlockToDownload) { - console.log("No blocks to download"); + logger.info("No blocks to download"); } else { let startHeight = !env.KEEP_CACHE ? latestInsertedHeight + 1 : Math.max(latestHeightInCache, 1); // If database is empty if (latestInsertedHeight === 0) { - console.log("Starting from scratch"); + logger.info("Starting from scratch"); startHeight = activeChain.startHeight || 1; } @@ -101,13 +104,13 @@ export async function syncBlocks() { const maxDownloadGroupSize = 1_000; if (latestBlockToDownload - startHeight > maxDownloadGroupSize) { - console.log("Limiting download to " + maxDownloadGroupSize + " blocks"); + logger.info("Limiting download to " + maxDownloadGroupSize + " blocks"); latestBlockToDownload = startHeight + maxDownloadGroupSize; } - console.log("Starting download at block #" + startHeight); - console.log("Will end download at block #" + latestBlockToDownload); - console.log(latestBlockToDownload - startHeight + 1 + " blocks to download"); + logger.info("Starting download at block #" + startHeight); + logger.info("Will end download at block #" + latestBlockToDownload); + logger.info(latestBlockToDownload - startHeight + 1 + " blocks to download"); await benchmark.measureAsync("downloadBlocks", async () => { await downloadBlocks(startHeight, latestBlockToDownload); @@ -151,7 +154,7 @@ export async function syncBlocks() { async function insertBlocks(startHeight: number, endHeight: number) { const blockCount = endHeight - startHeight + 1; - console.log("Inserting " + blockCount + " blocks into database"); + logger.info("Inserting " + blockCount + " blocks into database"); let lastInsertedBlock = (await Block.findOne({ include: [ @@ -242,7 +245,7 @@ async function insertBlocks(startHeight: number, endHeight: number) { const blockDate = new Date(Date.UTC(blockDatetime.getUTCFullYear(), blockDatetime.getUTCMonth(), blockDatetime.getUTCDate())); if (!lastInsertedBlock || !isEqual(blockDate, lastInsertedBlock.day.date)) { - console.log("Creating day: ", blockDate, i); + logger.info(`Creating day: ${blockDate} ${i}`); const [newDay, created] = await Day.findOrCreate({ where: { date: blockDate @@ -256,7 +259,7 @@ async function insertBlocks(startHeight: number, endHeight: number) { }); if (!created) { - console.warn(`Day ${blockDate} already exists in database`); + logger.warn(`Day ${blockDate} already exists in database`); } blockEntry.dayId = newDay.id; @@ -287,7 +290,7 @@ async function insertBlocks(startHeight: number, endHeight: number) { blocksToAdd = []; txsToAdd = []; msgsToAdd = []; - console.log(`Blocks added to db: ${i - startHeight + 1} / ${blockCount} (${(((i - startHeight + 1) * 100) / blockCount).toFixed(2)}%)`); + logger.info(`Blocks added to db: ${i - startHeight + 1} / ${blockCount} (${(((i - startHeight + 1) * 100) / blockCount).toFixed(2)}%)`); if (lastInsertedBlock) { lastInsertedBlock.day.lastBlockHeightYet = lastInsertedBlock.height; @@ -295,7 +298,7 @@ async function insertBlocks(startHeight: number, endHeight: number) { } }); } catch (error) { - console.log(error, txsToAdd); + logger.info(`${error}, ${txsToAdd}`); } } } @@ -319,7 +322,7 @@ async function downloadBlocks(startHeight: number, endHeight: number) { if (Date.now() - lastLogDate > 500) { lastLogDate = Date.now(); console.clear(); - console.log("Progress: " + ((downloadedCount * 100) / missingBlockCount).toFixed(2) + "%"); + logger.info("Progress: " + ((downloadedCount * 100) / missingBlockCount).toFixed(2) + "%"); if (!isProd) { nodeAccessor.displayTable(); diff --git a/apps/indexer/src/chain/dataStore.ts b/apps/indexer/src/chain/dataStore.ts index 47d819d6d..21c286516 100644 --- a/apps/indexer/src/chain/dataStore.ts +++ b/apps/indexer/src/chain/dataStore.ts @@ -1,3 +1,4 @@ +import { LoggerService } from "@akashnetwork/logging"; import fs from "fs"; import { Level } from "level"; import path from "path"; @@ -6,6 +7,7 @@ import { dataFolderPath } from "@src/shared/constants"; import { bytesToHumanReadableSize } from "@src/shared/utils/files"; const LevelNotFoundCode = "LEVEL_NOT_FOUND"; +const logger = LoggerService.forContext("dataStore"); if (!fs.existsSync(dataFolderPath)) { fs.mkdirSync(dataFolderPath, { recursive: true }); @@ -37,10 +39,10 @@ export const getCacheSize = async function () { }; export const deleteCache = async function () { - console.log("Deleting cache..."); + logger.info("Deleting cache..."); await blocksDb.clear(); await blockResultsDb.clear(); - console.log("Deleted"); + logger.info("Deleted"); }; export async function getCachedBlockByHeight(height: number) { diff --git a/apps/indexer/src/chain/genesisImporter.ts b/apps/indexer/src/chain/genesisImporter.ts index 5317595bc..c7ee72ccc 100644 --- a/apps/indexer/src/chain/genesisImporter.ts +++ b/apps/indexer/src/chain/genesisImporter.ts @@ -1,4 +1,5 @@ import { activeChain } from "@akashnetwork/database/chainDefinitions"; +import { LoggerService } from "@akashnetwork/logging"; import fs from "fs"; import { ungzip } from "node-gzip"; import path from "path"; @@ -7,6 +8,8 @@ import { dataFolderPath } from "@src/shared/constants"; import { download } from "@src/shared/utils/download"; import { IGenesis } from "./genesisTypes"; +const logger = LoggerService.forContext("genesisImports"); + export async function getGenesis(): Promise { const ext = path.extname(activeChain.genesisFileUrl); const filename = path.basename(activeChain.genesisFileUrl); @@ -14,12 +17,12 @@ export async function getGenesis(): Promise { let genesisLocalPath = dataFolderPath + "/" + filename; if (!fs.existsSync(genesisLocalPath)) { - console.log("Downloading genesis file: " + activeChain.genesisFileUrl); + logger.info("Downloading genesis file: " + activeChain.genesisFileUrl); await download(activeChain.genesisFileUrl, genesisLocalPath); } if (ext === ".gz") { - console.log("Extracting genesis file..."); + logger.info("Extracting genesis file..."); const decompressed = await ungzip(fs.readFileSync(genesisLocalPath).buffer); genesisLocalPath = genesisLocalPath.replace(".gz", ""); fs.writeFileSync(genesisLocalPath, decompressed); diff --git a/apps/indexer/src/chain/nodeAccessor.ts b/apps/indexer/src/chain/nodeAccessor.ts index b56fc0fab..e43692184 100644 --- a/apps/indexer/src/chain/nodeAccessor.ts +++ b/apps/indexer/src/chain/nodeAccessor.ts @@ -1,4 +1,5 @@ import { activeChain } from "@akashnetwork/database/chainDefinitions"; +import { LoggerService } from "@akashnetwork/logging"; import fs from "fs"; import { concurrentNodeQuery, dataFolderPath } from "@src/shared/constants"; @@ -10,7 +11,7 @@ interface NodeAccessorSettings { } const savedNodeInfoPath = dataFolderPath + "/nodeStatus.json"; - +const logger = LoggerService.forContext("nodeAccessor"); class NodeAccessor { private nodes: NodeInfo[]; private settings: NodeAccessorSettings; @@ -21,7 +22,7 @@ class NodeAccessor { } private async saveNodeStatus() { - console.log("Saving node status..."); + logger.info("Saving node status..."); const statuses = this.nodes.map(x => x.getSavedNodeInfo()); await fs.promises.writeFile(savedNodeInfoPath, JSON.stringify(statuses, null, 2)); @@ -35,13 +36,13 @@ class NodeAccessor { public async loadNodeStatus() { if (!fs.existsSync(savedNodeInfoPath)) { - console.log("No saved node status found"); + logger.info("No saved node status found"); await this.refetchNodeStatus(); await this.saveNodeStatus(); return; } - console.log("Loading saved node status..."); + logger.info("Loading saved node status..."); const file = await fs.promises.readFile(savedNodeInfoPath, "utf-8"); const savedNodes = JSON.parse(file) as SavedNodeInfo[]; diff --git a/apps/indexer/src/chain/statsProcessor.ts b/apps/indexer/src/chain/statsProcessor.ts index 25f30b0c2..a157109f1 100644 --- a/apps/indexer/src/chain/statsProcessor.ts +++ b/apps/indexer/src/chain/statsProcessor.ts @@ -2,6 +2,7 @@ import { activeChain } from "@akashnetwork/database/chainDefinitions"; import { Block, Message } from "@akashnetwork/database/dbSchemas"; import { AkashMessage } from "@akashnetwork/database/dbSchemas/akash"; import { Transaction } from "@akashnetwork/database/dbSchemas/base"; +import { LoggerService } from "@akashnetwork/logging"; import { fromBase64 } from "@cosmjs/encoding"; import { decodeTxRaw } from "@cosmjs/proto-signing"; import { sha256 } from "js-sha256"; @@ -16,11 +17,13 @@ import { decodeMsg } from "@src/shared/utils/protobuf"; import { setMissingBlock } from "./chainSync"; import { getGenesis } from "./genesisImporter"; +const logger = LoggerService.forContext("statsProcessor"); + class StatsProcessor { private cacheInitialized: boolean = false; public async rebuildStatsTables() { - console.log('Setting "isProcessed" to false'); + logger.info('Setting "isProcessed" to false'); await Message.update( { isProcessed: false, @@ -41,7 +44,7 @@ class StatsProcessor { { where: { isProcessed: true } } ); - console.log("Rebuilding stats tables..."); + logger.info("Rebuilding stats tables..."); for (const indexer of activeIndexers) { await indexer.recreateTables(); @@ -58,7 +61,7 @@ class StatsProcessor { } public async processMessages() { - console.log("Querying unprocessed messages..."); + logger.info("Querying unprocessed messages..."); const shouldProcessEveryBlocks = activeIndexers.some(indexer => indexer.runForEveryBlocks); @@ -78,7 +81,7 @@ class StatsProcessor { const hasNewBlocks = !previousProcessedBlock || maxDbHeight > previousProcessedBlock.height; if (!hasNewBlocks) { - console.log("No new blocks to process"); + logger.info("No new blocks to process"); return; } @@ -94,7 +97,7 @@ class StatsProcessor { let firstBlockToProcess = firstUnprocessedHeight; let lastBlockToProcess = Math.min(maxDbHeight, firstBlockToProcess + groupSize, lastBlockToSync); while (firstBlockToProcess <= Math.min(maxDbHeight, lastBlockToSync)) { - console.log(`Loading blocks ${firstBlockToProcess} to ${lastBlockToProcess}`); + logger.info(`Loading blocks ${firstBlockToProcess} to ${lastBlockToProcess}`); const getBlocksTimer = benchmark.startTimer("getBlocks"); const blocks = await Block.findAll({ @@ -150,7 +153,7 @@ class StatsProcessor { decodeTimer.end(); for (const msg of transaction.messages) { - console.log(`Processing message ${msg.type} - Block #${block.height}`); + logger.info(`Processing message ${msg.type} - Block #${block.height}`); const encodedMessage = decodedTx.body.messages[msg.index].value; diff --git a/apps/indexer/src/db/buildDatabase.ts b/apps/indexer/src/db/buildDatabase.ts index fc252d7f9..579561b75 100644 --- a/apps/indexer/src/db/buildDatabase.ts +++ b/apps/indexer/src/db/buildDatabase.ts @@ -2,19 +2,22 @@ import { activeChain } from "@akashnetwork/database/chainDefinitions"; import { Block, Message } from "@akashnetwork/database/dbSchemas"; import { Day, Transaction } from "@akashnetwork/database/dbSchemas/base"; import { MonitoredValue } from "@akashnetwork/database/dbSchemas/base/monitoredValue"; +import { LoggerService } from "@akashnetwork/logging"; import { getGenesis } from "@src/chain/genesisImporter"; import { indexers } from "@src/indexers"; import { ExecutionMode, executionMode } from "@src/shared/constants"; import { sequelize } from "./dbConnection"; +const logger = LoggerService.forContext("buildDatabase"); + /** * Initiate database schema */ export const initDatabase = async () => { - console.log(`Connecting to db (${sequelize.config.host}/${sequelize.config.database})...`); + logger.info(`Connecting to db (${sequelize.config.host}/${sequelize.config.database})...`); await sequelize.authenticate(); - console.log("Connection has been established successfully."); + logger.info("Connection has been established successfully."); if (executionMode === ExecutionMode.RebuildAll) { await Day.drop({ cascade: true }); @@ -41,7 +44,7 @@ export const initDatabase = async () => { if (!activeChain.startHeight) { const firstBlock = await Block.findOne(); if (!firstBlock) { - console.log("First time syncing, seeding from genesis file..."); + logger.info("First time syncing, seeding from genesis file..."); const genesis = await getGenesis(); for (const indexer of indexers) { diff --git a/apps/indexer/src/db/keybaseProvider.ts b/apps/indexer/src/db/keybaseProvider.ts index 4d74e7963..b4aa98f0d 100644 --- a/apps/indexer/src/db/keybaseProvider.ts +++ b/apps/indexer/src/db/keybaseProvider.ts @@ -1,7 +1,10 @@ import { Validator } from "@akashnetwork/database/dbSchemas/base"; +import { LoggerService } from "@akashnetwork/logging"; import fetch from "node-fetch"; import { Op } from "sequelize"; +const logger = LoggerService.forContext("keybaseProvider"); + export async function fetchValidatorKeybaseInfos() { const validators = await Validator.findAll({ where: { @@ -12,11 +15,11 @@ export async function fetchValidatorKeybaseInfos() { const requests = validators.map(async validator => { try { if (!/^[A-F0-9]{16}$/.test(validator.identity)) { - console.warn("Invalid identity " + validator.identity + " for validator " + validator.operatorAddress); + logger.warn("Invalid identity " + validator.identity + " for validator " + validator.operatorAddress); return Promise.resolve(); } - console.log("Fetching keybase info for " + validator.operatorAddress); + logger.info("Fetching keybase info for " + validator.operatorAddress); const response = await fetch(`https://keybase.io/_/api/1.0/user/lookup.json?key_suffix=${validator.identity}`); if (response.status === 200) { @@ -31,7 +34,7 @@ export async function fetchValidatorKeybaseInfos() { await validator.save(); } catch (err) { - console.error("Error while fetching keybase info for " + validator.operatorAddress); + logger.error("Error while fetching keybase info for " + validator.operatorAddress); throw err; } }); diff --git a/apps/indexer/src/db/priceHistoryProvider.ts b/apps/indexer/src/db/priceHistoryProvider.ts index 8ae5422e7..5b38f071b 100644 --- a/apps/indexer/src/db/priceHistoryProvider.ts +++ b/apps/indexer/src/db/priceHistoryProvider.ts @@ -1,5 +1,6 @@ import { activeChain } from "@akashnetwork/database/chainDefinitions"; import { Day } from "@akashnetwork/database/dbSchemas/base"; +import { LoggerService } from "@akashnetwork/logging"; import { isSameDay } from "date-fns"; import fetch from "node-fetch"; @@ -9,15 +10,17 @@ interface PriceHistoryResponse { total_volumes: Array>; } +const logger = LoggerService.forContext("priceHistoryProvider"); + export const syncPriceHistory = async () => { if (!activeChain.coinGeckoId) { - console.log("No coin gecko id defined for this chain. Skipping price history sync."); + logger.info("No coin gecko id defined for this chain. Skipping price history sync."); return; } const endpointUrl = `https://api.coingecko.com/api/v3/coins/${activeChain.coinGeckoId}/market_chart?vs_currency=usd&days=360`; - console.log("Fetching latest market data from " + endpointUrl); + logger.info("Fetching latest market data from " + endpointUrl); const response = await fetch(endpointUrl); const data: PriceHistoryResponse = await response.json(); @@ -26,7 +29,7 @@ export const syncPriceHistory = async () => { price: pDate[1] })); - console.log(`There are ${apiPrices.length} prices to update.`); + logger.info(`There are ${apiPrices.length} prices to update.`); const days = await Day.findAll(); diff --git a/apps/indexer/src/index.ts b/apps/indexer/src/index.ts index e6963fa69..7146cfb64 100644 --- a/apps/indexer/src/index.ts +++ b/apps/indexer/src/index.ts @@ -1,6 +1,7 @@ import "@akashnetwork/env-loader"; import { activeChain, chainDefinitions } from "@akashnetwork/database/chainDefinitions"; +import { LoggerService } from "@akashnetwork/logging"; import * as Sentry from "@sentry/node"; import express from "express"; @@ -27,6 +28,8 @@ const app = express(); const { PORT = 3079 } = process.env; +const logger = LoggerService.forContext("indexer-app"); + Sentry.init({ dsn: env.SENTRY_DSN, environment: env.NODE_ENV, @@ -47,7 +50,7 @@ Sentry.setTag("chain", env.ACTIVE_CHAIN); const scheduler = new Scheduler({ healthchecksEnabled: env.HEALTH_CHECKS_ENABLED === "true", errorHandler: (task, error) => { - console.error(`Task "${task.name}" failed: `, error); + logger.error(`Task "${task.name}" failed: ${error}`); Sentry.captureException(error, { tags: { task: task.name } }); } }); @@ -125,7 +128,7 @@ function startScheduler() { async function initApp() { try { if (env.STANDBY) { - console.log("STANDBY mode enabled. Doing nothing."); + logger.debug("STANDBY mode enabled. Doing nothing."); // eslint-disable-next-line no-constant-condition while (true) { await sleep(5_000); @@ -152,10 +155,10 @@ async function initApp() { } app.listen(PORT, () => { - console.log("server started at http://localhost:" + PORT); + logger.debug("server started at http://localhost:" + PORT); }); } catch (err) { - console.error("Error while initializing app", err); + logger.debug(`Error while initializing app ${err}`); Sentry.captureException(err); } diff --git a/apps/indexer/src/indexers/akashStatsIndexer.ts b/apps/indexer/src/indexers/akashStatsIndexer.ts index ec0715b7a..b8ac25873 100644 --- a/apps/indexer/src/indexers/akashStatsIndexer.ts +++ b/apps/indexer/src/indexers/akashStatsIndexer.ts @@ -17,6 +17,7 @@ import { ProviderSnapshotNodeGPU } from "@akashnetwork/database/dbSchemas/akash"; import { AkashBlock as Block, AkashMessage as Message } from "@akashnetwork/database/dbSchemas/akash"; +import { LoggerService } from "@akashnetwork/logging"; import { Op, Transaction as DbTransaction } from "sequelize"; import * as uuid from "uuid"; @@ -43,6 +44,8 @@ const denomMapping = { "ibc/170C677610AC31DF0904FFE09CD3B5C657492170E7E52372E48756B71E56F2F1": "uusdc" // USDC on Mainnet }; +const logger = LoggerService.forContext("akashStatsIndexer"); + export class AkashStatsIndexer extends Indexer { private totalLeaseCount = 0; private activeProviderCount = 0; @@ -156,7 +159,7 @@ export class AkashStatsIndexer extends Indexer { this.totalResources = await this.getTotalResources(null, firstBlockHeight); this.predictedClosedHeights = await this.getFuturePredictedCloseHeights(firstBlockHeight, null); - console.log("Fetching deployment id cache..."); + logger.info("Fetching deployment id cache..."); const existingDeployments = await Deployment.findAll({ attributes: ["id", "owner", "dseq"] @@ -899,7 +902,7 @@ export class AkashStatsIndexer extends Indexer { const provider = await Provider.findOne({ where: { owner: decodedMessage.owner }, transaction: blockGroupTransaction }); if (!provider) { - console.warn(`Provider ${decodedMessage.owner} not found`); + logger.warn(`Provider ${decodedMessage.owner} not found`); return; } diff --git a/apps/indexer/src/indexers/messageAddressesIndexer.ts b/apps/indexer/src/indexers/messageAddressesIndexer.ts index 01be8ab37..8a2f6db0f 100644 --- a/apps/indexer/src/indexers/messageAddressesIndexer.ts +++ b/apps/indexer/src/indexers/messageAddressesIndexer.ts @@ -1,5 +1,6 @@ import { activeChain } from "@akashnetwork/database/chainDefinitions"; import { AddressReference, Message, Transaction } from "@akashnetwork/database/dbSchemas/base"; +import { LoggerService } from "@akashnetwork/logging"; import { toBech32 } from "@cosmjs/encoding"; import { DecodedTxRaw, decodePubkey } from "@cosmjs/proto-signing"; import { MsgMultiSend, MsgSend } from "cosmjs-types/cosmos/bank/v1beta1/tx"; @@ -13,6 +14,8 @@ import { getAmountFromCoin, getAmountFromCoinArray } from "@src/shared/utils/coi import * as benchmark from "../shared/utils/benchmark"; import { Indexer } from "./indexer"; +const logger = LoggerService.forContext("messageAddressesIndexer"); + export class MessageAddressesIndexer extends Indexer { constructor() { super(); @@ -117,7 +120,7 @@ export class MessageAddressesIndexer extends Indexer { const signerInfos = tx.authInfo.signerInfos; if (signerInfos.length !== 1) { - console.warn("More than one signer in tx: " + hash); + logger.warn("More than one signer in tx: " + hash); } let multisigThreshold: number | null = null; @@ -146,7 +149,7 @@ export class MessageAddressesIndexer extends Indexer { } catch (e) { // TEMPORARY FIX FOR TX 63CBF2B5C23E30B774F5072F625E3400603C95B993F0428E375F8078EAC95B17 if (signerInfo.publicKey.typeUrl === "/cosmos.crypto.multisig.LegacyAminoPubKey") { - console.log("FAILED TO DECODE MULTISIG PUBKEY: ", hash); + logger.info(`FAILED TO DECODE MULTISIG PUBKEY: ${hash}`); return { multisigThreshold: null, addresses: [] }; } diff --git a/apps/indexer/src/indexers/validatorIndexer.ts b/apps/indexer/src/indexers/validatorIndexer.ts index 6537169da..c30092b69 100644 --- a/apps/indexer/src/indexers/validatorIndexer.ts +++ b/apps/indexer/src/indexers/validatorIndexer.ts @@ -1,5 +1,6 @@ import { activeChain } from "@akashnetwork/database/chainDefinitions"; import { Message, Validator } from "@akashnetwork/database/dbSchemas/base"; +import { LoggerService } from "@akashnetwork/logging"; import { fromBase64, fromBech32, toBech32, toHex } from "@cosmjs/encoding"; import { MsgCreateValidator, MsgEditValidator } from "cosmjs-types/cosmos/staking/v1beta1/tx"; import { Transaction as DbTransaction } from "sequelize"; @@ -10,6 +11,8 @@ import { pubkeyToRawAddress } from "@src/shared/utils/addresses"; import * as benchmark from "../shared/utils/benchmark"; import { Indexer } from "./indexer"; +const logger = LoggerService.forContext("validatorIndexers"); + export class ValidatorIndexer extends Indexer { msgHandlers: { [key: string]: (msgSubmitProposal: any, height: number, blockGroupTransaction: DbTransaction, msg: Message) => Promise }; @@ -37,7 +40,7 @@ export class ValidatorIndexer extends Indexer { await sequelize.transaction(async dbTransaction => { for (const validator of validators) { - console.log("Creating validator :" + validator.operator_address); + logger.info("Creating validator :" + validator.operator_address); await this.createValidatorFromGenesis(validator, dbTransaction); } @@ -48,7 +51,7 @@ export class ValidatorIndexer extends Indexer { .filter(x => x["@type"] === "/cosmos.staking.v1beta1.MsgCreateValidator") as IGentxCreateValidator[]; for (const msg of msgs) { - console.log("Creating validator :" + msg.validator_address); + logger.info("Creating validator :" + msg.validator_address); await this.createValidatorFromGentx(msg, dbTransaction); } @@ -114,10 +117,10 @@ export class ValidatorIndexer extends Indexer { const existingValidator = await Validator.findOne({ where: { operatorAddress: decodedMessage.validatorAddress }, transaction: dbTransaction }); if (!existingValidator) { - console.log(`Creating validator ${decodedMessage.validatorAddress}`); + logger.info(`Creating validator ${decodedMessage.validatorAddress}`); await Validator.create(validatorInfo, { transaction: dbTransaction }); } else { - console.log(`Updating validator ${decodedMessage.validatorAddress}`); + logger.info(`Updating validator ${decodedMessage.validatorAddress}`); await Validator.update(validatorInfo, { where: { operatorAddress: decodedMessage.validatorAddress }, transaction: dbTransaction }); } } diff --git a/apps/indexer/src/monitors/addressBalanceMonitor.ts b/apps/indexer/src/monitors/addressBalanceMonitor.ts index f0fa9c599..6cb397354 100644 --- a/apps/indexer/src/monitors/addressBalanceMonitor.ts +++ b/apps/indexer/src/monitors/addressBalanceMonitor.ts @@ -1,7 +1,10 @@ import { activeChain } from "@akashnetwork/database/chainDefinitions"; import { MonitoredValue } from "@akashnetwork/database/dbSchemas/base/monitoredValue"; +import { LoggerService } from "@akashnetwork/logging"; import axios from "axios"; +const logger = LoggerService.forContext("addressBalanceMonitor"); + export class AddressBalanceMonitor { async run() { const monitoredValues = await MonitoredValue.findAll({ @@ -12,7 +15,7 @@ export class AddressBalanceMonitor { await Promise.allSettled(monitoredValues.map(x => this.updateValue(x))); - console.log("Refreshed balances for " + monitoredValues.length + " addresses."); + logger.info("Refreshed balances for " + monitoredValues.length + " addresses."); } async updateValue(monitoredValue: MonitoredValue) { diff --git a/apps/indexer/src/monitors/deploymentBalanceMonitor.ts b/apps/indexer/src/monitors/deploymentBalanceMonitor.ts index 95bde473f..25bac2d1d 100644 --- a/apps/indexer/src/monitors/deploymentBalanceMonitor.ts +++ b/apps/indexer/src/monitors/deploymentBalanceMonitor.ts @@ -1,8 +1,11 @@ import { activeChain } from "@akashnetwork/database/chainDefinitions"; import { MonitoredValue } from "@akashnetwork/database/dbSchemas/base/monitoredValue"; +import { LoggerService } from "@akashnetwork/logging"; import * as Sentry from "@sentry/node"; import axios from "axios"; +const logger = LoggerService.forContext("deploymentBalanceMonitor"); + export class DeploymentBalanceMonitor { async run() { const monitoredValues = await MonitoredValue.findAll({ @@ -13,7 +16,7 @@ export class DeploymentBalanceMonitor { await Promise.allSettled(monitoredValues.map(x => this.updateValue(x))); - console.log("Refreshed balances for " + monitoredValues.length + " deployments."); + logger.info("Refreshed balances for " + monitoredValues.length + " deployments."); } async updateValue(monitoredValue: MonitoredValue) { @@ -28,7 +31,7 @@ export class DeploymentBalanceMonitor { monitoredValue.lastUpdateDate = new Date(); await monitoredValue.save(); } catch (err) { - console.error(err); + logger.error(err); Sentry.captureException(err, { tags: { target: monitoredValue.target } }); } diff --git a/apps/indexer/src/providers/ipLocationProvider.ts b/apps/indexer/src/providers/ipLocationProvider.ts index f353f6777..cfbc1d715 100644 --- a/apps/indexer/src/providers/ipLocationProvider.ts +++ b/apps/indexer/src/providers/ipLocationProvider.ts @@ -1,10 +1,12 @@ import { Provider } from "@akashnetwork/database/dbSchemas/akash"; +import { LoggerService } from "@akashnetwork/logging"; import axios from "axios"; import dns from "dns/promises"; import { sleep } from "@src/shared/utils/delay"; const IpLookupDelay = 2_000; +const logger = LoggerService.forContext("ipLocationProvider"); async function getIpLocation(ip: string) { const response = await axios.get(`http://ip-api.com/json/${ip}`); @@ -30,7 +32,7 @@ export async function updateProvidersLocation() { } }); - console.log(`${providers.length} providers to lookup`); + logger.info(`${providers.length} providers to lookup`); for (const provider of providers) { try { @@ -38,20 +40,20 @@ export async function updateProvidersLocation() { const ips = await dns.resolve4(parsedUri.hostname); if (ips.length === 0) { - console.log(`Could not resolve ip for ${provider.hostUri}`); + logger.info(`Could not resolve ip for ${provider.hostUri}`); continue; } const ip = ips.sort()[0]; // Always use the first ip if (provider.ip === ip) { - console.log(`Ip for ${provider.hostUri} is the same`); + logger.info(`Ip for ${provider.hostUri} is the same`); continue; } const location = await getIpLocation(ip); - console.log(`${provider.hostUri} ip lookup: ${location.region}, ${location.country}`); + logger.info(`${provider.hostUri} ip lookup: ${location.region}, ${location.country}`); if (location) { await provider.update({ @@ -67,7 +69,7 @@ export async function updateProvidersLocation() { await sleep(IpLookupDelay); } catch (e) { - console.error(e); + logger.error(e); } } } diff --git a/apps/indexer/src/providers/providerStatusProvider.ts b/apps/indexer/src/providers/providerStatusProvider.ts index 65f9dfd6f..f9ea83acb 100644 --- a/apps/indexer/src/providers/providerStatusProvider.ts +++ b/apps/indexer/src/providers/providerStatusProvider.ts @@ -7,6 +7,7 @@ import { ProviderSnapshotNodeGPU, ProviderSnapshotStorage } from "@akashnetwork/database/dbSchemas/akash"; +import { LoggerService } from "@akashnetwork/logging"; import { asyncify, eachLimit } from "async"; import axios from "axios"; import { add, differenceInDays, differenceInHours, differenceInMinutes, isSameDay } from "date-fns"; @@ -23,6 +24,7 @@ import { ProviderStatusInfo, ProviderVersionEndpointResponseType } from "./statu const ConcurrentStatusCall = 10; const StatusCallTimeout = 10_000; // 10 seconds const UptimeCheckIntervalSeconds = 15 * 60; // 15 minutes +const logger = LoggerService.forContext("providerStatusProvider"); export async function syncProvidersInfo() { const providers = await Provider.findAll({ @@ -75,7 +77,7 @@ export async function syncProvidersInfo() { await saveProviderStatus(provider, providerStatus, akashVersion, cosmosVersion, errorMessage); doneCount++; - console.log("Fetched provider info: " + doneCount + " / " + providers.length); + logger.info("Fetched provider info: " + doneCount + " / " + providers.length); }) ); } diff --git a/apps/indexer/src/scheduler.ts b/apps/indexer/src/scheduler.ts index 9cbf45384..e88ab1697 100644 --- a/apps/indexer/src/scheduler.ts +++ b/apps/indexer/src/scheduler.ts @@ -1,8 +1,11 @@ +import { LoggerService } from "@akashnetwork/logging"; import humanInterval from "human-interval"; import fetch from "node-fetch"; import { getPrettyTime } from "./shared/utils/date"; +const logger = LoggerService.forContext("scheduler"); + class TaskDef { name: string; function: () => Promise; @@ -20,7 +23,7 @@ class TaskDef { constructor(name: string, fn: () => Promise, interval: number, runAtStart?: boolean, healthchecksConfig?: HealthchecksConfig) { if (healthchecksConfig && !healthchecksConfig.id) { - console.warn("Healthchecks config provided without an id."); + logger.warn("Healthchecks config provided without an id."); } this.name = name; @@ -48,7 +51,7 @@ export class Scheduler { constructor(config?: SchedulerConfig) { this.config = { ...config, - errorHandler: config?.errorHandler || ((task, err) => console.error(`Task "${task.name}" failed: ${err}`)) + errorHandler: config?.errorHandler || ((task, err) => logger.error(`Task "${task.name}" failed: ${err}`)) }; } @@ -68,7 +71,7 @@ export class Scheduler { } const intervalMs = typeof interval === "string" ? humanInterval(interval) : interval; - console.log(`Registered task "${name}" to run every ${getPrettyTime(intervalMs)}`); + logger.info(`Registered task "${name}" to run every ${getPrettyTime(intervalMs)}`); this.tasks.set(name, new TaskDef(name, fn, intervalMs, runAtStart, healthchecksConfig)); } @@ -82,11 +85,11 @@ export class Scheduler { setInterval(() => { const runningTask = this.tasks.get(task.name); if (runningTask.runningPromise) { - console.log(`Skipping task "${task.name}" because it is already running`); + logger.info(`Skipping task "${task.name}" because it is already running`); return; } - console.log(`Starting task "${task.name}"`); + logger.info(`Starting task "${task.name}"`); this.runTask(runningTask); }, task.interval); } @@ -101,7 +104,7 @@ export class Scheduler { runningTask.runningPromise = runningTask .function() .then(() => { - console.log(`Task "${runningTask.name}" completed successfully`); + logger.info(`Task "${runningTask.name}" completed successfully`); runningTask.successfulRunCount++; if (this.config.healthchecksEnabled && runningTask.healthchecksConfig) { @@ -126,7 +129,7 @@ export class Scheduler { try { await fetch(`https://hc-ping.com/${runningTask.healthchecksConfig.id}/start`); } catch (err) { - console.error(err); + logger.error(err); } } @@ -134,7 +137,7 @@ export class Scheduler { try { await fetch(`https://hc-ping.com/${runningTask.healthchecksConfig.id}`); } catch (err) { - console.error(err); + logger.error(err); } } @@ -142,7 +145,7 @@ export class Scheduler { try { await fetch(`https://hc-ping.com/${runningTask.healthchecksConfig.id}/fail`); } catch (err) { - console.error(err); + logger.error(err); } } diff --git a/apps/indexer/src/shared/utils/benchmark.ts b/apps/indexer/src/shared/utils/benchmark.ts index da8ca5214..7a0a7fa02 100644 --- a/apps/indexer/src/shared/utils/benchmark.ts +++ b/apps/indexer/src/shared/utils/benchmark.ts @@ -1,3 +1,4 @@ +import { LoggerService } from "@akashnetwork/logging"; import { performance } from "perf_hooks"; import { getPrettyTime } from "./date"; @@ -13,6 +14,7 @@ type BenchmarkDetails = { }; const benchmarkTimes: { [key: string]: BenchmarkDetails } = {}; +const logger = LoggerService.forContext("benchmark"); let firstTime = null; let lastTime = null; let activeTimer = null; @@ -107,7 +109,7 @@ export function displayTimes(): void { } export function displayTimesForGroup(group: string) { - console.log("Group: " + (group || "ROOT")); + logger.info("Group: " + (group || "ROOT")); const fullTime = group ? Object.values(benchmarkTimes) diff --git a/apps/indexer/src/shared/utils/download.ts b/apps/indexer/src/shared/utils/download.ts index fcd5c8e92..3bacf05eb 100644 --- a/apps/indexer/src/shared/utils/download.ts +++ b/apps/indexer/src/shared/utils/download.ts @@ -1,3 +1,4 @@ +import { LoggerService } from "@akashnetwork/logging"; import fs from "fs"; import http from "https"; import { basename } from "path"; @@ -5,6 +6,7 @@ import { basename } from "path"; import { bytesToHumanReadableSize } from "./files"; const progressLogThrottle = 1000; +const logger = LoggerService.forContext("download"); export async function download(url: string, path: string) { const uri = new URL(url); @@ -24,13 +26,13 @@ export async function download(url: string, path: string) { downloaded += chunk.length; const percent = ((100.0 * downloaded) / len).toFixed(2); if (Date.now() - lastProgressLog > progressLogThrottle) { - console.log(`${uri.pathname} - Downloading ${percent}% ${bytesToHumanReadableSize(downloaded)}`); + logger.info(`${uri.pathname} - Downloading ${percent}% ${bytesToHumanReadableSize(downloaded)}`); lastProgressLog = Date.now(); } }) .on("end", function () { file.end(); - console.log(`${uri.pathname} downloaded to: ${path}`); + logger.info(`${uri.pathname} downloaded to: ${path}`); resolve(); }) .on("error", function (err) { diff --git a/apps/indexer/src/shared/utils/files.ts b/apps/indexer/src/shared/utils/files.ts index c853cd41c..c0bdb0f78 100644 --- a/apps/indexer/src/shared/utils/files.ts +++ b/apps/indexer/src/shared/utils/files.ts @@ -1,3 +1,7 @@ +import { LoggerService } from "@akashnetwork/logging"; + +const logger = LoggerService.forContext("files"); + export const bytesToHumanReadableSize = function (bytes) { const sizes = ["Bytes", "KB", "MB", "GB", "TB"]; @@ -47,7 +51,7 @@ export function parseSizeStr(str: string) { return parseFloat(str); } } catch (err) { - console.error(err); + logger.error(err); throw new Error("Error while parsing size: " + str); } } @@ -74,7 +78,7 @@ export function parseDecimalKubernetesString(str: string) { return parseFloat(str); } } catch (err) { - console.error(err); + logger.error(err); throw new Error("Error while parsing size: " + str); } } diff --git a/apps/indexer/src/shared/utils/query.ts b/apps/indexer/src/shared/utils/query.ts index 815b5d6d8..35bea54dc 100644 --- a/apps/indexer/src/shared/utils/query.ts +++ b/apps/indexer/src/shared/utils/query.ts @@ -1,3 +1,7 @@ +import { LoggerService } from "@akashnetwork/logging"; + +const logger = LoggerService.forContext("query"); + export async function loadWithPagination(baseUrl: string, dataKey: string, limit: number) { let items = []; let nextKey = null; @@ -9,7 +13,7 @@ export async function loadWithPagination(baseUrl: string, dataKey: string, limit if (nextKey) { queryUrl += "&pagination.key=" + encodeURIComponent(nextKey); } - console.log(`Querying ${dataKey} [${callCount}] from : ${queryUrl}`); + logger.info(`Querying ${dataKey} [${callCount}] from : ${queryUrl}`); const response = await fetch(queryUrl); const data = await response.json(); @@ -21,7 +25,7 @@ export async function loadWithPagination(baseUrl: string, dataKey: string, limit nextKey = data.pagination.next_key; callCount++; - console.log(`Got ${items.length} of ${totalCount}`); + logger.info(`Got ${items.length} of ${totalCount}`); } while (nextKey); return items.filter(item => item); diff --git a/apps/indexer/src/tasks/providerUptimeTracker.ts b/apps/indexer/src/tasks/providerUptimeTracker.ts index 5d677ba41..5dc927c78 100644 --- a/apps/indexer/src/tasks/providerUptimeTracker.ts +++ b/apps/indexer/src/tasks/providerUptimeTracker.ts @@ -1,11 +1,14 @@ import { Provider } from "@akashnetwork/database/dbSchemas/akash"; +import { LoggerService } from "@akashnetwork/logging"; import { secondsInDay } from "date-fns"; import { QueryTypes } from "sequelize"; import { sequelize } from "@src/db/dbConnection"; +const logger = LoggerService.forContext("providerUptimeTracker"); + export async function updateProviderUptime() { - console.log("Updating provider uptimes."); + logger.info("Updating provider uptimes."); console.time("updateProviderUptimes"); const providers = await Provider.findAll(); diff --git a/apps/indexer/src/tasks/usdSpendingTracker.ts b/apps/indexer/src/tasks/usdSpendingTracker.ts index cffaa5e7b..c67d99fd6 100644 --- a/apps/indexer/src/tasks/usdSpendingTracker.ts +++ b/apps/indexer/src/tasks/usdSpendingTracker.ts @@ -1,9 +1,12 @@ import { AkashBlock } from "@akashnetwork/database/dbSchemas/akash"; import { Day } from "@akashnetwork/database/dbSchemas/base"; +import { LoggerService } from "@akashnetwork/logging"; import { Op } from "sequelize"; import { sequelize } from "@src/db/dbConnection"; +const logger = LoggerService.forContext("usdSpendingTracker"); + export async function updateUsdSpending() { // Check if there is a day flagged for update (akt price changed) let firstDayToRefresh = await Day.findOne({ @@ -28,7 +31,7 @@ export async function updateUsdSpending() { } if (!firstDayToRefresh) { - console.log("No days to update usd spending."); + logger.info("No days to update usd spending."); return; } @@ -40,10 +43,10 @@ export async function updateUsdSpending() { order: [["date", "ASC"]] }); - console.log(`There are ${days.length} days to update USD spending.`); + logger.info(`There are ${days.length} days to update USD spending.`); for (const day of days) { - console.log(`Updating usd spending for blocks of day ${day.date.toISOString().substring(0, 10)}... `); + logger.info(`Updating usd spending for blocks of day ${day.date.toISOString().substring(0, 10)}... `); let lastBlockOfPreviousDay: AkashBlock | null = null; if (day.firstBlockHeight > 1) { @@ -72,7 +75,7 @@ export async function updateUsdSpending() { } ); - console.log("Updated " + affectedCount + " blocks."); + logger.info("Updated " + affectedCount + " blocks."); if (day.aktPriceChanged) { day.aktPriceChanged = false;