From ab9f1ab636547bdfd34adffff229cd91b772c190 Mon Sep 17 00:00:00 2001 From: Tyler Hall Date: Mon, 13 Nov 2023 20:36:41 +0000 Subject: [PATCH] fix(cu): respect backpressure --- servers/cu/src/domain/client/ao-su.js | 4 ++-- servers/cu/src/domain/lib/evaluate.js | 1 - servers/cu/src/domain/lib/loadMessages.js | 8 ++++---- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/servers/cu/src/domain/client/ao-su.js b/servers/cu/src/domain/client/ao-su.js index 43a208d3f..d3c719c1d 100644 --- a/servers/cu/src/domain/client/ao-su.js +++ b/servers/cu/src/domain/client/ao-su.js @@ -1,5 +1,5 @@ /* eslint-disable camelcase */ -import { pipeline } from 'node:stream' +import { Transform, pipeline } from 'node:stream' import { of } from 'hyper-async' import { always, applySpec, evolve, filter, isNotNil, last, path, pathOr, pipe, prop } from 'ramda' @@ -175,7 +175,7 @@ export const loadMessagesWith = ({ fetch, SU_URL, logger: _logger, pageSize }) = .map(({ processId, owner: processOwner, from, to }) => { return pipeline( fetchAllPages({ processId, from, to }), - mapAoMessage({ processId, processOwner }), + Transform.from(mapAoMessage({ processId, processOwner })), (err) => { if (err) logger('Encountered err when mapping Sequencer Messages', err) } diff --git a/servers/cu/src/domain/lib/evaluate.js b/servers/cu/src/domain/lib/evaluate.js index 8b432cd4a..e85d0df0e 100644 --- a/servers/cu/src/domain/lib/evaluate.js +++ b/servers/cu/src/domain/lib/evaluate.js @@ -36,7 +36,6 @@ function saveEvaluationWith ({ saveEvaluation, logger }) { return (evaluation) => of(evaluation) - .map(logger.tap('Caching evaluation %O')) .chain(saveEvaluation) /** * Always ensure this Async resolves diff --git a/servers/cu/src/domain/lib/loadMessages.js b/servers/cu/src/domain/lib/loadMessages.js index 8af434a62..92d3ec0f4 100644 --- a/servers/cu/src/domain/lib/loadMessages.js +++ b/servers/cu/src/domain/lib/loadMessages.js @@ -1,4 +1,4 @@ -import { pipeline } from 'node:stream' +import { Transform, pipeline } from 'node:stream' import { Rejected, Resolved, fromPromise, of } from 'hyper-async' import { T, always, ascend, cond, equals, ifElse, length, mergeRight, pipe, prop, reduce } from 'ramda' @@ -447,8 +447,8 @@ function loadScheduledMessagesWith ({ loadTimestamp, loadBlocksMeta, logger }) { */ return pipeline( $sequenced, - genTuplesWithBoundaries({ left: leftMost, right: rightMost }), - async function * (boundaries) { + Transform.from(genTuplesWithBoundaries({ left: leftMost, right: rightMost })), + Transform.from(async function * (boundaries) { let tuple = await boundaries.next() while (!tuple.done) { const [left, right] = tuple.value @@ -476,7 +476,7 @@ function loadScheduledMessagesWith ({ loadTimestamp, loadBlocksMeta, logger }) { */ tuple = next } - }, + }), (err) => { if (err) logger('Encountered err when merging sequenced and scheduled messages', err) }