Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): merge stream chunks at MergeExecutor #17968

Merged
merged 14 commits into from
Oct 28, 2024
10 changes: 6 additions & 4 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,20 +159,22 @@ async fn test_merger_sum_aggr() {
let actor_ctx = ActorContext::for_test(gen_next_actor_id());

// use a merge operator to collect data from dispatchers before sending them to aggregator
let schema = Schema::new(vec![
Field::unnamed(DataType::Int64),
Field::unnamed(DataType::Int64),
]);
let merger = Executor::new(
ExecutorInfo {
// output schema of local simple agg
schema: Schema::new(vec![
Field::unnamed(DataType::Int64),
Field::unnamed(DataType::Int64),
]),
schema: schema.clone(),
pk_indices: PkIndices::new(),
identity: "MergeExecutor".to_string(),
},
MergeExecutor::for_test(
actor_ctx.id,
outputs,
barrier_test_env.shared_context.clone(),
schema,
)
.boxed(),
);
Expand Down
199 changes: 193 additions & 6 deletions src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, VecDeque};
use std::pin::Pin;
use std::task::{Context, Poll};

use anyhow::Context as _;
use futures::stream::{FusedStream, FuturesUnordered, StreamFuture};
use prometheus::Histogram;
use risingwave_common::array::StreamChunkBuilder;
use risingwave_common::config::MetricLevel;
use risingwave_common::metrics::LabelGuardedMetric;
use tokio::time::Instant;
Expand Down Expand Up @@ -50,6 +51,12 @@ pub struct MergeExecutor {

/// Streaming metrics.
metrics: Arc<StreamingMetrics>,

/// Chunk size for the StreamChunkBuilder
chunk_size: usize,

/// Data types for the StreamChunkBuilder
schema: Schema,
fuyufjh marked this conversation as resolved.
Show resolved Hide resolved
}

impl MergeExecutor {
Expand All @@ -60,8 +67,9 @@ impl MergeExecutor {
upstream_fragment_id: FragmentId,
inputs: Vec<BoxedInput>,
context: Arc<SharedContext>,
_receiver_id: u64,
metrics: Arc<StreamingMetrics>,
chunk_size: usize,
schema: Schema,
) -> Self {
Self {
actor_context: ctx,
Expand All @@ -70,6 +78,8 @@ impl MergeExecutor {
upstream_fragment_id,
context,
metrics,
chunk_size,
schema,
}
}

Expand All @@ -78,6 +88,7 @@ impl MergeExecutor {
actor_id: ActorId,
inputs: Vec<super::exchange::permit::Receiver>,
shared_context: Arc<SharedContext>,
schema: Schema,
) -> Self {
use super::exchange::input::LocalInput;
use crate::executor::exchange::input::Input;
Expand All @@ -100,8 +111,9 @@ impl MergeExecutor {
})
.collect(),
shared_context,
810,
StreamingMetrics::unused().into(),
1024,
schema,
)
}

Expand All @@ -126,6 +138,7 @@ impl MergeExecutor {
self.upstreams,
merge_barrier_align_duration.clone(),
);
let select_all = BufferChunks::new(select_all, self.chunk_size, self.schema);
let actor_id = self.actor_context.id;

let mut metrics = self.metrics.new_actor_input_metrics(
Expand Down Expand Up @@ -464,6 +477,92 @@ impl SelectReceivers {
}
}

/// A wrapper that buffers the `StreamChunk`s from upstream until no more ready items are available.
/// Besides, any message other than `StreamChunk` will trigger the buffered `StreamChunk`s
/// to be emitted immediately, as well as the message itself.
struct BufferChunks<S: Stream> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe we can call this BufferedChunkReader since it's very like tokio::io::BufReader and java.io.BufferedReader.

inner: S,
chunk_builder: StreamChunkBuilder,

/// The items to be emitted. Whenever there's something here, we should return a `Poll::Ready` immediately.
pending_items: VecDeque<S::Item>,
}

impl<S: Stream> BufferChunks<S> {
pub(super) fn new(inner: S, chunk_size: usize, schema: Schema) -> Self {
assert!(chunk_size > 0);
let chunk_builder = StreamChunkBuilder::new(chunk_size, schema.data_types());
Self {
inner,
chunk_builder,
pending_items: VecDeque::new(),
}
}
}

