Skip to content

Commit

Permalink
Simplify presync progress tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
SebastienGllmt committed Nov 18, 2023
1 parent ac57618 commit 0382de6
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 131 deletions.
15 changes: 0 additions & 15 deletions packages/engine/paima-sm/src/cde-processing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import processErc20DepositDatum from './cde-erc20-deposit.js';
import processErc6551RegistryDatum from './cde-erc6551-registry.js';
import processGenericDatum from './cde-generic.js';
import type { SQLUpdate } from '@paima/db';
import { getSpecificCdeBlockheight } from '@paima/db';
import assertNever from 'assert-never';

export async function cdeTransitionFunction(
Expand All @@ -34,17 +33,3 @@ export async function cdeTransitionFunction(
assertNever(cdeDatum);
}
}

export async function getProcessedCdeDatumCount(
readonlyDBConn: PoolClient,
blockHeight: number
): Promise<number> {
const cdeStatus = await getSpecificCdeBlockheight.run(
{ block_height: blockHeight },
readonlyDBConn
);
if (cdeStatus.length === 0) {
return 0;
}
return cdeStatus[0].datum_count;
}
17 changes: 1 addition & 16 deletions packages/engine/paima-sm/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import {
getLatestProcessedBlockHeight,
getScheduledDataByBlockHeight,
saveLastBlockHeight,
markCdeDatumProcessed,
markCdeBlockheightProcessed,
getLatestProcessedCdeBlockheight,
} from '@paima/db';
import Prando from '@paima/prando';

import { randomnessRouter } from './randomness.js';
import { cdeTransitionFunction, getProcessedCdeDatumCount } from './cde-processing.js';
import { cdeTransitionFunction } from './cde-processing.js';

