Skip to content

Commit

Permalink
migrated from postgres to pg
Browse files Browse the repository at this point in the history
  • Loading branch information
ecioppettini committed Apr 30, 2024
1 parent 24546d0 commit 27dbb08
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 83 deletions.
2 changes: 1 addition & 1 deletion packages/engine/paima-funnel/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@
"assert-never": "^1.2.1",
"@dcspark/carp-client": "^3.1.0",
"@dcspark/cardano-multiplatform-lib-nodejs": "5.2.0",
"postgres": "^3.3.5"
"pg": "^8.11.3"
}
}
98 changes: 51 additions & 47 deletions packages/engine/paima-funnel/src/cde/minaGeneric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import type {
ChainDataExtensionMinaEventGeneric,
} from '@paima/sm';
import { ChainDataExtensionDatumType } from '@paima/utils';
import postgres from 'postgres';
import pg from 'pg';

export async function getEventCdeData(args: {
pg: postgres.Sql;
pg: pg.Client;
extension: ChainDataExtensionMinaEventGeneric;
fromTimestamp: number;
toTimestamp: number;
Expand Down Expand Up @@ -36,7 +36,7 @@ export async function getEventCdeData(args: {
}

export async function getActionCdeData(args: {
pg: postgres.Sql;
pg: pg.Client;
extension: ChainDataExtensionMinaActionGeneric;
fromTimestamp: number;
toTimestamp: number;
Expand Down Expand Up @@ -68,7 +68,7 @@ export async function getCdeData(
cdeDatumType:
| ChainDataExtensionDatumType.MinaActionGeneric
| ChainDataExtensionDatumType.MinaEventGeneric,
pg: postgres.Sql,
pg: pg.Client,
extension: ChainDataExtensionMinaEventGeneric | ChainDataExtensionMinaActionGeneric,
fromTimestamp: number,
toTimestamp: number,
Expand All @@ -81,7 +81,10 @@ export async function getCdeData(
): Promise<(CdeMinaActionGenericDatum | CdeMinaEventGenericDatum)[]> {
const result = [] as (CdeMinaActionGenericDatum | CdeMinaEventGenericDatum)[];

console.log('fromTo', fromTimestamp, toTimestamp);

while (true) {
console.log('making query');
const unmapped = await query(
pg,
extension.address,
Expand All @@ -92,7 +95,9 @@ export async function getCdeData(
fromBlockHeight?.toString()
);

const grouped = groupByTx(unmapped);
console.log('unmapped', console.log(unmapped));

const grouped = groupByTx(unmapped.rows);

const events = grouped.flatMap(perBlock =>
perBlock.eventsData.map(txEvent => ({
Expand Down Expand Up @@ -124,7 +129,7 @@ export async function getCdeData(
return result;
}

function groupByTx(events: postgres.RowList<PerBlock[]>) {
function groupByTx(events: PerBlock[]) {
const grouped = [] as {
blockInfo: {
height: number;
Expand Down Expand Up @@ -157,42 +162,37 @@ function groupByTx(events: postgres.RowList<PerBlock[]>) {
return grouped;
}

function canonicalChainCTE(
db_client: postgres.Sql,
toTimestamp?: string,
fromTimestamp?: string,
fromBlockHeight?: string
) {
return db_client`
function canonicalChainCTE(toTimestamp?: string, fromTimestamp?: string, fromBlockHeight?: string) {
return `
canonical_chain AS (
SELECT
id, state_hash, height, global_slot_since_genesis, timestamp
FROM
blocks b
WHERE
1=1
${fromTimestamp ? db_client`AND b.timestamp::decimal >= ${fromTimestamp}::decimal` : db_client``}
${toTimestamp ? db_client`AND b.timestamp::decimal <= ${toTimestamp}::decimal` : db_client``}
${fromBlockHeight ? db_client`AND b.height::decimal >= ${fromBlockHeight}::decimal` : db_client``}
${fromTimestamp ? `AND b.timestamp::decimal >= ${fromTimestamp}::decimal` : ``}
${toTimestamp ? `AND b.timestamp::decimal <= ${toTimestamp}::decimal` : ``}
${fromBlockHeight ? `AND b.height::decimal >= ${fromBlockHeight}::decimal` : ``}
ORDER BY height
)
`;
}

function accountIdentifierCTE(db_client: postgres.Sql, address: string) {
return db_client`
function accountIdentifierCTE(address: string) {
return `
account_identifier AS (
SELECT
id AS requesting_zkapp_account_identifier_id
FROM
account_identifiers ai
WHERE
ai.public_key_id = (SELECT id FROM public_keys WHERE value = ${address})
ai.public_key_id = (SELECT id FROM public_keys WHERE value = '${address}')
)`;
}

function blocksAccessedCTE(db_client: postgres.Sql) {
return db_client`
function blocksAccessedCTE() {
return `
blocks_accessed AS
(
SELECT
Expand All @@ -212,8 +212,8 @@ function blocksAccessedCTE(db_client: postgres.Sql) {
)`;
}

function emittedZkAppCommandsCTE(db_client: postgres.Sql, after?: string) {
return db_client`
function emittedZkAppCommandsCTE(after?: string) {
return `
emitted_zkapp_commands AS (
SELECT
blocks_accessed.*,
Expand All @@ -234,14 +234,14 @@ function emittedZkAppCommandsCTE(db_client: postgres.Sql, after?: string) {
INNER JOIN zkapp_account_update zkcu ON zkcu.id = ANY(zkc.zkapp_account_updates_ids)
INNER JOIN zkapp_account_update_body zkcu_body ON zkcu_body.id = zkcu.body_id
AND zkcu_body.account_identifier_id = requesting_zkapp_account_identifier_id
${after ? db_client`AND zkc.id > (SELECT id FROM zkapp_commands WHERE zkapp_commands.hash = ${after})` : db_client``}
${after ? `AND zkc.id > (SELECT id FROM zkapp_commands WHERE zkapp_commands.hash = '${after}')` : ``}
WHERE
bzkc.status <> 'failed'
)`;
}

function emittedEventsCTE(db_client: postgres.Sql) {
return db_client`
function emittedEventsCTE() {
return `
emitted_events AS (
SELECT
*,
Expand All @@ -257,8 +257,8 @@ function emittedEventsCTE(db_client: postgres.Sql) {
`;
}

function emittedActionsCTE(db_client: postgres.Sql) {
return db_client`
function emittedActionsCTE() {
return `
emitted_actions AS (
SELECT
*,
Expand All @@ -274,8 +274,8 @@ function emittedActionsCTE(db_client: postgres.Sql) {
`;
}

function emittedActionStateCTE(db_client: postgres.Sql) {
return db_client`
function emittedActionStateCTE() {
return `
emitted_action_state AS (
SELECT
emitted_actions.*
Expand All @@ -297,21 +297,21 @@ type PerBlock = {
};

export function getEventsQuery(
db_client: postgres.Sql,
db_client: pg.Client,
address: string,
toTimestamp?: string,
fromTimestamp?: string,
after?: string,
limit?: string,
fromBlockHeight?: string
) {
return db_client<PerBlock[]>`
let query = `
WITH
${canonicalChainCTE(db_client, toTimestamp, fromTimestamp, fromBlockHeight)},
${accountIdentifierCTE(db_client, address)},
${blocksAccessedCTE(db_client)},
${emittedZkAppCommandsCTE(db_client, after)},
${emittedEventsCTE(db_client)},
${canonicalChainCTE(toTimestamp, fromTimestamp, fromBlockHeight)},
${accountIdentifierCTE(address)},
${blocksAccessedCTE()},
${emittedZkAppCommandsCTE(after)},
${emittedEventsCTE()},
grouped_events AS (
SELECT
MAX(timestamp) timestamp,
Expand All @@ -332,27 +332,29 @@ export function getEventsQuery(
FROM grouped_events
GROUP BY height
ORDER BY height
${limit ? db_client`LIMIT ${limit}` : db_client``}
${limit ? `LIMIT ${limit}` : ``}
`;

return db_client.query(query);
}

export function getActionsQuery(
db_client: postgres.Sql,
db_client: pg.Client,
address: string,
toTimestamp?: string,
fromTimestamp?: string,
after?: string,
limit?: string,
fromBlockHeight?: string
) {
return db_client<PerBlock[]>`
const query = `
WITH
${canonicalChainCTE(db_client, toTimestamp, fromTimestamp, fromBlockHeight)},
${accountIdentifierCTE(db_client, address)},
${blocksAccessedCTE(db_client)},
${emittedZkAppCommandsCTE(db_client, after)},
${emittedActionsCTE(db_client)},
${emittedActionStateCTE(db_client)},
${canonicalChainCTE(toTimestamp, fromTimestamp, fromBlockHeight)},
${accountIdentifierCTE(address)},
${blocksAccessedCTE()},
${emittedZkAppCommandsCTE(after)},
${emittedActionsCTE()},
${emittedActionStateCTE()},
grouped_events AS (
SELECT
MAX(timestamp) timestamp,
Expand All @@ -373,6 +375,8 @@ export function getActionsQuery(
FROM grouped_events
GROUP BY height
ORDER BY height
${limit ? db_client`LIMIT ${limit}` : db_client``}
${limit ? `LIMIT ${limit}` : ``}
`;

return db_client.query(query);
}
6 changes: 3 additions & 3 deletions packages/engine/paima-funnel/src/funnels/FunnelCache.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { ChainData } from '@paima/sm';
import postgres from 'postgres';
import pg from 'pg';

export interface FunnelCacheEntry {
/**
Expand Down Expand Up @@ -184,7 +184,7 @@ export class EvmFunnelCacheEntry implements FunnelCacheEntry {
export type MinaFunnelCacheEntryState = {
startingSlotTimestamp: number;
lastPoint: { timestamp: number } | undefined;
pg: postgres.Sql;
pg: pg.Client;
cursors:
| {
[cdeId: number]: { cursor: string; finished: boolean };
Expand All @@ -196,7 +196,7 @@ export class MinaFunnelCacheEntry implements FunnelCacheEntry {
private state: MinaFunnelCacheEntryState | null = null;
public static readonly SYMBOL = Symbol('MinaFunnelStartingSlot');

public updateStartingTimestamp(startingSlotTimestamp: number, pg: postgres.Sql): void {
public updateStartingTimestamp(startingSlotTimestamp: number, pg: pg.Client): void {
this.state = {
startingSlotTimestamp,
lastPoint: this.state?.lastPoint,
Expand Down
Loading

0 comments on commit 27dbb08

Please sign in to comment.