Skip to content

Commit

Permalink
feat(mu): modify push endpoint #1093
Browse files Browse the repository at this point in the history
  • Loading branch information
VinceJuliano committed Dec 20, 2024
1 parent ed00824 commit c5128b7
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 12 deletions.
40 changes: 32 additions & 8 deletions servers/mu/src/domain/api/pushMsg.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,46 @@
import { Rejected, Resolved, of } from 'hyper-async'
import { Rejected, Resolved, fromPromise, of } from 'hyper-async'

import { getCuAddressWith } from '../lib/get-cu-address.js'
import { pullResultWith } from '../lib/pull-result.js'
import { graphqlReturnSchema } from '../dal.js'

export function pushMsgWith ({
selectNode,
fetchResult,
fetchTransactions,
crank,
logger,
}) {
const getCuAddress = getCuAddressWith({ selectNode, logger })
const pullResult = pullResultWith({ fetchResult, logger })
const fetchTransactionsAsync = fromPromise(fetchTransactions)

return (ctx) => {
return of(ctx)
.chain((ctx) => {
return fetchTransactionsAsync([ctx.tx.id])
})
.chain(res => {
if(res?.data?.transactions?.edges?.length >= 1) {
if(res.data.transactions.edges[0].block?.timestamp) {
const oneDayAgo = Date.now() - 24 * 60 * 60 * 1000;
if (res.data.transactions.edges[0].block.timestamp >= oneDayAgo) {
return Resolved(ctx)
}
}
return Rejected(new Error('Message does not yet have a block', { cause: ctx }))
}
return Rejected(new Error('Message id not found on the gateway.', { cause: ctx }))
})
.chain(getCuAddress)
.chain(pullResult)
.chain((res) => {
const { msgs, number } = res
if(msgs.length <= number) {
return Rejected(new Error('Message number does not exist in the result.', { cause: ctx }))
}
return Resolved(res)
})
.bichain(
(error) => {
return of(error)
Expand All @@ -29,15 +55,13 @@ export function pushMsgWith ({
* An opaque method to fetch the result of the message just forwarded
* and then crank its results
*/
crank: () => of({ ...res, initialTxId: res.tx.id })
.chain(getCuAddress)
.chain(pullResult)
crank: () => of({ ...res })
.chain((ctx) => {
const { msgs, spawns, assigns, initialTxId, messageId: parentId } = ctx
const { msgs, initialTxId, messageId: parentId, number } = ctx
return crank({
msgs,
spawns,
assigns,
msgs: [msgs[number]],
spawns: [],
assigns: [],
initialTxId,
parentId
})
Expand Down
59 changes: 59 additions & 0 deletions servers/mu/src/domain/api/pushMsg.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { describe, test } from 'node:test'
import * as assert from 'node:assert'

import { pushMsgWith } from './pushMsg.js'
import { Resolved } from 'hyper-async'

const logger = () => undefined
logger.tap = () => (args) => {
return args
}

describe('pushMsgWith', () => {
describe('Push message from result', () => {
describe('push message', () => {
test('push message', async () => {
const pushMsg = pushMsgWith({
selectNode: async (res) => 'cu-url',
fetchResult: (res) => {
const msg1 = { Tags: [{ name: 'Data-Protocol', value: 'ao' }], Target: 'target-pid', Anchor: '0000001' }
return {
...res,
Messages: [msg1],
Spawns: [],
Assignments: []
}
},
fetchTransactions: async(ctx) => {
return {
data: {
transactions: {
edges: [
{
block: { timestamp: Date.now() }
}
]
}
}
}
},
crank: (res) => {
assert.ok(res.msgs.length === 1)
return Resolved()
},
logger,
})

const { crank } = await pushMsg({
tx: { id: 'message-id', processId: 'process-id' },
number: 0,
initialTxId: 'message-id',
logId: 'asdf',
messageId: 'message-id'
}).toPromise()

await crank().toPromise()
})
})
})
})
81 changes: 80 additions & 1 deletion servers/mu/src/domain/clients/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,85 @@ function isWalletWith ({
}
}


/**
* @name fetchTransactionDetails
* Fetches transaction details using the provided GraphQL query
* from the Arweave search endpoint.
*
* @param {string[]} ids - The list of transaction IDs to query.
* @param {function} fetch - The fetch implementation to use for HTTP requests.
* @returns {Promise<object>} The GraphQL query result.
*/
function fetchTransactionDetailsWith({ fetch, GRAPHQL_URL }) {
return async (ids) => {
const query = `
query {
transactions(ids: ${JSON.stringify(ids)}) {
pageInfo {
hasNextPage
}
edges {
cursor
node {
id
anchor
signature
recipient
block {
timestamp
}
owner {
address
key
}
fee {
winston
ar
}
quantity {
winston
ar
}
data {
size
type
}
tags {
name
value
}
block {
id
timestamp
height
previous
}
parent {
id
}
}
}
}
}
`;

const response = await fetch(GRAPHQL_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query })
});

if (!response.ok) {
throw new Error(`Failed to fetch transaction details: ${response.statusText}`);
}

return response.json();
}
}


export default {
isWalletWith
isWalletWith,
fetchTransactionDetailsWith
}
8 changes: 8 additions & 0 deletions servers/mu/src/domain/dal.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ export const resultSchema = z.function()
)
)

export const graphqlReturnSchema = z.function()
.args(
z.array(),
)
.returns(
z.any()
)

/**
* fetchCron
* Given a process and a cursor, fetch the cron from the CU
Expand Down
1 change: 1 addition & 0 deletions servers/mu/src/domain/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ export const createApis = async (ctx) => {
fetchResult: cuClient.resultWith({ fetch: fetchWithCache, histogram, CU_URL, logger: sendDataItemLogger }),
crank,
logger: pushMsgItemLogger,
fetchTransactions: gatewayClient.fetchTransactionDetailsWith({ fetch, GRAPHQL_URL })
})

return {
Expand Down
13 changes: 10 additions & 3 deletions servers/mu/src/routes/push.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,28 @@ import { randomBytes } from 'node:crypto'

const withPushRoute = (app) => {
app.post(
'/push/:processId/:id',
'/push/:id/:number',
compose(
withMiddleware,
withMetrics(),
always(async (req, res) => {
const {
logger: _logger,
domain: { apis: { pushMsg } },
params: { processId, id }
params: { id, number },
query: {
'process-id': processId
}
} = req

const logger = _logger.child('POST_push')
const logId = randomBytes(8).toString('hex')

await of({ tx: { id, processId }, logId, messageId: id })
if (isNaN(Number(number))) {
return res.status(400).send({ error: `'number' parameter must be a valid number` });
}

await of({ tx: { id, processId }, number: Number(number), logId, messageId: id, initialTxId: id })
.chain(pushMsg)
.bimap(
(e) => {
Expand Down

0 comments on commit c5128b7

Please sign in to comment.