diff --git a/apps/main-chain/src/amplifyClient.ts b/apps/main-chain/src/amplifyClient.ts index 1f0ccc89..d7de826f 100644 --- a/apps/main-chain/src/amplifyClient.ts +++ b/apps/main-chain/src/amplifyClient.ts @@ -5,5 +5,7 @@ const amplifyClient = new AmplifyClient( process.env.AWS_APPSYNC_KEY || '', ); +// @TODO stop exporting mutate and query like this because "this.methodName" loses context const { mutate, query } = amplifyClient; export { mutate, query }; +export default amplifyClient; diff --git a/apps/main-chain/src/blockListener.ts b/apps/main-chain/src/blockListener.ts deleted file mode 100644 index 51c0ac92..00000000 --- a/apps/main-chain/src/blockListener.ts +++ /dev/null @@ -1,65 +0,0 @@ -import { getLastBlockNumber } from '~utils'; -import { Block, BlockWithTransactions, EthersObserverEvents } from '~types'; -import rpcProvider from '~provider'; - -import { processNextBlock } from '~blockProcessor'; -import { output } from '@joincolony/utils'; - -/** - * Map storing blocks that have been either picked up by the block listener - * or missed blocks tracking - * Blocks are removed once processed by a call to .delete in the blockProcessor - */ -export const blocksMap = new Map(); -let latestSeenBlockNumber = 0; - -export const getLatestSeenBlockNumber = (): number => latestSeenBlockNumber; -// export const blocksMap = new Map(); - -export const startBlockListener = (): void => { - rpcProvider - .getProviderInstance() - .on(EthersObserverEvents.Block, async (blockNumber: number) => { - try { - // For now, we just track that this block exists. - latestSeenBlockNumber = Math.max(latestSeenBlockNumber, blockNumber); - - output(`Block ${blockNumber} added to the queue`); - - processNextBlock(); - } catch (error) { - throw new Error( - `Observed block ${blockNumber} but failed to get its data: ${error}`, - ); - } - }); - - output('Block listener started'); - - trackMissedBlocks(); -}; - -/** - * Function fetching all the blocks between the last processed block and the current block - * that happened when ingestor was not actively listening - */ -const trackMissedBlocks = async (): Promise => { - const lastBlockNumber = getLastBlockNumber(); - const currentBlockNumber = await rpcProvider - .getProviderInstance() - .getBlockNumber(); - - if (lastBlockNumber >= currentBlockNumber) { - return; - } - - output( - `Will need to process blocks from block ${ - lastBlockNumber + 1 - } to ${currentBlockNumber}`, - ); - - latestSeenBlockNumber = Math.max(latestSeenBlockNumber, currentBlockNumber); - - processNextBlock(); -}; diff --git a/apps/main-chain/src/blockManager.ts b/apps/main-chain/src/blockManager.ts new file mode 100644 index 00000000..bbaac0e0 --- /dev/null +++ b/apps/main-chain/src/blockManager.ts @@ -0,0 +1,8 @@ +import { BlockManager } from '@joincolony/blocks'; +import eventManager from '~eventManager'; +import rpcProvider from '~provider'; +import statsManager from '~statsManager'; + +const blockManager = new BlockManager(eventManager, rpcProvider, statsManager); + +export default blockManager; diff --git a/apps/main-chain/src/blockProcessor.ts b/apps/main-chain/src/blockProcessor.ts deleted file mode 100644 index dd663f0b..00000000 --- a/apps/main-chain/src/blockProcessor.ts +++ /dev/null @@ -1,225 +0,0 @@ -import { Log } from '@ethersproject/abstract-provider'; -import { blocksMap, getLatestSeenBlockNumber } from '~blockListener'; -import eventManager from '~eventManager'; -import { getInterfaceByListener } from '~interfaces'; -import rpcProvider from '~provider'; - -import { - getLastBlockNumber, - mapLogToContractEvent, - setLastBlockNumber, -} from '~utils'; -import { output, verbose } from '@joincolony/utils'; -import { BLOCK_PAGING_SIZE } from '~constants'; - -let isProcessing = false; -const blockLogs = new Map(); -let timeNow = Date.now(); -let timePrev = 0; - -export const processNextBlock = async (): Promise => { - if (isProcessing) { - return; - } - - // Only allow one instance of the function to run at any given time - isProcessing = true; - - let lastBlockNumber = getLastBlockNumber(); - - // Process as many blocks as are available sequentially - while (lastBlockNumber < getLatestSeenBlockNumber()) { - const currentBlockNumber = lastBlockNumber + 1; - if (currentBlockNumber % BLOCK_PAGING_SIZE === 0) { - if (timePrev > 0) { - timePrev = timeNow; - timeNow = Date.now(); - output( - `Time taken for last ${BLOCK_PAGING_SIZE} blocks: ${ - timeNow - timePrev - }ms`, - ); - output( - `Estimated time to sync: ${ - ((timeNow - timePrev) * - (getLatestSeenBlockNumber() - getLastBlockNumber())) / - 1000 - }ms`, - ); - output( - `Overall progress: ${currentBlockNumber} / ${getLatestSeenBlockNumber()}`, - ); - } else { - timePrev = timeNow; - } - } - - if (!blockLogs.get(currentBlockNumber)) { - // BLOCK_PAGING_SIZE - 1 thanks to fenceposts - const nMoreBlocks = Math.min( - getLatestSeenBlockNumber() - currentBlockNumber, - BLOCK_PAGING_SIZE - 1, - ); - - verbose( - 'Querying for logs', - currentBlockNumber, - 'to', - currentBlockNumber + nMoreBlocks, - ); - - const logs = await rpcProvider.getProviderInstance().getLogs({ - fromBlock: currentBlockNumber, - toBlock: currentBlockNumber + nMoreBlocks, - }); - - verbose( - `Fetched ${logs.length} logs`, - currentBlockNumber, - 'to', - currentBlockNumber + nMoreBlocks, - ); - - // initialize blockLogs - for ( - let i = currentBlockNumber; - i <= currentBlockNumber + nMoreBlocks; - i += 1 - ) { - blockLogs.set(i, []); - } - - let logIndex = 0; - let pushingBlock = 0; - let pushingLogs: Log[] = []; - - logs.forEach((log) => { - // As we push logs in to blockLogs, check they're in order - // (They should be...) - if (log.blockNumber !== pushingBlock) { - if (pushingBlock > log.blockNumber) { - output( - `Blocks (that logs from query are in) are not monotonically increasing`, - ); - process.exit(1); - } - blockLogs.set(pushingBlock, [...pushingLogs]); - pushingBlock = log.blockNumber; - pushingLogs = []; - logIndex = 0; - } - if (log.logIndex !== logIndex) { - output(`Logs are out of order for block ${log.blockNumber}`); - process.exit(1); - } - pushingLogs.push(log); - logIndex += 1; - }); - // Push the logs in the last block - blockLogs.set(pushingBlock, [...pushingLogs]); - } - - // Get logs contained in the current block - const logs = blockLogs.get(currentBlockNumber); - if (!logs) { - throw new Error( - `Could not find logs for block ${currentBlockNumber}, but should have been fetched`, - ); - } - - /* - * Logic needed to account for blocks that get emmited, but which don't have the logs indexed yet - * This happens in networks with very fast block times, like arbitrum (<=250ms block times) - * See: https://github.com/ethers-io/ethers.js/issues/3486 - * - * Basically, the change that @area implemented here is to try and detect if a block actually has - * logs, but which don't get retrived using the `getLogs` call. - * If that happens, it means the block was emitted, but the logs weren't indexed yet, at which point - * we just short-circuit and re-process the block. - * We do this enough times, until the logs are actually indexed. - */ - if (logs.length === 0) { - verbose('No logs seen in block', currentBlockNumber); - // Check whether block actually has no logs - let block = blocksMap.get(currentBlockNumber); - if ( - !block || - (block.transactions as string[]).every((tx) => typeof tx === 'string') - ) { - block = await rpcProvider - .getProviderInstance() - .getBlockWithTransactions(currentBlockNumber); - // May as well save this block in the blocksMap in case it turns out we need it in mapLogToContractEvent - blocksMap.set(currentBlockNumber, block); - } - - let mustReindex = false; - for (const tx of block.transactions) { - if (typeof tx === 'string') { - throw Error('tx was a string, but should have been a TxResponse'); - } - const txReceipt = await rpcProvider - .getProviderInstance() - .getTransactionReceipt(tx.hash); - if (txReceipt.logs.length > 0) { - verbose( - `Proved ${currentBlockNumber} has logs, but weren't given any, will reindex`, - ); - mustReindex = true; - // Then the block has events, and they've not been indexed yet. - // We exit out of this handler, and wait until they've been indexed. - // We remove the empty array from blockLogs to cause the getLogs call to be made again - blockLogs.delete(currentBlockNumber); - // Now we've proved we're missing events, don't need to look at any other transactions in - // this block. - break; - } - } - if (mustReindex) { - continue; - } - } - - for (const log of logs) { - // Find listeners that match the log - const listeners = eventManager.getMatchingListeners( - log.topics, - log.address, - ); - if (!listeners.length) { - continue; - } - - for (const listener of listeners) { - // In order to parse the log, we need an ethers interface - const iface = getInterfaceByListener(listener); - if (!iface) { - output( - `Failed to get an interface for a log with listener type ${listener.type}`, - ); - continue; - } - - const event = await mapLogToContractEvent(log, iface); - if (!event) { - output( - `Failed to map log describing event ${listener.eventSignature} in transaction ${log.transactionHash} `, - ); - continue; - } - - // Call the handler in a blocking way to ensure events get processed sequentially - await listener.handler(event, listener); - } - } - - verbose('processed block', currentBlockNumber); - - lastBlockNumber = currentBlockNumber; - setLastBlockNumber(currentBlockNumber); - blockLogs.delete(currentBlockNumber); - blocksMap.delete(currentBlockNumber); - } - - isProcessing = false; -}; diff --git a/apps/main-chain/src/eventManager.ts b/apps/main-chain/src/eventManager.ts index 7b258a2f..b57b8481 100644 --- a/apps/main-chain/src/eventManager.ts +++ b/apps/main-chain/src/eventManager.ts @@ -1,5 +1,6 @@ import { EventManager } from '@joincolony/blocks'; +import rpcProvider from '~provider'; -const eventManager = new EventManager(); +const eventManager = new EventManager(rpcProvider); export default eventManager; diff --git a/apps/main-chain/src/handlers/colonies/colonyAdded.ts b/apps/main-chain/src/handlers/colonies/colonyAdded.ts index 2d089b7e..10ee43c4 100644 --- a/apps/main-chain/src/handlers/colonies/colonyAdded.ts +++ b/apps/main-chain/src/handlers/colonies/colonyAdded.ts @@ -11,10 +11,10 @@ import { import { coloniesSet } from '~stats'; import { ContractEvent, ContractEventsSignatures } from '~types'; import { - updateStats, createColonyFounderInitialRoleEntry, getAllRoleEventsFromTransaction, } from '~utils'; +import statsManager from '~statsManager'; import { getColonyContributorId } from '~utils/contributors'; import { tryFetchGraphqlQuery } from '~utils/graphql'; import { createUniqueColony } from './helpers/createUniqueColony'; @@ -50,8 +50,9 @@ export default async (event: ContractEvent): Promise => { /* * Add it to the Set */ + // @NOTE seems to not be working, is it borken on master too? coloniesSet.add(JSON.stringify({ colonyAddress, tokenAddress })); - await updateStats({ trackedColonies: coloniesSet.size }); + await statsManager.updateStats({ trackedColonies: coloniesSet.size }); output( 'Found new Colony:', diff --git a/apps/main-chain/src/handlers/expenditures/stagedPaymentReleased.ts b/apps/main-chain/src/handlers/expenditures/stagedPaymentReleased.ts index 507dc4b3..9f4724a5 100644 --- a/apps/main-chain/src/handlers/expenditures/stagedPaymentReleased.ts +++ b/apps/main-chain/src/handlers/expenditures/stagedPaymentReleased.ts @@ -1,7 +1,6 @@ import { utils } from 'ethers'; import { ExtensionEventListener } from '~eventListeners'; import { ColonyActionType } from '@joincolony/graphql'; -import { getInterfaceByListener } from '~interfaces'; import rpcProvider from '~provider'; import { ContractEventsSignatures, EventHandler } from '~types'; import { @@ -12,6 +11,7 @@ import { toNumber, writeActionFromEvent, } from '~utils'; +import eventManager from '~eventManager'; export const handleStagedPaymentReleased: EventHandler = async ( event, @@ -47,7 +47,7 @@ export const handleStagedPaymentReleased: EventHandler = async ( topics: [utils.id(ContractEventsSignatures.StagedPaymentReleased)], }); - const iface = getInterfaceByListener(listener); + const iface = eventManager.getInterfaceByListener(listener); if (!iface) { return; } diff --git a/apps/main-chain/src/index.ts b/apps/main-chain/src/index.ts index 011a0ebe..f5364d6b 100644 --- a/apps/main-chain/src/index.ts +++ b/apps/main-chain/src/index.ts @@ -1,9 +1,10 @@ import 'cross-fetch/polyfill'; import { utils } from 'ethers'; -import { startBlockListener } from '~blockListener'; import '~amplifyClient'; +import '~statsManager'; import '~eventManager'; +import blockManager from '~blockManager'; import { startStatsServer } from '~stats'; import { setupListenersForColonies, @@ -16,6 +17,7 @@ import { setupNotificationsClient } from '~utils/notifications'; utils.Logger.setLogLevel(utils.Logger.levels.ERROR); const start = async (): Promise => { + await rpcProvider.initialiseProvider(); /** * Setup the notifications provider so that notifications can be sent when needed */ @@ -36,9 +38,7 @@ const start = async (): Promise => { /** * Start the main block listener */ - startBlockListener(); - - await rpcProvider.initialiseProvider(); + blockManager.startBlockListener(); /** * In development, where both the chain and the DB gets reset everytime, diff --git a/apps/main-chain/src/interfaces.ts b/apps/main-chain/src/interfaces.ts deleted file mode 100644 index d42c68df..00000000 --- a/apps/main-chain/src/interfaces.ts +++ /dev/null @@ -1,102 +0,0 @@ -import { utils, constants } from 'ethers'; -import { - IColonyEvents__factory as ColonyEventsFactory, - VotingReputationEvents__factory as VotingReputationEventsFactory, - StakedExpenditureEvents__factory as StakedExpenditureEventsFactory, - TokenEvents__factory as TokenEventsFactory, - StagedExpenditureEvents__factory as StagedExpenditureEventsFactory, - OneTxPaymentEvents__factory as OneTxPaymentEventsFactory, - StreamingPaymentsEvents__factory as StreamingPaymentsEventsFactory, - MultisigPermissionsEvents__factory as MultisigPermissionsEventsFactory, -} from '@colony/events'; -import { Extension, getExtensionHash } from '@colony/colony-js'; - -import networkClient from '~networkClient'; -import rpcProvider from '~provider'; - -import { EventListener, EventListenerType } from '~eventListeners'; - -// @TODO @chmanie is gonna make this better, for now let's just hardcode the proxy colony events -const ProxyColonyEvents = new utils.Interface([ - 'event ProxyColonyRequested(uint256 destinationChainId, bytes32 salt)', -]); - -/** - * Function returning ABI-derived interface for a given event listener type, - * which is later used for parsing event logs - */ -export const getInterfaceByListener = ( - listener: EventListener, -): utils.Interface | null => { - const provider = rpcProvider.getProviderInstance(); - const { type: listenerType } = listener; - - switch (listenerType) { - case EventListenerType.Network: { - return networkClient.interface; - } - case EventListenerType.ProxyColonies: { - return ProxyColonyEvents; - } - case EventListenerType.Colony: { - return ColonyEventsFactory.connect(constants.AddressZero, provider) - .interface; - } - case EventListenerType.Extension: { - return getInterfaceByExtensionHash(listener.extensionHash); - } - case EventListenerType.Token: { - return TokenEventsFactory.connect(constants.AddressZero, provider) - .interface; - } - default: { - return null; - } - } -}; - -const getInterfaceByExtensionHash = ( - extensionHash: string, -): utils.Interface | null => { - const provider = rpcProvider.getProviderInstance(); - - switch (extensionHash) { - case getExtensionHash(Extension.OneTxPayment): { - return OneTxPaymentEventsFactory.connect(constants.AddressZero, provider) - .interface; - } - case getExtensionHash(Extension.VotingReputation): { - return VotingReputationEventsFactory.connect( - constants.AddressZero, - provider, - ).interface; - } - case getExtensionHash(Extension.MultisigPermissions): { - return MultisigPermissionsEventsFactory.connect( - constants.AddressZero, - provider, - ).interface; - } - case getExtensionHash(Extension.StakedExpenditure): { - return StakedExpenditureEventsFactory.connect( - constants.AddressZero, - provider, - ).interface; - } - case getExtensionHash(Extension.StagedExpenditure): { - return StagedExpenditureEventsFactory.connect( - constants.AddressZero, - provider, - ).interface; - } - case getExtensionHash(Extension.StreamingPayments): { - return StreamingPaymentsEventsFactory.connect( - constants.AddressZero, - provider, - ).interface; - } - default: { - return null; - } - } -}; diff --git a/apps/main-chain/src/stats.ts b/apps/main-chain/src/stats.ts index ce40ae79..44fbca07 100644 --- a/apps/main-chain/src/stats.ts +++ b/apps/main-chain/src/stats.ts @@ -1,12 +1,14 @@ import express from 'express'; -import { getLastBlockNumber, getStats, initStats } from '~utils'; import eventManager from '~eventManager'; import rpcProvider from '~provider'; import { output } from '@joincolony/utils'; +import statsManager from '~statsManager'; +// @NOTE this can probably be moved to the colonyAdded handler export const coloniesSet = new Set(); +// @NOTE just copy this entire file later on for now const app = express(); const port = process.env.STATS_PORT; @@ -25,7 +27,7 @@ app.get('/liveness', (_, res) => res.sendStatus(200)); * Use to check various service stats */ app.get('/stats', async (_, res) => { - const stats = getStats(); + const stats = statsManager.getStats(); res.type('json').send(stats); }); @@ -41,8 +43,8 @@ export const startStatsServer = async (): Promise => { return; } - await initStats(); - const lastBlockNumber = getLastBlockNumber(); + await statsManager.initStats(); + const lastBlockNumber = statsManager.getLastBlockNumber(); app.listen(port, async () => { output('Block Ingestor started on chain', rpcProvider.getChainId()); diff --git a/apps/main-chain/src/statsManager.ts b/apps/main-chain/src/statsManager.ts new file mode 100644 index 00000000..58b06f98 --- /dev/null +++ b/apps/main-chain/src/statsManager.ts @@ -0,0 +1,6 @@ +import { StatsManager } from '@joincolony/blocks'; +import amplifyClient from './amplifyClient'; + +const statsManager = new StatsManager(amplifyClient); + +export default statsManager; diff --git a/apps/main-chain/src/utils/events.ts b/apps/main-chain/src/utils/events.ts index 99dd316a..fd60b8bf 100644 --- a/apps/main-chain/src/utils/events.ts +++ b/apps/main-chain/src/utils/events.ts @@ -14,7 +14,7 @@ import { GetContractEventQueryVariables, ChainMetadata, } from '@joincolony/graphql'; -import { blocksMap } from '~blockListener'; +import blockManager from '~blockManager'; import { verbose } from '@joincolony/utils'; export const mapLogToContractEvent = async ( @@ -31,10 +31,10 @@ export const mapLogToContractEvent = async ( try { // Attempt to first get a block from the map as we might have already fetched its info - let block = blocksMap.get(blockNumber); + let block = blockManager.getBlock(blockNumber); if (!block) { block = await provider.getBlock(blockNumber); - blocksMap.set(blockNumber, block); + blockManager.updateBlocksMap(blockNumber, block); } const { hash: blockHash, timestamp } = block; diff --git a/apps/main-chain/src/utils/index.ts b/apps/main-chain/src/utils/index.ts index e49a1be6..599520e6 100644 --- a/apps/main-chain/src/utils/index.ts +++ b/apps/main-chain/src/utils/index.ts @@ -1,5 +1,4 @@ export * from './events'; -export * from './stats'; export * from './extensions'; export * from './numbers'; export * from './tokens'; diff --git a/apps/main-chain/src/utils/stats.ts b/apps/main-chain/src/utils/stats.ts deleted file mode 100644 index bf4b3b63..00000000 --- a/apps/main-chain/src/utils/stats.ts +++ /dev/null @@ -1,102 +0,0 @@ -import { mutate, query } from '~amplifyClient'; -import { - CreateStatsDocument, - CreateStatsMutation, - CreateStatsMutationVariables, - GetStatsDocument, - GetStatsQuery, - GetStatsQueryVariables, - UpdateStatsDocument, - UpdateStatsMutation, - UpdateStatsMutationVariables, -} from '@joincolony/graphql'; -import { output, verbose } from '@joincolony/utils'; - -let stats: Record = {}; - -type ObjectOrFunction = - | Record - | ((jsonFile: Record) => Record); - -/* - * Update stats with a given argument - * It accepts either a object fragment (or full object) that will get appended to the stats, - * or a callback (which receives the current stats) and needs to return the new object - * that will be written back - */ -export const updateStats = async ( - objectOrFunction: ObjectOrFunction, -): Promise => { - if (typeof objectOrFunction === 'function') { - stats = { - ...stats, - ...objectOrFunction(stats), - }; - } else { - stats = { - ...stats, - ...objectOrFunction, - }; - } - - await mutate( - UpdateStatsDocument, - { - value: JSON.stringify(stats), - }, - ); - - verbose('Stats file updated'); -}; - -// This exists as a function to prevent accidental overwriting of the `stats` variable -export const getStats = (): typeof stats => ({ ...stats }); - -export const getLastBlockNumber = (): number => { - if (Number.isInteger(stats.lastBlockNumber)) { - return Number(stats.lastBlockNumber); - } - /* - * @NOTE This prevents accidental database stats overwriting if the API / GraphQL - * endpoint is not accessible - * - * It will throw the block ingestor (the pod that it's running on) into an restart - * loop until the API is accessible again - */ - throw new Error('Could not get last block number from stats. Aborting.'); -}; - -export const setLastBlockNumber = (lastBlockNumber: number): void => { - updateStats({ lastBlockNumber }); -}; - -/** - * Function fetching the last stored stats from the DB - * If no stats entry is found, it will create one - */ -export const initStats = async (): Promise => { - const { value: jsonStats } = - (await query(GetStatsDocument, {})) - ?.data?.getIngestorStats ?? {}; - - if (!jsonStats) { - stats = { - lastBlockNumber: 0, - }; - - await mutate( - CreateStatsDocument, - { - value: JSON.stringify(stats), - }, - ); - } else { - try { - stats = JSON.parse(jsonStats); - } catch { - output( - 'Could not parse stats from the DB. The value is not a valid JSON.', - ); - } - } -}; diff --git a/packages/blocks/package.json b/packages/blocks/package.json index d2cb61c9..1df7ae5d 100644 --- a/packages/blocks/package.json +++ b/packages/blocks/package.json @@ -2,7 +2,9 @@ "name": "@joincolony/blocks", "main": "src/index.ts", "version": "1.0.0", - "dependencies": { + "dependencies": { + "@joincolony/clients": "workspace:*", + "@joincolony/graphql": "workspace:*", "@joincolony/utils": "workspace:*" } } diff --git a/packages/blocks/src/blockManager.ts b/packages/blocks/src/blockManager.ts new file mode 100644 index 00000000..bb57c347 --- /dev/null +++ b/packages/blocks/src/blockManager.ts @@ -0,0 +1,343 @@ +import { Log } from '@ethersproject/abstract-provider'; +import { output, verbose } from '@joincolony/utils'; +import { EventManager } from './eventManager'; +import { + Block, + BlockWithTransactions, + ContractEvent, + EthersObserverEvents, +} from './types'; +import { RpcProvider } from '@joincolony/clients'; +import { utils } from 'ethers'; +import { StatsManager } from './statsManager'; + +export const BLOCK_PAGING_SIZE = process.env.BLOCK_PAGING_SIZE + ? parseInt(process.env.BLOCK_PAGING_SIZE, 10) + : 1000; + +export class BlockManager { + private blocksMap = new Map(); + private blockLogs = new Map(); + private latestSeenBlockNumber = 0; + private isProcessing = false; + private eventManager: EventManager; + private rpcProvider: RpcProvider; + private statsManager: StatsManager; + private timeNow = Date.now(); + private timePrev = 0; + + constructor( + eventManager: EventManager, + rpcProvider: RpcProvider, + statsManager: StatsManager, + ) { + this.eventManager = eventManager; + this.rpcProvider = rpcProvider; + this.statsManager = statsManager; + } + public getBlock(blockNumber: number) { + return this.blocksMap.get(blockNumber); + } + public updateBlocksMap( + blockNumber: number, + block: Block | BlockWithTransactions, + ) { + this.blocksMap.set(blockNumber, block); + } + public getLatestSeenBlockNumber(): number { + return this.latestSeenBlockNumber; + } + + public startBlockListener(): void { + this.rpcProvider + .getProviderInstance() + .on(EthersObserverEvents.Block, async (blockNumber: number) => { + try { + this.latestSeenBlockNumber = Math.max( + this.latestSeenBlockNumber, + blockNumber, + ); + output(`Block ${blockNumber} added to the queue`); + await this.processNextBlock(); + } catch (error) { + throw new Error( + `Observed block ${blockNumber} but failed to get its data: ${error}`, + ); + } + }); + + output('Block listener started'); + this.trackMissedBlocks(); + } + + private async trackMissedBlocks(): Promise { + const lastBlockNumber = this.statsManager.getLastBlockNumber(); + const currentBlockNumber = await this.rpcProvider + .getProviderInstance() + .getBlockNumber(); + + if (lastBlockNumber >= currentBlockNumber) return; + + output( + `Processing blocks from ${lastBlockNumber + 1} to ${currentBlockNumber}`, + ); + this.latestSeenBlockNumber = Math.max( + this.latestSeenBlockNumber, + currentBlockNumber, + ); + await this.processNextBlock(); + } + + private async processNextBlock(): Promise { + if (this.isProcessing) { + return; + } + + // Only allow one instance of the function to run at any given time + this.isProcessing = true; + + let lastBlockNumber = this.statsManager.getLastBlockNumber(); + + // Process as many blocks as are available sequentially + while (lastBlockNumber < this.getLatestSeenBlockNumber()) { + const currentBlockNumber = lastBlockNumber + 1; + if (currentBlockNumber % BLOCK_PAGING_SIZE === 0) { + if (this.timePrev > 0) { + this.timePrev = this.timeNow; + this.timeNow = Date.now(); + output( + `Time taken for last ${BLOCK_PAGING_SIZE} blocks: ${ + this.timeNow - this.timePrev + }ms`, + ); + output( + `Estimated time to sync: ${ + ((this.timeNow - this.timePrev) * + (this.getLatestSeenBlockNumber() - + this.statsManager.getLastBlockNumber())) / + 1000 + }ms`, + ); + output( + `Overall progress: ${currentBlockNumber} / ${this.getLatestSeenBlockNumber()}`, + ); + } else { + this.timePrev = this.timeNow; + } + } + + if (!this.blockLogs.get(currentBlockNumber)) { + // BLOCK_PAGING_SIZE - 1 thanks to fenceposts + const nMoreBlocks = Math.min( + this.getLatestSeenBlockNumber() - currentBlockNumber, + BLOCK_PAGING_SIZE - 1, + ); + + verbose( + 'Querying for logs', + currentBlockNumber, + 'to', + currentBlockNumber + nMoreBlocks, + ); + + const logs = await this.rpcProvider.getProviderInstance().getLogs({ + fromBlock: currentBlockNumber, + toBlock: currentBlockNumber + nMoreBlocks, + }); + + verbose( + `Fetched ${logs.length} logs`, + currentBlockNumber, + 'to', + currentBlockNumber + nMoreBlocks, + ); + + // initialize blockLogs + for ( + let i = currentBlockNumber; + i <= currentBlockNumber + nMoreBlocks; + i += 1 + ) { + this.blockLogs.set(i, []); + } + + let logIndex = 0; + let pushingBlock = 0; + let pushingLogs: Log[] = []; + + logs.forEach((log) => { + // As we push logs in to blockLogs, check they're in order + // (They should be...) + if (log.blockNumber !== pushingBlock) { + if (pushingBlock > log.blockNumber) { + output( + `Blocks (that logs from query are in) are not monotonically increasing`, + ); + process.exit(1); + } + this.blockLogs.set(pushingBlock, [...pushingLogs]); + pushingBlock = log.blockNumber; + pushingLogs = []; + logIndex = 0; + } + if (log.logIndex !== logIndex) { + output(`Logs are out of order for block ${log.blockNumber}`); + process.exit(1); + } + pushingLogs.push(log); + logIndex += 1; + }); + // Push the logs in the last block + this.blockLogs.set(pushingBlock, [...pushingLogs]); + } + + // Get logs contained in the current block + const logs = this.blockLogs.get(currentBlockNumber); + if (!logs) { + throw new Error( + `Could not find logs for block ${currentBlockNumber}, but should have been fetched`, + ); + } + + /* + * Logic needed to account for blocks that get emmited, but which don't have the logs indexed yet + * This happens in networks with very fast block times, like arbitrum (<=250ms block times) + * See: https://github.com/ethers-io/ethers.js/issues/3486 + * + * Basically, the change that @area implemented here is to try and detect if a block actually has + * logs, but which don't get retrived using the `getLogs` call. + * If that happens, it means the block was emitted, but the logs weren't indexed yet, at which point + * we just short-circuit and re-process the block. + * We do this enough times, until the logs are actually indexed. + */ + if (logs.length === 0) { + verbose('No logs seen in block', currentBlockNumber); + // Check whether block actually has no logs + let block = this.blocksMap.get(currentBlockNumber); + if ( + !block || + (block.transactions as string[]).every((tx) => typeof tx === 'string') + ) { + block = await this.rpcProvider + .getProviderInstance() + .getBlockWithTransactions(currentBlockNumber); + // May as well save this block in the blocksMap in case it turns out we need it in mapLogToContractEvent + this.blocksMap.set(currentBlockNumber, block); + } + + let mustReindex = false; + for (const tx of block.transactions) { + if (typeof tx === 'string') { + throw Error('tx was a string, but should have been a TxResponse'); + } + const txReceipt = await this.rpcProvider + .getProviderInstance() + .getTransactionReceipt(tx.hash); + if (txReceipt.logs.length > 0) { + verbose( + `Proved ${currentBlockNumber} has logs, but weren't given any, will reindex`, + ); + mustReindex = true; + // Then the block has events, and they've not been indexed yet. + // We exit out of this handler, and wait until they've been indexed. + // We remove the empty array from blockLogs to cause the getLogs call to be made again + this.blockLogs.delete(currentBlockNumber); + // Now we've proved we're missing events, don't need to look at any other transactions in + // this block. + break; + } + } + if (mustReindex) { + continue; + } + } + + for (const log of logs) { + // Find listeners that match the log + const listeners = this.eventManager.getMatchingListeners( + log.topics, + log.address, + ); + if (!listeners.length) { + continue; + } + + for (const listener of listeners) { + // In order to parse the log, we need an ethers interface + const iface = this.eventManager.getInterfaceByListener(listener); + if (!iface) { + output( + `Failed to get an interface for a log with listener type ${listener.type}`, + ); + continue; + } + + const event = await this.mapLogToContractEvent(log, iface); + if (!event) { + output( + `Failed to map log describing event ${listener.eventSignature} in transaction ${log.transactionHash} `, + ); + continue; + } + + // Call the handler in a blocking way to ensure events get processed sequentially + await listener.handler(event, listener); + } + } + + verbose('processed block', currentBlockNumber); + + lastBlockNumber = currentBlockNumber; + this.statsManager.setLastBlockNumber(currentBlockNumber); + this.blockLogs.delete(currentBlockNumber); + this.blocksMap.delete(currentBlockNumber); + } + + this.isProcessing = false; + } + + public mapLogToContractEvent = async ( + log: Log, + iface: utils.Interface, + ): Promise => { + const { + transactionHash, + logIndex, + blockNumber, + address: eventContractAddress, + } = log; + + try { + // Attempt to first get a block from the map as we might have already fetched its info + let block = this.blocksMap.get(blockNumber); + if (!block) { + block = await this.rpcProvider + .getProviderInstance() + .getBlock(blockNumber); + this.blocksMap.set(blockNumber, block); + } + + const { hash: blockHash, timestamp } = block; + const parsedLog = iface.parseLog(log); + + return { + ...parsedLog, + blockNumber, + transactionHash, + logIndex, + contractAddress: eventContractAddress, + blockHash, + timestamp, + }; + } catch (error) { + /* + * Silent Error + * + * This does not need to be loud since, at times, you'll map through a whole + * lot of events which might not know how to interface with since they were + * generated by other contracts + */ + return null; + } + }; +} diff --git a/packages/blocks/src/eventManager.ts b/packages/blocks/src/eventManager.ts index 28ae3672..8a44eb21 100644 --- a/packages/blocks/src/eventManager.ts +++ b/packages/blocks/src/eventManager.ts @@ -1,8 +1,32 @@ import { verbose } from '@joincolony/utils'; -import { EventListener } from './types'; +import { EventListener, EventListenerType } from './types'; +import { RpcProvider } from '@joincolony/clients'; +import { utils, constants } from 'ethers'; +import { + IColonyEvents__factory as ColonyEventsFactory, + VotingReputationEvents__factory as VotingReputationEventsFactory, + StakedExpenditureEvents__factory as StakedExpenditureEventsFactory, + TokenEvents__factory as TokenEventsFactory, + StagedExpenditureEvents__factory as StagedExpenditureEventsFactory, + OneTxPaymentEvents__factory as OneTxPaymentEventsFactory, + StreamingPaymentsEvents__factory as StreamingPaymentsEventsFactory, + MultisigPermissionsEvents__factory as MultisigPermissionsEventsFactory, + IColonyNetworkEvents__factory as ColonyNetworkEventsFactory, +} from '@colony/events'; +import { Extension, getExtensionHash } from '@colony/colony-js'; + +// @TODO @chmanie is gonna make this better, for now let's just hardcode the proxy colony events +const ProxyColonyEvents = new utils.Interface([ + 'event ProxyColonyRequested(uint256 destinationChainId, bytes32 salt)', +]); export class EventManager { private listeners: EventListener[] = []; + private rpcProvider: RpcProvider; + + constructor(rpcProvider: RpcProvider) { + this.rpcProvider = rpcProvider; + } public getEventListeners(): EventListener[] { return this.listeners; @@ -45,4 +69,87 @@ export class EventManager { public getListenersStats(): string { return JSON.stringify(this.listeners); } + /** + * Function returning ABI-derived interface for a given event listener type, + * which is later used for parsing event logs + */ + public getInterfaceByListener( + listener: EventListener, + ): utils.Interface | null { + const provider = this.rpcProvider.getProviderInstance(); + const { type: listenerType } = listener; + + switch (listenerType) { + case EventListenerType.Network: { + return ColonyNetworkEventsFactory.connect( + constants.AddressZero, + provider, + ).interface; + } + case EventListenerType.ProxyColonies: { + return ProxyColonyEvents; + } + case EventListenerType.Colony: { + return ColonyEventsFactory.connect(constants.AddressZero, provider) + .interface; + } + case EventListenerType.Extension: { + return this.getInterfaceByExtensionHash(listener.extensionHash); + } + case EventListenerType.Token: { + return TokenEventsFactory.connect(constants.AddressZero, provider) + .interface; + } + default: { + return null; + } + } + } + private getInterfaceByExtensionHash( + extensionHash: string, + ): utils.Interface | null { + const provider = this.rpcProvider.getProviderInstance(); + + switch (extensionHash) { + case getExtensionHash(Extension.OneTxPayment): { + return OneTxPaymentEventsFactory.connect( + constants.AddressZero, + provider, + ).interface; + } + case getExtensionHash(Extension.VotingReputation): { + return VotingReputationEventsFactory.connect( + constants.AddressZero, + provider, + ).interface; + } + case getExtensionHash(Extension.MultisigPermissions): { + return MultisigPermissionsEventsFactory.connect( + constants.AddressZero, + provider, + ).interface; + } + case getExtensionHash(Extension.StakedExpenditure): { + return StakedExpenditureEventsFactory.connect( + constants.AddressZero, + provider, + ).interface; + } + case getExtensionHash(Extension.StagedExpenditure): { + return StagedExpenditureEventsFactory.connect( + constants.AddressZero, + provider, + ).interface; + } + case getExtensionHash(Extension.StreamingPayments): { + return StreamingPaymentsEventsFactory.connect( + constants.AddressZero, + provider, + ).interface; + } + default: { + return null; + } + } + } } diff --git a/packages/blocks/src/index.ts b/packages/blocks/src/index.ts index 9473fe15..64e45e82 100644 --- a/packages/blocks/src/index.ts +++ b/packages/blocks/src/index.ts @@ -1,2 +1,4 @@ export * from './eventManager'; +export * from './statsManager'; +export * from './blockManager'; export * from './types'; diff --git a/packages/blocks/src/statsManager.ts b/packages/blocks/src/statsManager.ts new file mode 100644 index 00000000..ecf24819 --- /dev/null +++ b/packages/blocks/src/statsManager.ts @@ -0,0 +1,104 @@ +import { + CreateStatsDocument, + CreateStatsMutation, + CreateStatsMutationVariables, + GetStatsDocument, + GetStatsQuery, + GetStatsQueryVariables, + UpdateStatsDocument, + UpdateStatsMutation, + UpdateStatsMutationVariables, +} from '@joincolony/graphql'; +import { output, verbose } from '@joincolony/utils'; +import { AmplifyClient } from '@joincolony/clients'; + +type ObjectOrFunction = + | Record + | ((jsonFile: Record) => Record); + +export class StatsManager { + private stats: Record = {}; + private amplifyClient: AmplifyClient; + + constructor(amplifyClient: AmplifyClient) { + this.amplifyClient = amplifyClient; + } + + /** + * Update stats with a given argument. + * Accepts either an object fragment (or full object) to append to stats, + * or a callback (receives current stats) that returns the new object to write back. + */ + public async updateStats(objectOrFunction: ObjectOrFunction): Promise { + if (typeof objectOrFunction === 'function') { + this.stats = { + ...this.stats, + ...objectOrFunction(this.stats), + }; + } else { + this.stats = { + ...this.stats, + ...objectOrFunction, + }; + } + + await this.amplifyClient.mutate< + UpdateStatsMutation, + UpdateStatsMutationVariables + >(UpdateStatsDocument, { + value: JSON.stringify(this.stats), + }); + + verbose('Stats file updated'); + } + + // Return a copy of the current stats to avoid accidental overwrites + public getStats(): typeof this.stats { + return { ...this.stats }; + } + + public getLastBlockNumber(): number { + if (Number.isInteger(this.stats.lastBlockNumber)) { + return Number(this.stats.lastBlockNumber); + } + throw new Error('Could not get last block number from stats. Aborting.'); + } + + public setLastBlockNumber(lastBlockNumber: number): void { + this.updateStats({ lastBlockNumber }); + } + + /** + * Fetch the last stored stats from the DB. + * If no stats entry is found, it will create one. + */ + // @TODO make stats work with chainId + public async initStats(): Promise { + const { value: jsonStats } = + ( + await this.amplifyClient.query( + GetStatsDocument, + {}, + ) + )?.data?.getIngestorStats ?? {}; + + if (!jsonStats) { + this.stats = { lastBlockNumber: 0 }; + + await this.amplifyClient.mutate< + CreateStatsMutation, + CreateStatsMutationVariables + >(CreateStatsDocument, { + value: JSON.stringify(this.stats), + }); + } else { + try { + this.stats = JSON.parse(jsonStats); + } catch { + output( + 'Could not parse stats from the DB. The value is not a valid JSON.', + ); + } + } + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5151725d..93f42b6e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -132,6 +132,12 @@ importers: packages/blocks: dependencies: + '@joincolony/clients': + specifier: workspace:* + version: link:../clients + '@joincolony/graphql': + specifier: workspace:* + version: link:../graphql '@joincolony/utils': specifier: workspace:* version: link:../utils