diff --git a/servers/mu/src/domain/api/pushMsg.js b/servers/mu/src/domain/api/pushMsg.js index be1afde79..63b8ca134 100644 --- a/servers/mu/src/domain/api/pushMsg.js +++ b/servers/mu/src/domain/api/pushMsg.js @@ -1,20 +1,46 @@ -import { Rejected, Resolved, of } from 'hyper-async' +import { Rejected, Resolved, fromPromise, of } from 'hyper-async' import { getCuAddressWith } from '../lib/get-cu-address.js' import { pullResultWith } from '../lib/pull-result.js' +import { graphqlReturnSchema } from '../dal.js' export function pushMsgWith ({ selectNode, fetchResult, + fetchTransactions, crank, logger, }) { const getCuAddress = getCuAddressWith({ selectNode, logger }) const pullResult = pullResultWith({ fetchResult, logger }) + const fetchTransactionsAsync = fromPromise(fetchTransactions) return (ctx) => { return of(ctx) + .chain((ctx) => { + return fetchTransactionsAsync([ctx.tx.id]) + }) + .chain(res => { + if(res?.data?.transactions?.edges?.length >= 1) { + if(res.data.transactions.edges[0].block?.timestamp) { + const oneDayAgo = Date.now() - 24 * 60 * 60 * 1000; + if (res.data.transactions.edges[0].block.timestamp >= oneDayAgo) { + return Resolved(ctx) + } + } + return Rejected(new Error('Message does not yet have a block', { cause: ctx })) + } + return Rejected(new Error('Message id not found on the gateway.', { cause: ctx })) + }) + .chain(getCuAddress) .chain(pullResult) + .chain((res) => { + const { msgs, number } = res + if(msgs.length <= number) { + return Rejected(new Error('Message number does not exist in the result.', { cause: ctx })) + } + return Resolved(res) + }) .bichain( (error) => { return of(error) @@ -29,15 +55,13 @@ export function pushMsgWith ({ * An opaque method to fetch the result of the message just forwarded * and then crank its results */ - crank: () => of({ ...res, initialTxId: res.tx.id }) - .chain(getCuAddress) - .chain(pullResult) + crank: () => of({ ...res }) .chain((ctx) => { - const { msgs, spawns, assigns, initialTxId, messageId: parentId } = ctx + const { msgs, initialTxId, messageId: parentId, number } = ctx return crank({ - msgs, - spawns, - assigns, + msgs: [msgs[number]], + spawns: [], + assigns: [], initialTxId, parentId }) diff --git a/servers/mu/src/domain/api/pushMsg.test.js b/servers/mu/src/domain/api/pushMsg.test.js new file mode 100644 index 000000000..c610c45d1 --- /dev/null +++ b/servers/mu/src/domain/api/pushMsg.test.js @@ -0,0 +1,59 @@ +import { describe, test } from 'node:test' +import * as assert from 'node:assert' + +import { pushMsgWith } from './pushMsg.js' +import { Resolved } from 'hyper-async' + +const logger = () => undefined +logger.tap = () => (args) => { + return args +} + +describe('pushMsgWith', () => { + describe('Push message from result', () => { + describe('push message', () => { + test('push message', async () => { + const pushMsg = pushMsgWith({ + selectNode: async (res) => 'cu-url', + fetchResult: (res) => { + const msg1 = { Tags: [{ name: 'Data-Protocol', value: 'ao' }], Target: 'target-pid', Anchor: '0000001' } + return { + ...res, + Messages: [msg1], + Spawns: [], + Assignments: [] + } + }, + fetchTransactions: async(ctx) => { + return { + data: { + transactions: { + edges: [ + { + block: { timestamp: Date.now() } + } + ] + } + } + } + }, + crank: (res) => { + assert.ok(res.msgs.length === 1) + return Resolved() + }, + logger, + }) + + const { crank } = await pushMsg({ + tx: { id: 'message-id', processId: 'process-id' }, + number: 0, + initialTxId: 'message-id', + logId: 'asdf', + messageId: 'message-id' + }).toPromise() + + await crank().toPromise() + }) + }) + }) +}) diff --git a/servers/mu/src/domain/clients/gateway.js b/servers/mu/src/domain/clients/gateway.js index fca79644a..ba89fda37 100644 --- a/servers/mu/src/domain/clients/gateway.js +++ b/servers/mu/src/domain/clients/gateway.js @@ -69,6 +69,85 @@ function isWalletWith ({ } } + +/** + * @name fetchTransactionDetails + * Fetches transaction details using the provided GraphQL query + * from the Arweave search endpoint. + * + * @param {string[]} ids - The list of transaction IDs to query. + * @param {function} fetch - The fetch implementation to use for HTTP requests. + * @returns {Promise} The GraphQL query result. + */ +function fetchTransactionDetailsWith({ fetch, GRAPHQL_URL }) { + return async (ids) => { + const query = ` + query { + transactions(ids: ${JSON.stringify(ids)}) { + pageInfo { + hasNextPage + } + edges { + cursor + node { + id + anchor + signature + recipient + block { + timestamp + } + owner { + address + key + } + fee { + winston + ar + } + quantity { + winston + ar + } + data { + size + type + } + tags { + name + value + } + block { + id + timestamp + height + previous + } + parent { + id + } + } + } + } + } + `; + + const response = await fetch(GRAPHQL_URL, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ query }) + }); + + if (!response.ok) { + throw new Error(`Failed to fetch transaction details: ${response.statusText}`); + } + + return response.json(); + } +} + + export default { - isWalletWith + isWalletWith, + fetchTransactionDetailsWith } diff --git a/servers/mu/src/domain/dal.js b/servers/mu/src/domain/dal.js index 7eef9ba05..11f243389 100644 --- a/servers/mu/src/domain/dal.js +++ b/servers/mu/src/domain/dal.js @@ -72,6 +72,14 @@ export const resultSchema = z.function() ) ) +export const graphqlReturnSchema = z.function() + .args( + z.array(), + ) + .returns( + z.any() + ) + /** * fetchCron * Given a process and a cursor, fetch the cron from the CU diff --git a/servers/mu/src/domain/index.js b/servers/mu/src/domain/index.js index b41544544..8b7c3bf2a 100644 --- a/servers/mu/src/domain/index.js +++ b/servers/mu/src/domain/index.js @@ -300,6 +300,7 @@ export const createApis = async (ctx) => { fetchResult: cuClient.resultWith({ fetch: fetchWithCache, histogram, CU_URL, logger: sendDataItemLogger }), crank, logger: pushMsgItemLogger, + fetchTransactions: gatewayClient.fetchTransactionDetailsWith({ fetch, GRAPHQL_URL }) }) return { diff --git a/servers/mu/src/routes/push.js b/servers/mu/src/routes/push.js index 7f4112a8f..20c83507d 100644 --- a/servers/mu/src/routes/push.js +++ b/servers/mu/src/routes/push.js @@ -6,7 +6,7 @@ import { randomBytes } from 'node:crypto' const withPushRoute = (app) => { app.post( - '/push/:processId/:id', + '/push/:id/:number', compose( withMiddleware, withMetrics(), @@ -14,13 +14,20 @@ const withPushRoute = (app) => { const { logger: _logger, domain: { apis: { pushMsg } }, - params: { processId, id } + params: { id, number }, + query: { + 'process-id': processId + } } = req const logger = _logger.child('POST_push') const logId = randomBytes(8).toString('hex') - await of({ tx: { id, processId }, logId, messageId: id }) + if (isNaN(Number(number))) { + return res.status(400).send({ error: `'number' parameter must be a valid number` }); + } + + await of({ tx: { id, processId }, number: Number(number), logId, messageId: id, initialTxId: id }) .chain(pushMsg) .bimap( (e) => {