Skip to content

Commit

Permalink
fix(cu): respect backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
TillaTheHun0 committed Nov 13, 2023
1 parent c898905 commit ab9f1ab
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 7 deletions.
4 changes: 2 additions & 2 deletions servers/cu/src/domain/client/ao-su.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable camelcase */
import { pipeline } from 'node:stream'
import { Transform, pipeline } from 'node:stream'
import { of } from 'hyper-async'
import { always, applySpec, evolve, filter, isNotNil, last, path, pathOr, pipe, prop } from 'ramda'

Expand Down Expand Up @@ -175,7 +175,7 @@ export const loadMessagesWith = ({ fetch, SU_URL, logger: _logger, pageSize }) =
.map(({ processId, owner: processOwner, from, to }) => {
return pipeline(
fetchAllPages({ processId, from, to }),
mapAoMessage({ processId, processOwner }),
Transform.from(mapAoMessage({ processId, processOwner })),
(err) => {
if (err) logger('Encountered err when mapping Sequencer Messages', err)
}
Expand Down
1 change: 0 additions & 1 deletion servers/cu/src/domain/lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ function saveEvaluationWith ({ saveEvaluation, logger }) {

return (evaluation) =>
of(evaluation)
.map(logger.tap('Caching evaluation %O'))
.chain(saveEvaluation)
/**
* Always ensure this Async resolves
Expand Down
8 changes: 4 additions & 4 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { pipeline } from 'node:stream'
import { Transform, pipeline } from 'node:stream'

import { Rejected, Resolved, fromPromise, of } from 'hyper-async'
import { T, always, ascend, cond, equals, ifElse, length, mergeRight, pipe, prop, reduce } from 'ramda'
Expand Down Expand Up @@ -447,8 +447,8 @@ function loadScheduledMessagesWith ({ loadTimestamp, loadBlocksMeta, logger }) {
*/
return pipeline(
$sequenced,
genTuplesWithBoundaries({ left: leftMost, right: rightMost }),
async function * (boundaries) {
Transform.from(genTuplesWithBoundaries({ left: leftMost, right: rightMost })),
Transform.from(async function * (boundaries) {
let tuple = await boundaries.next()
while (!tuple.done) {
const [left, right] = tuple.value
Expand Down Expand Up @@ -476,7 +476,7 @@ function loadScheduledMessagesWith ({ loadTimestamp, loadBlocksMeta, logger }) {
*/
tuple = next
}
},
}),
(err) => {
if (err) logger('Encountered err when merging sequenced and scheduled messages', err)
}
Expand Down

0 comments on commit ab9f1ab

Please sign in to comment.