Skip to content

Commit

Permalink
revert: feat(stream): make Project expr evaluation concurrent (#14558) (
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored and TennyZhuang committed Feb 2, 2024
1 parent bb3be10 commit 918c21a
Showing 1 changed file with 45 additions and 111 deletions.
156 changes: 45 additions & 111 deletions src/stream/src/executor/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,13 @@

use std::fmt::{Debug, Formatter};

use auto_enums::auto_enum;
use futures::stream::Stream;
use multimap::MultiMap;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::ToOwnedDatum;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::expr::NonStrictExpression;
use rw_futures_util::{RwFutureExt, RwTryStreamExt};

use super::*;

Expand All @@ -33,8 +30,6 @@ use super::*;
pub struct ProjectExecutor {
input: BoxedExecutor,
inner: Inner,
/// The mutable parts of inner fields.
vars: ExecutionVars,
}

struct Inner {
Expand All @@ -48,17 +43,14 @@ struct Inner {
watermark_derivations: MultiMap<usize, usize>,
/// Indices of nondecreasing expressions in the expression list.
nondecreasing_expr_indices: Vec<usize>,
/// Last seen values of nondecreasing expressions, buffered to periodically produce watermarks.
last_nondec_expr_values: Vec<Option<ScalarImpl>>,

/// the selectivity threshold which should be in `[0,1]`. for the chunk with selectivity less
/// than the threshold, the Project executor will construct a new chunk before expr evaluation,
materialize_selectivity_threshold: f64,
}

struct ExecutionVars {
/// Last seen values of nondecreasing expressions, buffered to periodically produce watermarks.
last_nondec_expr_values: Vec<Option<ScalarImpl>>,
}

impl ProjectExecutor {
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand All @@ -79,10 +71,8 @@ impl ProjectExecutor {
exprs,
watermark_derivations,
nondecreasing_expr_indices,
materialize_selectivity_threshold,
},
vars: ExecutionVars {
last_nondec_expr_values: vec![None; n_nondecreasing_exprs],
materialize_selectivity_threshold,
},
}
}
Expand Down Expand Up @@ -110,7 +100,7 @@ impl Executor for ProjectExecutor {
}

fn execute(self: Box<Self>) -> BoxedMessageStream {
self.inner.execute(self.input, self.vars).boxed()
self.inner.execute(self.input).boxed()
}
}

Expand Down Expand Up @@ -160,110 +150,54 @@ impl Inner {
Ok(ret)
}

fn execute(
self,
input: BoxedExecutor,
mut vars: ExecutionVars,
) -> impl Stream<Item = MessageStreamItem> {
let return_types: Vec<_> = self.exprs.iter().map(|expr| expr.return_type()).collect();

// Phase 1: only evaluating the expression, which can be concurrent.

enum Phase1Item {
Chunk(Option<StreamChunk>),
Barrier(Barrier),
Watermark(Vec<Watermark>),
}

let this = Arc::new(self);

let this2 = this.clone();

let st = input.execute().map(move |msg| {
let this = this.clone();
#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute(mut self, input: BoxedExecutor) {
#[for_await]
for msg in input.execute() {
let msg = msg?;
let is_fence: bool;
#[auto_enum(Future)]
let fut = match msg {
Message::Chunk(chunk) => {
is_fence = false;
async move {
let new_chunk = this.map_filter_chunk(chunk).await?;
Ok(Phase1Item::Chunk(new_chunk)) as StreamExecutorResult<_>
}
}
Message::Watermark(watermark) => {
is_fence = false;
async move {
let watermarks = this.handle_watermark(watermark).await?;
Ok(Phase1Item::Watermark(watermarks))
match msg {
Message::Watermark(w) => {
let watermarks = self.handle_watermark(w).await?;
for watermark in watermarks {
yield Message::Watermark(watermark)
}
}
Message::Barrier(barrier) => {
is_fence = true;
async { Ok(Phase1Item::Barrier(barrier)) }
}
};

let fut = fut.with_fence(is_fence);

Ok(fut) as StreamExecutorResult<_>
});

// Make the phase 1 concurrent.
let st = st.try_buffered_with_fence(16);

let this = this2;

// Phase 2: Handle the watermark related logicals, and output them all. The phase is executed one by one.
#[try_stream]
async move {
#[for_await]
for msg in st {
let msg = msg?;
match msg {
Phase1Item::Watermark(watermarks) => {
for watermark in watermarks {
yield Message::Watermark(watermark)
}
}
Phase1Item::Chunk(new_chunk) => match new_chunk {
Some(new_chunk) => {
if !this.nondecreasing_expr_indices.is_empty() {
if let Some((_, first_visible_row)) = new_chunk.rows().next() {
// it's ok to use the first row here, just one chunk delay
first_visible_row
.project(&this.nondecreasing_expr_indices)
.iter()
.enumerate()
.for_each(|(idx, value)| {
vars.last_nondec_expr_values[idx] =
Some(value.to_owned_datum().expect(
"non-decreasing expression should never be NULL",
));
});
}
Message::Chunk(chunk) => match self.map_filter_chunk(chunk).await? {
Some(new_chunk) => {
if !self.nondecreasing_expr_indices.is_empty() {
if let Some((_, first_visible_row)) = new_chunk.rows().next() {
// it's ok to use the first row here, just one chunk delay
first_visible_row
.project(&self.nondecreasing_expr_indices)
.iter()
.enumerate()
.for_each(|(idx, value)| {
self.last_nondec_expr_values[idx] =
Some(value.to_owned_datum().expect(
"non-decreasing expression should never be NULL",
));
});
}
yield Message::Chunk(new_chunk)
}
None => continue,
},
Phase1Item::Barrier(barrier) => {
for (&expr_idx, value) in this
.nondecreasing_expr_indices
.iter()
.zip_eq_fast(&mut vars.last_nondec_expr_values)
{
if let Some(value) = std::mem::take(value) {
yield Message::Watermark(Watermark::new(
expr_idx,
return_types[expr_idx].clone(),
value,
))
}
yield Message::Chunk(new_chunk)
}
None => continue,
},
barrier @ Message::Barrier(_) => {
for (&expr_idx, value) in self
.nondecreasing_expr_indices
.iter()
.zip_eq_fast(&mut self.last_nondec_expr_values)
{
if let Some(value) = std::mem::take(value) {
yield Message::Watermark(Watermark::new(
expr_idx,
self.exprs[expr_idx].return_type(),
value,
))
}
yield Message::Barrier(barrier);
}
yield barrier;
}
}
}
Expand Down

0 comments on commit 918c21a

Please sign in to comment.