diff --git a/servers/cu/package-lock.json b/servers/cu/package-lock.json index 14dedf508..8f90460ab 100644 --- a/servers/cu/package-lock.json +++ b/servers/cu/package-lock.json @@ -8,19 +8,19 @@ "name": "@permaweb/ao-cu", "version": "1.0.0", "dependencies": { - "@fastify/middie": "^9.0.2", + "@fastify/middie": "^9.0.3", "@permaweb/ao-loader": "^0.0.44", "@permaweb/ao-scheduler-utils": "^0.0.25", "@permaweb/weavedrive": "^0.0.18", "arweave": "^1.15.5", "async-lock": "^1.4.1", - "better-sqlite3": "^11.7.0", + "better-sqlite3": "^11.8.0", "bytes": "^3.1.2", "cors": "^2.8.5", "dataloader": "^2.2.3", "dotenv": "^16.4.7", - "fast-glob": "^3.3.2", - "fastify": "^5.2.0", + "fast-glob": "^3.3.3", + "fastify": "^5.2.1", "helmet": "^8.0.0", "hyper-async": "^1.1.2", "keccak": "^3.0.4", @@ -33,7 +33,7 @@ "pg": "^8.13.1", "prom-client": "^15.1.3", "ramda": "^0.30.1", - "undici": "^7.2.0", + "undici": "^7.2.3", "warp-arbundles": "^1.0.4", "winston": "^3.17.0", "workerpool": "^9.2.0", @@ -101,6 +101,12 @@ "fast-json-stringify": "^6.0.0" } }, + "node_modules/@fastify/forwarded": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/@fastify/forwarded/-/forwarded-3.0.0.tgz", + "integrity": "sha512-kJExsp4JCms7ipzg7SJ3y8DwmePaELHxKYtg+tZow+k0znUTf3cb+npgyqm8+ATZOdmfgfydIebPDWM172wfyA==", + "license": "MIT" + }, "node_modules/@fastify/merge-json-schemas": { "version": "0.1.1", "resolved": "https://registry.npmjs.org/@fastify/merge-json-schemas/-/merge-json-schemas-0.1.1.tgz", @@ -111,9 +117,19 @@ } }, "node_modules/@fastify/middie": { - "version": "9.0.2", - "resolved": "https://registry.npmjs.org/@fastify/middie/-/middie-9.0.2.tgz", - "integrity": "sha512-MHvAhUBxrefkpx4A8HtjOjAdlaCtY8j19PC6ORfm7KPMb/dklDeqBqR4xPRTtcBRPZUYq2jAJJWQCB4eO+dtKw==", + "version": "9.0.3", + "resolved": "https://registry.npmjs.org/@fastify/middie/-/middie-9.0.3.tgz", + "integrity": "sha512-7OYovKXp9UKYeVMcjcFLMcSpoMkmcZmfnG+eAvtdiatN35W7c+r9y1dRfpA+pfFVNuHGGqI3W+vDTmjvcfLcMA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/fastify" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fastify" + } + ], "license": "MIT", "dependencies": { "@fastify/error": "^4.0.0", @@ -122,6 +138,16 @@ "reusify": "^1.0.4" } }, + "node_modules/@fastify/proxy-addr": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/@fastify/proxy-addr/-/proxy-addr-5.0.0.tgz", + "integrity": "sha512-37qVVA1qZ5sgH7KpHkkC4z9SK6StIsIcOmpjvMPXNb3vx2GQxhZocogVYbr2PbbeLCQxYIPDok307xEvRZOzGA==", + "license": "MIT", + "dependencies": { + "@fastify/forwarded": "^3.0.0", + "ipaddr.js": "^2.1.0" + } + }, "node_modules/@nodelib/fs.scandir": { "version": "2.1.5", "resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz", @@ -396,9 +422,9 @@ } }, "node_modules/better-sqlite3": { - "version": "11.7.0", - "resolved": "https://registry.npmjs.org/better-sqlite3/-/better-sqlite3-11.7.0.tgz", - "integrity": "sha512-mXpa5jnIKKHeoGzBrUJrc65cXFKcILGZpU3FXR0pradUEm9MA7UZz02qfEejaMcm9iXrSOCenwwYMJ/tZ1y5Ig==", + "version": "11.8.0", + "resolved": "https://registry.npmjs.org/better-sqlite3/-/better-sqlite3-11.8.0.tgz", + "integrity": "sha512-aKv9s2dir7bsEX5RIjL9HHWB9uQ+f6Vch5B4qmeAOop4Y9OYHX+PNKLr+mpv6+d8L/ZYh4l7H8zPuVMbWkVMLw==", "hasInstallScript": true, "license": "MIT", "dependencies": { @@ -742,16 +768,16 @@ "license": "MIT" }, "node_modules/fast-glob": { - "version": "3.3.2", - "resolved": "https://registry.npmjs.org/fast-glob/-/fast-glob-3.3.2.tgz", - "integrity": "sha512-oX2ruAFQwf/Orj8m737Y5adxDQO0LAB7/S5MnxCdTNDd4p6BsyIVsv9JQsATbTSq8KHRpLwIHbVlUNatxd+1Ow==", + "version": "3.3.3", + "resolved": "https://registry.npmjs.org/fast-glob/-/fast-glob-3.3.3.tgz", + "integrity": "sha512-7MptL8U0cqcFdzIzwOTHoilX9x5BrNqye7Z/LuC7kCMRio1EMSyqRK3BEAUD7sXRq4iT4AzTVuZdhgQ2TCvYLg==", "license": "MIT", "dependencies": { "@nodelib/fs.stat": "^2.0.2", "@nodelib/fs.walk": "^1.2.3", "glob-parent": "^5.1.2", "merge2": "^1.3.0", - "micromatch": "^4.0.4" + "micromatch": "^4.0.8" }, "engines": { "node": ">=8.6.0" @@ -803,9 +829,9 @@ "license": "BSD-3-Clause" }, "node_modules/fastify": { - "version": "5.2.0", - "resolved": "https://registry.npmjs.org/fastify/-/fastify-5.2.0.tgz", - "integrity": "sha512-3s+Qt5S14Eq5dCpnE0FxTp3z4xKChI83ZnMv+k0FwX+VUoZrgCFoLAxpfdi/vT4y6Mk+g7aAMt9pgXDoZmkefQ==", + "version": "5.2.1", + "resolved": "https://registry.npmjs.org/fastify/-/fastify-5.2.1.tgz", + "integrity": "sha512-rslrNBF67eg8/Gyn7P2URV8/6pz8kSAscFL4EThZJ8JBMaXacVdVE4hmUcnPNKERl5o/xTiBSLfdowBRhVF1WA==", "funding": [ { "type": "github", @@ -821,6 +847,7 @@ "@fastify/ajv-compiler": "^4.0.0", "@fastify/error": "^4.0.0", "@fastify/fast-json-stringify-compiler": "^5.0.0", + "@fastify/proxy-addr": "^5.0.0", "abstract-logging": "^2.0.1", "avvio": "^9.0.0", "fast-json-stringify": "^6.0.0", @@ -828,7 +855,6 @@ "light-my-request": "^6.0.0", "pino": "^9.0.0", "process-warning": "^4.0.0", - "proxy-addr": "^2.0.7", "rfdc": "^1.3.1", "secure-json-parse": "^3.0.1", "semver": "^7.6.0", @@ -894,15 +920,6 @@ "integrity": "sha512-GRnmB5gPyJpAhTQdSZTSp9uaPSvl09KoYcMQtsB9rQoOmzs9dH6ffeccH+Z+cv6P68Hu5bC6JjRh4Ah/mHSNRw==", "license": "MIT" }, - "node_modules/forwarded": { - "version": "0.2.0", - "resolved": "https://registry.npmjs.org/forwarded/-/forwarded-0.2.0.tgz", - "integrity": "sha512-buRG0fpBtRHSTCOASe6hD258tEubFoRLb4ZNA6NxMVHNw2gOcwHo9wyablzMzOA5z9xA9L1KNjk/Nt6MT9aYow==", - "license": "MIT", - "engines": { - "node": ">= 0.6" - } - }, "node_modules/fs-constants": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/fs-constants/-/fs-constants-1.0.0.tgz", @@ -1007,12 +1024,12 @@ "license": "ISC" }, "node_modules/ipaddr.js": { - "version": "1.9.1", - "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz", - "integrity": "sha512-0KI/607xoxSToH7GjN1FfSbLoU0+btTicjsQSWQlh/hZykN8KpmMf7uYwPW3R+akZ6R/w18ZlXSHBYXiYUPO3g==", + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-2.2.0.tgz", + "integrity": "sha512-Ag3wB2o37wslZS19hZqorUnrnzSkpOVy+IiiDEiTqNubEYpYuHWIf6K4psgN2ZWKExS4xhVCrRVfb/wfW8fWJA==", "license": "MIT", "engines": { - "node": ">= 0.10" + "node": ">= 10" } }, "node_modules/is-arrayish": { @@ -1638,19 +1655,6 @@ "node": "^16 || ^18 || >=20" } }, - "node_modules/proxy-addr": { - "version": "2.0.7", - "resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz", - "integrity": "sha512-llQsMLSUDUPT44jdrU/O37qlnifitDP+ZwrmmZcoSKyLKvtZxpyV0n2/bD/N4tBAAZ/gJEdZU7KMraoK1+XYAg==", - "license": "MIT", - "dependencies": { - "forwarded": "0.2.0", - "ipaddr.js": "1.9.1" - }, - "engines": { - "node": ">= 0.10" - } - }, "node_modules/pstree.remy": { "version": "1.1.8", "resolved": "https://registry.npmjs.org/pstree.remy/-/pstree.remy-1.1.8.tgz", @@ -2117,9 +2121,9 @@ "license": "MIT" }, "node_modules/undici": { - "version": "7.2.0", - "resolved": "https://registry.npmjs.org/undici/-/undici-7.2.0.tgz", - "integrity": "sha512-klt+0S55GBViA9nsq48/NSCo4YX5mjydjypxD7UmHh/brMu8h/Mhd/F7qAeoH2NOO8SDTk6kjnTFc4WpzmfYpQ==", + "version": "7.2.3", + "resolved": "https://registry.npmjs.org/undici/-/undici-7.2.3.tgz", + "integrity": "sha512-2oSLHaDalSt2/O/wHA9M+/ZPAOcU2yrSP/cdBYJ+YxZskiPYDSqHbysLSlD7gq3JMqOoJI5O31RVU3BxX/MnAA==", "license": "MIT", "engines": { "node": ">=20.18.1" diff --git a/servers/cu/package.json b/servers/cu/package.json index 1fd01ce1e..3b27b5d90 100644 --- a/servers/cu/package.json +++ b/servers/cu/package.json @@ -11,19 +11,19 @@ "test": "node --experimental-wasm-memory64 --test" }, "dependencies": { - "@fastify/middie": "^9.0.2", + "@fastify/middie": "^9.0.3", "@permaweb/ao-loader": "^0.0.44", "@permaweb/ao-scheduler-utils": "^0.0.25", "@permaweb/weavedrive": "^0.0.18", "arweave": "^1.15.5", "async-lock": "^1.4.1", - "better-sqlite3": "^11.7.0", + "better-sqlite3": "^11.8.0", "bytes": "^3.1.2", "cors": "^2.8.5", "dataloader": "^2.2.3", "dotenv": "^16.4.7", - "fast-glob": "^3.3.2", - "fastify": "^5.2.0", + "fast-glob": "^3.3.3", + "fastify": "^5.2.1", "helmet": "^8.0.0", "hyper-async": "^1.1.2", "keccak": "^3.0.4", @@ -36,7 +36,7 @@ "pg": "^8.13.1", "prom-client": "^15.1.3", "ramda": "^0.30.1", - "undici": "^7.2.0", + "undici": "^7.2.3", "warp-arbundles": "^1.0.4", "winston": "^3.17.0", "workerpool": "^9.2.0", diff --git a/servers/cu/src/bootstrap.js b/servers/cu/src/bootstrap.js index d3e0aa633..539b1ab2b 100644 --- a/servers/cu/src/bootstrap.js +++ b/servers/cu/src/bootstrap.js @@ -135,6 +135,14 @@ export const createApis = async (ctx) => { Math.ceil(ctx.WASM_EVALUATION_MAX_WORKERS * (ctx.WASM_EVALUATION_PRIMARY_WORKERS_PERCENTAGE / 100)) ) + /** + * node's crypto module is synchronous, which blocks the main thread. + * Since hash chain valiation is done for _every single scheduled message_, we offload the + * work to a worker thread, so at least the main thread isn't blocked. + */ + const hashChainWorkerPath = join(__dirname, 'effects', 'worker', 'hashChain', 'index.js') + const hashChainWorker = workerpool.pool(hashChainWorkerPath, { maxWorkers: maxPrimaryWorkerThreads }) + const worker = join(__dirname, 'effects', 'worker', 'evaluator', 'index.js') const primaryWorkerPool = workerpool.pool(worker, { maxWorkers: maxPrimaryWorkerThreads, @@ -367,7 +375,12 @@ export const createApis = async (ctx) => { findMessageBefore: AoEvaluationClient.findMessageBeforeWith({ db, logger }), loadTimestamp: AoSuClient.loadTimestampWith({ fetch: ctx.fetch, logger }), loadProcess: AoSuClient.loadProcessWith({ fetch: ctx.fetch, logger }), - loadMessages: AoSuClient.loadMessagesWith({ fetch: ctx.fetch, pageSize: 1000, logger }), + loadMessages: AoSuClient.loadMessagesWith({ + hashChain: (...args) => hashChainWorker.exec('hashChain', args), + fetch: ctx.fetch, + pageSize: 1000, + logger + }), locateProcess: locateDataloader.load.bind(locateDataloader), isModuleMemoryLimitSupported: WasmClient.isModuleMemoryLimitSupportedWith({ PROCESS_WASM_MEMORY_MAX_LIMIT: ctx.PROCESS_WASM_MEMORY_MAX_LIMIT }), isModuleComputeLimitSupported: WasmClient.isModuleComputeLimitSupportedWith({ PROCESS_WASM_COMPUTE_MAX_LIMIT: ctx.PROCESS_WASM_COMPUTE_MAX_LIMIT }), diff --git a/servers/cu/src/domain/dal.js b/servers/cu/src/domain/dal.js index 606a8edaf..c5e026867 100644 --- a/servers/cu/src/domain/dal.js +++ b/servers/cu/src/domain/dal.js @@ -1,6 +1,6 @@ import { z } from 'zod' -import { blockSchema, evaluationSchema, processSchema, moduleSchema, rawTagSchema, streamSchema, processCheckpointSchema, bufferSchema } from './model.js' +import { blockSchema, evaluationSchema, processSchema, moduleSchema, rawTagSchema, processCheckpointSchema, bufferSchema } from './model.js' // Arweave @@ -67,6 +67,8 @@ export const saveLatestProcessMemorySchema = z.function() processId: z.string(), moduleId: z.string().nullish(), messageId: z.string().nullish(), + assignmentId: z.string().nullish(), + hashChain: z.string().nullish(), timestamp: z.coerce.number().nullish(), epoch: z.coerce.number().nullish(), nonce: z.coerce.number().nullish(), @@ -74,7 +76,6 @@ export const saveLatestProcessMemorySchema = z.function() cron: z.string().nullish(), blockHeight: z.coerce.number().nullish(), Memory: bufferSchema, - evalCount: z.number().nullish(), gasUsed: z.bigint().nullish() })) .returns(z.promise(z.any())) @@ -167,20 +168,23 @@ export const loadMessagesSchema = z.function() z.object({ suUrl: z.string().url(), processId: z.string(), + block: blockSchema, owner: z.string(), tags: z.array(rawTagSchema), moduleId: z.string(), moduleTags: z.array(rawTagSchema), moduleOwner: z.string(), from: z.coerce.number().nullish(), - to: z.coerce.number().nullish() + to: z.coerce.number().nullish(), + assignmentId: z.string().nullish(), + hashChain: z.string().nullish() }) ) /** * Returns a Stream that wraps an AsyncIterable, which is not something that Zod can * parse natively, so we make sure the returned value implements a pipe api */ - .returns(z.promise(streamSchema)) + .returns(z.promise(z.any())) export const loadProcessSchema = z.function() .args(z.object({ diff --git a/servers/cu/src/domain/lib/evaluate.js b/servers/cu/src/domain/lib/evaluate.js index 6e2db30fd..630a6f12b 100644 --- a/servers/cu/src/domain/lib/evaluate.js +++ b/servers/cu/src/domain/lib/evaluate.js @@ -1,4 +1,5 @@ -import { Transform, compose as composeStreams, finished } from 'node:stream' +import { Transform, compose as composeStreams } from 'node:stream' +import { pipeline } from 'node:stream/promises' import { always, applySpec, evolve, mergeLeft, mergeRight, pathOr, pipe } from 'ramda' import { Rejected, Resolved, fromPromise, of } from 'hyper-async' @@ -91,6 +92,8 @@ export function evaluateWith (env) { .chain(fromPromise(async (ctx) => { // A running tally of gas used in the eval stream let totalGasUsed = BigInt(0) + let mostRecentAssignmentId = ctx.mostRecentAssignmentId + let mostRecentHashChain = ctx.mostRecentHashChain let prev = applySpec({ /** * Ensure all result fields are initialized @@ -106,187 +109,196 @@ export function evaluateWith (env) { noSave: always(true) })(ctx) - await new Promise((resolve, reject) => { - const cleanup = finished( + const evalStream = async function (messages) { + /** + * There seems to be duplicate Cron Message evaluations occurring and it's been difficult + * to pin down why. My hunch is that the very first message can be a duplicate of the 'from', if 'from' + * is itself a Cron Message. + * + * So to get around this, we maintain a set of strings that unique identify + * Cron messages (timestamp+cron interval). We will add each cron message identifier. + * If an iteration comes across an identifier already present in this list, then we consider it + * a duplicate and remove it from the eval stream. + * + * This should prevent duplicate Cron Messages from being duplicate evaluated, thus not "tainting" + * Memory that is folded as part of the eval stream. + * + * Since this will only happen at the end and beginning of boundaries generated in loadMessages + * this cache can be small, which prevents bloating memory on long cron runs + */ + const evaledCrons = new LRUCache({ maxSize: 100, sizeCalculation: always(1) }) + /** + * If the starting point ('from') is itself a Cron Message, + * then that will be our first identifier added to the set + */ + if (ctx.fromCron) evaledCrons.set(toEvaledCron({ timestamp: ctx.from, cron: ctx.fromCron }), true) + + /** + * Iterate over the async iterable of messages, + * and evaluate each one + */ + let first = true + for await (const { noSave, cron, ordinate, name, message, deepHash, isAssignment, assignmentId, AoGlobal } of messages) { + if (cron) { + const key = toEvaledCron({ timestamp: message.Timestamp, cron }) + if (evaledCrons.has(key)) continue + /** + * We add the crons identifier to the cache, + * thus preventing a duplicate evaluation if we come across it + * again in the eval stream + */ + else evaledCrons.set(key, true) + } else if (!noSave) { + /** + * As messages stream into the process to be evaluated, + * we need to keep track of the most assignmentId + * and hashChain of the most recent scheduled message + */ + mostRecentAssignmentId = assignmentId + mostRecentHashChain = message['Hash-Chain'] + } + /** - * Where the entire eval stream is composed and concludes. + * We make sure to remove duplicate pushed (matching deepHash) + * and duplicate assignments (matching messageId) from the eval stream * - * messages will flow into evaluation, respecting backpressure, bubbling errors, - * and cleaning up resources when finished. + * Which prevents them from corrupting the process. + * + * NOTE: We should only need to check if the message is an assignment + * or a pushed message, since the SU rejects any messages with the same id, + * that are not an assignment + * + * TODO: should the CU check every message, ergo not trusting the SU? */ - composeStreams( - ctx.messages, - Transform.from(async function * removeInvalidTags ($messages) { - for await (const message of $messages) { - yield evolve( - { - message: { - Tags: pipe( - removeTagsByNameMaybeValue('From'), - removeTagsByNameMaybeValue('Owner') - ) - } - }, - message - ) - } - }), - async function (messages) { - /** - * There seems to be duplicate Cron Message evaluations occurring and it's been difficult - * to pin down why. My hunch is that the very first message can be a duplicate of the 'from', if 'from' - * is itself a Cron Message. - * - * So to get around this, we maintain a set of strings that unique identify - * Cron messages (timestamp+cron interval). We will add each cron message identifier. - * If an iteration comes across an identifier already present in this list, then we consider it - * a duplicate and remove it from the eval stream. - * - * This should prevent duplicate Cron Messages from being duplicate evaluated, thus not "tainting" - * Memory that is folded as part of the eval stream. - * - * Since this will only happen at the end and beginning of boundaries generated in loadMessages - * this cache can be small, which prevents bloating memory on long cron runs - */ - const evaledCrons = new LRUCache({ maxSize: 100, sizeCalculation: always(1) }) - /** - * If the starting point ('from') is itself a Cron Message, - * then that will be our first identifier added to the set - */ - if (ctx.fromCron) evaledCrons.set(toEvaledCron({ timestamp: ctx.from, cron: ctx.fromCron }), true) - - /** - * Iterate over the async iterable of messages, - * and evaluate each one - */ - let first = true - for await (const { noSave, cron, ordinate, name, message, deepHash, isAssignment, AoGlobal } of messages) { - if (cron) { - const key = toEvaledCron({ timestamp: message.Timestamp, cron }) - if (evaledCrons.has(key)) continue - /** - * We add the crons identifier to the cache, - * thus preventing a duplicate evaluation if we come across it - * again in the eval stream - */ - else evaledCrons.set(key, true) - } + if ( + (deepHash || isAssignment) && + await doesMessageExist({ + messageId: message.Id, + deepHash, + isAssignment, + processId: ctx.id, + epoch: message.Epoch, + nonce: message.Nonce + }).toPromise() + ) { + const log = deepHash ? 'deepHash of' : 'assigned id' + logger( + `Prior Message to process "%s" with ${log} "%s" was found and therefore has already been evaluated. Removing "%s" from eval stream`, + ctx.id, + deepHash || message.Id, + name + ) + continue + } + prev = await Promise.resolve(prev) + .then((prev) => + Promise.resolve(prev.Memory) /** - * We make sure to remove duplicate pushed (matching deepHash) - * and duplicate assignments (matching messageId) from the eval stream - * - * Which prevents them from corrupting the process. - * - * NOTE: We should only need to check if the message is an assignment - * or a pushed message, since the SU rejects any messages with the same id, - * that are not an assignment - * - * TODO: should the CU check every message, ergo not trusting the SU? + * Where the actual evaluation is performed */ - if ( - (deepHash || isAssignment) && - await doesMessageExist({ - messageId: message.Id, - deepHash, - isAssignment, - processId: ctx.id, - epoch: message.Epoch, - nonce: message.Nonce - }).toPromise() - ) { - const log = deepHash ? 'deepHash of' : 'assigned id' - logger( - `Prior Message to process "%s" with ${log} "%s" was found and therefore has already been evaluated. Removing "%s" from eval stream`, - ctx.id, - deepHash || message.Id, - name - ) - continue - } + .then((Memory) => ctx.evaluator({ first, noSave, name, deepHash, cron, ordinate, isAssignment, processId: ctx.id, Memory, message, AoGlobal })) + /** + * These values are folded, + * so that we can potentially update the process memory cache + * at the end of evaluation + */ + .then(mergeLeft({ noSave, message, cron, ordinate })) + .then(async (output) => { + /** + * Make sure to set first to false + * for all subsequent evaluations for this evaluation stream + */ + if (first) first = false + if (output.GasUsed) totalGasUsed += BigInt(output.GasUsed ?? 0) - prev = await Promise.resolve(prev) - .then((prev) => - Promise.resolve(prev.Memory) - /** - * Where the actual evaluation is performed - */ - .then((Memory) => ctx.evaluator({ first, noSave, name, deepHash, cron, ordinate, isAssignment, processId: ctx.id, Memory, message, AoGlobal })) - /** - * These values are folded, - * so that we can potentially update the process memory cache - * at the end of evaluation - */ - .then(mergeLeft({ noSave, message, cron, ordinate })) - .then(async (output) => { - /** - * Make sure to set first to false - * for all subsequent evaluations for this evaluation stream - */ - if (first) first = false - if (output.GasUsed) totalGasUsed += BigInt(output.GasUsed ?? 0) + if (cron) ctx.stats.messages.cron++ + else ctx.stats.messages.scheduled++ - if (cron) ctx.stats.messages.cron++ - else ctx.stats.messages.scheduled++ + if (output.Error) { + logger( + 'Error occurred when applying message "%s" to process "%s": "%s', + name, + ctx.id, + output.Error + ) + ctx.stats.messages.error = ctx.stats.messages.error || 0 + ctx.stats.messages.error++ + } - if (output.Error) { - logger( - 'Error occurred when applying message "%s" to process "%s": "%s', - name, - ctx.id, - output.Error - ) - ctx.stats.messages.error = ctx.stats.messages.error || 0 - ctx.stats.messages.error++ - } + /** + * Increments gauges for total evaluations for both: + * + * total evaluations (no labels) + * specific kinds of evaluations (type of eval stream, type of message, whether an error occurred or not) + */ + evaluationCounter.inc(1) + evaluationCounter.inc( + 1, + { + stream_type: ctx.dryRun ? 'dry-run' : 'primary', + message_type: ctx.dryRun ? 'dry-run' : cron ? 'cron' : isAssignment ? 'assignment' : 'scheduled', + process_error: Boolean(output.Error) + }, + { processId: ctx.id } + ) + /** + * TODO: Gas can grow to a huge number. We need to make sure this doesn't crash when that happens + */ + // gasCounter.inc(output.GasUsed ?? 0, { cron: Boolean(cron), dryRun: Boolean(ctx.dryRun) }, { processId: ctx.id, error: Boolean(output.Error) }) - /** - * Increments gauges for total evaluations for both: - * - * total evaluations (no labels) - * specific kinds of evaluations (type of eval stream, type of message, whether an error occurred or not) - */ - evaluationCounter.inc(1) - evaluationCounter.inc( - 1, - { - stream_type: ctx.dryRun ? 'dry-run' : 'primary', - message_type: ctx.dryRun ? 'dry-run' : cron ? 'cron' : isAssignment ? 'assignment' : 'scheduled', - process_error: Boolean(output.Error) - }, - { processId: ctx.id } - ) - /** - * TODO: Gas can grow to a huge number. We need to make sure this doesn't crash when that happens - */ - // gasCounter.inc(output.GasUsed ?? 0, { cron: Boolean(cron), dryRun: Boolean(ctx.dryRun) }, { processId: ctx.id, error: Boolean(output.Error) }) + return output + }) + ) + } + } - return output - }) - ) + const removeInvalidTags = Transform.from(async function * ($messages) { + for await (const message of $messages) { + yield evolve( + { + message: { + Tags: pipe( + removeTagsByNameMaybeValue('From'), + removeTagsByNameMaybeValue('Owner') + ) } - } - ), - (err) => { - /** - * finished() will leave dangling event listeners even after this callback - * has been invoked, so that unexpected errors do not cause full-on crashes. - * - * finished() returns a callback fn to cleanup these dangling listeners, so we make sure - * to always call it here to prevent memory leaks. - * - * See https://nodejs.org/api/stream.html#streamfinishedstream-options-callback - */ - cleanup() - /** - * Signal the evaluator to close any resources spun up as part - * of handling this eval stream - */ - ctx.evaluator({ close: true }) - err ? reject(err) : resolve() - } - ) + }, + message + ) + } + }) + + /** + * ABANDON ALL HOPE YE WHO ENTER HERE + * + * compose from node:streams has some incredibly hard to debug issues, when it comes to destroying + * streams and propagating errors, when composing multiple levels of streams. + * + * In order to circumvent these issues, we've hacked the steps to instead build a list of + * streams, then combine them all here. + * + * THEN we add an error listener such that any stream erroring + * results in all streams being cleaned up and destroyed with that error. + * + * Finally, if an error was thrown from any stream, we re-throw such that the eval promise rejects, + * bubbling the error to the caller + */ + if (!Array.isArray(ctx.messages)) ctx.messages = [ctx.messages] + const streams = [...ctx.messages, removeInvalidTags] + streams.push(composeStreams(...streams)) + let e + streams.forEach(s => { + s.on('error', (err) => { + e = err + streams.forEach(s => { + s.emit('end') + s.destroy(err) + }) + }) }) + await pipeline(streams[streams.length - 1], evalStream) + if (e) throw e /** * Make sure to attempt to cache the last result @@ -306,6 +318,8 @@ export function evaluateWith (env) { processId: ctx.id, moduleId: ctx.moduleId, messageId: message.Id, + assignmentId: mostRecentAssignmentId, + hashChain: mostRecentHashChain, timestamp: message.Timestamp, nonce: message.Nonce, epoch: message.Epoch, @@ -313,7 +327,6 @@ export function evaluateWith (env) { ordinate, cron, Memory: prev.Memory, - evalCount: ctx.stats.messages.scheduled + ctx.stats.messages.cron, gasUsed: totalGasUsed }) } diff --git a/servers/cu/src/domain/lib/evaluate.test.js b/servers/cu/src/domain/lib/evaluate.test.js index 374393703..cc453088b 100644 --- a/servers/cu/src/domain/lib/evaluate.test.js +++ b/servers/cu/src/domain/lib/evaluate.test.js @@ -2,6 +2,7 @@ import { describe, test } from 'node:test' import assert from 'node:assert' import { readFileSync } from 'node:fs' +import { Readable } from 'node:stream' import AoLoader from '@permaweb/ao-loader' @@ -10,10 +11,12 @@ import { evaluateWith } from './evaluate.js' const logger = createTestLogger({ name: 'ao-cu:readState' }) -async function * toAsyncIterable (iterable) { - while (iterable.length) yield iterable.shift() +function toReadable (iterable) { + return Readable.from(iterable) } +const mockCounter = { inc: () => undefined } + const moduleOptions = { format: 'wasm32-unknown-emscripten', inputEncoding: 'JSON-1', @@ -69,7 +72,7 @@ describe('evaluate', () => { result: { Memory: null }, - messages: toAsyncIterable([ + messages: toReadable([ { ordinate: 1, isAssignment: false, @@ -204,7 +207,7 @@ describe('evaluate', () => { result: { Memory: null }, - messages: toAsyncIterable([ + messages: toReadable([ // assignment { ordinate: 1, @@ -319,7 +322,7 @@ describe('evaluate', () => { result: { Memory: null }, - messages: toAsyncIterable([ + messages: toReadable([ // duplicate of starting point { ordinate: 1, @@ -466,7 +469,7 @@ describe('evaluate', () => { result: { Memory: null }, - messages: toAsyncIterable([ + messages: toReadable([ { // Will include an error in result.error ordinate: 1, @@ -576,7 +579,7 @@ describe('evaluate', () => { result: { Memory: Buffer.from('Hello world') }, - messages: toAsyncIterable([ + messages: toReadable([ { ordinate: 1, isAssignment: false, @@ -601,4 +604,176 @@ describe('evaluate', () => { ]) }).toPromise() }) + + describe('tracks and saves most recent assignmentId and hashChain', () => { + const cronOne = { + ordinate: 1, + isAssignment: false, + cron: '1-20-minutes', + message: { + Id: 'message-123', + Timestamp: 1702846520559, + Owner: 'owner-123', + Tags: [ + { name: 'From', value: 'hello' }, + { name: 'function', value: 'hello' }, + { name: 'Owner', value: 'hello' } + ], + 'Block-Height': 1234 + }, + AoGlobal: { + Process: { + Id: '1234', + Tags: [] + } + } + } + + const cronTwo = { + ordinate: 2, + isAssignment: false, + cron: '1-20-minutes', + message: { + Id: 'message-123', + Timestamp: 1702846520559, + Owner: 'owner-123', + Tags: [ + { name: 'From', value: 'hello' }, + { name: 'function', value: 'hello' }, + { name: 'Owner', value: 'hello' } + ], + 'Block-Height': 1234 + }, + AoGlobal: { + Process: { + Id: '1234', + Tags: [] + } + } + } + + const messageOne = { + ordinate: 1, + isAssignment: false, + assignmentId: 'assignment-1', + message: { + Id: 'message-123', + Timestamp: 1702846520559, + Owner: 'owner-123', + 'Hash-Chain': 'hashchain-1', + Tags: [ + { name: 'From', value: 'hello' }, + { name: 'function', value: 'hello' }, + { name: 'Owner', value: 'hello' } + ], + 'Block-Height': 1234 + }, + AoGlobal: { + Process: { + Id: '1234', + Tags: [] + } + } + } + + const messageTwo = { + ordinate: 1, + isAssignment: false, + assignmentId: 'assignment-2', + message: { + Id: 'message-123', + Timestamp: 1702846520559, + Owner: 'owner-123', + 'Hash-Chain': 'hashchain-2', + Tags: [ + { name: 'From', value: 'hello' }, + { name: 'function', value: 'hello' }, + { name: 'Owner', value: 'hello' } + ], + 'Block-Height': 1234 + }, + AoGlobal: { + Process: { + Id: '1234', + Tags: [] + } + } + } + + const args = { + id: 'ctr-1234', + from: new Date().getTime(), + moduleId: 'foo-module', + mostRecentAssignmentId: 'init-assignment-123', + mostRecentHashChain: 'init-hashchain-123', + moduleOptions, + stats: { + messages: { + scheduled: 0, + cron: 0, + error: 0 + } + }, + result: { + Memory: Buffer.from('Hello world') + } + } + + test('when only cron', async () => { + const evaluate = evaluateWith({ + findMessageBefore: async () => { throw { status: 404 } }, + loadEvaluator: () => ({ message, close }) => ({ Memory: Buffer.from('Hello world') }), + saveLatestProcessMemory: async (a) => { + assert.equal(a.assignmentId, args.mostRecentAssignmentId) + assert.equal(a.hashChain, args.mostRecentHashChain) + }, + evaluationCounter: mockCounter, + gasCounter: mockCounter, + logger + }) + + await evaluate({ + ...args, + messages: toReadable([cronOne, cronTwo]) + }).toPromise() + }) + + test('intermingled cron and scheduled', async () => { + const evaluate = evaluateWith({ + findMessageBefore: async () => { throw { status: 404 } }, + loadEvaluator: () => ({ message, close }) => ({ Memory: Buffer.from('Hello world') }), + saveLatestProcessMemory: async (a) => { + assert.equal(a.assignmentId, messageOne.assignmentId) + assert.equal(a.hashChain, messageOne.message['Hash-Chain']) + }, + evaluationCounter: mockCounter, + gasCounter: mockCounter, + logger + }) + + await evaluate({ + ...args, + messages: toReadable([cronOne, messageOne, cronTwo]) + }).toPromise() + }) + + test('multiple scheduled', async () => { + const evaluate = evaluateWith({ + findMessageBefore: async () => { throw { status: 404 } }, + loadEvaluator: () => ({ message, close }) => ({ Memory: Buffer.from('Hello world') }), + saveLatestProcessMemory: async (a) => { + assert.equal(a.assignmentId, messageTwo.assignmentId) + assert.equal(a.hashChain, messageTwo.message['Hash-Chain']) + }, + evaluationCounter: mockCounter, + gasCounter: mockCounter, + logger + }) + + await evaluate({ + ...args, + messages: toReadable([cronOne, messageOne, cronTwo, messageTwo]) + }).toPromise() + }) + }) }) diff --git a/servers/cu/src/domain/lib/hydrateMessages.js b/servers/cu/src/domain/lib/hydrateMessages.js index a00165999..25ba598f9 100644 --- a/servers/cu/src/domain/lib/hydrateMessages.js +++ b/servers/cu/src/domain/lib/hydrateMessages.js @@ -1,27 +1,15 @@ -import { compose as composeStreams, PassThrough, Transform } from 'node:stream' +import { Transform } from 'node:stream' import { of } from 'hyper-async' import { mergeRight, isNil } from 'ramda' -import { z } from 'zod' import WarpArBundles from 'warp-arbundles' import { loadTransactionDataSchema, loadTransactionMetaSchema } from '../dal.js' -import { messageSchema, streamSchema } from '../model.js' +import { messageSchema } from '../model.js' import { mapFrom, addressFrom } from '../utils.js' const { createData } = WarpArBundles -/** - * The result that is produced from this step - * and added to ctx. - * - * This is used to parse the output to ensure the correct shape - * is always added to context - */ -const ctxSchema = z.object({ - messages: streamSchema -}).passthrough() - function loadFromChainWith ({ loadTransactionData, loadTransactionMeta }) { loadTransactionData = loadTransactionDataSchema.implement(loadTransactionData) loadTransactionMeta = loadTransactionMetaSchema.implement(loadTransactionMeta) @@ -219,39 +207,16 @@ export function hydrateMessagesWith (env) { */ // $messages.on('error', () => $messages.emit('end')) - return composeStreams( - /** - * There is some sort of bug in pipeline which will consistently cause this stream - * to not end IFF it emits an error. - * - * When errors are thrown in other points in the stream, pipeline seems to work and - * close the stream just fine, so not sure what's going on here. - * - * Before, we had a workaround to manually emit 'end' from the stream on eror, which seemed - * to work (See above commented out .on()) - * - * That was UNTIL we composed more than 2 Transform streams after it, which caused that - * workaround to no longer work -- very strange. - * - * For some reason, wrapping the subsequent Transforms in another compose, - * AND adding a PassThrough stream at the end successfully ends the stream on errors, - * thus closing the pipeline, and resolving the promise wrapping the stream - * (see finished in evaluate.js) - */ - $messages, - composeStreams( - Transform.from(maybeMessageId), - Transform.from(maybeAoAssignment), - // Ensure every message emitted satisfies the schema - Transform.from(async function * (messages) { - for await (const cur of messages) yield messageSchema.parse(cur) - }) - ), - new PassThrough({ objectMode: true }) - ) + return [ + ...$messages, + Transform.from(maybeMessageId), + Transform.from(maybeAoAssignment), + // Ensure every message emitted satisfies the schema + Transform.from(async function * (messages) { + for await (const cur of messages) yield messageSchema.parse(cur) + }) + ] }) - .map(messages => ({ messages })) - .map(mergeRight(ctx)) - .map(ctxSchema.parse) + .map(messages => ({ ...ctx, messages })) } } diff --git a/servers/cu/src/domain/lib/loadMessages.js b/servers/cu/src/domain/lib/loadMessages.js index ebb3485a4..60f1ebef5 100644 --- a/servers/cu/src/domain/lib/loadMessages.js +++ b/servers/cu/src/domain/lib/loadMessages.js @@ -1,11 +1,9 @@ -import { Transform, compose as composeStreams } from 'node:stream' +import { Transform } from 'node:stream' import { Resolved, fromPromise, of } from 'hyper-async' -import { T, always, ascend, cond, equals, identity, ifElse, isNil, last, length, mergeRight, pipe, prop, reduce, uniqBy } from 'ramda' -import { z } from 'zod' +import { T, always, ascend, cond, equals, identity, ifElse, isNil, last, length, pipe, prop, reduce, uniqBy } from 'ramda' import ms from 'ms' -import { streamSchema } from '../model.js' import { mapFrom, parseTags } from '../utils.js' import { findBlocksSchema, loadBlocksMetaSchema, loadMessagesSchema, loadTimestampSchema, saveBlocksSchema } from '../dal.js' @@ -482,13 +480,16 @@ function loadScheduledMessagesWith ({ loadMessages, logger }) { loadMessages({ suUrl: ctx.suUrl, processId: ctx.id, + block: ctx.block, owner: ctx.owner, tags: ctx.tags, moduleId: ctx.moduleId, moduleOwner: ctx.moduleOwner, moduleTags: ctx.moduleTags, from: ctx.from, // could be undefined - to: ctx.to // could be undefined + to: ctx.to, // could be undefined + assignmentId: ctx.mostRecentAssignmentId, + hashChain: ctx.mostRecentHashChain }) ) } @@ -584,8 +585,8 @@ function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, load }) ) .map(({ leftMost, rightMostTimestamp, $scheduled, genCronMessages }) => { - return composeStreams( - $scheduled, + return [ + ...$scheduled, /** * Given a left-most and right-most boundary, return an async generator, * that given a list of values, emits sequential binary tuples dervied from those values. @@ -692,7 +693,7 @@ function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, load if (doEmitRight) yield right } }) - ) + ] }) }) /** @@ -704,28 +705,13 @@ function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, load * construct and emit a 'message' for the process */ .map($messages => { - return composeStreams( - $messages, + return [ + ...$messages, Transform.from(maybePrependProcessMessage(ctx, logger, loadTransactionData)) - ) + ] }) - .map(messages => ({ messages })) } -/** - * The result that is produced from this step - * and added to ctx. - * - * This is used to parse the output to ensure the correct shape - * is always added to context - */ -const ctxSchema = z.object({ - /** - * Messages to be evaluated, as a stream - */ - messages: streamSchema -}).passthrough() - /** * @typedef LoadMessagesArgs * @property {string} id - the contract id @@ -758,7 +744,5 @@ export function loadMessagesWith (env) { of(ctx) .chain(loadScheduledMessages) .chain($scheduled => loadCronMessages({ ...ctx, $scheduled })) - // { messages } - .map(mergeRight(ctx)) - .map(ctxSchema.parse) + .map(messages => ({ ...ctx, messages })) } diff --git a/servers/cu/src/domain/lib/loadProcess.js b/servers/cu/src/domain/lib/loadProcess.js index 099c5a13c..637c3e5ef 100644 --- a/servers/cu/src/domain/lib/loadProcess.js +++ b/servers/cu/src/domain/lib/loadProcess.js @@ -45,6 +45,16 @@ const ctxSchema = z.object({ * was not the result of a Cron message */ fromCron: z.string().nullish(), + /** + * The most RECENT SCHEDULED message assignmentId, needed + * in order to perform hashChain validation + */ + mostRecentAssignmentId: z.string().nullish(), + /** + * The most RECENT SCHEDULED message hashChain, needed + * in order to perform hashChain validation + */ + mostRecentHashChain: z.string().nullish(), /** * Whether the evaluation found is the exact evaluation being requested */ @@ -89,8 +99,6 @@ function loadLatestEvaluationWith ({ findEvaluation, findLatestProcessMemory, sa } function maybeCachedMemory (ctx) { - // logger('Checking cache for existing memory to start evaluation "%s"...', ctx.id) - return findLatestProcessMemory({ processId: ctx.id, timestamp: ctx.to, @@ -127,14 +135,12 @@ function loadLatestEvaluationWith ({ findEvaluation, findLatestProcessMemory, sa if (['cold_start'].includes(found.src)) return Resolved(found) if (['memory'].includes(found.src) && !found.fromFile) return Resolved(found) - // logger( - // 'Seeding cache with latest checkpoint found with parameters "%j"', - // omit(['Memory'], found) - // ) /** - * Immediatley attempt to save the memory loaded from a checkpoint - * into the LRU In-memory cache, which will cut - * down on calls to load checkpoints from arweave (it will load from cache instead) + * Immediatley attempt to save the memory loaded, from a less-hot checkpoint + * layer, into the LRU In-memory cache. + * + * This will help mitigate concurrent evals of the same process + * from all making calls to a remote ie. Arweave */ return saveLatestProcessMemory({ processId: ctx.id, @@ -144,6 +150,8 @@ function loadLatestEvaluationWith ({ findEvaluation, findLatestProcessMemory, sa */ Memory: found.Memory, moduleId: found.moduleId, + assignmentId: found.assignmentId, + hashChain: found.hashChain, // messageId: found.messageId, timestamp: found.timestamp, epoch: found.epoch, @@ -163,6 +171,15 @@ function loadLatestEvaluationWith ({ findEvaluation, findLatestProcessMemory, sa ordinate: found.ordinate, fromBlockHeight: found.blockHeight, fromCron: found.cron, + mostRecentAssignmentId: found.assignmentId, + mostRecentHashChain: found.hashChain, + /** + * The exact evaluation may not have been found in persitence, + * but it may be found in a caching tier. + * + * So we still signal to the caller whether an exact match + * was found, for potential optimizations. + */ exact })) }) diff --git a/servers/cu/src/domain/lib/loadProcess.test.js b/servers/cu/src/domain/lib/loadProcess.test.js index 274875692..b71b2740a 100644 --- a/servers/cu/src/domain/lib/loadProcess.test.js +++ b/servers/cu/src/domain/lib/loadProcess.test.js @@ -9,13 +9,15 @@ const PROCESS = 'process-123-9HdeqeuYQOgMgWucro' const logger = createTestLogger({ name: 'ao-cu:readState' }) describe('loadProcess', () => { - test('appends result, from, fromCron, fromBlockHeight and evaluatedAt to ctx', async () => { + test('appends result to ctx', async () => { const loadProcess = loadProcessWith({ findEvaluation: async () => { throw { status: 404 } }, findLatestProcessMemory: async () => ({ src: 'cold_start', Memory: null, moduleId: undefined, + assignmentId: undefined, + hashChain: undefined, timestamp: undefined, epoch: undefined, nonce: undefined, @@ -30,11 +32,7 @@ describe('loadProcess', () => { const res = await loadProcess({ id: PROCESS, to: '1697574792000' }).toPromise() assert.deepStrictEqual(res.result, { Memory: null }) - assert.equal(res.from, undefined) - assert.equal(res.fromCron, undefined) assert.equal(res.ordinate, '0') - assert.equal(res.fromBlockHeight, undefined) - assert.equal(res.evaluatedAt, undefined) assert.equal(res.id, PROCESS) }) @@ -76,6 +74,8 @@ describe('loadProcess', () => { assert.deepStrictEqual(res.ordinate, cachedEvaluation.ordinate) assert.deepStrictEqual(res.fromCron, cachedEvaluation.cron) assert.deepStrictEqual(res.fromBlockHeight, cachedEvaluation.blockHeight) + assert.equal(res.mostRecentAssignmentId, undefined) + assert.equal(res.mostRecentHashChain, undefined) assert.equal(res.id, PROCESS) }) @@ -84,6 +84,8 @@ describe('loadProcess', () => { src: 'memory', Memory: Buffer.from('hello world'), moduleId: 'module-123', + assignmentId: 'assignment-123', + hashChain: 'hashchain-123', timestamp: 1697574792000, epoch: 0, nonce: 11, @@ -104,6 +106,8 @@ describe('loadProcess', () => { assert.deepStrictEqual(res.ordinate, cached.ordinate) assert.deepStrictEqual(res.fromCron, cached.cron) assert.deepStrictEqual(res.fromBlockHeight, cached.blockHeight) + assert.deepStrictEqual(res.mostRecentAssignmentId, cached.assignmentId) + assert.deepStrictEqual(res.mostRecentHashChain, cached.hashChain) assert.equal(res.id, PROCESS) }) @@ -111,6 +115,8 @@ describe('loadProcess', () => { const cached = { Memory: Buffer.from('hello world'), moduleId: 'module-123', + assignmentId: 'assignment-123', + hashChain: 'hashchain-123', timestamp: 1697574792000, epoch: 0, nonce: 11, @@ -125,9 +131,10 @@ describe('loadProcess', () => { saveLatestProcessMemory: async (args) => { assert.deepStrictEqual(args, { processId: PROCESS, - evalCount: 0, Memory: cached.Memory, moduleId: cached.moduleId, + assignmentId: cached.assignmentId, + hashChain: cached.hashChain, timestamp: cached.timestamp, epoch: cached.epoch, nonce: cached.nonce, @@ -150,6 +157,8 @@ describe('loadProcess', () => { const cached = { Memory: Buffer.from('hello world'), moduleId: 'module-123', + assignmentId: 'assignment-123', + hashChain: 'hashchain-123', timestamp: 1697574792000, epoch: 0, nonce: 11, @@ -164,9 +173,10 @@ describe('loadProcess', () => { saveLatestProcessMemory: async (args) => { assert.deepStrictEqual(args, { processId: PROCESS, - evalCount: 0, Memory: cached.Memory, moduleId: cached.moduleId, + assignmentId: cached.assignmentId, + hashChain: cached.hashChain, timestamp: cached.timestamp, epoch: cached.epoch, nonce: cached.nonce, diff --git a/servers/cu/src/domain/model.js b/servers/cu/src/domain/model.js index ec4759cb0..c8d77addf 100644 --- a/servers/cu/src/domain/model.js +++ b/servers/cu/src/domain/model.js @@ -252,6 +252,8 @@ export const processCheckpointSchema = z.object({ fromFile: z.string().nullish(), Memory: bufferSchema.nullish(), moduleId: z.string().nullish(), + assignmentId: z.string().nullish(), + hashChain: z.string().nullish(), timestamp: z.coerce.number().nullish(), epoch: z.coerce.number().nullish(), nonce: z.coerce.number().nullish(), @@ -307,6 +309,13 @@ export const messageSchema = z.object({ * Whether the message is a pure assignment of an on-chain message */ isAssignment: z.boolean().default(false), + /** + * The id of the assignment. + * + * cron messages do not have an assignment, so this + * is optional + */ + assignmentId: z.string().nullish(), /** * For assignments, any exclusions to not be passed to the process, * and potentially not loaded from the gateway or arweave diff --git a/servers/cu/src/domain/utils.js b/servers/cu/src/domain/utils.js index 53d26b36f..d54dd9028 100644 --- a/servers/cu/src/domain/utils.js +++ b/servers/cu/src/domain/utils.js @@ -529,3 +529,9 @@ export const addressFrom = ({ address, key }) => { return _address } + +export function readableToAsyncGenerator (stream) { + return (async function * () { + for await (const d of stream) yield d + })() +} diff --git a/servers/cu/src/effects/ao-process.js b/servers/cu/src/effects/ao-process.js index ff4c13692..197417bf1 100644 --- a/servers/cu/src/effects/ao-process.js +++ b/servers/cu/src/effects/ao-process.js @@ -45,6 +45,8 @@ export const LATEST = 'LATEST' * @typedef Evaluation * @prop {string} processId * @prop {string} moduleId + * @prop {string} assignmentId + * @prop {string} hashChain * @prop {string} epoch * @prop {string} nonce * @prop {string} timestamp @@ -566,6 +568,14 @@ export function writeCheckpointRecordWith ({ db }) { evaluation: z.object({ processId: z.string().min(1), moduleId: z.string().min(1), + /** + * nullish for backwards compat + */ + assignmentId: z.string().nullish(), + /** + * nullish for backwards compat + */ + hashChain: z.string().nullish(), timestamp: z.coerce.number(), epoch: z.coerce.number().nullish(), nonce: z.coerce.number().nullish(), @@ -639,11 +649,11 @@ export function findFileCheckpointBeforeWith ({ db }) { .map(createQuery) .chain(fromPromise((query) => db.query(query))) ) - .map((results) => { - return results.map((result) => ({ - ...result, - file: pathOr('', ['file'])(result), - evaluation: JSON.parse(pathOr({}, ['evaluation'])(result)) + .map((fileCheckpoints) => { + return fileCheckpoints.map((fileCheckpoint) => ({ + ...fileCheckpoint, + file: pathOr('', ['file'])(fileCheckpoint), + evaluation: JSON.parse(pathOr({}, ['evaluation'])(fileCheckpoint)) })) }) .map((parsed) => { @@ -669,6 +679,8 @@ export function writeFileCheckpointRecordWith ({ db }) { evaluation: z.object({ processId: z.string().min(1), moduleId: z.string().min(1), + assignmentId: z.string().nullish(), + hashChain: z.string().nullish(), timestamp: z.coerce.number(), epoch: z.coerce.number().nullish(), nonce: z.coerce.number().nullish(), @@ -850,6 +862,8 @@ export function findLatestProcessMemoryWith ({ return { id: node.id, timestamp: parseInt(tags.Timestamp), + assignmentId: tags.Assignment, + hashChain: tags['Hash-Chain'], /** * Due to a previous bug, these tags may sometimes * be invalid values, so we've added a utility @@ -972,6 +986,8 @@ export function findLatestProcessMemoryWith ({ fromFile: file, Memory, moduleId: cached.evaluation.moduleId, + assignmentId: cached.evaluation.assignmentId, + hashChain: cached.evaluation.hashChain, timestamp: cached.evaluation.timestamp, blockHeight: cached.evaluation.blockHeight, epoch: cached.evaluation.epoch, @@ -1030,6 +1046,8 @@ export function findLatestProcessMemoryWith ({ src: 'file', Memory, moduleId: checkpoint.moduleId, + assignmentId: checkpoint.assignmentId, + hashChain: checkpoint.hashChain, timestamp: checkpoint.timestamp, blockHeight: checkpoint.blockHeight, epoch: checkpoint.epoch, @@ -1087,6 +1105,8 @@ export function findLatestProcessMemoryWith ({ src: 'record', Memory, moduleId: checkpoint.evaluation.moduleId, + assignmentId: checkpoint.evaluation.assignmentId, + hashChain: checkpoint.evaluation.hashChain, timestamp: checkpoint.evaluation.timestamp, epoch: checkpoint.evaluation.epoch, nonce: checkpoint.evaluation.nonce, @@ -1155,6 +1175,8 @@ export function findLatestProcessMemoryWith ({ src: 'arweave', Memory, moduleId: latestCheckpoint.module, + assignmentId: latestCheckpoint.assignmentId, + hashChain: latestCheckpoint.hashChain, timestamp: latestCheckpoint.timestamp, epoch: latestCheckpoint.epoch, nonce: latestCheckpoint.nonce, @@ -1186,6 +1208,8 @@ export function findLatestProcessMemoryWith ({ src: 'cold_start', Memory: null, moduleId: undefined, + assignmentId: undefined, + hashChain: undefined, timestamp: undefined, epoch: undefined, nonce: undefined, @@ -1289,7 +1313,7 @@ export function findLatestProcessMemoryWith ({ } export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD }) { - return async ({ processId, moduleId, messageId, timestamp, epoch, nonce, ordinate, cron, blockHeight, Memory, evalCount, gasUsed }) => { + return async ({ processId, moduleId, assignmentId, messageId, hashChain, timestamp, epoch, nonce, ordinate, cron, blockHeight, Memory, gasUsed }) => { const cached = cache.get(processId) /** @@ -1338,6 +1362,8 @@ export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EA const evaluation = { processId, moduleId, + assignmentId, + hashChain, timestamp, epoch, nonce, @@ -1460,7 +1486,7 @@ export function saveCheckpointWith ({ ` function createCheckpointDataItem (args) { - const { moduleId, processId, epoch, nonce, ordinate, timestamp, blockHeight, cron, encoding, Memory } = args + const { moduleId, processId, assignmentId, hashChain, epoch, nonce, ordinate, timestamp, blockHeight, cron, encoding, Memory } = args return of(Memory) .chain((buffer) => @@ -1496,6 +1522,25 @@ export function saveCheckpointWith ({ ] } + /** + * A Cron message does not have an assignment + * (which is to say this is the assignment of the most recent + * Scheduled message) + * + * This is needed in order to perform hash chain verification + * on hot starts from a checkpoint + */ + if (assignmentId) dataItem.tags.push({ name: 'Assignment', value: assignmentId.trim() }) + /** + * A Cron message does not have a hashChain + * (which is to say this is the hashChain of the most recent + * Scheduled message) + * + * This is needed in order to perform hash chain verification + * on hot starts from a checkpoint + */ + if (hashChain) dataItem.tags.push({ name: 'Hash-Chain', value: hashChain.trim() }) + /** * Cron messages do not have an Epoch, * so only add the tag if the value is present @@ -1542,7 +1587,7 @@ export function saveCheckpointWith ({ */ if (DISABLE_PROCESS_FILE_CHECKPOINT_CREATION && DISABLE_PROCESS_CHECKPOINT_CREATION) return Resolved(args) - const { moduleId, processId, epoch, nonce, timestamp, blockHeight, cron, encoding, Memory, File } = args + const { moduleId, processId, assignmentId, hashChain, epoch, nonce, timestamp, blockHeight, cron, encoding, Memory, File } = args let file return of() @@ -1569,7 +1614,7 @@ export function saveCheckpointWith ({ logger( 'Process cache entry error for evaluation "%j". Entry contains neither Memory or File. Skipping saving of checkpoint...', - { moduleId, processId, epoch, nonce, timestamp, blockHeight, cron, encoding } + { moduleId, processId, assignmentId, hashChain, epoch, nonce, timestamp, blockHeight, cron, encoding } ) return Rejected('either File or Memory required') }) @@ -1577,7 +1622,7 @@ export function saveCheckpointWith ({ } function createFileCheckpoint (args) { - const { Memory, encoding, processId, moduleId, timestamp, epoch, ordinate, nonce, blockHeight, cron } = args + const { Memory, encoding, processId, moduleId, assignmentId, hashChain, timestamp, epoch, ordinate, nonce, blockHeight, cron } = args if (DISABLE_PROCESS_FILE_CHECKPOINT_CREATION) return Rejected('file checkpoint creation is disabled') /** @@ -1589,6 +1634,8 @@ export function saveCheckpointWith ({ const evaluation = { processId, moduleId, + assignmentId, + hashChain, timestamp, nonce, ordinate, @@ -1606,7 +1653,7 @@ export function saveCheckpointWith ({ logger( 'Successfully created file checkpoint for process "%s" on evaluation "%j"', processId, - { file, processId, nonce, timestamp, cron } + { file, processId, assignmentId, hashChain, nonce, timestamp, cron } ) return { file, ...evaluation } }) @@ -1614,7 +1661,7 @@ export function saveCheckpointWith ({ } function createArweaveCheckpoint (args) { - const { encoding, processId, moduleId, timestamp, epoch, ordinate, nonce, blockHeight, cron } = args + const { encoding, processId, moduleId, assignmentId, hashChain, timestamp, epoch, ordinate, nonce, blockHeight, cron } = args if (DISABLE_PROCESS_CHECKPOINT_CREATION) return Rejected('arweave checkpoint creation is disabled') /** @@ -1624,7 +1671,7 @@ export function saveCheckpointWith ({ logger( 'Checking Gateway for existing Checkpoint for evaluation: %j', - { moduleId, processId, epoch, nonce, timestamp, blockHeight, cron, encoding } + { moduleId, processId, assignmentId, hashChain, epoch, nonce, timestamp, blockHeight, cron, encoding } ) return address() @@ -1668,7 +1715,7 @@ export function saveCheckpointWith ({ logger( 'Creating Checkpoint for evaluation: %j', - { moduleId, processId, epoch, nonce, timestamp, blockHeight, cron, encoding: 'gzip' } + { moduleId, processId, assignmentId, hashChain, epoch, nonce, timestamp, blockHeight, cron, encoding: 'gzip' } ) /** @@ -1683,7 +1730,7 @@ export function saveCheckpointWith ({ logger( 'Successfully uploaded Checkpoint DataItem for process "%s" on evaluation "%j"', processId, - { checkpointTxId: res.id, processId, nonce, timestamp, cron, encoding: 'gzip' } + { checkpointTxId: res.id, processId, assignmentId, hashChain, nonce, timestamp, cron, encoding: 'gzip' } ) /** * Track that we've recently created a checkpoint for this @@ -1717,6 +1764,8 @@ export function saveCheckpointWith ({ evaluation: { processId, moduleId, + assignmentId, + hashChain, timestamp, epoch, nonce, @@ -1769,8 +1818,36 @@ export function saveCheckpointWith ({ )) } - return async ({ Memory, File, encoding, processId, moduleId, timestamp, epoch, ordinate, nonce, blockHeight, cron }) => - maybeHydrateMemory({ Memory, File, encoding, processId, moduleId, timestamp, epoch, ordinate, nonce, blockHeight, cron }) + return async ({ + Memory, + File, + encoding, + processId, + moduleId, + assignmentId, + hashChain, + timestamp, + epoch, + ordinate, + nonce, + blockHeight, + cron + }) => + maybeHydrateMemory({ + Memory, + File, + encoding, + processId, + assignmentId, + hashChain, + moduleId, + timestamp, + epoch, + ordinate, + nonce, + blockHeight, + cron + }) .chain(createCheckpoints) .toPromise() } diff --git a/servers/cu/src/effects/ao-process.test.js b/servers/cu/src/effects/ao-process.test.js index 278299f29..16a4414b4 100644 --- a/servers/cu/src/effects/ao-process.test.js +++ b/servers/cu/src/effects/ao-process.test.js @@ -195,6 +195,8 @@ describe('ao-process', () => { const evaluation = { processId: 'process-123', moduleId: 'module-123', + assignmentId: 'assignment-123', + hashChain: 'hashchain-123', epoch: 0, nonce: 11, timestamp: now, @@ -218,7 +220,10 @@ describe('ao-process', () => { cron: undefined } }) - assert.deepStrictEqual(res, { file: 'state-process-123.dat', ...evaluation }) + assert.deepStrictEqual(res, { + file: 'state-process-123.dat', + ...evaluation + }) }) test('should return the latest checkpoint from a file BEFORE the before', async () => { @@ -296,6 +301,8 @@ describe('ao-process', () => { const cachedEval = { processId: PROCESS, moduleId: 'module-123', + assignmentId: 'assignment-123', + hashChain: 'hashchain-123', epoch: 0, nonce: 11, timestamp: tenSecondsAgo, @@ -400,6 +407,8 @@ describe('ao-process', () => { fromFile: undefined, Memory, moduleId: 'module-123', + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -417,6 +426,8 @@ describe('ao-process', () => { fromFile: undefined, Memory, moduleId: 'module-123', + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -449,6 +460,8 @@ describe('ao-process', () => { fromFile: 'state-process123.dat', Memory, moduleId: 'module-123', + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -509,6 +522,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'file', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -525,6 +540,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'file', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -541,6 +558,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'file', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -611,6 +630,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'record', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -627,6 +648,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'record', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -652,6 +675,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'record', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -699,6 +724,8 @@ describe('ao-process', () => { tags: [ { name: 'Module', value: `${cachedEval.moduleId}` }, { name: 'Timestamp', value: `${cachedEval.timestamp}` }, + { name: 'Assignment', value: `${cachedEval.assignmentId}` }, + { name: 'Hash-Chain', value: `${cachedEval.hashChain}` }, { name: 'Epoch', value: `${cachedEval.epoch}` }, { name: 'Nonce', value: `${cachedEval.nonce}` }, { name: 'Block-Height', value: `${cachedEval.blockHeight}` }, @@ -744,6 +771,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'arweave', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -760,6 +789,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'arweave', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -783,6 +814,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'arweave', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -809,6 +842,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'arweave', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -861,6 +896,8 @@ describe('ao-process', () => { id: 'tx-not-encoded', tags: [ { name: 'Module', value: `${cachedEval.moduleId}` }, + { name: 'Assignment', value: `${cachedEval.assignmentId}` }, + { name: 'Hash-Chain', value: `${cachedEval.hashChain}` }, { name: 'Timestamp', value: `${cachedEval.timestamp}` }, { name: 'Epoch', value: `${cachedEval.epoch}` }, { name: 'Nonce', value: `${cachedEval.nonce}` }, @@ -897,6 +934,8 @@ describe('ao-process', () => { ...edges[0].node, tags: [ { name: 'Module', value: `${cachedEval.moduleId}` }, + { name: 'Assignment', value: `${cachedEval.assignmentId}` }, + { name: 'Hash-Chain', value: `${cachedEval.hashChain}` }, { name: 'Timestamp', value: `${cachedEval.timestamp + 1000}` }, { name: 'Epoch', value: `${cachedEval.epoch}` }, { name: 'Nonce', value: '12' }, @@ -931,6 +970,9 @@ describe('ao-process', () => { id: 'ignored', tags: [ { name: 'Module', value: `${cachedEval.moduleId}` }, + // purposefully omitted to ensure robustness + // { name: 'Assignment', value: `${cachedEval.assignmentId}` }, + // { name: 'Hash-Chain', value: `${cachedEval.hashChain}` }, { name: 'Timestamp', value: `${cachedEval.timestamp + 1000}` }, { name: 'Epoch', value: `${cachedEval.epoch}` }, { name: 'Nonce', value: '12' }, @@ -968,6 +1010,8 @@ describe('ao-process', () => { ...edges[0].node, tags: [ { name: 'Module', value: `${cachedEval.moduleId}` }, + { name: 'Assignment', value: `${cachedEval.assignmentId}` }, + { name: 'Hash-Chain', value: `${cachedEval.hashChain}` }, { name: 'Timestamp', value: `${cachedEval.timestamp + 1000}` }, { name: 'Epoch', value: `${cachedEval.epoch}` }, { name: 'Nonce', value: '12' }, @@ -1056,6 +1100,8 @@ describe('ao-process', () => { src: 'cold_start', Memory: null, moduleId: undefined, + assignmentId: undefined, + hashChain: undefined, timestamp: undefined, epoch: undefined, nonce: undefined, @@ -1233,6 +1279,8 @@ describe('ao-process', () => { id: 'tx-123', tags: [ { name: 'Module', value: `${laterCachedEval.moduleId}` }, + { name: 'Assignment', value: `${cachedEval.assignmentId}` }, + { name: 'Hash-Chain', value: `${cachedEval.hashChain}` }, { name: 'Timestamp', value: `${laterCachedEval.timestamp}` }, { name: 'Epoch', value: `${laterCachedEval.epoch}` }, { name: 'Nonce', value: `${laterCachedEval.nonce}` }, @@ -1278,6 +1326,8 @@ describe('ao-process', () => { const cachedEval = { processId: PROCESS, moduleId: 'module-123', + assignmentId: 'assignment-123', + hashChain: 'hashchain-123', epoch: 0, nonce: 11, timestamp: tenSecondsAgo, @@ -1289,6 +1339,8 @@ describe('ao-process', () => { const cachedEvalFuture = { processId: PROCESS, moduleId: 'module-1234', + assignmentId: 'assignment-123', + hashChain: 'hashchain-123', epoch: 0, nonce: 11, timestamp: now + 1000, @@ -1310,7 +1362,6 @@ describe('ao-process', () => { timestamp: now - 1000, ordinate: '13', cron: undefined, - evalCount: 5, gasUsed: BigInt(50) } const targetWithGasUsed = { @@ -1319,7 +1370,6 @@ describe('ao-process', () => { timestamp: now - 1000, ordinate: '13', cron: undefined, - evalCount: 15, gasUsed: BigInt(50) } diff --git a/servers/cu/src/effects/ao-su.js b/servers/cu/src/effects/ao-su.js index 0555fb218..0ad7f897e 100644 --- a/servers/cu/src/effects/ao-su.js +++ b/servers/cu/src/effects/ao-su.js @@ -1,5 +1,6 @@ /* eslint-disable camelcase */ -import { Transform, compose as composeStreams } from 'node:stream' +import { Transform, Readable } from 'node:stream' + import { of } from 'hyper-async' import { always, applySpec, filter, has, ifElse, isNil, isNotNil, juxt, last, mergeAll, path, pathOr, pipe, prop } from 'ramda' import DataLoader from 'dataloader' @@ -13,6 +14,21 @@ const okRes = (res) => { const resToJson = (res) => res.json() +export const isHashChainValidWith = ({ hashChain }) => async (prev, scheduled) => { + const { assignmentId: prevAssignmentId, hashChain: prevHashChain } = prev + const { message } = scheduled + const actual = message['Hash-Chain'] + + /** + * Depending on the source of the hot-start, there may not be + * a prev assignmentId and hashChain + */ + if (!prevAssignmentId || !prevHashChain) return !!actual + + const expected = await hashChain(prevHashChain, prevAssignmentId) + return expected === actual +} + /** * See new shape in https://github.com/permaweb/ao/issues/563#issuecomment-2020597581 */ @@ -21,37 +37,45 @@ export const mapNode = pipe( // fromAssignment pipe( path(['assignment']), - (assignment) => parseTags(assignment.tags), - applySpec({ - /** - * There could be multiple Exclude tags, - * but parseTags will accumulate them into an array, - * - * which is what we want - */ - Exclude: path(['Exclude']), - /** - * Data from the assignment, to be placed - * on the message - */ - message: applySpec({ - Message: path(['Message']), - Target: path(['Process']), - Epoch: pipe(path(['Epoch']), parseInt), - Nonce: pipe(path(['Nonce']), parseInt), - Timestamp: pipe(path(['Timestamp']), parseInt), - 'Block-Height': pipe( + (assignment) => { + const tags = parseTags(assignment.tags) + + return applySpec({ /** - * Returns a left padded integer like '000001331218' + * The assignment id is needed in order to compute + * and verify the hash chain + */ + AssignmentId: always(assignment.id), + /** + * There could be multiple Exclude tags, + * but parseTags will accumulate them into an array, * - * So use parseInt to convert it into a number + * which is what we want */ - path(['Block-Height']), - parseInt - ), - 'Hash-Chain': path(['Hash-Chain']) - }) - }) + Exclude: path(['Exclude']), + /** + * Data from the assignment, to be placed + * on the message + */ + message: applySpec({ + Message: path(['Message']), + Target: path(['Process']), + Epoch: pipe(path(['Epoch']), parseInt), + Nonce: pipe(path(['Nonce']), parseInt), + Timestamp: pipe(path(['Timestamp']), parseInt), + 'Block-Height': pipe( + /** + * Returns a left padded integer like '000001331218' + * + * So use parseInt to convert it into a number + */ + path(['Block-Height']), + parseInt + ), + 'Hash-Chain': path(['Hash-Chain']) + }) + })(tags) + } ), // fromMessage pipe( @@ -94,6 +118,7 @@ export const mapNode = pipe( name: `${fBoth.isAssignment ? 'Assigned' : 'Scheduled'} Message ${fBoth.isAssignment ? fAssignment.message.Message : fMessage.Id} ${fAssignment.message.Timestamp}:${fAssignment.message.Nonce}`, exclude: fAssignment.Exclude, isAssignment: fBoth.isAssignment, + assignmentId: fAssignment.AssignmentId, message: mergeAll([ fMessage, fAssignment.message, @@ -132,9 +157,11 @@ export const mapNode = pipe( }) ) -export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { +export const loadMessagesWith = ({ hashChain, fetch, logger: _logger, pageSize }) => { const logger = _logger.child('ao-su:loadMessages') + const isHashChainValid = isHashChainValidWith({ hashChain }) + const fetchPageDataloader = new DataLoader(async (args) => { fetchPageDataloader.clearAll() @@ -233,15 +260,29 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { } } - function mapAoMessage ({ processId, processOwner, processTags, moduleId, moduleOwner, moduleTags }) { + function mapAoMessage ({ processId, processBlock, assignmentId, hashChain, processOwner, processTags, moduleId, moduleOwner, moduleTags, logger }) { const AoGlobal = { Process: { Id: processId, Owner: processOwner, Tags: processTags }, Module: { Id: moduleId, Owner: moduleOwner, Tags: moduleTags } } + /** + * Only perform hash chain validation on processes + * spawned after arweave day, since old hash chains are invalid + * anyway. + */ + const isHashChainValidationEnabled = processBlock.height >= 1440000 + if (!isHashChainValidationEnabled) { + logger('HashChain validation disabled for old process "%s" at block [%j]', [processId, processBlock]) + } + // Set this to simulate a stream error + // eslint-disable-next-line + let simulateError = false + let prevAssignmentId = assignmentId + let prevHashChain = hashChain return async function * (edges) { for await (const edge of edges) { - yield pipe( + const scheduled = pipe( prop('node'), /** * Map to the expected shape @@ -252,20 +293,62 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { return scheduled } )(edge) + + if (simulateError) { + logger(' message "%s" scheduled on process "%s"', scheduled.message.Id, processId) + const err = new Error(`Simulated Error on message ${scheduled.message.Id}`) + err.status = 422 + throw err + } + + if (isHashChainValidationEnabled) { + if (!(await isHashChainValid({ assignmentId: prevAssignmentId, hashChain: prevHashChain }, scheduled))) { + logger('HashChain invalid on message "%s" scheduled on process "%s"', scheduled.message.Id, processId) + const err = new Error(`HashChain invalid on message ${scheduled.message.Id}`) + err.status = 422 + throw err + } + } + + prevAssignmentId = scheduled.assignmentId + prevHashChain = scheduled.message['Hash-Chain'] + + yield scheduled } } } return (args) => of(args) - .map(({ suUrl, processId, owner: processOwner, tags: processTags, moduleId, moduleOwner, moduleTags, from, to }) => { - return composeStreams( - /** - * compose will convert the AsyncIterable into a readable Duplex - */ - fetchAllPages({ suUrl, processId, from, to })(), - Transform.from(mapAoMessage({ processId, processOwner, processTags, moduleId, moduleOwner, moduleTags })) - ) + .map(({ + suUrl, + processId, + block: processBlock, + owner: processOwner, + tags: processTags, + moduleId, + moduleOwner, + moduleTags, + from, + to, + assignmentId, + hashChain + }) => { + return [ + Readable.from(fetchAllPages({ suUrl, processId, from, to })()), + Transform.from(mapAoMessage({ + processId, + processBlock, + assignmentId, + hashChain, + processOwner, + processTags, + moduleId, + moduleOwner, + moduleTags, + logger + })) + ] }) .toPromise() } diff --git a/servers/cu/src/effects/ao-su.test.js b/servers/cu/src/effects/ao-su.test.js index ce8b44b42..a6182c2e6 100644 --- a/servers/cu/src/effects/ao-su.test.js +++ b/servers/cu/src/effects/ao-su.test.js @@ -1,11 +1,13 @@ /* eslint-disable no-throw-literal */ import { describe, test } from 'node:test' import assert from 'node:assert' +import { createHash } from 'node:crypto' import { loadMessageMetaSchema, loadProcessSchema, loadTimestampSchema } from '../domain/dal.js' import { messageSchema } from '../domain/model.js' import { createTestLogger } from '../domain/logger.js' -import { loadMessageMetaWith, loadProcessWith, loadTimestampWith, mapNode } from './ao-su.js' +import { isHashChainValidWith, loadMessageMetaWith, loadProcessWith, loadTimestampWith, mapNode } from './ao-su.js' +import { hashChain } from './worker/hashChain/main.js' const withoutAoGlobal = messageSchema.omit({ AoGlobal: true }) const logger = createTestLogger({ name: 'ao-cu:ao-su' }) @@ -14,12 +16,14 @@ describe('ao-su', () => { describe('mapNode', () => { const now = new Date().getTime() const messageId = 'message-123' + const assignmentId = 'assignment-123' const assignedMessageId = 'assigned-123' const expected = { cron: undefined, ordinate: '23', name: `Scheduled Message ${messageId} ${now}:23`, exclude: undefined, + assignmentId, message: { Id: messageId, Signature: 'sig-123', @@ -57,6 +61,7 @@ describe('ao-su', () => { data: 'data-123' }, assignment: { + id: assignmentId, owner: { address: 'su-123', key: 'su-123' @@ -78,9 +83,10 @@ describe('ao-su', () => { withoutAoGlobal.parse(expected) ) assert.equal(res.isAssignment, false) + assert.equal(res.assignmentId, assignmentId) }) - describe('should map an assigned tx', () => { + describe('should map an assignment tx', () => { const res = mapNode({ message: null, assignment: { @@ -134,6 +140,85 @@ describe('ao-su', () => { }) }) + describe('isHashChainValid', () => { + const isHashChainValid = isHashChainValidWith({ hashChain }) + const now = new Date().getTime() + const messageId = 'message-123' + const scheduled = { + cron: undefined, + ordinate: '23', + name: `Scheduled Message ${messageId} ${now}:23`, + exclude: undefined, + message: { + Id: messageId, + Signature: 'sig-123', + Data: 'data-123', + Owner: 'owner-123', + Target: 'process-123', + Anchor: '00000000123', + From: 'owner-123', + 'Forwarded-By': undefined, + Tags: [{ name: 'Foo', value: 'Bar' }], + Epoch: 0, + Nonce: 23, + Timestamp: now, + 'Block-Height': 123, + 'Hash-Chain': 'hash-123', + Cron: false + }, + block: { + height: 123, + timestamp: now + } + } + + test('should return whether the hashChain exists if there is no previous assignment', async () => { + // no prev info + assert(await isHashChainValid({}, scheduled)) + // first assignment ergo has no prev assignment + assert(await isHashChainValid({ + hashChain: null, + assignmentId: 'foo' + }, scheduled)) + assert(await isHashChainValid({ + hashChain: 'foo', + assignmentId: null + }, scheduled)) + }) + + test('should calculate and compare the hashChain based on the previous assignment', async () => { + const prevAssignmentId = Buffer.from('assignment-123', 'utf8').toString('base64url') + const prevHashChain = Buffer.from('hashchain-123', 'utf8').toString('base64url') + + const prev = { assignmentId: prevAssignmentId, hashChain: prevHashChain } + + const expected = createHash('sha256') + .update(Buffer.from(prevAssignmentId, 'base64url')) + .update(Buffer.from(prevHashChain, 'base64url')) + .digest('base64url') + + const valid = { + message: { + // .... + 'Hash-Chain': expected + } + } + + assert(await isHashChainValid(prev, valid)) + + const invalid = { + message: { + 'Hash-Chain': createHash('sha256') + .update(Buffer.from('something else', 'base64url')) + .update(Buffer.from(prevHashChain, 'base64url')) + .digest('base64url') + } + } + + assert(!(await isHashChainValid(prev, invalid))) + }) + }) + describe('loadMessageMetaWith', () => { test('return the message meta', async () => { const loadMessageMeta = loadMessageMetaSchema.implement( diff --git a/servers/cu/src/effects/worker/hashChain/index.js b/servers/cu/src/effects/worker/hashChain/index.js new file mode 100644 index 000000000..1055937f5 --- /dev/null +++ b/servers/cu/src/effects/worker/hashChain/index.js @@ -0,0 +1,8 @@ +import { worker } from 'workerpool' + +import { hashChain } from './main.js' + +/** + * Expose worker api + */ +worker({ hashChain }) diff --git a/servers/cu/src/effects/worker/hashChain/main.js b/servers/cu/src/effects/worker/hashChain/main.js new file mode 100644 index 000000000..4272fdbe3 --- /dev/null +++ b/servers/cu/src/effects/worker/hashChain/main.js @@ -0,0 +1,19 @@ +import { createHash } from 'node:crypto' + +const base64UrlToBytes = (b64Url) => + Buffer.from(b64Url, 'base64url') + +export const hashChain = (prevHashChain, prevAssignmentId) => { + const hash = createHash('sha256') + /** + * For the very first message, there is no previous id, + * so it is not included in the hashed bytes, to produce the very first + * hash chain + */ + if (prevAssignmentId) hash.update(base64UrlToBytes(prevAssignmentId)) + /** + * Always include the previous hash chain + */ + hash.update(base64UrlToBytes(prevHashChain)) + return hash.digest('base64url') +}