Skip to content

Commit

Permalink
fix(streaming): correctly skip problematic table-function input (#19353)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Nov 13, 2024
1 parent 1aed314 commit ccba7e3
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 20 deletions.
11 changes: 11 additions & 0 deletions e2e_test/streaming/bug_fixes/issue_19352.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# https://github.com/risingwavelabs/risingwave/issues/11915
# https://github.com/risingwavelabs/risingwave/pull/17156
# https://github.com/risingwavelabs/risingwave/issues/19352

statement ok
set streaming_parallelism to 1;

include ./issue_11915.slt

statement ok
set streaming_parallelism to default;
74 changes: 54 additions & 20 deletions src/stream/src/executor/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@ impl Inner {
Op::Delete | Op::UpdateDelete => Op::Delete,
Op::Insert | Op::UpdateInsert => Op::Insert,
};

// Whether the output corresponds to the current input row.
let is_current_input = |i| {
assert!(
i >= row_idx,
"unexpectedly operating on previous input, i: {i}, row_idx: {row_idx}",
);
i == row_idx
};

// for each output row
for projected_row_id in 0i64.. {
// SAFETY:
Expand All @@ -174,22 +184,33 @@ impl Inner {
unsafe { std::mem::transmute(row.as_mut_slice()) };

row[0] = Some(projected_row_id.into());
// if any of the set columns has a value
let mut valid = false;

// Whether all table functions has exhausted or has failed for current input row.
let mut fully_consumed = true;

// for each column
for (item, value) in results.iter_mut().zip_eq_fast(&mut row[1..]) {
*value = match item {
Either::Left(state) => {
if let Some((i, result)) = state.peek()
&& i == row_idx
&& is_current_input(i)
{
match result {
Ok(value) => {
valid = true;
fully_consumed = false;
value
}
Err(err) => {
self.error_report.report(err);
// When we encounter an error from one of the table functions,
//
// - if there are other successful table functions, `fully_consumed` will still be
// set to `false`, a `NULL` will be set in the output row for the failed table function,
// that's why we set `None` here.
//
// - if there are no other successful table functions (or we are the only table function),
// `fully_consumed` will be set to `true`, we won't output the row at all but skip
// the whole result set of the given row. Setting `None` here is no-op.
None
}
}
Expand All @@ -200,23 +221,36 @@ impl Inner {
Either::Right(array) => array.value_at(row_idx),
};
}
if !valid {
// no more output rows for the input row

if fully_consumed {
// Skip the current input row and break the loop to handle the next input row.
// - If all exhausted, this is no-op.
// - If all failed, this skips remaining outputs of the current input row.
for item in &mut results {
if let Either::Left(state) = item {
while let Some((i, _)) = state.peek()
&& is_current_input(i)
{
state.next().await?;
}
}
}
break;
}
if let Some(chunk) = builder.append_row(op, &*row) {
self.update_last_nondec_expr_values(
&mut last_nondec_expr_values,
&chunk,
);
yield Message::Chunk(chunk);
}
// move to the next row
for item in &mut results {
if let Either::Left(state) = item
&& matches!(state.peek(), Some((i, _)) if i == row_idx)
{
state.next().await?;
} else {
if let Some(chunk) = builder.append_row(op, &*row) {
self.update_last_nondec_expr_values(
&mut last_nondec_expr_values,
&chunk,
);
yield Message::Chunk(chunk);
}
// move to the next row
for item in &mut results {
if let Either::Left(state) = item
&& matches!(state.peek(), Some((i, _)) if is_current_input(i))
{
state.next().await?;
}
}
}
}
Expand Down

0 comments on commit ccba7e3

Please sign in to comment.