diff --git a/src/stream/src/executor/project.rs b/src/stream/src/executor/project.rs index 8cbd7e66e4897..1cada8cfd1abf 100644 --- a/src/stream/src/executor/project.rs +++ b/src/stream/src/executor/project.rs @@ -14,8 +14,6 @@ use std::fmt::{Debug, Formatter}; -use auto_enums::auto_enum; -use futures::stream::Stream; use multimap::MultiMap; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; @@ -23,7 +21,6 @@ use risingwave_common::row::{Row, RowExt}; use risingwave_common::types::ToOwnedDatum; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::expr::NonStrictExpression; -use rw_futures_util::{RwFutureExt, RwTryStreamExt}; use super::*; @@ -33,8 +30,6 @@ use super::*; pub struct ProjectExecutor { input: BoxedExecutor, inner: Inner, - /// The mutable parts of inner fields. - vars: ExecutionVars, } struct Inner { @@ -48,17 +43,14 @@ struct Inner { watermark_derivations: MultiMap, /// Indices of nondecreasing expressions in the expression list. nondecreasing_expr_indices: Vec, + /// Last seen values of nondecreasing expressions, buffered to periodically produce watermarks. + last_nondec_expr_values: Vec>, /// the selectivity threshold which should be in `[0,1]`. for the chunk with selectivity less /// than the threshold, the Project executor will construct a new chunk before expr evaluation, materialize_selectivity_threshold: f64, } -struct ExecutionVars { - /// Last seen values of nondecreasing expressions, buffered to periodically produce watermarks. - last_nondec_expr_values: Vec>, -} - impl ProjectExecutor { #[allow(clippy::too_many_arguments)] pub fn new( @@ -79,10 +71,8 @@ impl ProjectExecutor { exprs, watermark_derivations, nondecreasing_expr_indices, - materialize_selectivity_threshold, - }, - vars: ExecutionVars { last_nondec_expr_values: vec![None; n_nondecreasing_exprs], + materialize_selectivity_threshold, }, } } @@ -110,7 +100,7 @@ impl Executor for ProjectExecutor { } fn execute(self: Box) -> BoxedMessageStream { - self.inner.execute(self.input, self.vars).boxed() + self.inner.execute(self.input).boxed() } } @@ -160,110 +150,54 @@ impl Inner { Ok(ret) } - fn execute( - self, - input: BoxedExecutor, - mut vars: ExecutionVars, - ) -> impl Stream { - let return_types: Vec<_> = self.exprs.iter().map(|expr| expr.return_type()).collect(); - - // Phase 1: only evaluating the expression, which can be concurrent. - - enum Phase1Item { - Chunk(Option), - Barrier(Barrier), - Watermark(Vec), - } - - let this = Arc::new(self); - - let this2 = this.clone(); - - let st = input.execute().map(move |msg| { - let this = this.clone(); + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn execute(mut self, input: BoxedExecutor) { + #[for_await] + for msg in input.execute() { let msg = msg?; - let is_fence: bool; - #[auto_enum(Future)] - let fut = match msg { - Message::Chunk(chunk) => { - is_fence = false; - async move { - let new_chunk = this.map_filter_chunk(chunk).await?; - Ok(Phase1Item::Chunk(new_chunk)) as StreamExecutorResult<_> - } - } - Message::Watermark(watermark) => { - is_fence = false; - async move { - let watermarks = this.handle_watermark(watermark).await?; - Ok(Phase1Item::Watermark(watermarks)) + match msg { + Message::Watermark(w) => { + let watermarks = self.handle_watermark(w).await?; + for watermark in watermarks { + yield Message::Watermark(watermark) } } - Message::Barrier(barrier) => { - is_fence = true; - async { Ok(Phase1Item::Barrier(barrier)) } - } - }; - - let fut = fut.with_fence(is_fence); - - Ok(fut) as StreamExecutorResult<_> - }); - - // Make the phase 1 concurrent. - let st = st.try_buffered_with_fence(16); - - let this = this2; - - // Phase 2: Handle the watermark related logicals, and output them all. The phase is executed one by one. - #[try_stream] - async move { - #[for_await] - for msg in st { - let msg = msg?; - match msg { - Phase1Item::Watermark(watermarks) => { - for watermark in watermarks { - yield Message::Watermark(watermark) - } - } - Phase1Item::Chunk(new_chunk) => match new_chunk { - Some(new_chunk) => { - if !this.nondecreasing_expr_indices.is_empty() { - if let Some((_, first_visible_row)) = new_chunk.rows().next() { - // it's ok to use the first row here, just one chunk delay - first_visible_row - .project(&this.nondecreasing_expr_indices) - .iter() - .enumerate() - .for_each(|(idx, value)| { - vars.last_nondec_expr_values[idx] = - Some(value.to_owned_datum().expect( - "non-decreasing expression should never be NULL", - )); - }); - } + Message::Chunk(chunk) => match self.map_filter_chunk(chunk).await? { + Some(new_chunk) => { + if !self.nondecreasing_expr_indices.is_empty() { + if let Some((_, first_visible_row)) = new_chunk.rows().next() { + // it's ok to use the first row here, just one chunk delay + first_visible_row + .project(&self.nondecreasing_expr_indices) + .iter() + .enumerate() + .for_each(|(idx, value)| { + self.last_nondec_expr_values[idx] = + Some(value.to_owned_datum().expect( + "non-decreasing expression should never be NULL", + )); + }); } - yield Message::Chunk(new_chunk) } - None => continue, - }, - Phase1Item::Barrier(barrier) => { - for (&expr_idx, value) in this - .nondecreasing_expr_indices - .iter() - .zip_eq_fast(&mut vars.last_nondec_expr_values) - { - if let Some(value) = std::mem::take(value) { - yield Message::Watermark(Watermark::new( - expr_idx, - return_types[expr_idx].clone(), - value, - )) - } + yield Message::Chunk(new_chunk) + } + None => continue, + }, + barrier @ Message::Barrier(_) => { + for (&expr_idx, value) in self + .nondecreasing_expr_indices + .iter() + .zip_eq_fast(&mut self.last_nondec_expr_values) + { + if let Some(value) = std::mem::take(value) { + yield Message::Watermark(Watermark::new( + expr_idx, + self.exprs[expr_idx].return_type(), + value, + )) } - yield Message::Barrier(barrier); } + yield barrier; } } }