impl<S: Stream> std::ops::Deref for BufferChunks<S> {
type Target = S;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<S: Stream> std::ops::DerefMut for BufferChunks<S> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

impl<S: Stream> Stream for BufferChunks<S>
fuyufjh marked this conversation as resolved.
Show resolved Hide resolved
where
S: Stream<Item = std::result::Result<Message, StreamExecutorError>> + Unpin,
{
type Item = std::result::Result<Message, StreamExecutorError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(item) = self.pending_items.pop_front() {
return Poll::Ready(Some(item));
}

match self.inner.poll_next_unpin(cx) {
Poll::Pending => {
return if self.chunk_builder.size() > 0 {
let chunk_out = self.chunk_builder.take().unwrap();
Poll::Ready(Some(Ok(Message::Chunk(chunk_out))))
} else {
Poll::Pending
}
}

Poll::Ready(Some(result)) => {
if let Ok(Message::Chunk(chunk)) = result {
for row in chunk.records() {
if let Some(chunk_out) = self.chunk_builder.append_record(row) {
self.pending_items.push_back(Ok(Message::Chunk(chunk_out)));
}
}
} else {
return if self.chunk_builder.size() > 0 {
let chunk_out = self.chunk_builder.take().unwrap();
fuyufjh marked this conversation as resolved.
Show resolved Hide resolved
self.pending_items.push_back(result);
Poll::Ready(Some(Ok(Message::Chunk(chunk_out))))
} else {
Poll::Ready(Some(result))
};
}
}

Poll::Ready(None) => {
// See also the comments in `SelectReceivers::poll_next`.
unreachable!("SelectReceivers should never return None");
}
}
}
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicBool, Ordering};
Expand All @@ -486,7 +585,7 @@ mod tests {
use tonic::{Request, Response, Status, Streaming};

use super::*;
use crate::executor::exchange::input::{Input, RemoteInput};
use crate::executor::exchange::input::{Input, LocalInput, RemoteInput};
use crate::executor::exchange::permit::channel_for_test;
use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
use crate::task::barrier_test_utils::LocalBarrierTestEnv;
Expand All @@ -498,6 +597,88 @@ mod tests {
StreamChunk::new(ops, vec![])
}

#[tokio::test]
async fn test_buffer_chunks() {
let test_env = LocalBarrierTestEnv::for_test().await;

let (tx, rx) = channel_for_test();
let input = LocalInput::new(
rx,
1,
2,
test_env.shared_context.local_barrier_manager.clone(),
)
.boxed_input();
let mut buffer = BufferChunks::new(input, 100, Schema::new(vec![]));

// Send a chunk
tx.send(Message::Chunk(build_test_chunk(10))).await.unwrap();
assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Chunk(chunk) => {
assert_eq!(chunk.ops().len() as u64, 10);
});

// Send 2 chunks and expect them to be merged.
tx.send(Message::Chunk(build_test_chunk(10))).await.unwrap();
tx.send(Message::Chunk(build_test_chunk(10))).await.unwrap();
assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Chunk(chunk) => {
assert_eq!(chunk.ops().len() as u64, 20);
});

// Send a watermark.
tx.send(Message::Watermark(Watermark {
col_idx: 0,
data_type: DataType::Int64,
val: ScalarImpl::Int64(233),
}))
.await
.unwrap();
assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Watermark(watermark) => {
assert_eq!(watermark.val, ScalarImpl::Int64(233));
});

// Send 2 chunks before a watermark. Expect the 2 chunks to be merged and the watermark to be emitted.
tx.send(Message::Chunk(build_test_chunk(10))).await.unwrap();
tx.send(Message::Chunk(build_test_chunk(10))).await.unwrap();
tx.send(Message::Watermark(Watermark {
col_idx: 0,
data_type: DataType::Int64,
val: ScalarImpl::Int64(233),
}))
.await
.unwrap();
assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Chunk(chunk) => {
assert_eq!(chunk.ops().len() as u64, 20);
});
assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Watermark(watermark) => {
assert_eq!(watermark.val, ScalarImpl::Int64(233));
});

