Skip to content

Commit

Permalink
feat(cu): send non-persisted process message as first message on cold…
Browse files Browse the repository at this point in the history
… start #293
  • Loading branch information
TillaTheHun0 committed Jan 9, 2024
1 parent 04f0267 commit 6f30a94
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 6 deletions.
13 changes: 9 additions & 4 deletions servers/cu/src/domain/lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ export function evaluateWith (env) {
* Iterate over the async iterable of messages,
* and evaluate each one
*/
for await (const { cron, ordinate, message, deepHash, AoGlobal } of ctx.messages) {
for await (const { noSave, cron, ordinate, message, deepHash, AoGlobal } of ctx.messages) {
/**
* We skip over forwarded messages (which we've calculated a deepHash for - see hydrateMessages)
* if their deepHash is found in the cache, this prevents duplicate evals
Expand Down Expand Up @@ -219,8 +219,13 @@ export function evaluateWith (env) {
/**
* Create a new evaluation to be cached in the local db
*/
.then((output) =>
saveEvaluation({
.then((output) => {
/**
* Noop saving the evaluation is noSave flag is set
*/
if (noSave) return output

return saveEvaluation({
deepHash,
cron,
ordinate,
Expand All @@ -233,7 +238,7 @@ export function evaluateWith (env) {
})
.map(() => output)
.toPromise()
)
})
.catch(logger.tap(
'Error occurred when applying message with id "%s" to process "%s" %o',
message.Id || `Cron Message ${cron}`,
Expand Down
15 changes: 15 additions & 0 deletions servers/cu/src/domain/lib/evaluate.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,21 @@ describe('evaluate', () => {
module: readFileSync('./test/processes/happy/process.wasm'),
Memory: null,
messages: toAsyncIterable([
// noSave should noop and not call saveInteraction
{
noSave: true,
ordinate: 0,
message: {
Id: 'message-123',
Timestamp: 1702846520559,
Owner: 'owner-123',
Tags: [
{ name: 'function', value: 'hello' }
],
'Block-Height': 1234
},
AoGlobal: {}
},
{
ordinate: 1,
message: {
Expand Down
62 changes: 60 additions & 2 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ function loadCronMessagesWith ({ loadTimestamp, locateScheduler, loadBlocksMeta,
* messages between them, and so emitting them one at a time will respect
* backpressure.
*/
for await (const message of genCronMessages(left, right)) yield messageSchema.parse(message)
for await (const message of genCronMessages(left, right)) yield message
/**
* We need to emit the right boundary since it will always be a message,
* EXCEPT for final tuple, since THAT right boundary will be our right-most boundary,
Expand All @@ -429,13 +429,71 @@ function loadCronMessagesWith ({ loadTimestamp, locateScheduler, loadBlocksMeta,
* Otherwise, we would end up with duplicates at each boundary
*/
const next = await boundaries.next()
if (!next.done) yield messageSchema.parse(right)
if (!next.done) yield right
/**
* Set up the next boundary to generate cron messages between
*/
tuple = next
}
}),
/**
* If this is a process cold start, the first 'message' evaluated by the module
* will be the process, itself, meaning the process tags, data, etc.
*
* So we check to see if this is a cold state, by checking if the 'from' is undefined.
* If so, we know the evaluation is starting from the beginning (cold start), and we will
* construct and emit a 'message' for the process
*/
Transform.from(async function * (messages) {
const isColdStart = !ctx.from

/**
* Generate and emit a message that represents the process itself.
*
* It will be the first message evaluated by the module
*/
if (isColdStart) {
const processMessage = messageSchema.parse({
/**
* Ensure the noSave flag is set, so evaluation does not persist
* this process message
*/
noSave: true,
ordinate: '^',
message: {
Id: ctx.id,
Signature: ctx.signature,
Data: ctx.data,
Owner: ctx.owner,
/**
* the target of the process message is itself
*/
Target: ctx.id,
Anchor: ctx.anchor,
/**
* the process message is from the owner of the process
*/
From: ctx.owner,
Tags: ctx.tags,
Epoch: undefined,
Nonce: undefined,
Timestamp: ctx.block.timestamp,
'Block-Height': ctx.block.height,
Cron: false
},
AoGlobal: {
Process: { Id: ctx.id, Owner: ctx.owner, Tags: ctx.tags }
}
})
logger('Emitting process message at beginning of evaluation stream for process %s cold start: %o', ctx.id, processMessage)
yield processMessage
}

/**
* Emit the merged stream of Cron and Scheduled Messages
*/
for await (const message of messages) yield messageSchema.parse(message)
}),
(err) => {
if (err) logger('Encountered err when merging scheduled and cron messages', err)
}
Expand Down

0 comments on commit 6f30a94

Please sign in to comment.