Skip to content

Commit

Permalink
migrate cardanoProjected nft to cursor-based pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
ecioppettini committed Mar 4, 2024
1 parent 47dd957 commit b922ca9
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 170 deletions.
76 changes: 57 additions & 19 deletions packages/engine/paima-funnel/src/cde/cardanoProjectedNFT.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,49 +7,86 @@ import { ChainDataExtensionDatumType, DEFAULT_FUNNEL_TIMEOUT, timeout } from '@p
import { Routes, query } from '@dcspark/carp-client/client/src';
import { ProjectedNftStatus } from '@dcspark/carp-client/shared/models/ProjectedNftRange';
import type { ProjectedNftRangeResponse } from '@dcspark/carp-client/shared/models/ProjectedNftRange';
import { BlockTxPair } from '@dcspark/carp-client/shared/models/common';

export default async function getCdeProjectedNFTData(
url: string,
extension: ChainDataExtensionCardanoProjectedNFT,
fromAbsoluteSlot: number,
toAbsoluteSlot: number,
getBlockNumber: (slot: number) => number
getBlockNumber: (slot: number) => number,
isPresync: boolean,
untilBlock: string,
fromTx: BlockTxPair | undefined,
paginationLimit: number
): Promise<ChainDataExtensionDatum[]> {
const events = await timeout(
query(url, Routes.projectedNftEventsRange, {
range: { minSlot: fromAbsoluteSlot, maxSlot: toAbsoluteSlot },
address: undefined,
}),
DEFAULT_FUNNEL_TIMEOUT
);
let result = [] as ChainDataExtensionDatum[];

return events
.map(e => eventToCdeDatum(e, extension, getBlockNumber(e.actionSlot)))
.filter(e => e != null)
.map(e => e!);
while (true) {
const events = await timeout(
query(url, Routes.projectedNftEventsRange, {
address: undefined,
slotLimits: {
from: fromAbsoluteSlot,
to: toAbsoluteSlot,
},
limit: paginationLimit,
untilBlock,
after: fromTx,
}),
DEFAULT_FUNNEL_TIMEOUT
);

if (events.length > 0) {
const last = events[events.length - 1];

fromTx = {
tx: last.txId,
block: last.block,
};
}

events
.flatMap(event =>
event.payload.map(payload => ({ txId: event.txId, block: event.block, ...payload }))
)
.map(e => eventToCdeDatum(e, extension, getBlockNumber(e.actionSlot)))
.filter(e => e != null)
.map(e => e!)
.forEach(element => {
result.push(element);
});

if (events.length === 0 || isPresync) {
break;
}
}

return result;
}

