Skip to content

Commit

Permalink
add row iter with vnode
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Mar 20, 2024
1 parent d853fbd commit 8edc97c
Showing 1 changed file with 22 additions and 0 deletions.
22 changes: 22 additions & 0 deletions src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,28 @@ pub(crate) fn compute_bounds(
}
}

pub(crate) enum RowAndVnodeStreamItem {
Finished { vnode: VirtualNode },
Record { vnode: VirtualNode, row: OwnedRow },
}

#[try_stream(ok = RowAndVnodeStreamItem, error = StreamExecutorError)]
pub(crate) async fn owned_row_iter_with_vnode<S, E>(storage_iter: S, vnode: VirtualNode)
where
StreamExecutorError: From<E>,
S: Stream<Item = Result<KeyedRow<Bytes>, E>>,
{
pin_mut!(storage_iter);
while let Some(row) = storage_iter.next().await {
let row = row?;
yield RowAndVnodeStreamItem::Record {
vnode,
row: row.into_owned_row(),
}
}
yield RowAndVnodeStreamItem::Finished { vnode }
}

#[try_stream(ok = OwnedRow, error = StreamExecutorError)]
pub(crate) async fn owned_row_iter<S, E>(storage_iter: S)
where
Expand Down

0 comments on commit 8edc97c

Please sign in to comment.