const SM: GameStateMachineInitializer = {
initialize: (
Expand Down Expand Up @@ -185,26 +184,12 @@ async function processCdeData(
return 0;
}

let processedCdeDatumCount = await getProcessedCdeDatumCount(dbTx, blockHeight);
if (processedCdeDatumCount > 0) {
cdeData = cdeData.slice(processedCdeDatumCount);
}

for (const datum of cdeData) {
const sqlQueries = await cdeTransitionFunction(dbTx, datum);
try {
processedCdeDatumCount++;
const datumCount = processedCdeDatumCount;
for (const [query, params] of sqlQueries) {
await query.run(params, dbTx);
}
await markCdeDatumProcessed.run(
{
block_height: blockHeight,
datum_count: datumCount,
},
dbTx
);
} catch (err) {
doLog(`[paima-sm] Database error: ${err}`);
}
Expand Down
4 changes: 1 addition & 3 deletions packages/node-sdk/paima-db/migrations/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ CREATE TABLE historical_game_inputs (
);

CREATE TABLE cde_tracking (
block_height INTEGER PRIMARY KEY,
datum_count INTEGER NOT NULL,
done BOOLEAN NOT NULL DEFAULT false
block_height INTEGER PRIMARY KEY
);

CREATE TABLE chain_data_extensions (
Expand Down
10 changes: 2 additions & 8 deletions packages/node-sdk/paima-db/src/paima-tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,14 @@ const TABLE_DATA_HISTORICAL: TableData = {

const QUERY_CREATE_TABLE_CDE_TRACKING = `
CREATE TABLE cde_tracking (
block_height INTEGER PRIMARY KEY,
datum_count INTEGER NOT NULL,
done BOOLEAN NOT NULL DEFAULT false
block_height INTEGER PRIMARY KEY
);
`;

const TABLE_DATA_CDE_TRACKING: TableData = {
tableName: 'cde_tracking',
primaryKeyColumns: ['block_height'],
columnData: packTuples([
['block_height', 'integer', 'NO', ''],
['datum_count', 'integer', 'NO', ''],
['done', 'boolean', 'NO', 'false'],
]),
columnData: packTuples([['block_height', 'integer', 'NO', '']]),
serialColumns: [],
creationQuery: QUERY_CREATE_TABLE_CDE_TRACKING,
};
Expand Down
75 changes: 4 additions & 71 deletions packages/node-sdk/paima-db/src/sql/cde-tracking.queries.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,6 @@
/** Types generated for queries found in "src/sql/cde-tracking.sql" */
import { PreparedQuery } from '@pgtyped/runtime';

/** 'MarkCdeDatumProcessed' parameters type */
export interface IMarkCdeDatumProcessedParams {
block_height: number;
datum_count?: number | null | void;
}

/** 'MarkCdeDatumProcessed' return type */
export type IMarkCdeDatumProcessedResult = void;

/** 'MarkCdeDatumProcessed' query type */
export interface IMarkCdeDatumProcessedQuery {
params: IMarkCdeDatumProcessedParams;
result: IMarkCdeDatumProcessedResult;
}

const markCdeDatumProcessedIR: any = {"usedParamSet":{"block_height":true,"datum_count":true},"params":[{"name":"block_height","required":true,"transform":{"type":"scalar"},"locs":[{"a":66,"b":79}]},{"name":"datum_count","required":false,"transform":{"type":"scalar"},"locs":[{"a":82,"b":93}]}],"statement":"INSERT INTO cde_tracking(block_height, datum_count, done)\nVALUES (:block_height!, :datum_count, FALSE)\nON CONFLICT (block_height)\nDO UPDATE SET\nblock_height = EXCLUDED.block_height,\ndatum_count = EXCLUDED.datum_count,\ndone = EXCLUDED.done"};

/**
* Query generated from SQL:
* ```
* INSERT INTO cde_tracking(block_height, datum_count, done)
* VALUES (:block_height!, :datum_count, FALSE)
* ON CONFLICT (block_height)
* DO UPDATE SET
* block_height = EXCLUDED.block_height,
* datum_count = EXCLUDED.datum_count,
* done = EXCLUDED.done
* ```
*/
export const markCdeDatumProcessed = new PreparedQuery<IMarkCdeDatumProcessedParams,IMarkCdeDatumProcessedResult>(markCdeDatumProcessedIR);


/** 'MarkCdeBlockheightProcessed' parameters type */
export interface IMarkCdeBlockheightProcessedParams {
block_height: number;
Expand All @@ -47,58 +15,24 @@ export interface IMarkCdeBlockheightProcessedQuery {
result: IMarkCdeBlockheightProcessedResult;
}

const markCdeBlockheightProcessedIR: any = {"usedParamSet":{"block_height":true},"params":[{"name":"block_height","required":true,"transform":{"type":"scalar"},"locs":[{"a":59,"b":72}]}],"statement":"UPDATE cde_tracking\nSET\n done = TRUE\nWHERE block_height = :block_height!"};
const markCdeBlockheightProcessedIR: any = {"usedParamSet":{"block_height":true},"params":[{"name":"block_height","required":true,"transform":{"type":"scalar"},"locs":[{"a":47,"b":60}]}],"statement":"INSERT INTO cde_tracking(block_height)\nVALUES (:block_height!)"};

/**
* Query generated from SQL:
* ```
* UPDATE cde_tracking
* SET
* done = TRUE
* WHERE block_height = :block_height!
* INSERT INTO cde_tracking(block_height)
* VALUES (:block_height!)
* ```
*/
export const markCdeBlockheightProcessed = new PreparedQuery<IMarkCdeBlockheightProcessedParams,IMarkCdeBlockheightProcessedResult>(markCdeBlockheightProcessedIR);


/** 'GetSpecificCdeBlockheight' parameters type */
export interface IGetSpecificCdeBlockheightParams {
block_height: number;
}

/** 'GetSpecificCdeBlockheight' return type */
export interface IGetSpecificCdeBlockheightResult {
block_height: number;
datum_count: number;
done: boolean;
}

/** 'GetSpecificCdeBlockheight' query type */
export interface IGetSpecificCdeBlockheightQuery {
params: IGetSpecificCdeBlockheightParams;
result: IGetSpecificCdeBlockheightResult;
}

const getSpecificCdeBlockheightIR: any = {"usedParamSet":{"block_height":true},"params":[{"name":"block_height","required":true,"transform":{"type":"scalar"},"locs":[{"a":48,"b":61}]}],"statement":"SELECT * FROM cde_tracking\nWHERE block_height = :block_height!"};

/**
* Query generated from SQL:
* ```
* SELECT * FROM cde_tracking
* WHERE block_height = :block_height!
* ```
*/
export const getSpecificCdeBlockheight = new PreparedQuery<IGetSpecificCdeBlockheightParams,IGetSpecificCdeBlockheightResult>(getSpecificCdeBlockheightIR);


/** 'GetLatestProcessedCdeBlockheight' parameters type */
export type IGetLatestProcessedCdeBlockheightParams = void;

/** 'GetLatestProcessedCdeBlockheight' return type */
export interface IGetLatestProcessedCdeBlockheightResult {
block_height: number;
datum_count: number;
done: boolean;
}

/** 'GetLatestProcessedCdeBlockheight' query type */
Expand All @@ -107,13 +41,12 @@ export interface IGetLatestProcessedCdeBlockheightQuery {
result: IGetLatestProcessedCdeBlockheightResult;
}

const getLatestProcessedCdeBlockheightIR: any = {"usedParamSet":{},"params":[],"statement":"SELECT * FROM cde_tracking\nWHERE done IS TRUE\nORDER BY block_height DESC\nLIMIT 1"};
const getLatestProcessedCdeBlockheightIR: any = {"usedParamSet":{},"params":[],"statement":"SELECT * FROM cde_tracking\nORDER BY block_height DESC\nLIMIT 1"};

/**
* Query generated from SQL:
* ```
* SELECT * FROM cde_tracking
* WHERE done IS TRUE
* ORDER BY block_height DESC
* LIMIT 1
* ```
Expand Down
20 changes: 2 additions & 18 deletions packages/node-sdk/paima-db/src/sql/cde-tracking.sql
Original file line number Diff line number Diff line change
@@ -1,24 +1,8 @@
/* @name markCdeDatumProcessed */
INSERT INTO cde_tracking(block_height, datum_count, done)
VALUES (:block_height!, :datum_count, FALSE)
ON CONFLICT (block_height)
DO UPDATE SET
block_height = EXCLUDED.block_height,
datum_count = EXCLUDED.datum_count,
done = EXCLUDED.done;

/* @name markCdeBlockheightProcessed */
UPDATE cde_tracking
SET
done = TRUE
WHERE block_height = :block_height!;

/* @name getSpecificCdeBlockheight */
SELECT * FROM cde_tracking
WHERE block_height = :block_height!;
INSERT INTO cde_tracking(block_height)
VALUES (:block_height!);

/* @name getLatestProcessedCdeBlockheight */
SELECT * FROM cde_tracking
WHERE done IS TRUE
ORDER BY block_height DESC
LIMIT 1;

0 comments on commit 0382de6

Please sign in to comment.