function eventToCdeDatum(
event: ProjectedNftRangeResponse[0],
event: { txId: string; block: string } & ProjectedNftRangeResponse[0]['payload'][0],
extension: ChainDataExtensionCardanoProjectedNFT,
blockNumber: number
): CdeCardanoProjectedNFTDatum | null {
if (
event.actionTxId === null ||
event.actionTxId == '' ||
event.status === ProjectedNftStatus.Invalid
) {
if (event.txId === null || event.txId == '' || event.status === ProjectedNftStatus.Invalid) {
return null;
}

const cursor: BlockTxPair = {
block: event.block,
tx: event.txId,
};

return {
cdeId: extension.cdeId,
cdeDatumType: ChainDataExtensionDatumType.CardanoProjectedNFT,
blockNumber,
payload: {
ownerAddress: event.ownerAddress != null ? event.ownerAddress : '',

actionTxId: event.actionTxId,
actionTxId: event.txId,
actionOutputIndex: event.actionOutputIndex != null ? event.actionOutputIndex : undefined,

previousTxHash: event.previousTxHash != null ? event.previousTxHash : undefined,
Expand All @@ -65,5 +102,6 @@ function eventToCdeDatum(
forHowLong: event.forHowLong != null ? event.forHowLong : undefined,
},
scheduledPrefix: extension.scheduledPrefix,
paginationCursor: { cursor: JSON.stringify(cursor), finished: false },
};
}
11 changes: 2 additions & 9 deletions packages/engine/paima-funnel/src/funnels/FunnelCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ export type CarpFunnelCacheEntryState = {
epoch: number | undefined;
cursors:
| {
[cdeId: number]:
| { kind: 'paginationCursor'; cursor: string; finished: boolean }
| { kind: 'slot'; slot: number; finished: boolean };
[cdeId: number]: { cursor: string; finished: boolean };
}
| undefined;
};
Expand Down Expand Up @@ -112,12 +110,7 @@ export class CarpFunnelCacheEntry implements FunnelCacheEntry {
}
}

public updateCursor(
cdeId: number,
presyncCursor:
| { kind: 'paginationCursor'; cursor: string; finished: boolean }
| { kind: 'slot'; slot: number; finished: boolean }
): void {
public updateCursor(cdeId: number, presyncCursor: { cursor: string; finished: boolean }): void {
if (this.state) {
if (!this.state.cursors) {
this.state.cursors = {};
Expand Down
171 changes: 37 additions & 134 deletions packages/engine/paima-funnel/src/funnels/carp/funnel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ import { Routes } from '@dcspark/carp-client/shared/routes';
import { FUNNEL_PRESYNC_FINISHED, InternalEventType } from '@paima/utils';
import { CarpFunnelCacheEntry } from '../FunnelCache.js';
import { getCardanoEpoch, getCarpCursors } from '@paima/db';
import type { BlockTxPair } from 'tmp-carp-client/shared/models/common';
import { BlockTxPair } from '@dcspark/carp-client/shared/models/common';

const delayForWaitingForFinalityLoop = 1000;
const DEFAULT_PRESYNC_SLOT_RANGE = 10000;

type Era = {
firstSlot: number;
Expand Down Expand Up @@ -223,31 +222,6 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel {
return data;
}

const getSlotRange = ({
cdeId,
startSlot,
}: {
cdeId: number;
startSlot: number;
}): { from: number; to: number } => {
const cursors = this.cache.getState().cursors;
const from: number = (cursors && (cursors[cdeId] as { slot: number }).slot) || startSlot;
const to = from + DEFAULT_PRESYNC_SLOT_RANGE;

// the cache gets invalidated on error, so we can update the cursor before
// even returning the event without risk.
this.cache.updateCursor(cdeId, {
kind: 'slot',
slot: to,
finished: to >= this.cache.getState().startingSlot,
});

return {
from,
to,
};
};

const cache = this.cache;
const mapCursorPaginatedData = (cdeId: number) => (datums: any) => {
// we are providing the entire indexed range, so if carp
Expand All @@ -256,7 +230,6 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel {
const finished = datums.length === 0 || datums.length < this.config.paginationLimit;

cache.updateCursor(cdeId, {
kind: 'paginationCursor',
cursor: datums[datums.length - 1] ? datums[datums.length - 1].paginationCursor.cursor : '',
finished,
});
Expand Down Expand Up @@ -296,8 +269,6 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel {

const cursor = cursors && cursors[extension.cdeId];

console.log('min', Math.min(startingSlot, extension.stopSlot || startingSlot));

const data = getCdePoolData(
this.carpUrl,
extension,
Expand All @@ -307,9 +278,7 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel {
slot => absoluteSlotToEpoch(this.era, slot),
true,
stableBlock.block.hash,
cursor && cursor.kind === 'paginationCursor'
? (JSON.parse(cursor.cursor) as BlockTxPair)
: undefined,
cursor ? (JSON.parse(cursor.cursor) as BlockTxPair) : undefined,
this.config.paginationLimit
).then(mapCursorPaginatedData(extension.cdeId));

Expand All @@ -320,15 +289,23 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel {
}));
}
case ChainDataExtensionType.CardanoProjectedNFT: {
const { from, to } = getSlotRange(extension);
const cursors = this.cache.getState().cursors;
const startingSlot = this.cache.getState().startingSlot - 1;

const cursor = cursors && cursors[extension.cdeId];

const data = getCdeProjectedNFTData(
this.carpUrl,
extension,
from,
Math.min(to, this.cache.getState().startingSlot - 1),
slot => slot
);

extension.startSlot,
Math.min(startingSlot, extension.stopSlot || startingSlot),
slot => slot,
true,
stableBlock.block.hash,
cursor ? (JSON.parse(cursor.cursor) as BlockTxPair) : undefined,
this.config.paginationLimit
).then(mapCursorPaginatedData(extension.cdeId));

return data.then(data => ({
cdeId: extension.cdeId,
Expand All @@ -350,9 +327,7 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel {
slot => slot,
true,
stableBlock.block.hash,
cursor && cursor.kind === 'paginationCursor'
? (JSON.parse(cursor.cursor) as BlockTxPair)
: undefined,
cursor ? (JSON.parse(cursor.cursor) as BlockTxPair) : undefined,
this.config.paginationLimit
).then(mapCursorPaginatedData(extension.cdeId));

Expand All @@ -376,9 +351,7 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel {
slot => slot,
true,
stableBlock.block.hash,
cursor && cursor.kind === 'paginationCursor'
? (JSON.parse(cursor.cursor) as BlockTxPair)
: undefined,
cursor ? (JSON.parse(cursor.cursor) as BlockTxPair) : undefined,
this.config.paginationLimit
).then(mapCursorPaginatedData(extension.cdeId));

Expand All @@ -399,75 +372,17 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel {
const list: CardanoPresyncChainData[] = [];

for (const events of carpEvents) {
if (
events.cdeType === ChainDataExtensionType.CardanoTransfer ||
events.cdeType === ChainDataExtensionType.CardanoPool
) {
for (const event of events.data || []) {
list.push({
extensionDatums: [event],
network: this.chainName,
networkType: ConfigNetworkType.CARDANO,
carpCursor: {
kind: 'paginationCursor',
cdeId: event.cdeId,
cursor: event.paginationCursor.cursor,
finished: event.paginationCursor.finished,
},
});
}
} else {
// handle the cde's that are still on slot range based 'pagination' (not
// really pagination, but emulated)
const cursor = cursors && cursors[events.cdeId];
const finished = (cursor && cursor.kind === 'slot' && cursor.finished) || false;

// add an empty event so that the slot range gets updated even if there
// are no events, since it's not a real pagination cursor
if (events.data.length === 0) {
const slot = (cursor && cursor.kind === 'slot' && cursor.slot) || 0;
list.push({
extensionDatums: [],
networkType: ConfigNetworkType.CARDANO,
network: this.chainName,
carpCursor: {
kind: 'slot',
cdeId: events.cdeId,
slot,
finished,
},
});
}

// group by slot by traversing in order, if two consecutive entries have
// the same slot then add them to the last entry.
//
// it's important to group by slot since that's how the 'cursor' is updated.
// this code can be removed when we implement pagination by (tx,block).
let slot;
for (let i = 0; i < events.data.length; i++) {
const event = events.data[i];
const isLastAndFinished = (i == events.data.length - 1 && finished) || false;

if (slot && slot === event.blockNumber) {
list[list.length - 1].extensionDatums.push(event);
list[list.length - 1].carpCursor.finished = isLastAndFinished;
} else {
slot = event.blockNumber;

list.push({
extensionDatums: [event],
networkType: ConfigNetworkType.CARDANO,
network: this.chainName,
carpCursor: {
kind: 'slot',
cdeId: event.cdeId,
slot: event.blockNumber,
finished: isLastAndFinished,
},
});
}
}
for (const event of events.data || []) {
list.push({
extensionDatums: [event],
network: this.chainName,
networkType: ConfigNetworkType.CARDANO,
carpCursor: {
cdeId: event.cdeId,
cursor: event.paginationCursor.cursor,
finished: event.paginationCursor.finished,
},
});
}
}

Expand Down Expand Up @@ -514,26 +429,10 @@ export class CarpFunnel extends BaseFunnel implements ChainFunnel {
const cursors = await getCarpCursors.run(undefined, dbTx);

for (const cursor of cursors) {
const kind = sharedData.extensions.find(extension => extension.cdeId === cursor.cde_id);

const slotBased =
kind?.cdeType !== ChainDataExtensionType.CardanoTransfer &&
kind?.cdeType !== ChainDataExtensionType.CardanoPool &&
kind?.cdeType !== ChainDataExtensionType.CardanoAssetUtxo;

if (slotBased) {
newEntry.updateCursor(cursor.cde_id, {
kind: 'slot',
slot: Number(cursor.cursor),
finished: cursor.finished,
});
} else {
newEntry.updateCursor(cursor.cde_id, {
kind: 'paginationCursor',
cursor: cursor.cursor,
finished: cursor.finished,
});
}
newEntry.updateCursor(cursor.cde_id, {
cursor: cursor.cursor,
finished: cursor.finished,
});
}

return newEntry;
Expand Down Expand Up @@ -640,7 +539,11 @@ async function readDataInternal(
extension,
min,
Math.min(max, extension.stopSlot || max),
mapSlotToBlockNumber
mapSlotToBlockNumber,
false, // not presync
stableBlockId,
undefined, // we want everything in the range, so no starting point for the pagination
paginationLimit
);
return projectedNFTData;
case ChainDataExtensionType.CardanoAssetUtxo:
Expand Down
Loading

0 comments on commit b922ca9

Please sign in to comment.