diff --git a/packages/indexer-common/src/indexer-management/__tests__/allocations.test.ts b/packages/indexer-common/src/indexer-management/__tests__/allocations.test.ts index d9f62b4ac..cd35b9d33 100644 --- a/packages/indexer-common/src/indexer-management/__tests__/allocations.test.ts +++ b/packages/indexer-common/src/indexer-management/__tests__/allocations.test.ts @@ -15,7 +15,9 @@ import { createMetrics, Logger, Metrics, + mutable, parseGRT, + WritableEventual, } from '@graphprotocol/common-ts' import { invalidReallocateAction, @@ -166,4 +168,17 @@ describe('Allocation Manager', () => { expect(reordered[1]).toStrictEqual(reallocateAction) expect(reordered[2]).toStrictEqual(queuedAllocateAction) }) + + test('waitForEventualSequential() waits for a number to be pushed to the stream', async () => { + const latestBlockNumberStream: WritableEventual = mutable(0) + latestBlockNumberStream.push(1) + latestBlockNumberStream.push(2) + latestBlockNumberStream.push(3) + await allocationManager.waitForBlockNumberOnEventual( + logger, + 3, + latestBlockNumberStream, + ) + expect(await latestBlockNumberStream.value()).toBe(3) + }) }) diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index 1fd78e540..316d1248e 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -1,4 +1,5 @@ import { + Eventual, formatGRT, Logger, parseGRT, @@ -429,6 +430,8 @@ export class AllocationManager { ) } + await this.waitForBlockNumber(logger, receipt.blockNumber) + const createAllocationEventLogs = this.network.transactionManager.findEvent( 'AllocationCreated', this.network.contracts.staking.interface, @@ -482,6 +485,33 @@ export class AllocationManager { } } + async waitForBlockNumber(logger: Logger, blockNumber: number): Promise { + await this.waitForBlockNumberOnEventual( + logger, + blockNumber, + this.network.networkSubgraph.deployment!.status.map( + (status) => status.latestBlock?.number ?? 0, + ), + ) + } + + async waitForBlockNumberOnEventual( + logger: Logger, + number: number, + sequentialNumberStream: Eventual, + ): Promise { + for await (const latestSequentialNumber of sequentialNumberStream.values()) { + if (latestSequentialNumber >= number) { + return + } else { + logger.debug('Waiting for block to be reached', { + blockNumber: number, + latestBlock: latestSequentialNumber, + }) + } + } + } + async prepareAllocate( logger: Logger, context: TransactionPreparationContext, @@ -839,6 +869,8 @@ export class AllocationManager { ) } + await this.waitForBlockNumber(logger, receipt.blockNumber) + const closeAllocationEventLogs = this.network.transactionManager.findEvent( 'AllocationClosed', this.network.contracts.staking.interface,