From a87db6a163b9504051a97db0ffe0deba81ec0f05 Mon Sep 17 00:00:00 2001 From: Tyler Hall Date: Thu, 16 Jan 2025 10:55:03 -0500 Subject: [PATCH 1/9] feat(cu): implement hash chain validation on messages received from SU #1112 --- servers/cu/src/effects/ao-su.js | 59 ++++++++++- servers/cu/src/effects/ao-su.test.js | 140 ++++++++++++++++++++++++++- 2 files changed, 194 insertions(+), 5 deletions(-) diff --git a/servers/cu/src/effects/ao-su.js b/servers/cu/src/effects/ao-su.js index 0555fb218..73860dbe2 100644 --- a/servers/cu/src/effects/ao-su.js +++ b/servers/cu/src/effects/ao-su.js @@ -1,11 +1,16 @@ /* eslint-disable camelcase */ import { Transform, compose as composeStreams } from 'node:stream' +import { createHash } from 'node:crypto' + import { of } from 'hyper-async' import { always, applySpec, filter, has, ifElse, isNil, isNotNil, juxt, last, mergeAll, path, pathOr, pipe, prop } from 'ramda' import DataLoader from 'dataloader' import { backoff, mapForwardedBy, mapFrom, addressFrom, parseTags, strFromFetchError } from '../domain/utils.js' +const base64UrlToBytes = (b64Url) => + Buffer.from(b64Url, 'base64url') + const okRes = (res) => { if (res.ok) return res throw res @@ -13,6 +18,40 @@ const okRes = (res) => { const resToJson = (res) => res.json() +const hashChain = (prevHashChain, prevAssignmentId) => { + const hash = createHash('sha256') + + /** + * For the very first message, there is no previous id, + * so it is not included in the hashed bytes, to produce the very first + * hash chain + */ + if (prevAssignmentId) hash.update(base64UrlToBytes(prevAssignmentId)) + /** + * Always include the previous hash chain + */ + hash.update(base64UrlToBytes(prevHashChain)) + + return hash.digest('base64url') +} + +export const isHashChainValid = (scheduled) => { + const { prevAssignment, message } = scheduled + const actual = message['Hash-Chain'] + + /** + * This will match in cases where the SU returns no prevAssignment + * which is to say the feature isn't live on the SU. + * + * AND will match validating the first assignment, which needs + * no validation, besides needing to check its hashChain is present + */ + if (!prevAssignment?.id || !prevAssignment?.hashChain) return !!actual + + const expected = hashChain(prevAssignment.hashChain, prevAssignment.id) + return expected === actual +} + /** * See new shape in https://github.com/permaweb/ao/issues/563#issuecomment-2020597581 */ @@ -79,6 +118,10 @@ export const mapNode = pipe( ), // both applySpec({ + prevAssignment: applySpec({ + hashChain: pathOr(null, ['previous_assignment', 'hash_chain']), + id: pathOr(null, ['previous_assignment', 'id']) + }), isAssignment: pipe( path(['message', 'id']), isNil @@ -94,6 +137,7 @@ export const mapNode = pipe( name: `${fBoth.isAssignment ? 'Assigned' : 'Scheduled'} Message ${fBoth.isAssignment ? fAssignment.message.Message : fMessage.Id} ${fAssignment.message.Timestamp}:${fAssignment.message.Nonce}`, exclude: fAssignment.Exclude, isAssignment: fBoth.isAssignment, + prevAssignment: fBoth.prevAssignment, message: mergeAll([ fMessage, fAssignment.message, @@ -233,7 +277,7 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { } } - function mapAoMessage ({ processId, processOwner, processTags, moduleId, moduleOwner, moduleTags }) { + function mapAoMessage ({ processId, processOwner, processTags, moduleId, moduleOwner, moduleTags, logger }) { const AoGlobal = { Process: { Id: processId, Owner: processOwner, Tags: processTags }, Module: { Id: moduleId, Owner: moduleOwner, Tags: moduleTags } @@ -241,7 +285,7 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { return async function * (edges) { for await (const edge of edges) { - yield pipe( + const scheduled = pipe( prop('node'), /** * Map to the expected shape @@ -252,6 +296,15 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { return scheduled } )(edge) + + if (!isHashChainValid(scheduled)) { + logger('HashChain invalid on message "%s" scheduled on process "%s"', scheduled.message.Id, processId) + const err = new Error(`HashChain invalid on message ${scheduled.message.Id}`) + err.status = 422 + throw err + } + + yield scheduled } } } @@ -264,7 +317,7 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { * compose will convert the AsyncIterable into a readable Duplex */ fetchAllPages({ suUrl, processId, from, to })(), - Transform.from(mapAoMessage({ processId, processOwner, processTags, moduleId, moduleOwner, moduleTags })) + Transform.from(mapAoMessage({ processId, processOwner, processTags, moduleId, moduleOwner, moduleTags, logger })) ) }) .toPromise() diff --git a/servers/cu/src/effects/ao-su.test.js b/servers/cu/src/effects/ao-su.test.js index ce8b44b42..9265a74f7 100644 --- a/servers/cu/src/effects/ao-su.test.js +++ b/servers/cu/src/effects/ao-su.test.js @@ -1,11 +1,12 @@ /* eslint-disable no-throw-literal */ import { describe, test } from 'node:test' import assert from 'node:assert' +import { createHash } from 'node:crypto' import { loadMessageMetaSchema, loadProcessSchema, loadTimestampSchema } from '../domain/dal.js' import { messageSchema } from '../domain/model.js' import { createTestLogger } from '../domain/logger.js' -import { loadMessageMetaWith, loadProcessWith, loadTimestampWith, mapNode } from './ao-su.js' +import { isHashChainValid, loadMessageMetaWith, loadProcessWith, loadTimestampWith, mapNode } from './ao-su.js' const withoutAoGlobal = messageSchema.omit({ AoGlobal: true }) const logger = createTestLogger({ name: 'ao-cu:ao-su' }) @@ -20,6 +21,7 @@ describe('ao-su', () => { ordinate: '23', name: `Scheduled Message ${messageId} ${now}:23`, exclude: undefined, + prevAssignment: { id: null, hashChain: null }, message: { Id: messageId, Signature: 'sig-123', @@ -80,7 +82,7 @@ describe('ao-su', () => { assert.equal(res.isAssignment, false) }) - describe('should map an assigned tx', () => { + describe('should map an assignment tx', () => { const res = mapNode({ message: null, assignment: { @@ -132,6 +134,140 @@ describe('ao-su', () => { }) }) }) + + describe('should map the previous assignment data', () => { + const arg = { + message: null, + assignment: { + owner: { + address: 'su-123', + key: 'su-123' + }, + tags: [ + { name: 'Epoch', value: '0' }, + { name: 'Nonce', value: '23' }, + { name: 'Process', value: 'process-123' }, + { name: 'Block-Height', value: '000000000123' }, + { name: 'Timestamp', value: `${now}` }, + { name: 'Hash-Chain', value: 'hash-123' }, + { name: 'Message', value: assignedMessageId } + ], + data: 'su-data-123' + } + } + + test('should map prevAssignment fields', () => { + const res = mapNode({ + ...arg, + previous_assignment: { + id: 'prev-assignment-id', + hash_chain: 'prev-hashchain' + } + }) + + assert.deepStrictEqual( + res.prevAssignment, + { id: 'prev-assignment-id', hashChain: 'prev-hashchain' } + ) + }) + + test('should set prevAssignment fields to null', () => { + const res = mapNode({ + ...arg, + no_previous_assigment: true + }) + + assert.deepStrictEqual( + res.prevAssignment, + { id: null, hashChain: null } + ) + }) + }) + }) + + describe('isHashChainValid', () => { + const now = new Date().getTime() + const messageId = 'message-123' + const arg = { + cron: undefined, + ordinate: '23', + name: `Scheduled Message ${messageId} ${now}:23`, + exclude: undefined, + message: { + Id: messageId, + Signature: 'sig-123', + Data: 'data-123', + Owner: 'owner-123', + Target: 'process-123', + Anchor: '00000000123', + From: 'owner-123', + 'Forwarded-By': undefined, + Tags: [{ name: 'Foo', value: 'Bar' }], + Epoch: 0, + Nonce: 23, + Timestamp: now, + 'Block-Height': 123, + 'Hash-Chain': 'hash-123', + Cron: false + }, + block: { + height: 123, + timestamp: now + } + } + + test('should return whether the hashChain exists if there is no previous assignment', () => { + // feature not rolled out on SU + assert(isHashChainValid(arg)) + // first assignment ergo has no prev assignment + assert(isHashChainValid({ + ...arg, + prevAssignment: { + hashChain: null, + id: 'foo' + } + })) + assert(isHashChainValid({ + ...arg, + prevAssignment: { + hashChain: 'foo', + id: null + } + })) + }) + + test('should calculate and compare the hashChain based on the previous assignment', () => { + const prevAssignmentId = Buffer.from('assignment-123', 'utf8').toString('base64url') + const prevHashChain = Buffer.from('hashchain-123', 'utf8').toString('base64url') + + const expected = createHash('sha256') + .update(Buffer.from(prevAssignmentId, 'base64url')) + .update(Buffer.from(prevHashChain, 'base64url')) + .digest('base64url') + + const arg = { + // ... + prevAssignment: { id: prevAssignmentId, hashChain: prevHashChain }, + message: { + // .... + 'Hash-Chain': expected + } + } + + assert(isHashChainValid(arg)) + + const invalid = { + ...arg, + message: { + 'Hash-Chain': createHash('sha256') + .update(Buffer.from('something else', 'base64url')) + .update(Buffer.from(prevHashChain, 'base64url')) + .digest('base64url') + } + } + + assert(!isHashChainValid(invalid)) + }) }) describe('loadMessageMetaWith', () => { From c8979d6f34f46217fa0e9ff33a0e276256347c94 Mon Sep 17 00:00:00 2001 From: Tyler Hall Date: Fri, 17 Jan 2025 19:52:59 +0000 Subject: [PATCH 2/9] feat(cu): save, and pass around hashchain/assignment id between eval streams and checkpoints #1112 --- servers/cu/src/domain/dal.js | 7 +- servers/cu/src/domain/lib/evaluate.js | 15 +- servers/cu/src/domain/lib/evaluate.test.js | 174 ++++++++++++++++++ servers/cu/src/domain/lib/loadMessages.js | 1 + servers/cu/src/domain/lib/loadProcess.js | 35 +++- servers/cu/src/domain/lib/loadProcess.test.js | 24 ++- servers/cu/src/domain/model.js | 9 + servers/cu/src/effects/ao-process.js | 107 +++++++++-- servers/cu/src/effects/ao-process.test.js | 56 +++++- servers/cu/src/effects/ao-su.js | 82 +++++---- servers/cu/src/effects/ao-su.test.js | 54 +----- 11 files changed, 437 insertions(+), 127 deletions(-) diff --git a/servers/cu/src/domain/dal.js b/servers/cu/src/domain/dal.js index 606a8edaf..eed3272f1 100644 --- a/servers/cu/src/domain/dal.js +++ b/servers/cu/src/domain/dal.js @@ -67,6 +67,8 @@ export const saveLatestProcessMemorySchema = z.function() processId: z.string(), moduleId: z.string().nullish(), messageId: z.string().nullish(), + assignmentId: z.string().nullish(), + hashChain: z.string().nullish(), timestamp: z.coerce.number().nullish(), epoch: z.coerce.number().nullish(), nonce: z.coerce.number().nullish(), @@ -74,7 +76,6 @@ export const saveLatestProcessMemorySchema = z.function() cron: z.string().nullish(), blockHeight: z.coerce.number().nullish(), Memory: bufferSchema, - evalCount: z.number().nullish(), gasUsed: z.bigint().nullish() })) .returns(z.promise(z.any())) @@ -173,7 +174,9 @@ export const loadMessagesSchema = z.function() moduleTags: z.array(rawTagSchema), moduleOwner: z.string(), from: z.coerce.number().nullish(), - to: z.coerce.number().nullish() + to: z.coerce.number().nullish(), + assignmentId: z.string().nullish(), + hashChain: z.string().nullish() }) ) /** diff --git a/servers/cu/src/domain/lib/evaluate.js b/servers/cu/src/domain/lib/evaluate.js index 6e2db30fd..508214032 100644 --- a/servers/cu/src/domain/lib/evaluate.js +++ b/servers/cu/src/domain/lib/evaluate.js @@ -91,6 +91,8 @@ export function evaluateWith (env) { .chain(fromPromise(async (ctx) => { // A running tally of gas used in the eval stream let totalGasUsed = BigInt(0) + let mostRecentAssignmentId = ctx.mostRecentAssignmentId + let mostRecentHashChain = ctx.mostRecentHashChain let prev = applySpec({ /** * Ensure all result fields are initialized @@ -160,7 +162,7 @@ export function evaluateWith (env) { * and evaluate each one */ let first = true - for await (const { noSave, cron, ordinate, name, message, deepHash, isAssignment, AoGlobal } of messages) { + for await (const { noSave, cron, ordinate, name, message, deepHash, isAssignment, assignmentId, AoGlobal } of messages) { if (cron) { const key = toEvaledCron({ timestamp: message.Timestamp, cron }) if (evaledCrons.has(key)) continue @@ -170,6 +172,14 @@ export function evaluateWith (env) { * again in the eval stream */ else evaledCrons.set(key, true) + } else if (!noSave) { + /** + * As messages stream into the process to be evaluated, + * we need to keep track of the most assignmentId + * and hashChain of the most recent scheduled message + */ + mostRecentAssignmentId = assignmentId + mostRecentHashChain = message['Hash-Chain'] } /** @@ -306,6 +316,8 @@ export function evaluateWith (env) { processId: ctx.id, moduleId: ctx.moduleId, messageId: message.Id, + assignmentId: mostRecentAssignmentId, + hashChain: mostRecentHashChain, timestamp: message.Timestamp, nonce: message.Nonce, epoch: message.Epoch, @@ -313,7 +325,6 @@ export function evaluateWith (env) { ordinate, cron, Memory: prev.Memory, - evalCount: ctx.stats.messages.scheduled + ctx.stats.messages.cron, gasUsed: totalGasUsed }) } diff --git a/servers/cu/src/domain/lib/evaluate.test.js b/servers/cu/src/domain/lib/evaluate.test.js index 374393703..ef84e19ac 100644 --- a/servers/cu/src/domain/lib/evaluate.test.js +++ b/servers/cu/src/domain/lib/evaluate.test.js @@ -14,6 +14,8 @@ async function * toAsyncIterable (iterable) { while (iterable.length) yield iterable.shift() } +const mockCounter = { inc: () => undefined } + const moduleOptions = { format: 'wasm32-unknown-emscripten', inputEncoding: 'JSON-1', @@ -601,4 +603,176 @@ describe('evaluate', () => { ]) }).toPromise() }) + + describe('tracks and saves most recent assignmentId and hashChain', () => { + const cronOne = { + ordinate: 1, + isAssignment: false, + cron: '1-20-minutes', + message: { + Id: 'message-123', + Timestamp: 1702846520559, + Owner: 'owner-123', + Tags: [ + { name: 'From', value: 'hello' }, + { name: 'function', value: 'hello' }, + { name: 'Owner', value: 'hello' } + ], + 'Block-Height': 1234 + }, + AoGlobal: { + Process: { + Id: '1234', + Tags: [] + } + } + } + + const cronTwo = { + ordinate: 2, + isAssignment: false, + cron: '1-20-minutes', + message: { + Id: 'message-123', + Timestamp: 1702846520559, + Owner: 'owner-123', + Tags: [ + { name: 'From', value: 'hello' }, + { name: 'function', value: 'hello' }, + { name: 'Owner', value: 'hello' } + ], + 'Block-Height': 1234 + }, + AoGlobal: { + Process: { + Id: '1234', + Tags: [] + } + } + } + + const messageOne = { + ordinate: 1, + isAssignment: false, + assignmentId: 'assignment-1', + message: { + Id: 'message-123', + Timestamp: 1702846520559, + Owner: 'owner-123', + 'Hash-Chain': 'hashchain-1', + Tags: [ + { name: 'From', value: 'hello' }, + { name: 'function', value: 'hello' }, + { name: 'Owner', value: 'hello' } + ], + 'Block-Height': 1234 + }, + AoGlobal: { + Process: { + Id: '1234', + Tags: [] + } + } + } + + const messageTwo = { + ordinate: 1, + isAssignment: false, + assignmentId: 'assignment-2', + message: { + Id: 'message-123', + Timestamp: 1702846520559, + Owner: 'owner-123', + 'Hash-Chain': 'hashchain-2', + Tags: [ + { name: 'From', value: 'hello' }, + { name: 'function', value: 'hello' }, + { name: 'Owner', value: 'hello' } + ], + 'Block-Height': 1234 + }, + AoGlobal: { + Process: { + Id: '1234', + Tags: [] + } + } + } + + const args = { + id: 'ctr-1234', + from: new Date().getTime(), + moduleId: 'foo-module', + mostRecentAssignmentId: 'init-assignment-123', + mostRecentHashChain: 'init-hashchain-123', + moduleOptions, + stats: { + messages: { + scheduled: 0, + cron: 0, + error: 0 + } + }, + result: { + Memory: Buffer.from('Hello world') + } + } + + test('when only cron', async () => { + const evaluate = evaluateWith({ + findMessageBefore: async () => { throw { status: 404 } }, + loadEvaluator: () => ({ message, close }) => ({ Memory: Buffer.from('Hello world') }), + saveLatestProcessMemory: async (a) => { + assert.equal(a.assignmentId, args.mostRecentAssignmentId) + assert.equal(a.hashChain, args.mostRecentHashChain) + }, + evaluationCounter: mockCounter, + gasCounter: mockCounter, + logger + }) + + await evaluate({ + ...args, + messages: toAsyncIterable([cronOne, cronTwo]) + }).toPromise() + }) + + test('intermingled cron and scheduled', async () => { + const evaluate = evaluateWith({ + findMessageBefore: async () => { throw { status: 404 } }, + loadEvaluator: () => ({ message, close }) => ({ Memory: Buffer.from('Hello world') }), + saveLatestProcessMemory: async (a) => { + assert.equal(a.assignmentId, messageOne.assignmentId) + assert.equal(a.hashChain, messageOne.message['Hash-Chain']) + }, + evaluationCounter: mockCounter, + gasCounter: mockCounter, + logger + }) + + await evaluate({ + ...args, + messages: toAsyncIterable([cronOne, messageOne, cronTwo]) + }).toPromise() + }) + + test('multiple scheduled', async () => { + const evaluate = evaluateWith({ + findMessageBefore: async () => { throw { status: 404 } }, + loadEvaluator: () => ({ message, close }) => ({ Memory: Buffer.from('Hello world') }), + saveLatestProcessMemory: async (a) => { + assert.equal(a.assignmentId, messageTwo.assignmentId) + assert.equal(a.hashChain, messageTwo.message['Hash-Chain']) + }, + evaluationCounter: mockCounter, + gasCounter: mockCounter, + logger + }) + + await evaluate({ + ...args, + messages: toAsyncIterable([cronOne, messageOne, cronTwo, messageTwo]) + }).toPromise() + }) + }) }) diff --git a/servers/cu/src/domain/lib/loadMessages.js b/servers/cu/src/domain/lib/loadMessages.js index ebb3485a4..28f2cdf49 100644 --- a/servers/cu/src/domain/lib/loadMessages.js +++ b/servers/cu/src/domain/lib/loadMessages.js @@ -489,6 +489,7 @@ function loadScheduledMessagesWith ({ loadMessages, logger }) { moduleTags: ctx.moduleTags, from: ctx.from, // could be undefined to: ctx.to // could be undefined + // TODO: pass ctx.mostRecentHashChain and ctx.mostRecentAssignmentId to initialize hash chain verification of scheduled messages }) ) } diff --git a/servers/cu/src/domain/lib/loadProcess.js b/servers/cu/src/domain/lib/loadProcess.js index 099c5a13c..637c3e5ef 100644 --- a/servers/cu/src/domain/lib/loadProcess.js +++ b/servers/cu/src/domain/lib/loadProcess.js @@ -45,6 +45,16 @@ const ctxSchema = z.object({ * was not the result of a Cron message */ fromCron: z.string().nullish(), + /** + * The most RECENT SCHEDULED message assignmentId, needed + * in order to perform hashChain validation + */ + mostRecentAssignmentId: z.string().nullish(), + /** + * The most RECENT SCHEDULED message hashChain, needed + * in order to perform hashChain validation + */ + mostRecentHashChain: z.string().nullish(), /** * Whether the evaluation found is the exact evaluation being requested */ @@ -89,8 +99,6 @@ function loadLatestEvaluationWith ({ findEvaluation, findLatestProcessMemory, sa } function maybeCachedMemory (ctx) { - // logger('Checking cache for existing memory to start evaluation "%s"...', ctx.id) - return findLatestProcessMemory({ processId: ctx.id, timestamp: ctx.to, @@ -127,14 +135,12 @@ function loadLatestEvaluationWith ({ findEvaluation, findLatestProcessMemory, sa if (['cold_start'].includes(found.src)) return Resolved(found) if (['memory'].includes(found.src) && !found.fromFile) return Resolved(found) - // logger( - // 'Seeding cache with latest checkpoint found with parameters "%j"', - // omit(['Memory'], found) - // ) /** - * Immediatley attempt to save the memory loaded from a checkpoint - * into the LRU In-memory cache, which will cut - * down on calls to load checkpoints from arweave (it will load from cache instead) + * Immediatley attempt to save the memory loaded, from a less-hot checkpoint + * layer, into the LRU In-memory cache. + * + * This will help mitigate concurrent evals of the same process + * from all making calls to a remote ie. Arweave */ return saveLatestProcessMemory({ processId: ctx.id, @@ -144,6 +150,8 @@ function loadLatestEvaluationWith ({ findEvaluation, findLatestProcessMemory, sa */ Memory: found.Memory, moduleId: found.moduleId, + assignmentId: found.assignmentId, + hashChain: found.hashChain, // messageId: found.messageId, timestamp: found.timestamp, epoch: found.epoch, @@ -163,6 +171,15 @@ function loadLatestEvaluationWith ({ findEvaluation, findLatestProcessMemory, sa ordinate: found.ordinate, fromBlockHeight: found.blockHeight, fromCron: found.cron, + mostRecentAssignmentId: found.assignmentId, + mostRecentHashChain: found.hashChain, + /** + * The exact evaluation may not have been found in persitence, + * but it may be found in a caching tier. + * + * So we still signal to the caller whether an exact match + * was found, for potential optimizations. + */ exact })) }) diff --git a/servers/cu/src/domain/lib/loadProcess.test.js b/servers/cu/src/domain/lib/loadProcess.test.js index 274875692..b71b2740a 100644 --- a/servers/cu/src/domain/lib/loadProcess.test.js +++ b/servers/cu/src/domain/lib/loadProcess.test.js @@ -9,13 +9,15 @@ const PROCESS = 'process-123-9HdeqeuYQOgMgWucro' const logger = createTestLogger({ name: 'ao-cu:readState' }) describe('loadProcess', () => { - test('appends result, from, fromCron, fromBlockHeight and evaluatedAt to ctx', async () => { + test('appends result to ctx', async () => { const loadProcess = loadProcessWith({ findEvaluation: async () => { throw { status: 404 } }, findLatestProcessMemory: async () => ({ src: 'cold_start', Memory: null, moduleId: undefined, + assignmentId: undefined, + hashChain: undefined, timestamp: undefined, epoch: undefined, nonce: undefined, @@ -30,11 +32,7 @@ describe('loadProcess', () => { const res = await loadProcess({ id: PROCESS, to: '1697574792000' }).toPromise() assert.deepStrictEqual(res.result, { Memory: null }) - assert.equal(res.from, undefined) - assert.equal(res.fromCron, undefined) assert.equal(res.ordinate, '0') - assert.equal(res.fromBlockHeight, undefined) - assert.equal(res.evaluatedAt, undefined) assert.equal(res.id, PROCESS) }) @@ -76,6 +74,8 @@ describe('loadProcess', () => { assert.deepStrictEqual(res.ordinate, cachedEvaluation.ordinate) assert.deepStrictEqual(res.fromCron, cachedEvaluation.cron) assert.deepStrictEqual(res.fromBlockHeight, cachedEvaluation.blockHeight) + assert.equal(res.mostRecentAssignmentId, undefined) + assert.equal(res.mostRecentHashChain, undefined) assert.equal(res.id, PROCESS) }) @@ -84,6 +84,8 @@ describe('loadProcess', () => { src: 'memory', Memory: Buffer.from('hello world'), moduleId: 'module-123', + assignmentId: 'assignment-123', + hashChain: 'hashchain-123', timestamp: 1697574792000, epoch: 0, nonce: 11, @@ -104,6 +106,8 @@ describe('loadProcess', () => { assert.deepStrictEqual(res.ordinate, cached.ordinate) assert.deepStrictEqual(res.fromCron, cached.cron) assert.deepStrictEqual(res.fromBlockHeight, cached.blockHeight) + assert.deepStrictEqual(res.mostRecentAssignmentId, cached.assignmentId) + assert.deepStrictEqual(res.mostRecentHashChain, cached.hashChain) assert.equal(res.id, PROCESS) }) @@ -111,6 +115,8 @@ describe('loadProcess', () => { const cached = { Memory: Buffer.from('hello world'), moduleId: 'module-123', + assignmentId: 'assignment-123', + hashChain: 'hashchain-123', timestamp: 1697574792000, epoch: 0, nonce: 11, @@ -125,9 +131,10 @@ describe('loadProcess', () => { saveLatestProcessMemory: async (args) => { assert.deepStrictEqual(args, { processId: PROCESS, - evalCount: 0, Memory: cached.Memory, moduleId: cached.moduleId, + assignmentId: cached.assignmentId, + hashChain: cached.hashChain, timestamp: cached.timestamp, epoch: cached.epoch, nonce: cached.nonce, @@ -150,6 +157,8 @@ describe('loadProcess', () => { const cached = { Memory: Buffer.from('hello world'), moduleId: 'module-123', + assignmentId: 'assignment-123', + hashChain: 'hashchain-123', timestamp: 1697574792000, epoch: 0, nonce: 11, @@ -164,9 +173,10 @@ describe('loadProcess', () => { saveLatestProcessMemory: async (args) => { assert.deepStrictEqual(args, { processId: PROCESS, - evalCount: 0, Memory: cached.Memory, moduleId: cached.moduleId, + assignmentId: cached.assignmentId, + hashChain: cached.hashChain, timestamp: cached.timestamp, epoch: cached.epoch, nonce: cached.nonce, diff --git a/servers/cu/src/domain/model.js b/servers/cu/src/domain/model.js index ec4759cb0..c8d77addf 100644 --- a/servers/cu/src/domain/model.js +++ b/servers/cu/src/domain/model.js @@ -252,6 +252,8 @@ export const processCheckpointSchema = z.object({ fromFile: z.string().nullish(), Memory: bufferSchema.nullish(), moduleId: z.string().nullish(), + assignmentId: z.string().nullish(), + hashChain: z.string().nullish(), timestamp: z.coerce.number().nullish(), epoch: z.coerce.number().nullish(), nonce: z.coerce.number().nullish(), @@ -307,6 +309,13 @@ export const messageSchema = z.object({ * Whether the message is a pure assignment of an on-chain message */ isAssignment: z.boolean().default(false), + /** + * The id of the assignment. + * + * cron messages do not have an assignment, so this + * is optional + */ + assignmentId: z.string().nullish(), /** * For assignments, any exclusions to not be passed to the process, * and potentially not loaded from the gateway or arweave diff --git a/servers/cu/src/effects/ao-process.js b/servers/cu/src/effects/ao-process.js index ff4c13692..535acd00c 100644 --- a/servers/cu/src/effects/ao-process.js +++ b/servers/cu/src/effects/ao-process.js @@ -45,6 +45,8 @@ export const LATEST = 'LATEST' * @typedef Evaluation * @prop {string} processId * @prop {string} moduleId + * @prop {string} assignmentId + * @prop {string} hashChain * @prop {string} epoch * @prop {string} nonce * @prop {string} timestamp @@ -566,6 +568,14 @@ export function writeCheckpointRecordWith ({ db }) { evaluation: z.object({ processId: z.string().min(1), moduleId: z.string().min(1), + /** + * nullish for backwards compat + */ + assignmentId: z.string().nullish(), + /** + * nullish for backwards compat + */ + hashChain: z.string().nullish(), timestamp: z.coerce.number(), epoch: z.coerce.number().nullish(), nonce: z.coerce.number().nullish(), @@ -639,11 +649,11 @@ export function findFileCheckpointBeforeWith ({ db }) { .map(createQuery) .chain(fromPromise((query) => db.query(query))) ) - .map((results) => { - return results.map((result) => ({ - ...result, - file: pathOr('', ['file'])(result), - evaluation: JSON.parse(pathOr({}, ['evaluation'])(result)) + .map((fileCheckpoints) => { + return fileCheckpoints.map((fileCheckpoint) => ({ + ...fileCheckpoint, + file: pathOr('', ['file'])(fileCheckpoint), + evaluation: JSON.parse(pathOr({}, ['evaluation'])(fileCheckpoint)) })) }) .map((parsed) => { @@ -669,6 +679,8 @@ export function writeFileCheckpointRecordWith ({ db }) { evaluation: z.object({ processId: z.string().min(1), moduleId: z.string().min(1), + assignmentId: z.string().nullish(), + hashChain: z.string().nullish(), timestamp: z.coerce.number(), epoch: z.coerce.number().nullish(), nonce: z.coerce.number().nullish(), @@ -850,6 +862,8 @@ export function findLatestProcessMemoryWith ({ return { id: node.id, timestamp: parseInt(tags.Timestamp), + assignmentId: tags.Assignment, + hashChain: tags['Hash-Chain'], /** * Due to a previous bug, these tags may sometimes * be invalid values, so we've added a utility @@ -972,6 +986,8 @@ export function findLatestProcessMemoryWith ({ fromFile: file, Memory, moduleId: cached.evaluation.moduleId, + assignmentId: cached.evaluation.assignmentId, + hashChain: cached.evaluation.hashChain, timestamp: cached.evaluation.timestamp, blockHeight: cached.evaluation.blockHeight, epoch: cached.evaluation.epoch, @@ -1030,6 +1046,8 @@ export function findLatestProcessMemoryWith ({ src: 'file', Memory, moduleId: checkpoint.moduleId, + assignmentId: checkpoint.assignmentId, + hashChain: checkpoint.hashChain, timestamp: checkpoint.timestamp, blockHeight: checkpoint.blockHeight, epoch: checkpoint.epoch, @@ -1087,6 +1105,8 @@ export function findLatestProcessMemoryWith ({ src: 'record', Memory, moduleId: checkpoint.evaluation.moduleId, + assignmentId: checkpoint.evaluation.assignmentId, + hashChain: checkpoint.evaluation.hashChain, timestamp: checkpoint.evaluation.timestamp, epoch: checkpoint.evaluation.epoch, nonce: checkpoint.evaluation.nonce, @@ -1155,6 +1175,8 @@ export function findLatestProcessMemoryWith ({ src: 'arweave', Memory, moduleId: latestCheckpoint.module, + assignmentId: latestCheckpoint.assignmentId, + hashChain: latestCheckpoint.hashChain, timestamp: latestCheckpoint.timestamp, epoch: latestCheckpoint.epoch, nonce: latestCheckpoint.nonce, @@ -1186,6 +1208,8 @@ export function findLatestProcessMemoryWith ({ src: 'cold_start', Memory: null, moduleId: undefined, + assignmentId: undefined, + hashChain: undefined, timestamp: undefined, epoch: undefined, nonce: undefined, @@ -1289,7 +1313,7 @@ export function findLatestProcessMemoryWith ({ } export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD }) { - return async ({ processId, moduleId, messageId, timestamp, epoch, nonce, ordinate, cron, blockHeight, Memory, evalCount, gasUsed }) => { + return async ({ processId, moduleId, assignmentId, messageId, hashChain, timestamp, epoch, nonce, ordinate, cron, blockHeight, Memory, gasUsed }) => { const cached = cache.get(processId) /** @@ -1338,6 +1362,8 @@ export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EA const evaluation = { processId, moduleId, + assignmentId, + hashChain, timestamp, epoch, nonce, @@ -1460,7 +1486,7 @@ export function saveCheckpointWith ({ ` function createCheckpointDataItem (args) { - const { moduleId, processId, epoch, nonce, ordinate, timestamp, blockHeight, cron, encoding, Memory } = args + const { moduleId, processId, assignmentId, hashChain, epoch, nonce, ordinate, timestamp, blockHeight, cron, encoding, Memory } = args return of(Memory) .chain((buffer) => @@ -1496,6 +1522,25 @@ export function saveCheckpointWith ({ ] } + /** + * A Cron message does not have an assignment + * (which is to say this is the assignment of the most recent + * Scheduled message) + * + * This is needed in order to perform hash chain verification + * on hot starts from a checkpoint + */ + if (assignmentId) dataItem.tags.push({ name: 'Assignment', value: assignmentId.trim() }) + /** + * A Cron message does not have a hashChain + * (which is to say this is the hashChain of the most recent + * Scheduled message) + * + * This is needed in order to perform hash chain verification + * on hot starts from a checkpoint + */ + if (hashChain) dataItem.tags.push({ name: 'Hash-Chain', value: hashChain.trim() }) + /** * Cron messages do not have an Epoch, * so only add the tag if the value is present @@ -1542,7 +1587,7 @@ export function saveCheckpointWith ({ */ if (DISABLE_PROCESS_FILE_CHECKPOINT_CREATION && DISABLE_PROCESS_CHECKPOINT_CREATION) return Resolved(args) - const { moduleId, processId, epoch, nonce, timestamp, blockHeight, cron, encoding, Memory, File } = args + const { moduleId, processId, assignmentId, hashChain, epoch, nonce, timestamp, blockHeight, cron, encoding, Memory, File } = args let file return of() @@ -1569,7 +1614,7 @@ export function saveCheckpointWith ({ logger( 'Process cache entry error for evaluation "%j". Entry contains neither Memory or File. Skipping saving of checkpoint...', - { moduleId, processId, epoch, nonce, timestamp, blockHeight, cron, encoding } + { moduleId, processId, assignmentId, hashChain, epoch, nonce, timestamp, blockHeight, cron, encoding } ) return Rejected('either File or Memory required') }) @@ -1577,7 +1622,7 @@ export function saveCheckpointWith ({ } function createFileCheckpoint (args) { - const { Memory, encoding, processId, moduleId, timestamp, epoch, ordinate, nonce, blockHeight, cron } = args + const { Memory, encoding, processId, moduleId, assignmentId, hashChain, timestamp, epoch, ordinate, nonce, blockHeight, cron } = args if (DISABLE_PROCESS_FILE_CHECKPOINT_CREATION) return Rejected('file checkpoint creation is disabled') /** @@ -1589,6 +1634,8 @@ export function saveCheckpointWith ({ const evaluation = { processId, moduleId, + assignmentId, + hashChain, timestamp, nonce, ordinate, @@ -1614,7 +1661,7 @@ export function saveCheckpointWith ({ } function createArweaveCheckpoint (args) { - const { encoding, processId, moduleId, timestamp, epoch, ordinate, nonce, blockHeight, cron } = args + const { encoding, processId, moduleId, assignmentId, hashChain, timestamp, epoch, ordinate, nonce, blockHeight, cron } = args if (DISABLE_PROCESS_CHECKPOINT_CREATION) return Rejected('arweave checkpoint creation is disabled') /** @@ -1624,7 +1671,7 @@ export function saveCheckpointWith ({ logger( 'Checking Gateway for existing Checkpoint for evaluation: %j', - { moduleId, processId, epoch, nonce, timestamp, blockHeight, cron, encoding } + { moduleId, processId, assignmentId, hashChain, epoch, nonce, timestamp, blockHeight, cron, encoding } ) return address() @@ -1668,7 +1715,7 @@ export function saveCheckpointWith ({ logger( 'Creating Checkpoint for evaluation: %j', - { moduleId, processId, epoch, nonce, timestamp, blockHeight, cron, encoding: 'gzip' } + { moduleId, processId, assignmentId, hashChain, epoch, nonce, timestamp, blockHeight, cron, encoding: 'gzip' } ) /** @@ -1717,6 +1764,8 @@ export function saveCheckpointWith ({ evaluation: { processId, moduleId, + assignmentId, + hashChain, timestamp, epoch, nonce, @@ -1769,8 +1818,36 @@ export function saveCheckpointWith ({ )) } - return async ({ Memory, File, encoding, processId, moduleId, timestamp, epoch, ordinate, nonce, blockHeight, cron }) => - maybeHydrateMemory({ Memory, File, encoding, processId, moduleId, timestamp, epoch, ordinate, nonce, blockHeight, cron }) + return async ({ + Memory, + File, + encoding, + processId, + moduleId, + assignmentId, + hashChain, + timestamp, + epoch, + ordinate, + nonce, + blockHeight, + cron + }) => + maybeHydrateMemory({ + Memory, + File, + encoding, + processId, + assignmentId, + hashChain, + moduleId, + timestamp, + epoch, + ordinate, + nonce, + blockHeight, + cron + }) .chain(createCheckpoints) .toPromise() } diff --git a/servers/cu/src/effects/ao-process.test.js b/servers/cu/src/effects/ao-process.test.js index 278299f29..16a4414b4 100644 --- a/servers/cu/src/effects/ao-process.test.js +++ b/servers/cu/src/effects/ao-process.test.js @@ -195,6 +195,8 @@ describe('ao-process', () => { const evaluation = { processId: 'process-123', moduleId: 'module-123', + assignmentId: 'assignment-123', + hashChain: 'hashchain-123', epoch: 0, nonce: 11, timestamp: now, @@ -218,7 +220,10 @@ describe('ao-process', () => { cron: undefined } }) - assert.deepStrictEqual(res, { file: 'state-process-123.dat', ...evaluation }) + assert.deepStrictEqual(res, { + file: 'state-process-123.dat', + ...evaluation + }) }) test('should return the latest checkpoint from a file BEFORE the before', async () => { @@ -296,6 +301,8 @@ describe('ao-process', () => { const cachedEval = { processId: PROCESS, moduleId: 'module-123', + assignmentId: 'assignment-123', + hashChain: 'hashchain-123', epoch: 0, nonce: 11, timestamp: tenSecondsAgo, @@ -400,6 +407,8 @@ describe('ao-process', () => { fromFile: undefined, Memory, moduleId: 'module-123', + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -417,6 +426,8 @@ describe('ao-process', () => { fromFile: undefined, Memory, moduleId: 'module-123', + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -449,6 +460,8 @@ describe('ao-process', () => { fromFile: 'state-process123.dat', Memory, moduleId: 'module-123', + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -509,6 +522,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'file', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -525,6 +540,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'file', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -541,6 +558,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'file', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -611,6 +630,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'record', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -627,6 +648,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'record', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -652,6 +675,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'record', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -699,6 +724,8 @@ describe('ao-process', () => { tags: [ { name: 'Module', value: `${cachedEval.moduleId}` }, { name: 'Timestamp', value: `${cachedEval.timestamp}` }, + { name: 'Assignment', value: `${cachedEval.assignmentId}` }, + { name: 'Hash-Chain', value: `${cachedEval.hashChain}` }, { name: 'Epoch', value: `${cachedEval.epoch}` }, { name: 'Nonce', value: `${cachedEval.nonce}` }, { name: 'Block-Height', value: `${cachedEval.blockHeight}` }, @@ -744,6 +771,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'arweave', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -760,6 +789,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'arweave', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -783,6 +814,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'arweave', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -809,6 +842,8 @@ describe('ao-process', () => { assert.deepStrictEqual(res, { src: 'arweave', moduleId: cachedEval.moduleId, + assignmentId: cachedEval.assignmentId, + hashChain: cachedEval.hashChain, epoch: cachedEval.epoch, nonce: cachedEval.nonce, timestamp: cachedEval.timestamp, @@ -861,6 +896,8 @@ describe('ao-process', () => { id: 'tx-not-encoded', tags: [ { name: 'Module', value: `${cachedEval.moduleId}` }, + { name: 'Assignment', value: `${cachedEval.assignmentId}` }, + { name: 'Hash-Chain', value: `${cachedEval.hashChain}` }, { name: 'Timestamp', value: `${cachedEval.timestamp}` }, { name: 'Epoch', value: `${cachedEval.epoch}` }, { name: 'Nonce', value: `${cachedEval.nonce}` }, @@ -897,6 +934,8 @@ describe('ao-process', () => { ...edges[0].node, tags: [ { name: 'Module', value: `${cachedEval.moduleId}` }, + { name: 'Assignment', value: `${cachedEval.assignmentId}` }, + { name: 'Hash-Chain', value: `${cachedEval.hashChain}` }, { name: 'Timestamp', value: `${cachedEval.timestamp + 1000}` }, { name: 'Epoch', value: `${cachedEval.epoch}` }, { name: 'Nonce', value: '12' }, @@ -931,6 +970,9 @@ describe('ao-process', () => { id: 'ignored', tags: [ { name: 'Module', value: `${cachedEval.moduleId}` }, + // purposefully omitted to ensure robustness + // { name: 'Assignment', value: `${cachedEval.assignmentId}` }, + // { name: 'Hash-Chain', value: `${cachedEval.hashChain}` }, { name: 'Timestamp', value: `${cachedEval.timestamp + 1000}` }, { name: 'Epoch', value: `${cachedEval.epoch}` }, { name: 'Nonce', value: '12' }, @@ -968,6 +1010,8 @@ describe('ao-process', () => { ...edges[0].node, tags: [ { name: 'Module', value: `${cachedEval.moduleId}` }, + { name: 'Assignment', value: `${cachedEval.assignmentId}` }, + { name: 'Hash-Chain', value: `${cachedEval.hashChain}` }, { name: 'Timestamp', value: `${cachedEval.timestamp + 1000}` }, { name: 'Epoch', value: `${cachedEval.epoch}` }, { name: 'Nonce', value: '12' }, @@ -1056,6 +1100,8 @@ describe('ao-process', () => { src: 'cold_start', Memory: null, moduleId: undefined, + assignmentId: undefined, + hashChain: undefined, timestamp: undefined, epoch: undefined, nonce: undefined, @@ -1233,6 +1279,8 @@ describe('ao-process', () => { id: 'tx-123', tags: [ { name: 'Module', value: `${laterCachedEval.moduleId}` }, + { name: 'Assignment', value: `${cachedEval.assignmentId}` }, + { name: 'Hash-Chain', value: `${cachedEval.hashChain}` }, { name: 'Timestamp', value: `${laterCachedEval.timestamp}` }, { name: 'Epoch', value: `${laterCachedEval.epoch}` }, { name: 'Nonce', value: `${laterCachedEval.nonce}` }, @@ -1278,6 +1326,8 @@ describe('ao-process', () => { const cachedEval = { processId: PROCESS, moduleId: 'module-123', + assignmentId: 'assignment-123', + hashChain: 'hashchain-123', epoch: 0, nonce: 11, timestamp: tenSecondsAgo, @@ -1289,6 +1339,8 @@ describe('ao-process', () => { const cachedEvalFuture = { processId: PROCESS, moduleId: 'module-1234', + assignmentId: 'assignment-123', + hashChain: 'hashchain-123', epoch: 0, nonce: 11, timestamp: now + 1000, @@ -1310,7 +1362,6 @@ describe('ao-process', () => { timestamp: now - 1000, ordinate: '13', cron: undefined, - evalCount: 5, gasUsed: BigInt(50) } const targetWithGasUsed = { @@ -1319,7 +1370,6 @@ describe('ao-process', () => { timestamp: now - 1000, ordinate: '13', cron: undefined, - evalCount: 15, gasUsed: BigInt(50) } diff --git a/servers/cu/src/effects/ao-su.js b/servers/cu/src/effects/ao-su.js index 73860dbe2..eac49e236 100644 --- a/servers/cu/src/effects/ao-su.js +++ b/servers/cu/src/effects/ao-su.js @@ -60,37 +60,45 @@ export const mapNode = pipe( // fromAssignment pipe( path(['assignment']), - (assignment) => parseTags(assignment.tags), - applySpec({ - /** - * There could be multiple Exclude tags, - * but parseTags will accumulate them into an array, - * - * which is what we want - */ - Exclude: path(['Exclude']), - /** - * Data from the assignment, to be placed - * on the message - */ - message: applySpec({ - Message: path(['Message']), - Target: path(['Process']), - Epoch: pipe(path(['Epoch']), parseInt), - Nonce: pipe(path(['Nonce']), parseInt), - Timestamp: pipe(path(['Timestamp']), parseInt), - 'Block-Height': pipe( + (assignment) => { + const tags = parseTags(assignment.tags) + + return applySpec({ /** - * Returns a left padded integer like '000001331218' + * The assignment id is needed in order to compute + * and verify the hash chain + */ + AssignmentId: always(assignment.id), + /** + * There could be multiple Exclude tags, + * but parseTags will accumulate them into an array, * - * So use parseInt to convert it into a number + * which is what we want */ - path(['Block-Height']), - parseInt - ), - 'Hash-Chain': path(['Hash-Chain']) - }) - }) + Exclude: path(['Exclude']), + /** + * Data from the assignment, to be placed + * on the message + */ + message: applySpec({ + Message: path(['Message']), + Target: path(['Process']), + Epoch: pipe(path(['Epoch']), parseInt), + Nonce: pipe(path(['Nonce']), parseInt), + Timestamp: pipe(path(['Timestamp']), parseInt), + 'Block-Height': pipe( + /** + * Returns a left padded integer like '000001331218' + * + * So use parseInt to convert it into a number + */ + path(['Block-Height']), + parseInt + ), + 'Hash-Chain': path(['Hash-Chain']) + }) + })(tags) + } ), // fromMessage pipe( @@ -118,10 +126,6 @@ export const mapNode = pipe( ), // both applySpec({ - prevAssignment: applySpec({ - hashChain: pathOr(null, ['previous_assignment', 'hash_chain']), - id: pathOr(null, ['previous_assignment', 'id']) - }), isAssignment: pipe( path(['message', 'id']), isNil @@ -137,7 +141,7 @@ export const mapNode = pipe( name: `${fBoth.isAssignment ? 'Assigned' : 'Scheduled'} Message ${fBoth.isAssignment ? fAssignment.message.Message : fMessage.Id} ${fAssignment.message.Timestamp}:${fAssignment.message.Nonce}`, exclude: fAssignment.Exclude, isAssignment: fBoth.isAssignment, - prevAssignment: fBoth.prevAssignment, + assignmentId: fAssignment.AssignmentId, message: mergeAll([ fMessage, fAssignment.message, @@ -297,12 +301,12 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { } )(edge) - if (!isHashChainValid(scheduled)) { - logger('HashChain invalid on message "%s" scheduled on process "%s"', scheduled.message.Id, processId) - const err = new Error(`HashChain invalid on message ${scheduled.message.Id}`) - err.status = 422 - throw err - } + // if (!isHashChainValid(scheduled)) { + // logger('HashChain invalid on message "%s" scheduled on process "%s"', scheduled.message.Id, processId) + // const err = new Error(`HashChain invalid on message ${scheduled.message.Id}`) + // err.status = 422 + // throw err + // } yield scheduled } diff --git a/servers/cu/src/effects/ao-su.test.js b/servers/cu/src/effects/ao-su.test.js index 9265a74f7..06e55ac31 100644 --- a/servers/cu/src/effects/ao-su.test.js +++ b/servers/cu/src/effects/ao-su.test.js @@ -15,13 +15,14 @@ describe('ao-su', () => { describe('mapNode', () => { const now = new Date().getTime() const messageId = 'message-123' + const assignmentId = 'assignment-123' const assignedMessageId = 'assigned-123' const expected = { cron: undefined, ordinate: '23', name: `Scheduled Message ${messageId} ${now}:23`, exclude: undefined, - prevAssignment: { id: null, hashChain: null }, + assignmentId, message: { Id: messageId, Signature: 'sig-123', @@ -59,6 +60,7 @@ describe('ao-su', () => { data: 'data-123' }, assignment: { + id: assignmentId, owner: { address: 'su-123', key: 'su-123' @@ -80,6 +82,7 @@ describe('ao-su', () => { withoutAoGlobal.parse(expected) ) assert.equal(res.isAssignment, false) + assert.equal(res.assignmentId, assignmentId) }) describe('should map an assignment tx', () => { @@ -134,55 +137,6 @@ describe('ao-su', () => { }) }) }) - - describe('should map the previous assignment data', () => { - const arg = { - message: null, - assignment: { - owner: { - address: 'su-123', - key: 'su-123' - }, - tags: [ - { name: 'Epoch', value: '0' }, - { name: 'Nonce', value: '23' }, - { name: 'Process', value: 'process-123' }, - { name: 'Block-Height', value: '000000000123' }, - { name: 'Timestamp', value: `${now}` }, - { name: 'Hash-Chain', value: 'hash-123' }, - { name: 'Message', value: assignedMessageId } - ], - data: 'su-data-123' - } - } - - test('should map prevAssignment fields', () => { - const res = mapNode({ - ...arg, - previous_assignment: { - id: 'prev-assignment-id', - hash_chain: 'prev-hashchain' - } - }) - - assert.deepStrictEqual( - res.prevAssignment, - { id: 'prev-assignment-id', hashChain: 'prev-hashchain' } - ) - }) - - test('should set prevAssignment fields to null', () => { - const res = mapNode({ - ...arg, - no_previous_assigment: true - }) - - assert.deepStrictEqual( - res.prevAssignment, - { id: null, hashChain: null } - ) - }) - }) }) describe('isHashChainValid', () => { From 3c6a3bd8a2caabd1a1faa3a95df99ff2ebc3249b Mon Sep 17 00:00:00 2001 From: Tyler Hall Date: Fri, 17 Jan 2025 19:54:16 +0000 Subject: [PATCH 3/9] chore(cu): bump deps --- servers/cu/package-lock.json | 100 ++++++++++++++++++----------------- servers/cu/package.json | 10 ++-- 2 files changed, 57 insertions(+), 53 deletions(-) diff --git a/servers/cu/package-lock.json b/servers/cu/package-lock.json index 14dedf508..8f90460ab 100644 --- a/servers/cu/package-lock.json +++ b/servers/cu/package-lock.json @@ -8,19 +8,19 @@ "name": "@permaweb/ao-cu", "version": "1.0.0", "dependencies": { - "@fastify/middie": "^9.0.2", + "@fastify/middie": "^9.0.3", "@permaweb/ao-loader": "^0.0.44", "@permaweb/ao-scheduler-utils": "^0.0.25", "@permaweb/weavedrive": "^0.0.18", "arweave": "^1.15.5", "async-lock": "^1.4.1", - "better-sqlite3": "^11.7.0", + "better-sqlite3": "^11.8.0", "bytes": "^3.1.2", "cors": "^2.8.5", "dataloader": "^2.2.3", "dotenv": "^16.4.7", - "fast-glob": "^3.3.2", - "fastify": "^5.2.0", + "fast-glob": "^3.3.3", + "fastify": "^5.2.1", "helmet": "^8.0.0", "hyper-async": "^1.1.2", "keccak": "^3.0.4", @@ -33,7 +33,7 @@ "pg": "^8.13.1", "prom-client": "^15.1.3", "ramda": "^0.30.1", - "undici": "^7.2.0", + "undici": "^7.2.3", "warp-arbundles": "^1.0.4", "winston": "^3.17.0", "workerpool": "^9.2.0", @@ -101,6 +101,12 @@ "fast-json-stringify": "^6.0.0" } }, + "node_modules/@fastify/forwarded": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/@fastify/forwarded/-/forwarded-3.0.0.tgz", + "integrity": "sha512-kJExsp4JCms7ipzg7SJ3y8DwmePaELHxKYtg+tZow+k0znUTf3cb+npgyqm8+ATZOdmfgfydIebPDWM172wfyA==", + "license": "MIT" + }, "node_modules/@fastify/merge-json-schemas": { "version": "0.1.1", "resolved": "https://registry.npmjs.org/@fastify/merge-json-schemas/-/merge-json-schemas-0.1.1.tgz", @@ -111,9 +117,19 @@ } }, "node_modules/@fastify/middie": { - "version": "9.0.2", - "resolved": "https://registry.npmjs.org/@fastify/middie/-/middie-9.0.2.tgz", - "integrity": "sha512-MHvAhUBxrefkpx4A8HtjOjAdlaCtY8j19PC6ORfm7KPMb/dklDeqBqR4xPRTtcBRPZUYq2jAJJWQCB4eO+dtKw==", + "version": "9.0.3", + "resolved": "https://registry.npmjs.org/@fastify/middie/-/middie-9.0.3.tgz", + "integrity": "sha512-7OYovKXp9UKYeVMcjcFLMcSpoMkmcZmfnG+eAvtdiatN35W7c+r9y1dRfpA+pfFVNuHGGqI3W+vDTmjvcfLcMA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/fastify" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fastify" + } + ], "license": "MIT", "dependencies": { "@fastify/error": "^4.0.0", @@ -122,6 +138,16 @@ "reusify": "^1.0.4" } }, + "node_modules/@fastify/proxy-addr": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/@fastify/proxy-addr/-/proxy-addr-5.0.0.tgz", + "integrity": "sha512-37qVVA1qZ5sgH7KpHkkC4z9SK6StIsIcOmpjvMPXNb3vx2GQxhZocogVYbr2PbbeLCQxYIPDok307xEvRZOzGA==", + "license": "MIT", + "dependencies": { + "@fastify/forwarded": "^3.0.0", + "ipaddr.js": "^2.1.0" + } + }, "node_modules/@nodelib/fs.scandir": { "version": "2.1.5", "resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz", @@ -396,9 +422,9 @@ } }, "node_modules/better-sqlite3": { - "version": "11.7.0", - "resolved": "https://registry.npmjs.org/better-sqlite3/-/better-sqlite3-11.7.0.tgz", - "integrity": "sha512-mXpa5jnIKKHeoGzBrUJrc65cXFKcILGZpU3FXR0pradUEm9MA7UZz02qfEejaMcm9iXrSOCenwwYMJ/tZ1y5Ig==", + "version": "11.8.0", + "resolved": "https://registry.npmjs.org/better-sqlite3/-/better-sqlite3-11.8.0.tgz", + "integrity": "sha512-aKv9s2dir7bsEX5RIjL9HHWB9uQ+f6Vch5B4qmeAOop4Y9OYHX+PNKLr+mpv6+d8L/ZYh4l7H8zPuVMbWkVMLw==", "hasInstallScript": true, "license": "MIT", "dependencies": { @@ -742,16 +768,16 @@ "license": "MIT" }, "node_modules/fast-glob": { - "version": "3.3.2", - "resolved": "https://registry.npmjs.org/fast-glob/-/fast-glob-3.3.2.tgz", - "integrity": "sha512-oX2ruAFQwf/Orj8m737Y5adxDQO0LAB7/S5MnxCdTNDd4p6BsyIVsv9JQsATbTSq8KHRpLwIHbVlUNatxd+1Ow==", + "version": "3.3.3", + "resolved": "https://registry.npmjs.org/fast-glob/-/fast-glob-3.3.3.tgz", + "integrity": "sha512-7MptL8U0cqcFdzIzwOTHoilX9x5BrNqye7Z/LuC7kCMRio1EMSyqRK3BEAUD7sXRq4iT4AzTVuZdhgQ2TCvYLg==", "license": "MIT", "dependencies": { "@nodelib/fs.stat": "^2.0.2", "@nodelib/fs.walk": "^1.2.3", "glob-parent": "^5.1.2", "merge2": "^1.3.0", - "micromatch": "^4.0.4" + "micromatch": "^4.0.8" }, "engines": { "node": ">=8.6.0" @@ -803,9 +829,9 @@ "license": "BSD-3-Clause" }, "node_modules/fastify": { - "version": "5.2.0", - "resolved": "https://registry.npmjs.org/fastify/-/fastify-5.2.0.tgz", - "integrity": "sha512-3s+Qt5S14Eq5dCpnE0FxTp3z4xKChI83ZnMv+k0FwX+VUoZrgCFoLAxpfdi/vT4y6Mk+g7aAMt9pgXDoZmkefQ==", + "version": "5.2.1", + "resolved": "https://registry.npmjs.org/fastify/-/fastify-5.2.1.tgz", + "integrity": "sha512-rslrNBF67eg8/Gyn7P2URV8/6pz8kSAscFL4EThZJ8JBMaXacVdVE4hmUcnPNKERl5o/xTiBSLfdowBRhVF1WA==", "funding": [ { "type": "github", @@ -821,6 +847,7 @@ "@fastify/ajv-compiler": "^4.0.0", "@fastify/error": "^4.0.0", "@fastify/fast-json-stringify-compiler": "^5.0.0", + "@fastify/proxy-addr": "^5.0.0", "abstract-logging": "^2.0.1", "avvio": "^9.0.0", "fast-json-stringify": "^6.0.0", @@ -828,7 +855,6 @@ "light-my-request": "^6.0.0", "pino": "^9.0.0", "process-warning": "^4.0.0", - "proxy-addr": "^2.0.7", "rfdc": "^1.3.1", "secure-json-parse": "^3.0.1", "semver": "^7.6.0", @@ -894,15 +920,6 @@ "integrity": "sha512-GRnmB5gPyJpAhTQdSZTSp9uaPSvl09KoYcMQtsB9rQoOmzs9dH6ffeccH+Z+cv6P68Hu5bC6JjRh4Ah/mHSNRw==", "license": "MIT" }, - "node_modules/forwarded": { - "version": "0.2.0", - "resolved": "https://registry.npmjs.org/forwarded/-/forwarded-0.2.0.tgz", - "integrity": "sha512-buRG0fpBtRHSTCOASe6hD258tEubFoRLb4ZNA6NxMVHNw2gOcwHo9wyablzMzOA5z9xA9L1KNjk/Nt6MT9aYow==", - "license": "MIT", - "engines": { - "node": ">= 0.6" - } - }, "node_modules/fs-constants": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/fs-constants/-/fs-constants-1.0.0.tgz", @@ -1007,12 +1024,12 @@ "license": "ISC" }, "node_modules/ipaddr.js": { - "version": "1.9.1", - "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz", - "integrity": "sha512-0KI/607xoxSToH7GjN1FfSbLoU0+btTicjsQSWQlh/hZykN8KpmMf7uYwPW3R+akZ6R/w18ZlXSHBYXiYUPO3g==", + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-2.2.0.tgz", + "integrity": "sha512-Ag3wB2o37wslZS19hZqorUnrnzSkpOVy+IiiDEiTqNubEYpYuHWIf6K4psgN2ZWKExS4xhVCrRVfb/wfW8fWJA==", "license": "MIT", "engines": { - "node": ">= 0.10" + "node": ">= 10" } }, "node_modules/is-arrayish": { @@ -1638,19 +1655,6 @@ "node": "^16 || ^18 || >=20" } }, - "node_modules/proxy-addr": { - "version": "2.0.7", - "resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz", - "integrity": "sha512-llQsMLSUDUPT44jdrU/O37qlnifitDP+ZwrmmZcoSKyLKvtZxpyV0n2/bD/N4tBAAZ/gJEdZU7KMraoK1+XYAg==", - "license": "MIT", - "dependencies": { - "forwarded": "0.2.0", - "ipaddr.js": "1.9.1" - }, - "engines": { - "node": ">= 0.10" - } - }, "node_modules/pstree.remy": { "version": "1.1.8", "resolved": "https://registry.npmjs.org/pstree.remy/-/pstree.remy-1.1.8.tgz", @@ -2117,9 +2121,9 @@ "license": "MIT" }, "node_modules/undici": { - "version": "7.2.0", - "resolved": "https://registry.npmjs.org/undici/-/undici-7.2.0.tgz", - "integrity": "sha512-klt+0S55GBViA9nsq48/NSCo4YX5mjydjypxD7UmHh/brMu8h/Mhd/F7qAeoH2NOO8SDTk6kjnTFc4WpzmfYpQ==", + "version": "7.2.3", + "resolved": "https://registry.npmjs.org/undici/-/undici-7.2.3.tgz", + "integrity": "sha512-2oSLHaDalSt2/O/wHA9M+/ZPAOcU2yrSP/cdBYJ+YxZskiPYDSqHbysLSlD7gq3JMqOoJI5O31RVU3BxX/MnAA==", "license": "MIT", "engines": { "node": ">=20.18.1" diff --git a/servers/cu/package.json b/servers/cu/package.json index 1fd01ce1e..3b27b5d90 100644 --- a/servers/cu/package.json +++ b/servers/cu/package.json @@ -11,19 +11,19 @@ "test": "node --experimental-wasm-memory64 --test" }, "dependencies": { - "@fastify/middie": "^9.0.2", + "@fastify/middie": "^9.0.3", "@permaweb/ao-loader": "^0.0.44", "@permaweb/ao-scheduler-utils": "^0.0.25", "@permaweb/weavedrive": "^0.0.18", "arweave": "^1.15.5", "async-lock": "^1.4.1", - "better-sqlite3": "^11.7.0", + "better-sqlite3": "^11.8.0", "bytes": "^3.1.2", "cors": "^2.8.5", "dataloader": "^2.2.3", "dotenv": "^16.4.7", - "fast-glob": "^3.3.2", - "fastify": "^5.2.0", + "fast-glob": "^3.3.3", + "fastify": "^5.2.1", "helmet": "^8.0.0", "hyper-async": "^1.1.2", "keccak": "^3.0.4", @@ -36,7 +36,7 @@ "pg": "^8.13.1", "prom-client": "^15.1.3", "ramda": "^0.30.1", - "undici": "^7.2.0", + "undici": "^7.2.3", "warp-arbundles": "^1.0.4", "winston": "^3.17.0", "workerpool": "^9.2.0", From 6ed66953a62b4766117e0ff618c6e5e1fa405d0c Mon Sep 17 00:00:00 2001 From: Tyler Hall Date: Fri, 17 Jan 2025 20:40:43 +0000 Subject: [PATCH 4/9] feat(cu): perform hash chain validation on scheduled messages #1112 --- servers/cu/src/domain/lib/loadMessages.js | 5 +- servers/cu/src/effects/ao-su.js | 60 +++++++++++++++-------- servers/cu/src/effects/ao-su.test.js | 35 ++++++------- 3 files changed, 57 insertions(+), 43 deletions(-) diff --git a/servers/cu/src/domain/lib/loadMessages.js b/servers/cu/src/domain/lib/loadMessages.js index 28f2cdf49..14da5c1b0 100644 --- a/servers/cu/src/domain/lib/loadMessages.js +++ b/servers/cu/src/domain/lib/loadMessages.js @@ -488,8 +488,9 @@ function loadScheduledMessagesWith ({ loadMessages, logger }) { moduleOwner: ctx.moduleOwner, moduleTags: ctx.moduleTags, from: ctx.from, // could be undefined - to: ctx.to // could be undefined - // TODO: pass ctx.mostRecentHashChain and ctx.mostRecentAssignmentId to initialize hash chain verification of scheduled messages + to: ctx.to, // could be undefined + assignmentId: ctx.mostRecentAssignmentId, + hashChain: ctx.mostRecentHashChain }) ) } diff --git a/servers/cu/src/effects/ao-su.js b/servers/cu/src/effects/ao-su.js index eac49e236..3ffb17299 100644 --- a/servers/cu/src/effects/ao-su.js +++ b/servers/cu/src/effects/ao-su.js @@ -20,7 +20,6 @@ const resToJson = (res) => res.json() const hashChain = (prevHashChain, prevAssignmentId) => { const hash = createHash('sha256') - /** * For the very first message, there is no previous id, * so it is not included in the hashed bytes, to produce the very first @@ -31,24 +30,21 @@ const hashChain = (prevHashChain, prevAssignmentId) => { * Always include the previous hash chain */ hash.update(base64UrlToBytes(prevHashChain)) - return hash.digest('base64url') } -export const isHashChainValid = (scheduled) => { - const { prevAssignment, message } = scheduled +export const isHashChainValid = (prev, scheduled) => { + const { assignmentId: prevAssignmentId, hashChain: prevHashChain } = prev + const { message } = scheduled const actual = message['Hash-Chain'] /** - * This will match in cases where the SU returns no prevAssignment - * which is to say the feature isn't live on the SU. - * - * AND will match validating the first assignment, which needs - * no validation, besides needing to check its hashChain is present + * Depending on the source of the hot-start, there may not be + * a prev assignmentId and hashChain */ - if (!prevAssignment?.id || !prevAssignment?.hashChain) return !!actual + if (!prevAssignmentId || !prevHashChain) return !!actual - const expected = hashChain(prevAssignment.hashChain, prevAssignment.id) + const expected = hashChain(prevHashChain, prevAssignmentId) return expected === actual } @@ -281,12 +277,14 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { } } - function mapAoMessage ({ processId, processOwner, processTags, moduleId, moduleOwner, moduleTags, logger }) { + function mapAoMessage ({ processId, assignmentId, hashChain, processOwner, processTags, moduleId, moduleOwner, moduleTags, logger }) { const AoGlobal = { Process: { Id: processId, Owner: processOwner, Tags: processTags }, Module: { Id: moduleId, Owner: moduleOwner, Tags: moduleTags } } + const prevAssignmentId = assignmentId + const prevHashChain = hashChain return async function * (edges) { for await (const edge of edges) { const scheduled = pipe( @@ -301,12 +299,12 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { } )(edge) - // if (!isHashChainValid(scheduled)) { - // logger('HashChain invalid on message "%s" scheduled on process "%s"', scheduled.message.Id, processId) - // const err = new Error(`HashChain invalid on message ${scheduled.message.Id}`) - // err.status = 422 - // throw err - // } + if (!isHashChainValid({ assignmentId: prevAssignmentId, hashChain: prevHashChain }, scheduled)) { + logger('HashChain invalid on message "%s" scheduled on process "%s"', scheduled.message.Id, processId) + const err = new Error(`HashChain invalid on message ${scheduled.message.Id}`) + err.status = 422 + throw err + } yield scheduled } @@ -315,13 +313,35 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { return (args) => of(args) - .map(({ suUrl, processId, owner: processOwner, tags: processTags, moduleId, moduleOwner, moduleTags, from, to }) => { + .map(({ + suUrl, + processId, + owner: processOwner, + tags: processTags, + moduleId, + moduleOwner, + moduleTags, + from, + to, + assignmentId, + hashChain + }) => { return composeStreams( /** * compose will convert the AsyncIterable into a readable Duplex */ fetchAllPages({ suUrl, processId, from, to })(), - Transform.from(mapAoMessage({ processId, processOwner, processTags, moduleId, moduleOwner, moduleTags, logger })) + Transform.from(mapAoMessage({ + processId, + assignmentId, + hashChain, + processOwner, + processTags, + moduleId, + moduleOwner, + moduleTags, + logger + })) ) }) .toPromise() diff --git a/servers/cu/src/effects/ao-su.test.js b/servers/cu/src/effects/ao-su.test.js index 06e55ac31..56e315d30 100644 --- a/servers/cu/src/effects/ao-su.test.js +++ b/servers/cu/src/effects/ao-su.test.js @@ -142,7 +142,7 @@ describe('ao-su', () => { describe('isHashChainValid', () => { const now = new Date().getTime() const messageId = 'message-123' - const arg = { + const scheduled = { cron: undefined, ordinate: '23', name: `Scheduled Message ${messageId} ${now}:23`, @@ -171,47 +171,40 @@ describe('ao-su', () => { } test('should return whether the hashChain exists if there is no previous assignment', () => { - // feature not rolled out on SU - assert(isHashChainValid(arg)) + // no prev info + assert(isHashChainValid({}, scheduled)) // first assignment ergo has no prev assignment assert(isHashChainValid({ - ...arg, - prevAssignment: { - hashChain: null, - id: 'foo' - } - })) + hashChain: null, + assignmentId: 'foo' + }, scheduled)) assert(isHashChainValid({ - ...arg, - prevAssignment: { - hashChain: 'foo', - id: null - } - })) + hashChain: 'foo', + assignmentId: null + }, scheduled)) }) test('should calculate and compare the hashChain based on the previous assignment', () => { const prevAssignmentId = Buffer.from('assignment-123', 'utf8').toString('base64url') const prevHashChain = Buffer.from('hashchain-123', 'utf8').toString('base64url') + const prev = { assignmentId: prevAssignmentId, hashChain: prevHashChain } + const expected = createHash('sha256') .update(Buffer.from(prevAssignmentId, 'base64url')) .update(Buffer.from(prevHashChain, 'base64url')) .digest('base64url') - const arg = { - // ... - prevAssignment: { id: prevAssignmentId, hashChain: prevHashChain }, + const valid = { message: { // .... 'Hash-Chain': expected } } - assert(isHashChainValid(arg)) + assert(isHashChainValid(prev, valid)) const invalid = { - ...arg, message: { 'Hash-Chain': createHash('sha256') .update(Buffer.from('something else', 'base64url')) @@ -220,7 +213,7 @@ describe('ao-su', () => { } } - assert(!isHashChainValid(invalid)) + assert(!isHashChainValid(prev, invalid)) }) }) From d951aebabb39da84d34ecb8bd33943d6d84d6f5f Mon Sep 17 00:00:00 2001 From: Tyler Hall Date: Fri, 17 Jan 2025 23:15:11 +0000 Subject: [PATCH 5/9] chore(cu): enable hash chain validation #1112 --- servers/cu/src/domain/dal.js | 1 + servers/cu/src/domain/lib/loadMessages.js | 1 + servers/cu/src/effects/ao-su.js | 32 +++++++++++++++++------ 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/servers/cu/src/domain/dal.js b/servers/cu/src/domain/dal.js index eed3272f1..d453aea7e 100644 --- a/servers/cu/src/domain/dal.js +++ b/servers/cu/src/domain/dal.js @@ -168,6 +168,7 @@ export const loadMessagesSchema = z.function() z.object({ suUrl: z.string().url(), processId: z.string(), + block: blockSchema, owner: z.string(), tags: z.array(rawTagSchema), moduleId: z.string(), diff --git a/servers/cu/src/domain/lib/loadMessages.js b/servers/cu/src/domain/lib/loadMessages.js index 14da5c1b0..99f5edd6d 100644 --- a/servers/cu/src/domain/lib/loadMessages.js +++ b/servers/cu/src/domain/lib/loadMessages.js @@ -482,6 +482,7 @@ function loadScheduledMessagesWith ({ loadMessages, logger }) { loadMessages({ suUrl: ctx.suUrl, processId: ctx.id, + block: ctx.block, owner: ctx.owner, tags: ctx.tags, moduleId: ctx.moduleId, diff --git a/servers/cu/src/effects/ao-su.js b/servers/cu/src/effects/ao-su.js index 3ffb17299..ca0729bab 100644 --- a/servers/cu/src/effects/ao-su.js +++ b/servers/cu/src/effects/ao-su.js @@ -277,14 +277,23 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { } } - function mapAoMessage ({ processId, assignmentId, hashChain, processOwner, processTags, moduleId, moduleOwner, moduleTags, logger }) { + function mapAoMessage ({ processId, processBlock, assignmentId, hashChain, processOwner, processTags, moduleId, moduleOwner, moduleTags, logger }) { const AoGlobal = { Process: { Id: processId, Owner: processOwner, Tags: processTags }, Module: { Id: moduleId, Owner: moduleOwner, Tags: moduleTags } } + /** + * Only perform hash chain validation on processes + * spawned after arweave day, since old hash chains are invalid + * anyway. + */ + const isHashChainValidationEnabled = processBlock.height >= 1440000 + if (!isHashChainValidationEnabled) { + logger('HashChain validation disabled for old process "%s" at block [%j]', [processId, processBlock]) + } - const prevAssignmentId = assignmentId - const prevHashChain = hashChain + let prevAssignmentId = assignmentId + let prevHashChain = hashChain return async function * (edges) { for await (const edge of edges) { const scheduled = pipe( @@ -299,13 +308,18 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { } )(edge) - if (!isHashChainValid({ assignmentId: prevAssignmentId, hashChain: prevHashChain }, scheduled)) { - logger('HashChain invalid on message "%s" scheduled on process "%s"', scheduled.message.Id, processId) - const err = new Error(`HashChain invalid on message ${scheduled.message.Id}`) - err.status = 422 - throw err + if (isHashChainValidationEnabled) { + if (!isHashChainValid({ assignmentId: prevAssignmentId, hashChain: prevHashChain }, scheduled)) { + logger('HashChain invalid on message "%s" scheduled on process "%s"', scheduled.message.Id, processId) + const err = new Error(`HashChain invalid on message ${scheduled.message.Id}`) + err.status = 422 + throw err + } } + prevAssignmentId = scheduled.assignmentId + prevHashChain = scheduled.message['Hash-Chain'] + yield scheduled } } @@ -316,6 +330,7 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { .map(({ suUrl, processId, + block: processBlock, owner: processOwner, tags: processTags, moduleId, @@ -333,6 +348,7 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { fetchAllPages({ suUrl, processId, from, to })(), Transform.from(mapAoMessage({ processId, + processBlock, assignmentId, hashChain, processOwner, From 7fb2e9904fc18170ca2d7232371fa6b7ee226731 Mon Sep 17 00:00:00 2001 From: Tyler Hall Date: Sat, 18 Jan 2025 20:28:00 +0000 Subject: [PATCH 6/9] perf(cu): calculate hash chains on a thread pool to prevent blocking main thread #1112 --- servers/cu/src/bootstrap.js | 15 +++++++++- servers/cu/src/effects/ao-su.js | 29 ++++--------------- servers/cu/src/effects/ao-su.test.js | 18 +++++++----- .../cu/src/effects/worker/hashChain/index.js | 8 +++++ .../cu/src/effects/worker/hashChain/main.js | 19 ++++++++++++ 5 files changed, 57 insertions(+), 32 deletions(-) create mode 100644 servers/cu/src/effects/worker/hashChain/index.js create mode 100644 servers/cu/src/effects/worker/hashChain/main.js diff --git a/servers/cu/src/bootstrap.js b/servers/cu/src/bootstrap.js index d3e0aa633..539b1ab2b 100644 --- a/servers/cu/src/bootstrap.js +++ b/servers/cu/src/bootstrap.js @@ -135,6 +135,14 @@ export const createApis = async (ctx) => { Math.ceil(ctx.WASM_EVALUATION_MAX_WORKERS * (ctx.WASM_EVALUATION_PRIMARY_WORKERS_PERCENTAGE / 100)) ) + /** + * node's crypto module is synchronous, which blocks the main thread. + * Since hash chain valiation is done for _every single scheduled message_, we offload the + * work to a worker thread, so at least the main thread isn't blocked. + */ + const hashChainWorkerPath = join(__dirname, 'effects', 'worker', 'hashChain', 'index.js') + const hashChainWorker = workerpool.pool(hashChainWorkerPath, { maxWorkers: maxPrimaryWorkerThreads }) + const worker = join(__dirname, 'effects', 'worker', 'evaluator', 'index.js') const primaryWorkerPool = workerpool.pool(worker, { maxWorkers: maxPrimaryWorkerThreads, @@ -367,7 +375,12 @@ export const createApis = async (ctx) => { findMessageBefore: AoEvaluationClient.findMessageBeforeWith({ db, logger }), loadTimestamp: AoSuClient.loadTimestampWith({ fetch: ctx.fetch, logger }), loadProcess: AoSuClient.loadProcessWith({ fetch: ctx.fetch, logger }), - loadMessages: AoSuClient.loadMessagesWith({ fetch: ctx.fetch, pageSize: 1000, logger }), + loadMessages: AoSuClient.loadMessagesWith({ + hashChain: (...args) => hashChainWorker.exec('hashChain', args), + fetch: ctx.fetch, + pageSize: 1000, + logger + }), locateProcess: locateDataloader.load.bind(locateDataloader), isModuleMemoryLimitSupported: WasmClient.isModuleMemoryLimitSupportedWith({ PROCESS_WASM_MEMORY_MAX_LIMIT: ctx.PROCESS_WASM_MEMORY_MAX_LIMIT }), isModuleComputeLimitSupported: WasmClient.isModuleComputeLimitSupportedWith({ PROCESS_WASM_COMPUTE_MAX_LIMIT: ctx.PROCESS_WASM_COMPUTE_MAX_LIMIT }), diff --git a/servers/cu/src/effects/ao-su.js b/servers/cu/src/effects/ao-su.js index ca0729bab..0040cff27 100644 --- a/servers/cu/src/effects/ao-su.js +++ b/servers/cu/src/effects/ao-su.js @@ -1,6 +1,5 @@ /* eslint-disable camelcase */ import { Transform, compose as composeStreams } from 'node:stream' -import { createHash } from 'node:crypto' import { of } from 'hyper-async' import { always, applySpec, filter, has, ifElse, isNil, isNotNil, juxt, last, mergeAll, path, pathOr, pipe, prop } from 'ramda' @@ -8,9 +7,6 @@ import DataLoader from 'dataloader' import { backoff, mapForwardedBy, mapFrom, addressFrom, parseTags, strFromFetchError } from '../domain/utils.js' -const base64UrlToBytes = (b64Url) => - Buffer.from(b64Url, 'base64url') - const okRes = (res) => { if (res.ok) return res throw res @@ -18,22 +14,7 @@ const okRes = (res) => { const resToJson = (res) => res.json() -const hashChain = (prevHashChain, prevAssignmentId) => { - const hash = createHash('sha256') - /** - * For the very first message, there is no previous id, - * so it is not included in the hashed bytes, to produce the very first - * hash chain - */ - if (prevAssignmentId) hash.update(base64UrlToBytes(prevAssignmentId)) - /** - * Always include the previous hash chain - */ - hash.update(base64UrlToBytes(prevHashChain)) - return hash.digest('base64url') -} - -export const isHashChainValid = (prev, scheduled) => { +export const isHashChainValidWith = ({ hashChain }) => async (prev, scheduled) => { const { assignmentId: prevAssignmentId, hashChain: prevHashChain } = prev const { message } = scheduled const actual = message['Hash-Chain'] @@ -44,7 +25,7 @@ export const isHashChainValid = (prev, scheduled) => { */ if (!prevAssignmentId || !prevHashChain) return !!actual - const expected = hashChain(prevHashChain, prevAssignmentId) + const expected = await hashChain(prevHashChain, prevAssignmentId) return expected === actual } @@ -176,9 +157,11 @@ export const mapNode = pipe( }) ) -export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { +export const loadMessagesWith = ({ hashChain, fetch, logger: _logger, pageSize }) => { const logger = _logger.child('ao-su:loadMessages') + const isHashChainValid = isHashChainValidWith({ hashChain }) + const fetchPageDataloader = new DataLoader(async (args) => { fetchPageDataloader.clearAll() @@ -309,7 +292,7 @@ export const loadMessagesWith = ({ fetch, logger: _logger, pageSize }) => { )(edge) if (isHashChainValidationEnabled) { - if (!isHashChainValid({ assignmentId: prevAssignmentId, hashChain: prevHashChain }, scheduled)) { + if (!(await isHashChainValid({ assignmentId: prevAssignmentId, hashChain: prevHashChain }, scheduled))) { logger('HashChain invalid on message "%s" scheduled on process "%s"', scheduled.message.Id, processId) const err = new Error(`HashChain invalid on message ${scheduled.message.Id}`) err.status = 422 diff --git a/servers/cu/src/effects/ao-su.test.js b/servers/cu/src/effects/ao-su.test.js index 56e315d30..a6182c2e6 100644 --- a/servers/cu/src/effects/ao-su.test.js +++ b/servers/cu/src/effects/ao-su.test.js @@ -6,7 +6,8 @@ import { createHash } from 'node:crypto' import { loadMessageMetaSchema, loadProcessSchema, loadTimestampSchema } from '../domain/dal.js' import { messageSchema } from '../domain/model.js' import { createTestLogger } from '../domain/logger.js' -import { isHashChainValid, loadMessageMetaWith, loadProcessWith, loadTimestampWith, mapNode } from './ao-su.js' +import { isHashChainValidWith, loadMessageMetaWith, loadProcessWith, loadTimestampWith, mapNode } from './ao-su.js' +import { hashChain } from './worker/hashChain/main.js' const withoutAoGlobal = messageSchema.omit({ AoGlobal: true }) const logger = createTestLogger({ name: 'ao-cu:ao-su' }) @@ -140,6 +141,7 @@ describe('ao-su', () => { }) describe('isHashChainValid', () => { + const isHashChainValid = isHashChainValidWith({ hashChain }) const now = new Date().getTime() const messageId = 'message-123' const scheduled = { @@ -170,21 +172,21 @@ describe('ao-su', () => { } } - test('should return whether the hashChain exists if there is no previous assignment', () => { + test('should return whether the hashChain exists if there is no previous assignment', async () => { // no prev info - assert(isHashChainValid({}, scheduled)) + assert(await isHashChainValid({}, scheduled)) // first assignment ergo has no prev assignment - assert(isHashChainValid({ + assert(await isHashChainValid({ hashChain: null, assignmentId: 'foo' }, scheduled)) - assert(isHashChainValid({ + assert(await isHashChainValid({ hashChain: 'foo', assignmentId: null }, scheduled)) }) - test('should calculate and compare the hashChain based on the previous assignment', () => { + test('should calculate and compare the hashChain based on the previous assignment', async () => { const prevAssignmentId = Buffer.from('assignment-123', 'utf8').toString('base64url') const prevHashChain = Buffer.from('hashchain-123', 'utf8').toString('base64url') @@ -202,7 +204,7 @@ describe('ao-su', () => { } } - assert(isHashChainValid(prev, valid)) + assert(await isHashChainValid(prev, valid)) const invalid = { message: { @@ -213,7 +215,7 @@ describe('ao-su', () => { } } - assert(!isHashChainValid(prev, invalid)) + assert(!(await isHashChainValid(prev, invalid))) }) }) diff --git a/servers/cu/src/effects/worker/hashChain/index.js b/servers/cu/src/effects/worker/hashChain/index.js new file mode 100644 index 000000000..1055937f5 --- /dev/null +++ b/servers/cu/src/effects/worker/hashChain/index.js @@ -0,0 +1,8 @@ +import { worker } from 'workerpool' + +import { hashChain } from './main.js' + +/** + * Expose worker api + */ +worker({ hashChain }) diff --git a/servers/cu/src/effects/worker/hashChain/main.js b/servers/cu/src/effects/worker/hashChain/main.js new file mode 100644 index 000000000..4272fdbe3 --- /dev/null +++ b/servers/cu/src/effects/worker/hashChain/main.js @@ -0,0 +1,19 @@ +import { createHash } from 'node:crypto' + +const base64UrlToBytes = (b64Url) => + Buffer.from(b64Url, 'base64url') + +export const hashChain = (prevHashChain, prevAssignmentId) => { + const hash = createHash('sha256') + /** + * For the very first message, there is no previous id, + * so it is not included in the hashed bytes, to produce the very first + * hash chain + */ + if (prevAssignmentId) hash.update(base64UrlToBytes(prevAssignmentId)) + /** + * Always include the previous hash chain + */ + hash.update(base64UrlToBytes(prevHashChain)) + return hash.digest('base64url') +} From 43b701f1ecf0177fe74365c4ea94fdc1b8945b1d Mon Sep 17 00:00:00 2001 From: Tyler Hall Date: Mon, 20 Jan 2025 00:14:41 +0000 Subject: [PATCH 7/9] refactor(cu): eval step uses pipeline and async generator transform --- servers/cu/src/domain/lib/evaluate.js | 335 ++++++++++++-------------- servers/cu/src/domain/utils.js | 6 + 2 files changed, 162 insertions(+), 179 deletions(-) diff --git a/servers/cu/src/domain/lib/evaluate.js b/servers/cu/src/domain/lib/evaluate.js index 508214032..b6e38c081 100644 --- a/servers/cu/src/domain/lib/evaluate.js +++ b/servers/cu/src/domain/lib/evaluate.js @@ -1,4 +1,5 @@ -import { Transform, compose as composeStreams, finished } from 'node:stream' +import { Transform, compose as composeStreams } from 'node:stream' +import { pipeline } from 'node:stream/promises' import { always, applySpec, evolve, mergeLeft, mergeRight, pathOr, pipe } from 'ramda' import { Rejected, Resolved, fromPromise, of } from 'hyper-async' @@ -7,7 +8,7 @@ import { LRUCache } from 'lru-cache' import { evaluatorSchema, findMessageBeforeSchema, saveLatestProcessMemorySchema } from '../dal.js' import { evaluationSchema } from '../model.js' -import { removeTagsByNameMaybeValue } from '../utils.js' +import { readableToAsyncGenerator, removeTagsByNameMaybeValue } from '../utils.js' /** * The result that is produced from this step @@ -108,195 +109,171 @@ export function evaluateWith (env) { noSave: always(true) })(ctx) - await new Promise((resolve, reject) => { - const cleanup = finished( + const evalStream = async function (messages) { + /** + * There seems to be duplicate Cron Message evaluations occurring and it's been difficult + * to pin down why. My hunch is that the very first message can be a duplicate of the 'from', if 'from' + * is itself a Cron Message. + * + * So to get around this, we maintain a set of strings that unique identify + * Cron messages (timestamp+cron interval). We will add each cron message identifier. + * If an iteration comes across an identifier already present in this list, then we consider it + * a duplicate and remove it from the eval stream. + * + * This should prevent duplicate Cron Messages from being duplicate evaluated, thus not "tainting" + * Memory that is folded as part of the eval stream. + * + * Since this will only happen at the end and beginning of boundaries generated in loadMessages + * this cache can be small, which prevents bloating memory on long cron runs + */ + const evaledCrons = new LRUCache({ maxSize: 100, sizeCalculation: always(1) }) + /** + * If the starting point ('from') is itself a Cron Message, + * then that will be our first identifier added to the set + */ + if (ctx.fromCron) evaledCrons.set(toEvaledCron({ timestamp: ctx.from, cron: ctx.fromCron }), true) + + /** + * Iterate over the async iterable of messages, + * and evaluate each one + */ + let first = true + for await (const { noSave, cron, ordinate, name, message, deepHash, isAssignment, assignmentId, AoGlobal } of messages) { + if (cron) { + const key = toEvaledCron({ timestamp: message.Timestamp, cron }) + if (evaledCrons.has(key)) continue + /** + * We add the crons identifier to the cache, + * thus preventing a duplicate evaluation if we come across it + * again in the eval stream + */ + else evaledCrons.set(key, true) + } else if (!noSave) { + /** + * As messages stream into the process to be evaluated, + * we need to keep track of the most assignmentId + * and hashChain of the most recent scheduled message + */ + mostRecentAssignmentId = assignmentId + mostRecentHashChain = message['Hash-Chain'] + } + /** - * Where the entire eval stream is composed and concludes. + * We make sure to remove duplicate pushed (matching deepHash) + * and duplicate assignments (matching messageId) from the eval stream + * + * Which prevents them from corrupting the process. + * + * NOTE: We should only need to check if the message is an assignment + * or a pushed message, since the SU rejects any messages with the same id, + * that are not an assignment * - * messages will flow into evaluation, respecting backpressure, bubbling errors, - * and cleaning up resources when finished. + * TODO: should the CU check every message, ergo not trusting the SU? */ - composeStreams( - ctx.messages, - Transform.from(async function * removeInvalidTags ($messages) { - for await (const message of $messages) { - yield evolve( - { - message: { - Tags: pipe( - removeTagsByNameMaybeValue('From'), - removeTagsByNameMaybeValue('Owner') - ) - } - }, - message - ) - } - }), - async function (messages) { - /** - * There seems to be duplicate Cron Message evaluations occurring and it's been difficult - * to pin down why. My hunch is that the very first message can be a duplicate of the 'from', if 'from' - * is itself a Cron Message. - * - * So to get around this, we maintain a set of strings that unique identify - * Cron messages (timestamp+cron interval). We will add each cron message identifier. - * If an iteration comes across an identifier already present in this list, then we consider it - * a duplicate and remove it from the eval stream. - * - * This should prevent duplicate Cron Messages from being duplicate evaluated, thus not "tainting" - * Memory that is folded as part of the eval stream. - * - * Since this will only happen at the end and beginning of boundaries generated in loadMessages - * this cache can be small, which prevents bloating memory on long cron runs - */ - const evaledCrons = new LRUCache({ maxSize: 100, sizeCalculation: always(1) }) - /** - * If the starting point ('from') is itself a Cron Message, - * then that will be our first identifier added to the set - */ - if (ctx.fromCron) evaledCrons.set(toEvaledCron({ timestamp: ctx.from, cron: ctx.fromCron }), true) - - /** - * Iterate over the async iterable of messages, - * and evaluate each one - */ - let first = true - for await (const { noSave, cron, ordinate, name, message, deepHash, isAssignment, assignmentId, AoGlobal } of messages) { - if (cron) { - const key = toEvaledCron({ timestamp: message.Timestamp, cron }) - if (evaledCrons.has(key)) continue - /** - * We add the crons identifier to the cache, - * thus preventing a duplicate evaluation if we come across it - * again in the eval stream - */ - else evaledCrons.set(key, true) - } else if (!noSave) { - /** - * As messages stream into the process to be evaluated, - * we need to keep track of the most assignmentId - * and hashChain of the most recent scheduled message - */ - mostRecentAssignmentId = assignmentId - mostRecentHashChain = message['Hash-Chain'] - } + if ( + (deepHash || isAssignment) && + await doesMessageExist({ + messageId: message.Id, + deepHash, + isAssignment, + processId: ctx.id, + epoch: message.Epoch, + nonce: message.Nonce + }).toPromise() + ) { + const log = deepHash ? 'deepHash of' : 'assigned id' + logger( + `Prior Message to process "%s" with ${log} "%s" was found and therefore has already been evaluated. Removing "%s" from eval stream`, + ctx.id, + deepHash || message.Id, + name + ) + continue + } + prev = await Promise.resolve(prev) + .then((prev) => + Promise.resolve(prev.Memory) /** - * We make sure to remove duplicate pushed (matching deepHash) - * and duplicate assignments (matching messageId) from the eval stream - * - * Which prevents them from corrupting the process. - * - * NOTE: We should only need to check if the message is an assignment - * or a pushed message, since the SU rejects any messages with the same id, - * that are not an assignment - * - * TODO: should the CU check every message, ergo not trusting the SU? + * Where the actual evaluation is performed */ - if ( - (deepHash || isAssignment) && - await doesMessageExist({ - messageId: message.Id, - deepHash, - isAssignment, - processId: ctx.id, - epoch: message.Epoch, - nonce: message.Nonce - }).toPromise() - ) { - const log = deepHash ? 'deepHash of' : 'assigned id' - logger( - `Prior Message to process "%s" with ${log} "%s" was found and therefore has already been evaluated. Removing "%s" from eval stream`, - ctx.id, - deepHash || message.Id, - name - ) - continue - } + .then((Memory) => ctx.evaluator({ first, noSave, name, deepHash, cron, ordinate, isAssignment, processId: ctx.id, Memory, message, AoGlobal })) + /** + * These values are folded, + * so that we can potentially update the process memory cache + * at the end of evaluation + */ + .then(mergeLeft({ noSave, message, cron, ordinate })) + .then(async (output) => { + /** + * Make sure to set first to false + * for all subsequent evaluations for this evaluation stream + */ + if (first) first = false + if (output.GasUsed) totalGasUsed += BigInt(output.GasUsed ?? 0) - prev = await Promise.resolve(prev) - .then((prev) => - Promise.resolve(prev.Memory) - /** - * Where the actual evaluation is performed - */ - .then((Memory) => ctx.evaluator({ first, noSave, name, deepHash, cron, ordinate, isAssignment, processId: ctx.id, Memory, message, AoGlobal })) - /** - * These values are folded, - * so that we can potentially update the process memory cache - * at the end of evaluation - */ - .then(mergeLeft({ noSave, message, cron, ordinate })) - .then(async (output) => { - /** - * Make sure to set first to false - * for all subsequent evaluations for this evaluation stream - */ - if (first) first = false - if (output.GasUsed) totalGasUsed += BigInt(output.GasUsed ?? 0) + if (cron) ctx.stats.messages.cron++ + else ctx.stats.messages.scheduled++ - if (cron) ctx.stats.messages.cron++ - else ctx.stats.messages.scheduled++ + if (output.Error) { + logger( + 'Error occurred when applying message "%s" to process "%s": "%s', + name, + ctx.id, + output.Error + ) + ctx.stats.messages.error = ctx.stats.messages.error || 0 + ctx.stats.messages.error++ + } - if (output.Error) { - logger( - 'Error occurred when applying message "%s" to process "%s": "%s', - name, - ctx.id, - output.Error - ) - ctx.stats.messages.error = ctx.stats.messages.error || 0 - ctx.stats.messages.error++ - } + /** + * Increments gauges for total evaluations for both: + * + * total evaluations (no labels) + * specific kinds of evaluations (type of eval stream, type of message, whether an error occurred or not) + */ + evaluationCounter.inc(1) + evaluationCounter.inc( + 1, + { + stream_type: ctx.dryRun ? 'dry-run' : 'primary', + message_type: ctx.dryRun ? 'dry-run' : cron ? 'cron' : isAssignment ? 'assignment' : 'scheduled', + process_error: Boolean(output.Error) + }, + { processId: ctx.id } + ) + /** + * TODO: Gas can grow to a huge number. We need to make sure this doesn't crash when that happens + */ + // gasCounter.inc(output.GasUsed ?? 0, { cron: Boolean(cron), dryRun: Boolean(ctx.dryRun) }, { processId: ctx.id, error: Boolean(output.Error) }) - /** - * Increments gauges for total evaluations for both: - * - * total evaluations (no labels) - * specific kinds of evaluations (type of eval stream, type of message, whether an error occurred or not) - */ - evaluationCounter.inc(1) - evaluationCounter.inc( - 1, - { - stream_type: ctx.dryRun ? 'dry-run' : 'primary', - message_type: ctx.dryRun ? 'dry-run' : cron ? 'cron' : isAssignment ? 'assignment' : 'scheduled', - process_error: Boolean(output.Error) - }, - { processId: ctx.id } - ) - /** - * TODO: Gas can grow to a huge number. We need to make sure this doesn't crash when that happens - */ - // gasCounter.inc(output.GasUsed ?? 0, { cron: Boolean(cron), dryRun: Boolean(ctx.dryRun) }, { processId: ctx.id, error: Boolean(output.Error) }) + return output + }) + ) + } + } - return output - }) - ) - } + await pipeline( + composeStreams( + readableToAsyncGenerator(ctx.messages), + Transform.from(async function * removeInvalidTags ($messages) { + for await (const message of $messages) { + yield evolve( + { + message: { + Tags: pipe( + removeTagsByNameMaybeValue('From'), + removeTagsByNameMaybeValue('Owner') + ) + } + }, + message + ) } - ), - (err) => { - /** - * finished() will leave dangling event listeners even after this callback - * has been invoked, so that unexpected errors do not cause full-on crashes. - * - * finished() returns a callback fn to cleanup these dangling listeners, so we make sure - * to always call it here to prevent memory leaks. - * - * See https://nodejs.org/api/stream.html#streamfinishedstream-options-callback - */ - cleanup() - /** - * Signal the evaluator to close any resources spun up as part - * of handling this eval stream - */ - ctx.evaluator({ close: true }) - err ? reject(err) : resolve() - } - ) - }) + }) + ), + evalStream + ) /** * Make sure to attempt to cache the last result diff --git a/servers/cu/src/domain/utils.js b/servers/cu/src/domain/utils.js index 53d26b36f..d54dd9028 100644 --- a/servers/cu/src/domain/utils.js +++ b/servers/cu/src/domain/utils.js @@ -529,3 +529,9 @@ export const addressFrom = ({ address, key }) => { return _address } + +export function readableToAsyncGenerator (stream) { + return (async function * () { + for await (const d of stream) yield d + })() +} From a001fd04bdbebf19b2354f7fa414c52556a3f862 Mon Sep 17 00:00:00 2001 From: Tyler Hall Date: Mon, 20 Jan 2025 02:40:44 +0000 Subject: [PATCH 8/9] fix(cu): prevent streams from stalling out by hacking stream destroy on error --- servers/cu/src/domain/dal.js | 4 +- servers/cu/src/domain/lib/evaluate.js | 67 ++++++++++++++------ servers/cu/src/domain/lib/evaluate.test.js | 21 +++--- servers/cu/src/domain/lib/hydrateMessages.js | 41 ++++-------- servers/cu/src/domain/lib/loadMessages.js | 37 +++-------- servers/cu/src/effects/ao-su.js | 11 ++-- 6 files changed, 84 insertions(+), 97 deletions(-) diff --git a/servers/cu/src/domain/dal.js b/servers/cu/src/domain/dal.js index d453aea7e..c5e026867 100644 --- a/servers/cu/src/domain/dal.js +++ b/servers/cu/src/domain/dal.js @@ -1,6 +1,6 @@ import { z } from 'zod' -import { blockSchema, evaluationSchema, processSchema, moduleSchema, rawTagSchema, streamSchema, processCheckpointSchema, bufferSchema } from './model.js' +import { blockSchema, evaluationSchema, processSchema, moduleSchema, rawTagSchema, processCheckpointSchema, bufferSchema } from './model.js' // Arweave @@ -184,7 +184,7 @@ export const loadMessagesSchema = z.function() * Returns a Stream that wraps an AsyncIterable, which is not something that Zod can * parse natively, so we make sure the returned value implements a pipe api */ - .returns(z.promise(streamSchema)) + .returns(z.promise(z.any())) export const loadProcessSchema = z.function() .args(z.object({ diff --git a/servers/cu/src/domain/lib/evaluate.js b/servers/cu/src/domain/lib/evaluate.js index b6e38c081..630a6f12b 100644 --- a/servers/cu/src/domain/lib/evaluate.js +++ b/servers/cu/src/domain/lib/evaluate.js @@ -8,7 +8,7 @@ import { LRUCache } from 'lru-cache' import { evaluatorSchema, findMessageBeforeSchema, saveLatestProcessMemorySchema } from '../dal.js' import { evaluationSchema } from '../model.js' -import { readableToAsyncGenerator, removeTagsByNameMaybeValue } from '../utils.js' +import { removeTagsByNameMaybeValue } from '../utils.js' /** * The result that is produced from this step @@ -253,27 +253,52 @@ export function evaluateWith (env) { } } - await pipeline( - composeStreams( - readableToAsyncGenerator(ctx.messages), - Transform.from(async function * removeInvalidTags ($messages) { - for await (const message of $messages) { - yield evolve( - { - message: { - Tags: pipe( - removeTagsByNameMaybeValue('From'), - removeTagsByNameMaybeValue('Owner') - ) - } - }, - message - ) - } + const removeInvalidTags = Transform.from(async function * ($messages) { + for await (const message of $messages) { + yield evolve( + { + message: { + Tags: pipe( + removeTagsByNameMaybeValue('From'), + removeTagsByNameMaybeValue('Owner') + ) + } + }, + message + ) + } + }) + + /** + * ABANDON ALL HOPE YE WHO ENTER HERE + * + * compose from node:streams has some incredibly hard to debug issues, when it comes to destroying + * streams and propagating errors, when composing multiple levels of streams. + * + * In order to circumvent these issues, we've hacked the steps to instead build a list of + * streams, then combine them all here. + * + * THEN we add an error listener such that any stream erroring + * results in all streams being cleaned up and destroyed with that error. + * + * Finally, if an error was thrown from any stream, we re-throw such that the eval promise rejects, + * bubbling the error to the caller + */ + if (!Array.isArray(ctx.messages)) ctx.messages = [ctx.messages] + const streams = [...ctx.messages, removeInvalidTags] + streams.push(composeStreams(...streams)) + let e + streams.forEach(s => { + s.on('error', (err) => { + e = err + streams.forEach(s => { + s.emit('end') + s.destroy(err) }) - ), - evalStream - ) + }) + }) + await pipeline(streams[streams.length - 1], evalStream) + if (e) throw e /** * Make sure to attempt to cache the last result diff --git a/servers/cu/src/domain/lib/evaluate.test.js b/servers/cu/src/domain/lib/evaluate.test.js index ef84e19ac..cc453088b 100644 --- a/servers/cu/src/domain/lib/evaluate.test.js +++ b/servers/cu/src/domain/lib/evaluate.test.js @@ -2,6 +2,7 @@ import { describe, test } from 'node:test' import assert from 'node:assert' import { readFileSync } from 'node:fs' +import { Readable } from 'node:stream' import AoLoader from '@permaweb/ao-loader' @@ -10,8 +11,8 @@ import { evaluateWith } from './evaluate.js' const logger = createTestLogger({ name: 'ao-cu:readState' }) -async function * toAsyncIterable (iterable) { - while (iterable.length) yield iterable.shift() +function toReadable (iterable) { + return Readable.from(iterable) } const mockCounter = { inc: () => undefined } @@ -71,7 +72,7 @@ describe('evaluate', () => { result: { Memory: null }, - messages: toAsyncIterable([ + messages: toReadable([ { ordinate: 1, isAssignment: false, @@ -206,7 +207,7 @@ describe('evaluate', () => { result: { Memory: null }, - messages: toAsyncIterable([ + messages: toReadable([ // assignment { ordinate: 1, @@ -321,7 +322,7 @@ describe('evaluate', () => { result: { Memory: null }, - messages: toAsyncIterable([ + messages: toReadable([ // duplicate of starting point { ordinate: 1, @@ -468,7 +469,7 @@ describe('evaluate', () => { result: { Memory: null }, - messages: toAsyncIterable([ + messages: toReadable([ { // Will include an error in result.error ordinate: 1, @@ -578,7 +579,7 @@ describe('evaluate', () => { result: { Memory: Buffer.from('Hello world') }, - messages: toAsyncIterable([ + messages: toReadable([ { ordinate: 1, isAssignment: false, @@ -733,7 +734,7 @@ describe('evaluate', () => { await evaluate({ ...args, - messages: toAsyncIterable([cronOne, cronTwo]) + messages: toReadable([cronOne, cronTwo]) }).toPromise() }) @@ -752,7 +753,7 @@ describe('evaluate', () => { await evaluate({ ...args, - messages: toAsyncIterable([cronOne, messageOne, cronTwo]) + messages: toReadable([cronOne, messageOne, cronTwo]) }).toPromise() }) @@ -771,7 +772,7 @@ describe('evaluate', () => { await evaluate({ ...args, - messages: toAsyncIterable([cronOne, messageOne, cronTwo, messageTwo]) + messages: toReadable([cronOne, messageOne, cronTwo, messageTwo]) }).toPromise() }) }) diff --git a/servers/cu/src/domain/lib/hydrateMessages.js b/servers/cu/src/domain/lib/hydrateMessages.js index a00165999..079d5483f 100644 --- a/servers/cu/src/domain/lib/hydrateMessages.js +++ b/servers/cu/src/domain/lib/hydrateMessages.js @@ -1,27 +1,15 @@ -import { compose as composeStreams, PassThrough, Transform } from 'node:stream' +import { Transform } from 'node:stream' import { of } from 'hyper-async' import { mergeRight, isNil } from 'ramda' -import { z } from 'zod' import WarpArBundles from 'warp-arbundles' import { loadTransactionDataSchema, loadTransactionMetaSchema } from '../dal.js' -import { messageSchema, streamSchema } from '../model.js' +import { messageSchema } from '../model.js' import { mapFrom, addressFrom } from '../utils.js' const { createData } = WarpArBundles -/** - * The result that is produced from this step - * and added to ctx. - * - * This is used to parse the output to ensure the correct shape - * is always added to context - */ -const ctxSchema = z.object({ - messages: streamSchema -}).passthrough() - function loadFromChainWith ({ loadTransactionData, loadTransactionMeta }) { loadTransactionData = loadTransactionDataSchema.implement(loadTransactionData) loadTransactionMeta = loadTransactionMetaSchema.implement(loadTransactionMeta) @@ -219,7 +207,7 @@ export function hydrateMessagesWith (env) { */ // $messages.on('error', () => $messages.emit('end')) - return composeStreams( + return [ /** * There is some sort of bug in pipeline which will consistently cause this stream * to not end IFF it emits an error. @@ -238,20 +226,15 @@ export function hydrateMessagesWith (env) { * thus closing the pipeline, and resolving the promise wrapping the stream * (see finished in evaluate.js) */ - $messages, - composeStreams( - Transform.from(maybeMessageId), - Transform.from(maybeAoAssignment), - // Ensure every message emitted satisfies the schema - Transform.from(async function * (messages) { - for await (const cur of messages) yield messageSchema.parse(cur) - }) - ), - new PassThrough({ objectMode: true }) - ) + ...$messages, + Transform.from(maybeMessageId), + Transform.from(maybeAoAssignment), + // Ensure every message emitted satisfies the schema + Transform.from(async function * (messages) { + for await (const cur of messages) yield messageSchema.parse(cur) + }) + ] }) - .map(messages => ({ messages })) - .map(mergeRight(ctx)) - .map(ctxSchema.parse) + .map(messages => ({ ...ctx, messages })) } } diff --git a/servers/cu/src/domain/lib/loadMessages.js b/servers/cu/src/domain/lib/loadMessages.js index 99f5edd6d..60f1ebef5 100644 --- a/servers/cu/src/domain/lib/loadMessages.js +++ b/servers/cu/src/domain/lib/loadMessages.js @@ -1,11 +1,9 @@ -import { Transform, compose as composeStreams } from 'node:stream' +import { Transform } from 'node:stream' import { Resolved, fromPromise, of } from 'hyper-async' -import { T, always, ascend, cond, equals, identity, ifElse, isNil, last, length, mergeRight, pipe, prop, reduce, uniqBy } from 'ramda' -import { z } from 'zod' +import { T, always, ascend, cond, equals, identity, ifElse, isNil, last, length, pipe, prop, reduce, uniqBy } from 'ramda' import ms from 'ms' -import { streamSchema } from '../model.js' import { mapFrom, parseTags } from '../utils.js' import { findBlocksSchema, loadBlocksMetaSchema, loadMessagesSchema, loadTimestampSchema, saveBlocksSchema } from '../dal.js' @@ -587,8 +585,8 @@ function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, load }) ) .map(({ leftMost, rightMostTimestamp, $scheduled, genCronMessages }) => { - return composeStreams( - $scheduled, + return [ + ...$scheduled, /** * Given a left-most and right-most boundary, return an async generator, * that given a list of values, emits sequential binary tuples dervied from those values. @@ -695,7 +693,7 @@ function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, load if (doEmitRight) yield right } }) - ) + ] }) }) /** @@ -707,28 +705,13 @@ function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, load * construct and emit a 'message' for the process */ .map($messages => { - return composeStreams( - $messages, + return [ + ...$messages, Transform.from(maybePrependProcessMessage(ctx, logger, loadTransactionData)) - ) + ] }) - .map(messages => ({ messages })) } -/** - * The result that is produced from this step - * and added to ctx. - * - * This is used to parse the output to ensure the correct shape - * is always added to context - */ -const ctxSchema = z.object({ - /** - * Messages to be evaluated, as a stream - */ - messages: streamSchema -}).passthrough() - /** * @typedef LoadMessagesArgs * @property {string} id - the contract id @@ -761,7 +744,5 @@ export function loadMessagesWith (env) { of(ctx) .chain(loadScheduledMessages) .chain($scheduled => loadCronMessages({ ...ctx, $scheduled })) - // { messages } - .map(mergeRight(ctx)) - .map(ctxSchema.parse) + .map(messages => ({ ...ctx, messages })) } diff --git a/servers/cu/src/effects/ao-su.js b/servers/cu/src/effects/ao-su.js index 0040cff27..12bd2d833 100644 --- a/servers/cu/src/effects/ao-su.js +++ b/servers/cu/src/effects/ao-su.js @@ -1,5 +1,5 @@ /* eslint-disable camelcase */ -import { Transform, compose as composeStreams } from 'node:stream' +import { Transform, Readable } from 'node:stream' import { of } from 'hyper-async' import { always, applySpec, filter, has, ifElse, isNil, isNotNil, juxt, last, mergeAll, path, pathOr, pipe, prop } from 'ramda' @@ -324,11 +324,8 @@ export const loadMessagesWith = ({ hashChain, fetch, logger: _logger, pageSize } assignmentId, hashChain }) => { - return composeStreams( - /** - * compose will convert the AsyncIterable into a readable Duplex - */ - fetchAllPages({ suUrl, processId, from, to })(), + return [ + Readable.from(fetchAllPages({ suUrl, processId, from, to })()), Transform.from(mapAoMessage({ processId, processBlock, @@ -341,7 +338,7 @@ export const loadMessagesWith = ({ hashChain, fetch, logger: _logger, pageSize } moduleTags, logger })) - ) + ] }) .toPromise() } From a9a717cdfaa83f5e22c52e0dc358f46688cf5997 Mon Sep 17 00:00:00 2001 From: Tyler Hall Date: Mon, 20 Jan 2025 15:08:06 +0000 Subject: [PATCH 9/9] chore(cu): improve checkpoint logging. simulateError flag in su for quicker debugging. cleanup #1112 --- servers/cu/src/domain/lib/hydrateMessages.js | 18 ------------------ servers/cu/src/effects/ao-process.js | 4 ++-- servers/cu/src/effects/ao-su.js | 10 ++++++++++ 3 files changed, 12 insertions(+), 20 deletions(-) diff --git a/servers/cu/src/domain/lib/hydrateMessages.js b/servers/cu/src/domain/lib/hydrateMessages.js index 079d5483f..25ba598f9 100644 --- a/servers/cu/src/domain/lib/hydrateMessages.js +++ b/servers/cu/src/domain/lib/hydrateMessages.js @@ -208,24 +208,6 @@ export function hydrateMessagesWith (env) { // $messages.on('error', () => $messages.emit('end')) return [ - /** - * There is some sort of bug in pipeline which will consistently cause this stream - * to not end IFF it emits an error. - * - * When errors are thrown in other points in the stream, pipeline seems to work and - * close the stream just fine, so not sure what's going on here. - * - * Before, we had a workaround to manually emit 'end' from the stream on eror, which seemed - * to work (See above commented out .on()) - * - * That was UNTIL we composed more than 2 Transform streams after it, which caused that - * workaround to no longer work -- very strange. - * - * For some reason, wrapping the subsequent Transforms in another compose, - * AND adding a PassThrough stream at the end successfully ends the stream on errors, - * thus closing the pipeline, and resolving the promise wrapping the stream - * (see finished in evaluate.js) - */ ...$messages, Transform.from(maybeMessageId), Transform.from(maybeAoAssignment), diff --git a/servers/cu/src/effects/ao-process.js b/servers/cu/src/effects/ao-process.js index 535acd00c..197417bf1 100644 --- a/servers/cu/src/effects/ao-process.js +++ b/servers/cu/src/effects/ao-process.js @@ -1653,7 +1653,7 @@ export function saveCheckpointWith ({ logger( 'Successfully created file checkpoint for process "%s" on evaluation "%j"', processId, - { file, processId, nonce, timestamp, cron } + { file, processId, assignmentId, hashChain, nonce, timestamp, cron } ) return { file, ...evaluation } }) @@ -1730,7 +1730,7 @@ export function saveCheckpointWith ({ logger( 'Successfully uploaded Checkpoint DataItem for process "%s" on evaluation "%j"', processId, - { checkpointTxId: res.id, processId, nonce, timestamp, cron, encoding: 'gzip' } + { checkpointTxId: res.id, processId, assignmentId, hashChain, nonce, timestamp, cron, encoding: 'gzip' } ) /** * Track that we've recently created a checkpoint for this diff --git a/servers/cu/src/effects/ao-su.js b/servers/cu/src/effects/ao-su.js index 12bd2d833..0ad7f897e 100644 --- a/servers/cu/src/effects/ao-su.js +++ b/servers/cu/src/effects/ao-su.js @@ -275,6 +275,9 @@ export const loadMessagesWith = ({ hashChain, fetch, logger: _logger, pageSize } logger('HashChain validation disabled for old process "%s" at block [%j]', [processId, processBlock]) } + // Set this to simulate a stream error + // eslint-disable-next-line + let simulateError = false let prevAssignmentId = assignmentId let prevHashChain = hashChain return async function * (edges) { @@ -291,6 +294,13 @@ export const loadMessagesWith = ({ hashChain, fetch, logger: _logger, pageSize } } )(edge) + if (simulateError) { + logger(' message "%s" scheduled on process "%s"', scheduled.message.Id, processId) + const err = new Error(`Simulated Error on message ${scheduled.message.Id}`) + err.status = 422 + throw err + } + if (isHashChainValidationEnabled) { if (!(await isHashChainValid({ assignmentId: prevAssignmentId, hashChain: prevHashChain }, scheduled))) { logger('HashChain invalid on message "%s" scheduled on process "%s"', scheduled.message.Id, processId)