diff --git a/e2e_test/streaming/bug_fixes/issue_19352.slt b/e2e_test/streaming/bug_fixes/issue_19352.slt new file mode 100644 index 0000000000000..d4594de1a3566 --- /dev/null +++ b/e2e_test/streaming/bug_fixes/issue_19352.slt @@ -0,0 +1,11 @@ +# https://github.com/risingwavelabs/risingwave/issues/11915 +# https://github.com/risingwavelabs/risingwave/pull/17156 +# https://github.com/risingwavelabs/risingwave/issues/19352 + +statement ok +set streaming_parallelism to 1; + +include ./issue_11915.slt + +statement ok +set streaming_parallelism to default; diff --git a/src/stream/src/executor/project_set.rs b/src/stream/src/executor/project_set.rs index dff51a39255cf..99bab0c2edffa 100644 --- a/src/stream/src/executor/project_set.rs +++ b/src/stream/src/executor/project_set.rs @@ -165,6 +165,16 @@ impl Inner { Op::Delete | Op::UpdateDelete => Op::Delete, Op::Insert | Op::UpdateInsert => Op::Insert, }; + + // Whether the output corresponds to the current input row. + let is_current_input = |i| { + assert!( + i >= row_idx, + "unexpectedly operating on previous input, i: {i}, row_idx: {row_idx}", + ); + i == row_idx + }; + // for each output row for projected_row_id in 0i64.. { // SAFETY: @@ -174,22 +184,33 @@ impl Inner { unsafe { std::mem::transmute(row.as_mut_slice()) }; row[0] = Some(projected_row_id.into()); - // if any of the set columns has a value - let mut valid = false; + + // Whether all table functions has exhausted or has failed for current input row. + let mut fully_consumed = true; + // for each column for (item, value) in results.iter_mut().zip_eq_fast(&mut row[1..]) { *value = match item { Either::Left(state) => { if let Some((i, result)) = state.peek() - && i == row_idx + && is_current_input(i) { match result { Ok(value) => { - valid = true; + fully_consumed = false; value } Err(err) => { self.error_report.report(err); + // When we encounter an error from one of the table functions, + // + // - if there are other successful table functions, `fully_consumed` will still be + // set to `false`, a `NULL` will be set in the output row for the failed table function, + // that's why we set `None` here. + // + // - if there are no other successful table functions (or we are the only table function), + // `fully_consumed` will be set to `true`, we won't output the row at all but skip + // the whole result set of the given row. Setting `None` here is no-op. None } } @@ -200,23 +221,36 @@ impl Inner { Either::Right(array) => array.value_at(row_idx), }; } - if !valid { - // no more output rows for the input row + + if fully_consumed { + // Skip the current input row and break the loop to handle the next input row. + // - If all exhausted, this is no-op. + // - If all failed, this skips remaining outputs of the current input row. + for item in &mut results { + if let Either::Left(state) = item { + while let Some((i, _)) = state.peek() + && is_current_input(i) + { + state.next().await?; + } + } + } break; - } - if let Some(chunk) = builder.append_row(op, &*row) { - self.update_last_nondec_expr_values( - &mut last_nondec_expr_values, - &chunk, - ); - yield Message::Chunk(chunk); - } - // move to the next row - for item in &mut results { - if let Either::Left(state) = item - && matches!(state.peek(), Some((i, _)) if i == row_idx) - { - state.next().await?; + } else { + if let Some(chunk) = builder.append_row(op, &*row) { + self.update_last_nondec_expr_values( + &mut last_nondec_expr_values, + &chunk, + ); + yield Message::Chunk(chunk); + } + // move to the next row + for item in &mut results { + if let Either::Left(state) = item + && matches!(state.peek(), Some((i, _)) if is_current_input(i)) + { + state.next().await?; + } } } }