Skip to content

Commit

Permalink
Merge pull request #1114 from permaweb/tillathehun0/cu-hashchain-vali…
Browse files Browse the repository at this point in the history
…dation

CU hashchain validation
  • Loading branch information
TillaTheHun0 authored Jan 20, 2025
2 parents be93939 + a9a717c commit 601d6aa
Show file tree
Hide file tree
Showing 18 changed files with 911 additions and 389 deletions.
100 changes: 52 additions & 48 deletions servers/cu/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions servers/cu/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@
"test": "node --experimental-wasm-memory64 --test"
},
"dependencies": {
"@fastify/middie": "^9.0.2",
"@fastify/middie": "^9.0.3",
"@permaweb/ao-loader": "^0.0.44",
"@permaweb/ao-scheduler-utils": "^0.0.25",
"@permaweb/weavedrive": "^0.0.18",
"arweave": "^1.15.5",
"async-lock": "^1.4.1",
"better-sqlite3": "^11.7.0",
"better-sqlite3": "^11.8.0",
"bytes": "^3.1.2",
"cors": "^2.8.5",
"dataloader": "^2.2.3",
"dotenv": "^16.4.7",
"fast-glob": "^3.3.2",
"fastify": "^5.2.0",
"fast-glob": "^3.3.3",
"fastify": "^5.2.1",
"helmet": "^8.0.0",
"hyper-async": "^1.1.2",
"keccak": "^3.0.4",
Expand All @@ -36,7 +36,7 @@
"pg": "^8.13.1",
"prom-client": "^15.1.3",
"ramda": "^0.30.1",
"undici": "^7.2.0",
"undici": "^7.2.3",
"warp-arbundles": "^1.0.4",
"winston": "^3.17.0",
"workerpool": "^9.2.0",
Expand Down
15 changes: 14 additions & 1 deletion servers/cu/src/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ export const createApis = async (ctx) => {
Math.ceil(ctx.WASM_EVALUATION_MAX_WORKERS * (ctx.WASM_EVALUATION_PRIMARY_WORKERS_PERCENTAGE / 100))
)

/**
* node's crypto module is synchronous, which blocks the main thread.
* Since hash chain valiation is done for _every single scheduled message_, we offload the
* work to a worker thread, so at least the main thread isn't blocked.
*/
const hashChainWorkerPath = join(__dirname, 'effects', 'worker', 'hashChain', 'index.js')
const hashChainWorker = workerpool.pool(hashChainWorkerPath, { maxWorkers: maxPrimaryWorkerThreads })

const worker = join(__dirname, 'effects', 'worker', 'evaluator', 'index.js')
const primaryWorkerPool = workerpool.pool(worker, {
maxWorkers: maxPrimaryWorkerThreads,
Expand Down Expand Up @@ -367,7 +375,12 @@ export const createApis = async (ctx) => {
findMessageBefore: AoEvaluationClient.findMessageBeforeWith({ db, logger }),
loadTimestamp: AoSuClient.loadTimestampWith({ fetch: ctx.fetch, logger }),
loadProcess: AoSuClient.loadProcessWith({ fetch: ctx.fetch, logger }),
loadMessages: AoSuClient.loadMessagesWith({ fetch: ctx.fetch, pageSize: 1000, logger }),
loadMessages: AoSuClient.loadMessagesWith({
hashChain: (...args) => hashChainWorker.exec('hashChain', args),
fetch: ctx.fetch,
pageSize: 1000,
logger
}),
locateProcess: locateDataloader.load.bind(locateDataloader),
isModuleMemoryLimitSupported: WasmClient.isModuleMemoryLimitSupportedWith({ PROCESS_WASM_MEMORY_MAX_LIMIT: ctx.PROCESS_WASM_MEMORY_MAX_LIMIT }),
isModuleComputeLimitSupported: WasmClient.isModuleComputeLimitSupportedWith({ PROCESS_WASM_COMPUTE_MAX_LIMIT: ctx.PROCESS_WASM_COMPUTE_MAX_LIMIT }),
Expand Down
12 changes: 8 additions & 4 deletions servers/cu/src/domain/dal.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { z } from 'zod'

import { blockSchema, evaluationSchema, processSchema, moduleSchema, rawTagSchema, streamSchema, processCheckpointSchema, bufferSchema } from './model.js'
import { blockSchema, evaluationSchema, processSchema, moduleSchema, rawTagSchema, processCheckpointSchema, bufferSchema } from './model.js'

// Arweave

Expand Down Expand Up @@ -67,14 +67,15 @@ export const saveLatestProcessMemorySchema = z.function()
processId: z.string(),
moduleId: z.string().nullish(),
messageId: z.string().nullish(),
assignmentId: z.string().nullish(),
hashChain: z.string().nullish(),
timestamp: z.coerce.number().nullish(),
epoch: z.coerce.number().nullish(),
nonce: z.coerce.number().nullish(),
ordinate: z.coerce.string().nullish(),
cron: z.string().nullish(),
blockHeight: z.coerce.number().nullish(),
Memory: bufferSchema,
evalCount: z.number().nullish(),
gasUsed: z.bigint().nullish()
}))
.returns(z.promise(z.any()))
Expand Down Expand Up @@ -167,20 +168,23 @@ export const loadMessagesSchema = z.function()
z.object({
suUrl: z.string().url(),
processId: z.string(),
block: blockSchema,
owner: z.string(),
tags: z.array(rawTagSchema),
moduleId: z.string(),
moduleTags: z.array(rawTagSchema),
moduleOwner: z.string(),
from: z.coerce.number().nullish(),
to: z.coerce.number().nullish()
to: z.coerce.number().nullish(),
assignmentId: z.string().nullish(),
hashChain: z.string().nullish()
})
)
/**
* Returns a Stream that wraps an AsyncIterable, which is not something that Zod can
* parse natively, so we make sure the returned value implements a pipe api
*/
.returns(z.promise(streamSchema))
.returns(z.promise(z.any()))

export const loadProcessSchema = z.function()
.args(z.object({
Expand Down
Loading

0 comments on commit 601d6aa

Please sign in to comment.