From 8edc97c7235e484c65d624e851ae1b5424a594cb Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 20 Mar 2024 18:42:05 +0800 Subject: [PATCH] add row iter with vnode --- src/stream/src/executor/backfill/utils.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index a0e85f33c810a..93fd7c652cc2d 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -660,6 +660,28 @@ pub(crate) fn compute_bounds( } } +pub(crate) enum RowAndVnodeStreamItem { + Finished { vnode: VirtualNode }, + Record { vnode: VirtualNode, row: OwnedRow }, +} + +#[try_stream(ok = RowAndVnodeStreamItem, error = StreamExecutorError)] +pub(crate) async fn owned_row_iter_with_vnode(storage_iter: S, vnode: VirtualNode) +where + StreamExecutorError: From, + S: Stream, E>>, +{ + pin_mut!(storage_iter); + while let Some(row) = storage_iter.next().await { + let row = row?; + yield RowAndVnodeStreamItem::Record { + vnode, + row: row.into_owned_row(), + } + } + yield RowAndVnodeStreamItem::Finished { vnode } +} + #[try_stream(ok = OwnedRow, error = StreamExecutorError)] pub(crate) async fn owned_row_iter(storage_iter: S) where