// Send a barrier.
let barrier = Barrier::new_test_barrier(test_epoch(1));
test_env.inject_barrier(&barrier, [], [2]);
tx.send(Message::Barrier(barrier.clone().into_dispatcher()))
.await
.unwrap();
assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Barrier(Barrier { epoch: barrier_epoch, mutation: _, .. }) => {
assert_eq!(barrier_epoch.curr, test_epoch(1));
});

// Send 2 chunks before a barrier. Expect the 2 chunks to be merged and the barrier to be emitted.
tx.send(Message::Chunk(build_test_chunk(10))).await.unwrap();
tx.send(Message::Chunk(build_test_chunk(10))).await.unwrap();
let barrier = Barrier::new_test_barrier(test_epoch(2));
test_env.inject_barrier(&barrier, [], [2]);
tx.send(Message::Barrier(barrier.clone().into_dispatcher()))
.await
.unwrap();
assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Chunk(chunk) => {
assert_eq!(chunk.ops().len() as u64, 20);
});
assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Barrier(Barrier { epoch: barrier_epoch, mutation: _, .. }) => {
assert_eq!(barrier_epoch.curr, test_epoch(2));
});
}

#[tokio::test]
async fn test_merger() {
const CHANNEL_NUMBER: usize = 10;
Expand All @@ -509,7 +690,12 @@ mod tests {
rxs.push(rx);
}
let barrier_test_env = LocalBarrierTestEnv::for_test().await;
let merger = MergeExecutor::for_test(233, rxs, barrier_test_env.shared_context.clone());
let merger = MergeExecutor::for_test(
233,
rxs,
barrier_test_env.shared_context.clone(),
Schema::new(vec![]),
);
let actor_id = merger.actor_context.id;
let mut handles = Vec::with_capacity(CHANNEL_NUMBER);

Expand Down Expand Up @@ -640,8 +826,9 @@ mod tests {
upstream_fragment_id,
inputs,
ctx.clone(),
233,
metrics.clone(),
1024,
Schema::new(vec![]),
)
.boxed()
.execute();
Expand Down
16 changes: 8 additions & 8 deletions src/stream/src/executor/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct Inner {

/// the selectivity threshold which should be in `[0,1]`. for the chunk with selectivity less
/// than the threshold, the Project executor will construct a new chunk before expr evaluation,
materialize_selectivity_threshold: f64,
_materialize_selectivity_threshold: f64,

/// Whether there are likely no-op updates in the output chunks, so that eliminating them with
/// `StreamChunk::eliminate_adjacent_noop_update` could be beneficial.
Expand All @@ -58,7 +58,7 @@ impl ProjectExecutor {
exprs: Vec<NonStrictExpression>,
watermark_derivations: MultiMap<usize, usize>,
nondecreasing_expr_indices: Vec<usize>,
materialize_selectivity_threshold: f64,
_materialize_selectivity_threshold: f64,
noop_update_hint: bool,
) -> Self {
let n_nondecreasing_exprs = nondecreasing_expr_indices.len();
Expand All @@ -70,7 +70,7 @@ impl ProjectExecutor {
watermark_derivations,
nondecreasing_expr_indices,
last_nondec_expr_values: vec![None; n_nondecreasing_exprs],
materialize_selectivity_threshold,
_materialize_selectivity_threshold,
noop_update_hint,
},
}
Expand All @@ -96,11 +96,11 @@ impl Inner {
&self,
chunk: StreamChunk,
) -> StreamExecutorResult<Option<StreamChunk>> {
let chunk = if chunk.selectivity() <= self.materialize_selectivity_threshold {
chunk.compact()
} else {
chunk
};
// let chunk = if chunk.selectivity() <= self.materialize_selectivity_threshold {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat a workaround before. As we now compact all data after Exchange, the problem should be mostly resolved.

// chunk.compact()
// } else {
// chunk
// };
let (data_chunk, ops) = chunk.into_parts();
let mut projected_columns = Vec::new();

Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/from_proto/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ impl ExecutorBuilder for MergeExecutorBuilder {
upstream_fragment_id,
inputs,
params.shared_context.clone(),
params.operator_id,
params.executor_stats.clone(),
params.env.config().developer.chunk_size,
params.info.schema.clone(),
)
.boxed()
};
Expand Down
Loading