diff --git a/packages/engine/paima-funnel/src/cde/cardanoProjectedNFT.ts b/packages/engine/paima-funnel/src/cde/cardanoProjectedNFT.ts index 678be70c8..a84d1ebe3 100644 --- a/packages/engine/paima-funnel/src/cde/cardanoProjectedNFT.ts +++ b/packages/engine/paima-funnel/src/cde/cardanoProjectedNFT.ts @@ -7,41 +7,78 @@ import { ChainDataExtensionDatumType, DEFAULT_FUNNEL_TIMEOUT, timeout } from '@p import { Routes, query } from '@dcspark/carp-client/client/src'; import { ProjectedNftStatus } from '@dcspark/carp-client/shared/models/ProjectedNftRange'; import type { ProjectedNftRangeResponse } from '@dcspark/carp-client/shared/models/ProjectedNftRange'; +import { BlockTxPair } from '@dcspark/carp-client/shared/models/common'; export default async function getCdeProjectedNFTData( url: string, extension: ChainDataExtensionCardanoProjectedNFT, fromAbsoluteSlot: number, toAbsoluteSlot: number, - getBlockNumber: (slot: number) => number + getBlockNumber: (slot: number) => number, + isPresync: boolean, + untilBlock: string, + fromTx: BlockTxPair | undefined, + paginationLimit: number ): Promise { - const events = await timeout( - query(url, Routes.projectedNftEventsRange, { - range: { minSlot: fromAbsoluteSlot, maxSlot: toAbsoluteSlot }, - address: undefined, - }), - DEFAULT_FUNNEL_TIMEOUT - ); + let result = [] as ChainDataExtensionDatum[]; - return events - .map(e => eventToCdeDatum(e, extension, getBlockNumber(e.actionSlot))) - .filter(e => e != null) - .map(e => e!); + while (true) { + const events = await timeout( + query(url, Routes.projectedNftEventsRange, { + address: undefined, + slotLimits: { + from: fromAbsoluteSlot, + to: toAbsoluteSlot, + }, + limit: paginationLimit, + untilBlock, + after: fromTx, + }), + DEFAULT_FUNNEL_TIMEOUT + ); + + if (events.length > 0) { + const last = events[events.length - 1]; + + fromTx = { + tx: last.txId, + block: last.block, + }; + } + + events + .flatMap(event => + event.payload.map(payload => ({ txId: event.txId, block: event.block, ...payload })) + ) + .map(e => eventToCdeDatum(e, extension, getBlockNumber(e.actionSlot))) + .filter(e => e != null) + .map(e => e!) + .forEach(element => { + result.push(element); + }); + + if (events.length === 0 || isPresync) { + break; + } + } + + return result; } function eventToCdeDatum( - event: ProjectedNftRangeResponse[0], + event: { txId: string; block: string } & ProjectedNftRangeResponse[0]['payload'][0], extension: ChainDataExtensionCardanoProjectedNFT, blockNumber: number ): CdeCardanoProjectedNFTDatum | null { - if ( - event.actionTxId === null || - event.actionTxId == '' || - event.status === ProjectedNftStatus.Invalid - ) { + if (event.txId === null || event.txId == '' || event.status === ProjectedNftStatus.Invalid) { return null; } + const cursor: BlockTxPair = { + block: event.block, + tx: event.txId, + }; + return { cdeId: extension.cdeId, cdeDatumType: ChainDataExtensionDatumType.CardanoProjectedNFT, @@ -49,7 +86,7 @@ function eventToCdeDatum( payload: { ownerAddress: event.ownerAddress != null ? event.ownerAddress : '', - actionTxId: event.actionTxId, + actionTxId: event.txId, actionOutputIndex: event.actionOutputIndex != null ? event.actionOutputIndex : undefined, previousTxHash: event.previousTxHash != null ? event.previousTxHash : undefined, @@ -65,5 +102,6 @@ function eventToCdeDatum( forHowLong: event.forHowLong != null ? event.forHowLong : undefined, }, scheduledPrefix: extension.scheduledPrefix, + paginationCursor: { cursor: JSON.stringify(cursor), finished: false }, }; } diff --git a/packages/engine/paima-funnel/src/funnels/FunnelCache.ts b/packages/engine/paima-funnel/src/funnels/FunnelCache.ts index 30e29ab7f..161ebdf22 100644 --- a/packages/engine/paima-funnel/src/funnels/FunnelCache.ts +++ b/packages/engine/paima-funnel/src/funnels/FunnelCache.ts @@ -80,9 +80,7 @@ export type CarpFunnelCacheEntryState = { epoch: number | undefined; cursors: | { - [cdeId: number]: - | { kind: 'paginationCursor'; cursor: string; finished: boolean } - | { kind: 'slot'; slot: number; finished: boolean }; + [cdeId: number]: { cursor: string; finished: boolean }; } | undefined; }; @@ -112,12 +110,7 @@ export class CarpFunnelCacheEntry implements FunnelCacheEntry { } } - public updateCursor( - cdeId: number, - presyncCursor: - | { kind: 'paginationCursor'; cursor: string; finished: boolean } - | { kind: 'slot'; slot: number; finished: boolean } - ): void { + public updateCursor(cdeId: number, presyncCursor: { cursor: string; finished: boolean }): void { if (this.state) { if (!this.state.cursors) { this.state.cursors = {}; diff --git a/packages/engine/paima-funnel/src/funnels/carp/funnel.ts b/packages/engine/paima-funnel/src/funnels/carp/funnel.ts index 6588f2d8e..1b03f8f95 100644 --- a/packages/engine/paima-funnel/src/funnels/carp/funnel.ts +++ b/packages/engine/paima-funnel/src/funnels/carp/funnel.ts @@ -30,10 +30,9 @@ import { Routes } from '@dcspark/carp-client/shared/routes'; import { FUNNEL_PRESYNC_FINISHED, InternalEventType } from '@paima/utils'; import { CarpFunnelCacheEntry } from '../FunnelCache.js'; import { getCardanoEpoch, getCarpCursors } from '@paima/db'; -import type { BlockTxPair } from 'tmp-carp-client/shared/models/common'; +import { BlockTxPair } from '@dcspark/carp-client/shared/models/common'; const delayForWaitingForFinalityLoop = 1000; -const DEFAULT_PRESYNC_SLOT_RANGE = 10000; type Era = { firstSlot: number; @@ -223,31 +222,6 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel { return data; } - const getSlotRange = ({ - cdeId, - startSlot, - }: { - cdeId: number; - startSlot: number; - }): { from: number; to: number } => { - const cursors = this.cache.getState().cursors; - const from: number = (cursors && (cursors[cdeId] as { slot: number }).slot) || startSlot; - const to = from + DEFAULT_PRESYNC_SLOT_RANGE; - - // the cache gets invalidated on error, so we can update the cursor before - // even returning the event without risk. - this.cache.updateCursor(cdeId, { - kind: 'slot', - slot: to, - finished: to >= this.cache.getState().startingSlot, - }); - - return { - from, - to, - }; - }; - const cache = this.cache; const mapCursorPaginatedData = (cdeId: number) => (datums: any) => { // we are providing the entire indexed range, so if carp @@ -256,7 +230,6 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel { const finished = datums.length === 0 || datums.length < this.config.paginationLimit; cache.updateCursor(cdeId, { - kind: 'paginationCursor', cursor: datums[datums.length - 1] ? datums[datums.length - 1].paginationCursor.cursor : '', finished, }); @@ -296,8 +269,6 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel { const cursor = cursors && cursors[extension.cdeId]; - console.log('min', Math.min(startingSlot, extension.stopSlot || startingSlot)); - const data = getCdePoolData( this.carpUrl, extension, @@ -307,9 +278,7 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel { slot => absoluteSlotToEpoch(this.era, slot), true, stableBlock.block.hash, - cursor && cursor.kind === 'paginationCursor' - ? (JSON.parse(cursor.cursor) as BlockTxPair) - : undefined, + cursor ? (JSON.parse(cursor.cursor) as BlockTxPair) : undefined, this.config.paginationLimit ).then(mapCursorPaginatedData(extension.cdeId)); @@ -320,15 +289,23 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel { })); } case ChainDataExtensionType.CardanoProjectedNFT: { - const { from, to } = getSlotRange(extension); + const cursors = this.cache.getState().cursors; + const startingSlot = this.cache.getState().startingSlot - 1; + + const cursor = cursors && cursors[extension.cdeId]; const data = getCdeProjectedNFTData( this.carpUrl, extension, - from, - Math.min(to, this.cache.getState().startingSlot - 1), - slot => slot - ); + + extension.startSlot, + Math.min(startingSlot, extension.stopSlot || startingSlot), + slot => slot, + true, + stableBlock.block.hash, + cursor ? (JSON.parse(cursor.cursor) as BlockTxPair) : undefined, + this.config.paginationLimit + ).then(mapCursorPaginatedData(extension.cdeId)); return data.then(data => ({ cdeId: extension.cdeId, @@ -350,9 +327,7 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel { slot => slot, true, stableBlock.block.hash, - cursor && cursor.kind === 'paginationCursor' - ? (JSON.parse(cursor.cursor) as BlockTxPair) - : undefined, + cursor ? (JSON.parse(cursor.cursor) as BlockTxPair) : undefined, this.config.paginationLimit ).then(mapCursorPaginatedData(extension.cdeId)); @@ -376,9 +351,7 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel { slot => slot, true, stableBlock.block.hash, - cursor && cursor.kind === 'paginationCursor' - ? (JSON.parse(cursor.cursor) as BlockTxPair) - : undefined, + cursor ? (JSON.parse(cursor.cursor) as BlockTxPair) : undefined, this.config.paginationLimit ).then(mapCursorPaginatedData(extension.cdeId)); @@ -399,75 +372,17 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel { const list: CardanoPresyncChainData[] = []; for (const events of carpEvents) { - if ( - events.cdeType === ChainDataExtensionType.CardanoTransfer || - events.cdeType === ChainDataExtensionType.CardanoPool - ) { - for (const event of events.data || []) { - list.push({ - extensionDatums: [event], - network: this.chainName, - networkType: ConfigNetworkType.CARDANO, - carpCursor: { - kind: 'paginationCursor', - cdeId: event.cdeId, - cursor: event.paginationCursor.cursor, - finished: event.paginationCursor.finished, - }, - }); - } - } else { - // handle the cde's that are still on slot range based 'pagination' (not - // really pagination, but emulated) - const cursor = cursors && cursors[events.cdeId]; - const finished = (cursor && cursor.kind === 'slot' && cursor.finished) || false; - - // add an empty event so that the slot range gets updated even if there - // are no events, since it's not a real pagination cursor - if (events.data.length === 0) { - const slot = (cursor && cursor.kind === 'slot' && cursor.slot) || 0; - list.push({ - extensionDatums: [], - networkType: ConfigNetworkType.CARDANO, - network: this.chainName, - carpCursor: { - kind: 'slot', - cdeId: events.cdeId, - slot, - finished, - }, - }); - } - - // group by slot by traversing in order, if two consecutive entries have - // the same slot then add them to the last entry. - // - // it's important to group by slot since that's how the 'cursor' is updated. - // this code can be removed when we implement pagination by (tx,block). - let slot; - for (let i = 0; i < events.data.length; i++) { - const event = events.data[i]; - const isLastAndFinished = (i == events.data.length - 1 && finished) || false; - - if (slot && slot === event.blockNumber) { - list[list.length - 1].extensionDatums.push(event); - list[list.length - 1].carpCursor.finished = isLastAndFinished; - } else { - slot = event.blockNumber; - - list.push({ - extensionDatums: [event], - networkType: ConfigNetworkType.CARDANO, - network: this.chainName, - carpCursor: { - kind: 'slot', - cdeId: event.cdeId, - slot: event.blockNumber, - finished: isLastAndFinished, - }, - }); - } - } + for (const event of events.data || []) { + list.push({ + extensionDatums: [event], + network: this.chainName, + networkType: ConfigNetworkType.CARDANO, + carpCursor: { + cdeId: event.cdeId, + cursor: event.paginationCursor.cursor, + finished: event.paginationCursor.finished, + }, + }); } } @@ -514,26 +429,10 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel { const cursors = await getCarpCursors.run(undefined, dbTx); for (const cursor of cursors) { - const kind = sharedData.extensions.find(extension => extension.cdeId === cursor.cde_id); - - const slotBased = - kind?.cdeType !== ChainDataExtensionType.CardanoTransfer && - kind?.cdeType !== ChainDataExtensionType.CardanoPool && - kind?.cdeType !== ChainDataExtensionType.CardanoAssetUtxo; - - if (slotBased) { - newEntry.updateCursor(cursor.cde_id, { - kind: 'slot', - slot: Number(cursor.cursor), - finished: cursor.finished, - }); - } else { - newEntry.updateCursor(cursor.cde_id, { - kind: 'paginationCursor', - cursor: cursor.cursor, - finished: cursor.finished, - }); - } + newEntry.updateCursor(cursor.cde_id, { + cursor: cursor.cursor, + finished: cursor.finished, + }); } return newEntry; @@ -640,7 +539,11 @@ async function readDataInternal( extension, min, Math.min(max, extension.stopSlot || max), - mapSlotToBlockNumber + mapSlotToBlockNumber, + false, // not presync + stableBlockId, + undefined, // we want everything in the range, so no starting point for the pagination + paginationLimit ); return projectedNFTData; case ChainDataExtensionType.CardanoAssetUtxo: diff --git a/packages/engine/paima-sm/src/index.ts b/packages/engine/paima-sm/src/index.ts index 3ecae1ab4..94420641e 100644 --- a/packages/engine/paima-sm/src/index.ts +++ b/packages/engine/paima-sm/src/index.ts @@ -123,11 +123,7 @@ const SM: GameStateMachineInitializer = { ); if (cdeDataLength > 0) { doLog( - `[${latestCdeData.network}] Processed ${cdeDataLength} CDE events in ${ - latestCdeData.carpCursor.kind === 'paginationCursor' - ? latestCdeData.carpCursor.cursor - : `slot #${latestCdeData.carpCursor.slot}` - }` + `[${latestCdeData.network}] Processed ${cdeDataLength} CDE events in ${latestCdeData.carpCursor.cursor}` ); } } diff --git a/packages/engine/paima-sm/src/types.ts b/packages/engine/paima-sm/src/types.ts index 2598f83ab..3e4c0c6e2 100644 --- a/packages/engine/paima-sm/src/types.ts +++ b/packages/engine/paima-sm/src/types.ts @@ -48,9 +48,7 @@ export interface EvmPresyncChainData { export interface CardanoPresyncChainData { network: string; networkType: ConfigNetworkType.CARDANO; - carpCursor: - | { kind: 'paginationCursor'; cdeId: number; cursor: string; finished: boolean } - | { kind: 'slot'; cdeId: number; slot: number; finished: boolean }; + carpCursor: { cdeId: number; cursor: string; finished: boolean }; extensionDatums: ChainDataExtensionDatum[]; } @@ -195,6 +193,7 @@ export interface CdeCardanoProjectedNFTDatum extends CdeDatumBase { cdeDatumType: ChainDataExtensionDatumType.CardanoProjectedNFT; payload: CdeDatumCardanoProjectedNFTPayload; scheduledPrefix: string | undefined; + paginationCursor: { cursor: string; finished: boolean }; } export interface CdeCardanoAssetUtxoDatum extends CdeDatumBase {