Skip to content

Commit

Permalink
common: issue-878 - wait for block number in confirmation
Browse files Browse the repository at this point in the history
  • Loading branch information
dwerner committed Apr 30, 2024
1 parent e65224e commit 5cfb4ae
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 0 deletions.
2 changes: 2 additions & 0 deletions packages/indexer-common/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ export enum IndexerErrorCode {
IE073 = 'IE073',
IE074 = 'IE074',
IE075 = 'IE075',
IE076 = 'IE076',
}

export const INDEXER_ERROR_MESSAGES: Record<IndexerErrorCode, string> = {
Expand Down Expand Up @@ -165,6 +166,7 @@ export const INDEXER_ERROR_MESSAGES: Record<IndexerErrorCode, string> = {
IE073: 'Failed to query subgraph features from indexing statuses endpoint',
IE074: 'Failed to deploy subgraph: network not supported',
IE075: 'Failed to connect to network contracts',
IE076: 'Failed to allocate/reallocate: timed out waiting for block with deployment',
}

export type IndexerErrorCause = unknown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import {
defineIndexerManagementModels,
defineQueryFeeModels,
GraphNode,
indexerError,
IndexerErrorCode,
IndexerManagementModels,
Network,
QueryFeeModels,
Expand All @@ -15,7 +17,9 @@ import {
createMetrics,
Logger,
Metrics,
mutable,
parseGRT,
WritableEventual,
} from '@graphprotocol/common-ts'
import {
invalidReallocateAction,
Expand Down Expand Up @@ -166,4 +170,47 @@ describe('Allocation Manager', () => {
expect(reordered[1]).toStrictEqual(reallocateAction)
expect(reordered[2]).toStrictEqual(queuedAllocateAction)
})

test('waitForEventualSequential waits for a number to be pushed to the stream', async () => {
const latestBlockNumberStream: WritableEventual<number> = mutable(0)
latestBlockNumberStream.push(1)
latestBlockNumberStream.push(2)
latestBlockNumberStream.push(3)
await allocationManager.waitForBlockNumberOnEventual(
logger,
3,
latestBlockNumberStream,
)
expect(await latestBlockNumberStream.value()).toBe(3)
})

test('waitForEventualSequential throws an error if MAX_BLOCK_WAIT_ATTEMPTS is exceeded', async () => {
const latestBlockNumberStream: WritableEventual<number> = mutable(0)
async function *pushValue(latestBlockNumberStream: WritableEventual<number>) {
latestBlockNumberStream.push(await latestBlockNumberStream.value() + 1)
yield;
}

const operations: Promise<IteratorResult<undefined, void>>[] = []
for (let i = 1; i <= 100; i++) {
operations.push(pushValue(latestBlockNumberStream).next())
}

await expect(Promise.all([async () => {
// values need to be produced in the stream while waiting
for (const operation of operations) {
await operation
}
}, async () => {
await expect(async () => {
await allocationManager.waitForBlockNumberOnEventual(
logger,
100,
latestBlockNumberStream,
)
}).toThrowError(indexerError(IndexerErrorCode.IE076))
}])).resolves

})
})

38 changes: 38 additions & 0 deletions packages/indexer-common/src/indexer-management/allocations.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
Eventual,
formatGRT,
Logger,
parseGRT,
Expand Down Expand Up @@ -429,6 +430,8 @@ export class AllocationManager {
)
}

await this.waitForBlockNumber(logger, receipt.blockNumber)

const createAllocationEventLogs = this.network.transactionManager.findEvent(
'AllocationCreated',
this.network.contracts.staking.interface,
Expand Down Expand Up @@ -482,6 +485,39 @@ export class AllocationManager {
}
}

async waitForBlockNumber(logger: Logger, blockNumber: number): Promise<void> {
await this.waitForBlockNumberOnEventual(
logger,
blockNumber,
this.network.networkSubgraph.deployment!.status.map(
(status) => status.latestBlock?.number ?? 0,
),
)
}

async waitForBlockNumberOnEventual(
logger: Logger,
number: number,
sequentialNumberStream: Eventual<number>,
): Promise<void> {
let attempts = 0
const MAX_BLOCK_WAIT_ATTEMPTS = 10
for await (const latestSequentialNumber of sequentialNumberStream.values()) {
if (attempts > MAX_BLOCK_WAIT_ATTEMPTS) {
throw indexerError(IndexerErrorCode.IE076)
}
attempts += 1
if (latestSequentialNumber >= number) {
return
} else {
logger.debug('Waiting for block to be reached', {
blockNumber: number,
latestBlock: latestSequentialNumber,
})
}
}
}

async prepareAllocate(
logger: Logger,
context: TransactionPreparationContext,
Expand Down Expand Up @@ -839,6 +875,8 @@ export class AllocationManager {
)
}

await this.waitForBlockNumber(logger, receipt.blockNumber)

const closeAllocationEventLogs = this.network.transactionManager.findEvent(
'AllocationClosed',
this.network.contracts.staking.interface,
Expand Down

0 comments on commit 5cfb4ae

Please sign in to comment.