From 27dbb08a290a28e4cee5b4155090cd20cad81e0c Mon Sep 17 00:00:00 2001 From: Enzo Cioppettini <48031343+ecioppettini@users.noreply.github.com> Date: Tue, 30 Apr 2024 13:35:58 -0300 Subject: [PATCH] migrated from postgres to pg --- packages/engine/paima-funnel/package.json | 2 +- .../paima-funnel/src/cde/minaGeneric.ts | 98 ++++++++++--------- .../paima-funnel/src/funnels/FunnelCache.ts | 6 +- .../paima-funnel/src/funnels/mina/funnel.ts | 59 +++++------ packages/engine/paima-funnel/src/reading.ts | 2 + 5 files changed, 84 insertions(+), 83 deletions(-) diff --git a/packages/engine/paima-funnel/package.json b/packages/engine/paima-funnel/package.json index a9d1dad5c..f59aa0d18 100644 --- a/packages/engine/paima-funnel/package.json +++ b/packages/engine/paima-funnel/package.json @@ -20,6 +20,6 @@ "assert-never": "^1.2.1", "@dcspark/carp-client": "^3.1.0", "@dcspark/cardano-multiplatform-lib-nodejs": "5.2.0", - "postgres": "^3.3.5" + "pg": "^8.11.3" } } diff --git a/packages/engine/paima-funnel/src/cde/minaGeneric.ts b/packages/engine/paima-funnel/src/cde/minaGeneric.ts index 149cd32ff..17f924e40 100644 --- a/packages/engine/paima-funnel/src/cde/minaGeneric.ts +++ b/packages/engine/paima-funnel/src/cde/minaGeneric.ts @@ -5,10 +5,10 @@ import type { ChainDataExtensionMinaEventGeneric, } from '@paima/sm'; import { ChainDataExtensionDatumType } from '@paima/utils'; -import postgres from 'postgres'; +import pg from 'pg'; export async function getEventCdeData(args: { - pg: postgres.Sql; + pg: pg.Client; extension: ChainDataExtensionMinaEventGeneric; fromTimestamp: number; toTimestamp: number; @@ -36,7 +36,7 @@ export async function getEventCdeData(args: { } export async function getActionCdeData(args: { - pg: postgres.Sql; + pg: pg.Client; extension: ChainDataExtensionMinaActionGeneric; fromTimestamp: number; toTimestamp: number; @@ -68,7 +68,7 @@ export async function getCdeData( cdeDatumType: | ChainDataExtensionDatumType.MinaActionGeneric | ChainDataExtensionDatumType.MinaEventGeneric, - pg: postgres.Sql, + pg: pg.Client, extension: ChainDataExtensionMinaEventGeneric | ChainDataExtensionMinaActionGeneric, fromTimestamp: number, toTimestamp: number, @@ -81,7 +81,10 @@ export async function getCdeData( ): Promise<(CdeMinaActionGenericDatum | CdeMinaEventGenericDatum)[]> { const result = [] as (CdeMinaActionGenericDatum | CdeMinaEventGenericDatum)[]; + console.log('fromTo', fromTimestamp, toTimestamp); + while (true) { + console.log('making query'); const unmapped = await query( pg, extension.address, @@ -92,7 +95,9 @@ export async function getCdeData( fromBlockHeight?.toString() ); - const grouped = groupByTx(unmapped); + console.log('unmapped', console.log(unmapped)); + + const grouped = groupByTx(unmapped.rows); const events = grouped.flatMap(perBlock => perBlock.eventsData.map(txEvent => ({ @@ -124,7 +129,7 @@ export async function getCdeData( return result; } -function groupByTx(events: postgres.RowList) { +function groupByTx(events: PerBlock[]) { const grouped = [] as { blockInfo: { height: number; @@ -157,13 +162,8 @@ function groupByTx(events: postgres.RowList) { return grouped; } -function canonicalChainCTE( - db_client: postgres.Sql, - toTimestamp?: string, - fromTimestamp?: string, - fromBlockHeight?: string -) { - return db_client` +function canonicalChainCTE(toTimestamp?: string, fromTimestamp?: string, fromBlockHeight?: string) { + return ` canonical_chain AS ( SELECT id, state_hash, height, global_slot_since_genesis, timestamp @@ -171,28 +171,28 @@ function canonicalChainCTE( blocks b WHERE 1=1 - ${fromTimestamp ? db_client`AND b.timestamp::decimal >= ${fromTimestamp}::decimal` : db_client``} - ${toTimestamp ? db_client`AND b.timestamp::decimal <= ${toTimestamp}::decimal` : db_client``} - ${fromBlockHeight ? db_client`AND b.height::decimal >= ${fromBlockHeight}::decimal` : db_client``} + ${fromTimestamp ? `AND b.timestamp::decimal >= ${fromTimestamp}::decimal` : ``} + ${toTimestamp ? `AND b.timestamp::decimal <= ${toTimestamp}::decimal` : ``} + ${fromBlockHeight ? `AND b.height::decimal >= ${fromBlockHeight}::decimal` : ``} ORDER BY height ) `; } -function accountIdentifierCTE(db_client: postgres.Sql, address: string) { - return db_client` +function accountIdentifierCTE(address: string) { + return ` account_identifier AS ( SELECT id AS requesting_zkapp_account_identifier_id FROM account_identifiers ai WHERE - ai.public_key_id = (SELECT id FROM public_keys WHERE value = ${address}) + ai.public_key_id = (SELECT id FROM public_keys WHERE value = '${address}') )`; } -function blocksAccessedCTE(db_client: postgres.Sql) { - return db_client` +function blocksAccessedCTE() { + return ` blocks_accessed AS ( SELECT @@ -212,8 +212,8 @@ function blocksAccessedCTE(db_client: postgres.Sql) { )`; } -function emittedZkAppCommandsCTE(db_client: postgres.Sql, after?: string) { - return db_client` +function emittedZkAppCommandsCTE(after?: string) { + return ` emitted_zkapp_commands AS ( SELECT blocks_accessed.*, @@ -234,14 +234,14 @@ function emittedZkAppCommandsCTE(db_client: postgres.Sql, after?: string) { INNER JOIN zkapp_account_update zkcu ON zkcu.id = ANY(zkc.zkapp_account_updates_ids) INNER JOIN zkapp_account_update_body zkcu_body ON zkcu_body.id = zkcu.body_id AND zkcu_body.account_identifier_id = requesting_zkapp_account_identifier_id - ${after ? db_client`AND zkc.id > (SELECT id FROM zkapp_commands WHERE zkapp_commands.hash = ${after})` : db_client``} + ${after ? `AND zkc.id > (SELECT id FROM zkapp_commands WHERE zkapp_commands.hash = '${after}')` : ``} WHERE bzkc.status <> 'failed' )`; } -function emittedEventsCTE(db_client: postgres.Sql) { - return db_client` +function emittedEventsCTE() { + return ` emitted_events AS ( SELECT *, @@ -257,8 +257,8 @@ function emittedEventsCTE(db_client: postgres.Sql) { `; } -function emittedActionsCTE(db_client: postgres.Sql) { - return db_client` +function emittedActionsCTE() { + return ` emitted_actions AS ( SELECT *, @@ -274,8 +274,8 @@ function emittedActionsCTE(db_client: postgres.Sql) { `; } -function emittedActionStateCTE(db_client: postgres.Sql) { - return db_client` +function emittedActionStateCTE() { + return ` emitted_action_state AS ( SELECT emitted_actions.* @@ -297,7 +297,7 @@ type PerBlock = { }; export function getEventsQuery( - db_client: postgres.Sql, + db_client: pg.Client, address: string, toTimestamp?: string, fromTimestamp?: string, @@ -305,13 +305,13 @@ export function getEventsQuery( limit?: string, fromBlockHeight?: string ) { - return db_client` + let query = ` WITH - ${canonicalChainCTE(db_client, toTimestamp, fromTimestamp, fromBlockHeight)}, - ${accountIdentifierCTE(db_client, address)}, - ${blocksAccessedCTE(db_client)}, - ${emittedZkAppCommandsCTE(db_client, after)}, - ${emittedEventsCTE(db_client)}, + ${canonicalChainCTE(toTimestamp, fromTimestamp, fromBlockHeight)}, + ${accountIdentifierCTE(address)}, + ${blocksAccessedCTE()}, + ${emittedZkAppCommandsCTE(after)}, + ${emittedEventsCTE()}, grouped_events AS ( SELECT MAX(timestamp) timestamp, @@ -332,12 +332,14 @@ export function getEventsQuery( FROM grouped_events GROUP BY height ORDER BY height - ${limit ? db_client`LIMIT ${limit}` : db_client``} + ${limit ? `LIMIT ${limit}` : ``} `; + + return db_client.query(query); } export function getActionsQuery( - db_client: postgres.Sql, + db_client: pg.Client, address: string, toTimestamp?: string, fromTimestamp?: string, @@ -345,14 +347,14 @@ export function getActionsQuery( limit?: string, fromBlockHeight?: string ) { - return db_client` + const query = ` WITH - ${canonicalChainCTE(db_client, toTimestamp, fromTimestamp, fromBlockHeight)}, - ${accountIdentifierCTE(db_client, address)}, - ${blocksAccessedCTE(db_client)}, - ${emittedZkAppCommandsCTE(db_client, after)}, - ${emittedActionsCTE(db_client)}, - ${emittedActionStateCTE(db_client)}, + ${canonicalChainCTE(toTimestamp, fromTimestamp, fromBlockHeight)}, + ${accountIdentifierCTE(address)}, + ${blocksAccessedCTE()}, + ${emittedZkAppCommandsCTE(after)}, + ${emittedActionsCTE()}, + ${emittedActionStateCTE()}, grouped_events AS ( SELECT MAX(timestamp) timestamp, @@ -373,6 +375,8 @@ export function getActionsQuery( FROM grouped_events GROUP BY height ORDER BY height - ${limit ? db_client`LIMIT ${limit}` : db_client``} + ${limit ? `LIMIT ${limit}` : ``} `; + + return db_client.query(query); } diff --git a/packages/engine/paima-funnel/src/funnels/FunnelCache.ts b/packages/engine/paima-funnel/src/funnels/FunnelCache.ts index 839652c06..f8e0effcd 100644 --- a/packages/engine/paima-funnel/src/funnels/FunnelCache.ts +++ b/packages/engine/paima-funnel/src/funnels/FunnelCache.ts @@ -1,5 +1,5 @@ import type { ChainData } from '@paima/sm'; -import postgres from 'postgres'; +import pg from 'pg'; export interface FunnelCacheEntry { /** @@ -184,7 +184,7 @@ export class EvmFunnelCacheEntry implements FunnelCacheEntry { export type MinaFunnelCacheEntryState = { startingSlotTimestamp: number; lastPoint: { timestamp: number } | undefined; - pg: postgres.Sql; + pg: pg.Client; cursors: | { [cdeId: number]: { cursor: string; finished: boolean }; @@ -196,7 +196,7 @@ export class MinaFunnelCacheEntry implements FunnelCacheEntry { private state: MinaFunnelCacheEntryState | null = null; public static readonly SYMBOL = Symbol('MinaFunnelStartingSlot'); - public updateStartingTimestamp(startingSlotTimestamp: number, pg: postgres.Sql): void { + public updateStartingTimestamp(startingSlotTimestamp: number, pg: pg.Client): void { this.state = { startingSlotTimestamp, lastPoint: this.state?.lastPoint, diff --git a/packages/engine/paima-funnel/src/funnels/mina/funnel.ts b/packages/engine/paima-funnel/src/funnels/mina/funnel.ts index 855c85213..845acfc4e 100644 --- a/packages/engine/paima-funnel/src/funnels/mina/funnel.ts +++ b/packages/engine/paima-funnel/src/funnels/mina/funnel.ts @@ -22,17 +22,19 @@ import { getMinaCheckpoint, getPaginationCursors } from '@paima/db'; import { getActionCdeData, getEventCdeData } from '../../cde/minaGeneric.js'; import type { MinaConfig } from '@paima/utils'; import { MinaFunnelCacheEntry } from '../FunnelCache.js'; -import postgres from 'postgres'; +import pg from 'pg'; +const { Client } = pg; const delayForWaitingForFinalityLoop = 1000; async function findMinaConfirmedTimestamp( - pg: postgres.Sql, - confirmationDepth?: number, + db: pg.Client, + confirmationDepth?: number ): Promise { let row; if (confirmationDepth) { - row = await pg` + row = ( + await db.query(` WITH RECURSIVE chain AS ( (SELECT parent_id, id, timestamp, height FROM blocks b WHERE height = (select MAX(height) from blocks) ORDER BY timestamp ASC @@ -44,20 +46,21 @@ async function findMinaConfirmedTimestamp( ) SELECT timestamp FROM chain c LIMIT 1 OFFSET ${confirmationDepth}; - `; + `) + ).rows; } else { - row = - await pg`select timestamp from blocks where chain_status = 'canonical' order by height desc limit 1;`; + const res = await db.query( + `select timestamp from blocks where chain_status = 'canonical' order by height desc limit 1;` + ); + + row = res.rows; } - return Number.parseInt(row[0]["timestamp"], 10); + return Number.parseInt(row[0]['timestamp'], 10); } // mina timestamps are in milliseconds, while evm timestamps are in seconds. -function baseChainTimestampToMina( - baseChainTimestamp: number, - delay: number, -): number { +function baseChainTimestampToMina(baseChainTimestamp: number, delay: number): number { return Math.max((baseChainTimestamp - delay) * 1000, 0); } @@ -92,13 +95,13 @@ export class MinaFunnel extends BaseFunnel implements ChainFunnel { const maxBaseTimestamp = baseChainTimestampToMina( baseData[baseData.length - 1].timestamp, - this.config.delay, + this.config.delay ); while (true) { const confirmedTimestamp = await findMinaConfirmedTimestamp( cachedState.pg, - this.config.confirmationDepth, + this.config.confirmationDepth ); if (confirmedTimestamp >= maxBaseTimestamp) { @@ -118,10 +121,7 @@ export class MinaFunnel extends BaseFunnel implements ChainFunnel { const getBlockNumber = (state: { curr: number }) => (ts: number) => { while ( state.curr < baseData.length && - baseChainTimestampToMina( - baseData[state.curr].timestamp, - this.config.delay, - ) <= ts + baseChainTimestampToMina(baseData[state.curr].timestamp, this.config.delay) <= ts ) state.curr++; @@ -192,10 +192,7 @@ export class MinaFunnel extends BaseFunnel implements ChainFunnel { chainData.internalEvents.push({ type: InternalEventType.MinaLastTimestamp, - timestamp: baseChainTimestampToMina( - chainData.timestamp, - this.config.delay, - ).toString(), + timestamp: baseChainTimestampToMina(chainData.timestamp, this.config.delay).toString(), network: this.chainName, }); } @@ -203,9 +200,9 @@ export class MinaFunnel extends BaseFunnel implements ChainFunnel { return composed; } - public override async readPresyncData( - args: ReadPresyncDataFrom - ): Promise<{ [network: string]: PresyncChainData[] | typeof FUNNEL_PRESYNC_FINISHED }> { + public override async readPresyncData(args: ReadPresyncDataFrom): Promise<{ + [network: string]: PresyncChainData[] | typeof FUNNEL_PRESYNC_FINISHED; + }> { const basePromise = this.baseFunnel.readPresyncData(args); const cache = this.cache.getState(); @@ -354,15 +351,13 @@ export class MinaFunnel extends BaseFunnel implements ChainFunnel { const newEntry = new MinaFunnelCacheEntry(); sharedData.cacheManager.cacheEntries[MinaFunnelCacheEntry.SYMBOL] = newEntry; - const pg = postgres(config.archiveConnectionString); + const pg = new Client({ connectionString: config.archiveConnectionString }); + await pg.connect(); - const startingBlockTimestamp = (await sharedData.web3.eth.getBlock(startingBlockHeight)) - .timestamp as number; + const startingBlock = await sharedData.web3.eth.getBlock(startingBlockHeight); + const startingBlockTimestamp = startingBlock.timestamp as number; - const minaTimestamp = baseChainTimestampToMina( - startingBlockTimestamp, - config.delay, - ); + const minaTimestamp = baseChainTimestampToMina(startingBlockTimestamp, config.delay); newEntry.updateStartingTimestamp(minaTimestamp, pg); diff --git a/packages/engine/paima-funnel/src/reading.ts b/packages/engine/paima-funnel/src/reading.ts index 1e2b4bebf..7f8abf78e 100644 --- a/packages/engine/paima-funnel/src/reading.ts +++ b/packages/engine/paima-funnel/src/reading.ts @@ -80,6 +80,8 @@ export async function getMultipleBlockData( ): Promise { const batch = new web3.BatchRequest(); + // console.log('getting multiple block data', fromBlock, toBlock); + const blockRange = Array.from({ length: toBlock - fromBlock + 1 }, (_, i) => i + fromBlock); const blockPromises = blockRange.map(blockNumber => { return new Promise((resolve, reject) => {