From c1ef666eb4c435138011ecd6426523832292c943 Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Tue, 16 Apr 2024 11:06:25 +0800 Subject: [PATCH] refactor: move rate limit inside executor (#15948) Signed-off-by: tabVersion Co-authored-by: Noel Kwan --- src/meta/src/controller/streaming_job.rs | 17 ++- src/meta/src/manager/catalog/fragment.rs | 15 ++- .../executor/backfill/arrangement_backfill.rs | 42 +++--- .../src/executor/backfill/cdc/cdc_backfill.rs | 11 ++ .../backfill/cdc/upstream_table/snapshot.rs | 24 +++- .../executor/backfill/no_shuffle_backfill.rs | 76 ++++++----- src/stream/src/executor/backfill/utils.rs | 19 +++ src/stream/src/executor/flow_control.rs | 123 ------------------ src/stream/src/executor/mod.rs | 2 - .../src/executor/source/fetch_executor.rs | 19 ++- .../src/executor/source/fs_source_executor.rs | 34 ++++- .../source/source_backfill_executor.rs | 6 +- .../src/executor/source/source_executor.rs | 66 +++++++++- src/stream/src/from_proto/source/fs_fetch.rs | 8 +- .../src/from_proto/source/trad_source.rs | 8 +- src/stream/src/from_proto/stream_cdc_scan.rs | 10 +- src/stream/src/from_proto/stream_scan.rs | 12 +- .../tests/integration_tests/backfill_tests.rs | 2 +- 18 files changed, 274 insertions(+), 220 deletions(-) delete mode 100644 src/stream/src/executor/flow_control.rs diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 68bbc5fd025d..271860ee4956 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -951,12 +951,21 @@ impl CatalogController { fragments.retain_mut(|(_, fragment_type_mask, stream_node)| { let mut found = false; - if *fragment_type_mask & PbFragmentTypeFlag::StreamScan as i32 != 0 { - visit_stream_node(stream_node, |node| { - if let PbNodeBody::StreamScan(node) = node { + if (*fragment_type_mask & PbFragmentTypeFlag::StreamScan as i32 != 0) + || (*fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0) + { + visit_stream_node(stream_node, |node| match node { + PbNodeBody::StreamScan(node) => { node.rate_limit = rate_limit; found = true; } + PbNodeBody::Source(node) => { + if let Some(inner) = node.source_inner.as_mut() { + inner.rate_limit = rate_limit; + found = true; + } + } + _ => {} }); } found @@ -964,7 +973,7 @@ impl CatalogController { if fragments.is_empty() { return Err(MetaError::invalid_parameter(format!( - "stream scan node not found in job id {job_id}" + "stream scan node or source node not found in job id {job_id}" ))); } let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec(); diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 97fce87ebfb8..feb3dc3026db 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -952,15 +952,24 @@ impl FragmentManager { let mut fragment_to_apply = HashMap::new(); for fragment in fragment.fragments.values_mut() { - if (fragment.get_fragment_type_mask() & FragmentTypeFlag::StreamScan as u32) != 0 { + if (fragment.get_fragment_type_mask() & FragmentTypeFlag::StreamScan as u32) != 0 + || (fragment.get_fragment_type_mask() & FragmentTypeFlag::Source as u32) != 0 + { let mut actor_to_apply = Vec::new(); for actor in &mut fragment.actors { if let Some(node) = actor.nodes.as_mut() { - visit_stream_node(node, |node_body| { - if let NodeBody::StreamScan(ref mut node) = node_body { + visit_stream_node(node, |node_body| match node_body { + NodeBody::StreamScan(ref mut node) => { node.rate_limit = rate_limit; actor_to_apply.push(actor.actor_id); } + NodeBody::Source(ref mut node) => { + if let Some(ref mut node_inner) = node.source_inner { + node_inner.rate_limit = rate_limit; + actor_to_apply.push(actor.actor_id); + } + } + _ => {} }) }; } diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 6a4c11717426..2fea4b313c88 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -33,9 +33,9 @@ use crate::common::table::state_table::{ReplicatedStateTable, StateTable}; #[cfg(debug_assertions)] use crate::executor::backfill::utils::METADATA_STATE_LEN; use crate::executor::backfill::utils::{ - compute_bounds, create_builder, get_progress_per_vnode, mapping_chunk, mapping_message, - mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, update_pos_by_vnode, - BackfillProgressPerVnode, BackfillState, + compute_bounds, create_builder, create_limiter, get_progress_per_vnode, mapping_chunk, + mapping_message, mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, + update_pos_by_vnode, BackfillProgressPerVnode, BackfillRateLimiter, BackfillState, }; use crate::executor::monitor::StreamingMetrics; use crate::executor::{ @@ -215,6 +215,9 @@ where let mut upstream_chunk_buffer: Vec = vec![]; let mut pending_barrier: Option = None; + let rate_limiter = self.rate_limit.and_then(create_limiter); + let rate_limit = self.rate_limit; + let backfill_snapshot_read_row_count_metric = self .metrics .backfill_snapshot_read_row_count @@ -243,10 +246,14 @@ where { let left_upstream = upstream.by_ref().map(Either::Left); + // Check if stream paused + let paused = paused || matches!(rate_limit, Some(0)); + // Create the snapshot stream let right_snapshot = pin!(Self::make_snapshot_stream( &upstream_table, backfill_state.clone(), // FIXME: Use mutable reference instead. paused, + &rate_limiter, ) .map(Either::Right)); @@ -290,7 +297,7 @@ where // Consume remaining rows in the builder. for (vnode, builder) in &mut builders { if let Some(data_chunk) = builder.consume_all() { - yield Self::handle_snapshot_chunk( + yield Message::Chunk(Self::handle_snapshot_chunk( data_chunk, *vnode, &pk_in_output_indices, @@ -298,7 +305,7 @@ where &mut cur_barrier_snapshot_processed_rows, &mut total_snapshot_processed_rows, &self.output_indices, - )?; + )?); } } @@ -326,7 +333,7 @@ where Some((vnode, row)) => { let builder = builders.get_mut(&vnode).unwrap(); if let Some(chunk) = builder.append_one_row(row) { - yield Self::handle_snapshot_chunk( + yield Message::Chunk(Self::handle_snapshot_chunk( chunk, vnode, &pk_in_output_indices, @@ -334,7 +341,7 @@ where &mut cur_barrier_snapshot_processed_rows, &mut total_snapshot_processed_rows, &self.output_indices, - )?; + )?); } } } @@ -366,7 +373,7 @@ where Some((vnode, row)) => { let builder = builders.get_mut(&vnode).unwrap(); if let Some(chunk) = builder.append_one_row(row) { - yield Self::handle_snapshot_chunk( + yield Message::Chunk(Self::handle_snapshot_chunk( chunk, vnode, &pk_in_output_indices, @@ -374,7 +381,7 @@ where &mut cur_barrier_snapshot_processed_rows, &mut total_snapshot_processed_rows, &self.output_indices, - )?; + )?); } break; @@ -585,10 +592,11 @@ where } #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)] - async fn make_snapshot_stream( - upstream_table: &ReplicatedStateTable, + async fn make_snapshot_stream<'a>( + upstream_table: &'a ReplicatedStateTable, backfill_state: BackfillState, paused: bool, + rate_limiter: &'a Option, ) { if paused { #[for_await] @@ -596,9 +604,14 @@ where yield None; } } else { + // Checked the rate limit is not zero. #[for_await] for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state) { - yield r?; + let r = r?; + if let Some(rate_limit) = rate_limiter { + rate_limit.until_ready().await; + } + yield r; } } } @@ -611,7 +624,7 @@ where cur_barrier_snapshot_processed_rows: &mut u64, total_snapshot_processed_rows: &mut u64, output_indices: &[usize], - ) -> StreamExecutorResult { + ) -> StreamExecutorResult { let chunk = StreamChunk::from_parts(vec![Op::Insert; chunk.capacity()], chunk); // Raise the current position. // As snapshot read streams are ordered by pk, so we can @@ -628,8 +641,7 @@ where let chunk_cardinality = chunk.cardinality() as u64; *cur_barrier_snapshot_processed_rows += chunk_cardinality; *total_snapshot_processed_rows += chunk_cardinality; - let chunk = Message::Chunk(mapping_chunk(chunk, output_indices)); - Ok(chunk) + Ok(mapping_chunk(chunk, output_indices)) } /// Read snapshot per vnode. diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 9ccaa74ce6f3..234eceea9fc9 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -272,6 +272,17 @@ impl CdcBackfillExecutor { paused = false; valve.resume(); } + Mutation::Throttle(some) => { + if let Some(rate_limit) = + some.get(&self.actor_ctx.id) + { + self.chunk_size = rate_limit + .map(|x| x as usize) + .unwrap_or(self.chunk_size); + // rebuild the new reader stream with new chunk size + continue 'backfill_loop; + } + } _ => (), } } diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index 6835b7cd2c77..be98a6822035 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -13,9 +13,12 @@ // limitations under the License. use std::future::Future; +use std::num::NonZeroU32; use futures::{pin_mut, Stream}; use futures_async_stream::try_stream; +use governor::clock::MonotonicClock; +use governor::{Quota, RateLimiter}; use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::row::OwnedRow; @@ -104,9 +107,28 @@ impl UpstreamTableRead for UpstreamTableReader { let mut builder = DataChunkBuilder::new(self.inner.schema().data_types(), args.chunk_size); let chunk_stream = iter_chunks(row_stream, &mut builder); + + if args.chunk_size == 0 { + // If limit is 0, we should not read any data from the upstream table. + // Keep waiting util the stream is rebuilt. + let future = futures::future::pending::<()>(); + future.await; + } + let limiter = { + let quota = Quota::per_second(NonZeroU32::new(args.chunk_size as u32).unwrap()); + let clock = MonotonicClock; + RateLimiter::direct_with_clock(quota, &clock) + }; #[for_await] for chunk in chunk_stream { - yield Some(chunk?); + let chunk = chunk?; + if chunk.cardinality() != 0 { + limiter + .until_n_ready(NonZeroU32::new(chunk.cardinality() as u32).unwrap()) + .await + .unwrap(); + } + yield Some(chunk); } yield None; } diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 46597d941427..b409e183f5a9 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -33,8 +33,9 @@ use risingwave_storage::StateStore; use crate::common::table::state_table::StateTable; use crate::executor::backfill::utils; use crate::executor::backfill::utils::{ - compute_bounds, construct_initial_finished_state, create_builder, get_new_pos, mapping_chunk, - mapping_message, mark_chunk, owned_row_iter, METADATA_STATE_LEN, + compute_bounds, construct_initial_finished_state, create_builder, create_limiter, get_new_pos, + mapping_chunk, mapping_message, mark_chunk, owned_row_iter, BackfillRateLimiter, + METADATA_STATE_LEN, }; use crate::executor::monitor::StreamingMetrics; use crate::executor::{ @@ -165,6 +166,8 @@ where tracing::trace!(is_finished, row_count, "backfill state recovered"); let data_types = self.upstream_table.schema().data_types(); + + // Chunk builder will be instantiated with min(rate_limit, self.chunk_size) as the chunk's max size. let mut builder = create_builder(rate_limit, self.chunk_size, data_types.clone()); // Use this buffer to construct state, @@ -216,6 +219,7 @@ where if !is_finished { let mut upstream_chunk_buffer: Vec = vec![]; let mut pending_barrier: Option = None; + let mut rate_limiter = rate_limit.and_then(create_limiter); let backfill_snapshot_read_row_count_metric = self .metrics @@ -244,12 +248,13 @@ where { let left_upstream = upstream.by_ref().map(Either::Left); - + let paused = paused || matches!(rate_limit, Some(0)); let right_snapshot = pin!(Self::make_snapshot_stream( &self.upstream_table, snapshot_read_epoch, current_pos.clone(), - paused + paused, + &rate_limiter, ) .map(Either::Right)); @@ -293,14 +298,14 @@ where None => { // Consume remaining rows in the builder. if let Some(data_chunk) = builder.consume_all() { - yield Self::handle_snapshot_chunk( + yield Message::Chunk(Self::handle_snapshot_chunk( data_chunk, &mut current_pos, &mut cur_barrier_snapshot_processed_rows, &mut total_snapshot_processed_rows, &pk_indices, &self.output_indices, - ); + )); } // End of the snapshot read stream. @@ -327,14 +332,14 @@ where Some(record) => { // Buffer the snapshot read row. if let Some(data_chunk) = builder.append_one_row(record) { - yield Self::handle_snapshot_chunk( + yield Message::Chunk(Self::handle_snapshot_chunk( data_chunk, &mut current_pos, &mut cur_barrier_snapshot_processed_rows, &mut total_snapshot_processed_rows, &pk_indices, &self.output_indices, - ); + )); } } } @@ -367,14 +372,14 @@ where } Some(row) => { let chunk = DataChunk::from_rows(&[row], &data_types); - yield Self::handle_snapshot_chunk( + yield Message::Chunk(Self::handle_snapshot_chunk( chunk, &mut current_pos, &mut cur_barrier_snapshot_processed_rows, &mut total_snapshot_processed_rows, &pk_indices, &self.output_indices, - ); + )); break; } } @@ -397,14 +402,14 @@ where // Consume snapshot rows left in builder let chunk = builder.consume_all(); if let Some(chunk) = chunk { - yield Self::handle_snapshot_chunk( + yield Message::Chunk(Self::handle_snapshot_chunk( chunk, &mut current_pos, &mut cur_barrier_snapshot_processed_rows, &mut total_snapshot_processed_rows, &pk_indices, &self.output_indices, - ); + )); } // Consume upstream buffer chunk @@ -467,18 +472,26 @@ where Mutation::Throttle(actor_to_apply) => { let new_rate_limit_entry = actor_to_apply.get(&self.actor_id); if let Some(new_rate_limit) = new_rate_limit_entry { - rate_limit = new_rate_limit.as_ref().map(|x| *x as _); - tracing::info!( - id = self.actor_id, - new_rate_limit = ?self.rate_limit, - "actor rate limit changed", - ); - assert!(builder.is_empty()); - builder = create_builder( - rate_limit, - self.chunk_size, - self.upstream_table.schema().data_types(), - ); + let new_rate_limit = new_rate_limit.as_ref().map(|x| *x as _); + if new_rate_limit != rate_limit { + rate_limit = new_rate_limit; + tracing::info!( + id = self.actor_id, + new_rate_limit = ?self.rate_limit, + "actor rate limit changed", + ); + // The builder is emptied above via `DataChunkBuilder::consume_all`. + assert!( + builder.is_empty(), + "builder should already be emptied" + ); + builder = create_builder( + rate_limit, + self.chunk_size, + self.upstream_table.schema().data_types(), + ); + rate_limiter = new_rate_limit.and_then(create_limiter); + } } } _ => (), @@ -633,11 +646,12 @@ where } #[try_stream(ok = Option, error = StreamExecutorError)] - async fn make_snapshot_stream( - upstream_table: &StorageTable, + async fn make_snapshot_stream<'a>( + upstream_table: &'a StorageTable, epoch: u64, current_pos: Option, paused: bool, + rate_limiter: &'a Option, ) { if paused { #[for_await] @@ -645,8 +659,12 @@ where yield None; } } else { + // Checked the rate limit is not zero. #[for_await] for r in Self::snapshot_read(upstream_table, epoch, current_pos) { + if let Some(rate_limit) = &rate_limiter { + rate_limit.until_ready().await; + } yield r?; } } @@ -721,7 +739,7 @@ where /// 2. Update the current position. /// 3. Update Metrics /// 4. Map the chunk according to output indices, return - /// the stream message to be yielded downstream. + /// the stream chunk and do wrapping outside. fn handle_snapshot_chunk( data_chunk: DataChunk, current_pos: &mut Option, @@ -729,7 +747,7 @@ where total_snapshot_processed_rows: &mut u64, pk_indices: &[usize], output_indices: &[usize], - ) -> Message { + ) -> StreamChunk { let ops = vec![Op::Insert; data_chunk.capacity()]; let chunk = StreamChunk::from_parts(ops, data_chunk); // Raise the current position. @@ -741,7 +759,7 @@ where *cur_barrier_snapshot_processed_rows += chunk_cardinality; *total_snapshot_processed_rows += chunk_cardinality; - Message::Chunk(mapping_chunk(chunk, output_indices)) + mapping_chunk(chunk, output_indices) } } diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index a0e85f33c810..1d469b311fd2 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -14,13 +14,19 @@ use std::cmp::Ordering; use std::collections::HashMap; +use std::num::NonZeroU32; use std::ops::Bound; +use std::time::Instant; use await_tree::InstrumentAwait; use bytes::Bytes; use futures::future::try_join_all; use futures::{pin_mut, Stream, StreamExt}; use futures_async_stream::try_stream; +use governor::clock::MonotonicClock; +use governor::middleware::NoOpMiddleware; +use governor::state::{InMemoryState, NotKeyed}; +use governor::{Quota, RateLimiter}; use risingwave_common::array::stream_record::Record; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bail; @@ -799,6 +805,9 @@ pub(crate) async fn persist_state( Ok(()) } +pub type BackfillRateLimiter = + RateLimiter>; + /// Creates a data chunk builder for snapshot read. /// If the `rate_limit` is smaller than `chunk_size`, it will take precedence. /// This is so we can partition snapshot read into smaller chunks than chunk size. @@ -809,9 +818,19 @@ pub fn create_builder( ) -> DataChunkBuilder { if let Some(rate_limit) = rate_limit && rate_limit < chunk_size + && rate_limit > 0 { DataChunkBuilder::new(data_types, rate_limit) } else { DataChunkBuilder::new(data_types, chunk_size) } } + +pub fn create_limiter(rate_limit: usize) -> Option { + if rate_limit == 0 { + return None; + } + let quota = Quota::per_second(NonZeroU32::new(rate_limit as u32).unwrap()); + let clock = MonotonicClock; + Some(RateLimiter::direct_with_clock(quota, &clock)) +} diff --git a/src/stream/src/executor/flow_control.rs b/src/stream/src/executor/flow_control.rs deleted file mode 100644 index 984dddab9fb3..000000000000 --- a/src/stream/src/executor/flow_control.rs +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::fmt::{Debug, Formatter}; -use std::num::NonZeroU32; - -use futures::StreamExt; -use futures_async_stream::try_stream; -use governor::clock::MonotonicClock; -use governor::{Quota, RateLimiter}; - -use super::{ - ActorContextRef, BoxedMessageStream, Execute, Executor, Message, Mutation, StreamExecutorError, -}; - -/// Flow Control Executor is used to control the rate of the input executor. -/// -/// Currently it is placed after the `BackfillExecutor`: -/// upstream `MaterializeExecutor` -> `BackfillExecutor` -> `FlowControlExecutor` -/// -/// The rate limit is set statically at the moment, and cannot be changed in a running -/// stream graph. -/// -/// It is used to throttle problematic MVs that are consuming too much resources. -pub struct FlowControlExecutor { - input: Executor, - actor_ctx: ActorContextRef, - rate_limit: Option, -} - -impl FlowControlExecutor { - pub fn new(input: Executor, actor_ctx: ActorContextRef, rate_limit: Option) -> Self { - Self { - input, - actor_ctx, - rate_limit, - } - } - - #[try_stream(ok = Message, error = StreamExecutorError)] - async fn execute_inner(mut self) { - let get_rate_limiter = |rate_limit: u32| { - let quota = Quota::per_second(NonZeroU32::new(rate_limit).unwrap()); - let clock = MonotonicClock; - RateLimiter::direct_with_clock(quota, &clock) - }; - let mut rate_limiter = self.rate_limit.map(get_rate_limiter); - if self.rate_limit.is_some() { - tracing::info!(rate_limit = ?self.rate_limit, "actor starts with rate limit"); - } - - #[for_await] - for msg in self.input.execute() { - let msg = msg?; - match msg { - Message::Chunk(chunk) => { - let chunk_cardinality = chunk.cardinality(); - let Some(n) = NonZeroU32::new(chunk_cardinality as u32) else { - // Handle case where chunk is empty - continue; - }; - if let Some(rate_limiter) = &rate_limiter { - let limit = NonZeroU32::new(self.rate_limit.unwrap()).unwrap(); - if n <= limit { - // `InsufficientCapacity` should never happen because we have done the check - rate_limiter.until_n_ready(n).await.unwrap(); - yield Message::Chunk(chunk); - } else { - // Cut the chunk into smaller chunks - for chunk in chunk.split(limit.get() as usize) { - let n = NonZeroU32::new(chunk.cardinality() as u32).unwrap(); - // Ditto. - rate_limiter.until_n_ready(n).await.unwrap(); - yield Message::Chunk(chunk); - } - } - } else { - yield Message::Chunk(chunk); - } - } - Message::Barrier(barrier) => { - if let Some(mutation) = barrier.mutation.as_ref() { - if let Mutation::Throttle(actor_to_apply) = mutation.as_ref() { - if let Some(limit) = actor_to_apply.get(&self.actor_ctx.id) { - self.rate_limit = *limit; - rate_limiter = self.rate_limit.map(get_rate_limiter); - tracing::info!(new_rate_limit = ?self.rate_limit, "actor rate limit changed"); - } - } - } - - yield Message::Barrier(barrier); - } - _ => yield msg, - } - } - } -} - -impl Debug for FlowControlExecutor { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FlowControlExecutor") - .field("rate_limit", &self.rate_limit) - .finish() - } -} - -impl Execute for FlowControlExecutor { - fn execute(self: Box) -> BoxedMessageStream { - self.execute_inner().boxed() - } -} diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index a8878763422b..ab53b02ef7ce 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -65,7 +65,6 @@ mod dynamic_filter; pub mod error; mod expand; mod filter; -mod flow_control; mod hash_agg; pub mod hash_join; mod hop_window; @@ -119,7 +118,6 @@ pub use dynamic_filter::DynamicFilterExecutor; pub use error::{StreamExecutorError, StreamExecutorResult}; pub use expand::ExpandExecutor; pub use filter::FilterExecutor; -pub use flow_control::FlowControlExecutor; pub use hash_agg::HashAggExecutor; pub use hash_join::*; pub use hop_window::HopWindowExecutor; diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 3c6d89fe59a5..1d6e409c63a2 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -39,6 +39,7 @@ use risingwave_storage::StateStore; use thiserror_ext::AsReport; use super::{get_split_offset_col_idx, SourceStateTableHandler}; +use crate::executor::source_executor::apply_rate_limit; use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::{ expect_first_barrier, get_split_offset_mapping_from_chunk, prune_additional_cols, @@ -152,11 +153,13 @@ impl FsFetchExecutor { source_desc: &SourceDesc, batch: SplitBatch, ) -> StreamExecutorResult { - source_desc + let rate_limit = source_ctx.source_ctrl_opts.rate_limit; + let stream = source_desc .source .to_stream(batch, column_ids, Arc::new(source_ctx)) .await - .map_err(StreamExecutorError::connector_error) + .map_err(StreamExecutorError::connector_error)?; + Ok(apply_rate_limit(stream, rate_limit).boxed()) } fn build_source_ctx( @@ -234,10 +237,20 @@ impl FsFetchExecutor { Either::Left(msg) => { match &msg { Message::Barrier(barrier) => { + let mut need_rebuild_reader = false; + if let Some(mutation) = barrier.mutation.as_deref() { match mutation { Mutation::Pause => stream.pause_stream(), Mutation::Resume => stream.resume_stream(), + Mutation::Throttle(actor_to_apply) => { + if let Some(throttle) = + actor_to_apply.get(&self.actor_ctx.id) + { + self.source_ctrl_opts.rate_limit = *throttle; + need_rebuild_reader = true; + } + } _ => (), } } @@ -261,7 +274,7 @@ impl FsFetchExecutor { } } - if splits_on_fetch == 0 { + if splits_on_fetch == 0 || need_rebuild_reader { Self::replace_with_new_batch_reader( &mut splits_on_fetch, &state_store_handler, diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 73b76c247864..13b876c05c2b 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -38,6 +38,7 @@ use super::executor_core::StreamSourceCore; use crate::error::StreamResult; use crate::executor::error::StreamExecutorError; use crate::executor::monitor::StreamingMetrics; +use crate::executor::source_executor::apply_rate_limit; use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::*; @@ -104,11 +105,33 @@ impl FsSourceExecutor { source_desc.source.config.clone(), self.stream_source_core.source_name.clone(), ); - source_desc + let stream = source_desc .source .to_stream(state, column_ids, Arc::new(source_ctx)) .await - .map_err(StreamExecutorError::connector_error) + .map_err(StreamExecutorError::connector_error)?; + + Ok(apply_rate_limit(stream, self.source_ctrl_opts.rate_limit).boxed()) + } + + async fn rebuild_stream_reader( + &mut self, + source_desc: &FsSourceDesc, + stream: &mut StreamReaderWithPause, + ) -> StreamExecutorResult<()> { + let target_state: Vec = self + .stream_source_core + .latest_split_info + .values() + .cloned() + .collect(); + let reader = self + .build_stream_source_reader(source_desc, Some(target_state)) + .await? + .map_err(StreamExecutorError::connector_error); + stream.replace_data_stream(reader); + + Ok(()) } async fn apply_split_change( @@ -387,6 +410,13 @@ impl FsSourceExecutor { ) .await?; } + Mutation::Throttle(actor_to_apply) => { + if let Some(throttle) = actor_to_apply.get(&self.actor_ctx.id) { + self.source_ctrl_opts.rate_limit = *throttle; + self.rebuild_stream_reader(&source_desc, &mut stream) + .await?; + } + } _ => {} } } diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 6f8fca84bafd..be752b4598a3 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -41,6 +41,7 @@ use thiserror_ext::AsReport; use super::executor_core::StreamSourceCore; use super::source_backfill_state_table::BackfillStateTableHandler; use crate::executor::monitor::StreamingMetrics; +use crate::executor::source_executor::apply_rate_limit; use crate::executor::*; #[derive(Clone, Debug, Deserialize, Serialize)] @@ -214,11 +215,12 @@ impl SourceBackfillExecutorInner { source_desc.source.config.clone(), self.stream_source_core.source_name.clone(), ); - source_desc + let stream = source_desc .source .to_stream(Some(splits), column_ids, Arc::new(source_ctx)) .await - .map_err(StreamExecutorError::connector_error) + .map_err(StreamExecutorError::connector_error)?; + Ok(apply_rate_limit(stream, self.source_ctrl_opts.rate_limit).boxed()) } #[try_stream(ok = Message, error = StreamExecutorError)] diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 65635d0e4074..d86a85ec5bde 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::fmt::Formatter; +use std::num::NonZeroU32; use std::str::FromStr; use std::time::Duration; @@ -20,9 +21,12 @@ use anyhow::anyhow; use either::Either; use futures::{StreamExt, TryStreamExt}; use futures_async_stream::try_stream; +use governor::clock::MonotonicClock; +use governor::{Quota, RateLimiter}; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; +use risingwave_connector::error::ConnectorError; use risingwave_connector::source::cdc::jni_source; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ @@ -63,6 +67,54 @@ pub struct SourceExecutor { source_ctrl_opts: SourceCtrlOpts, } +#[try_stream(ok = StreamChunk, error = ConnectorError)] +pub async fn apply_rate_limit(stream: BoxChunkSourceStream, rate_limit: Option) { + if let Some(limit) = rate_limit + && limit == 0 + { + // block the stream until the rate limit is reset + let future = futures::future::pending::<()>(); + future.await; + } + let get_rate_limiter = |rate_limit: u32| { + let quota = Quota::per_second(NonZeroU32::new(rate_limit).unwrap()); + let clock = MonotonicClock; + RateLimiter::direct_with_clock(quota, &clock) + }; + let limiter = rate_limit.map(get_rate_limiter); + if rate_limit.is_some() { + tracing::info!(rate_limit = ?rate_limit, "applied rate limit"); + } + #[for_await] + for batch in stream { + let chunk: StreamChunk = batch?; + let chunk_cardinality = chunk.cardinality(); + let Some(n) = NonZeroU32::new(chunk_cardinality as u32) else { + // pass empty chunk + yield chunk; + continue; + }; + if let Some(limiter) = &limiter { + let limit = NonZeroU32::new(rate_limit.unwrap()).unwrap(); + if n <= limit { + // `InsufficientCapacity` should never happen because we have done the check + limiter.until_n_ready(n).await.unwrap(); + yield chunk; + } else { + // Cut the chunk into smaller chunks + for chunk in chunk.split(limit.get() as usize) { + let n = NonZeroU32::new(chunk.cardinality() as u32).unwrap(); + // Ditto. + limiter.until_n_ready(n).await.unwrap(); + yield chunk; + } + } + } else { + yield chunk; + } + } +} + impl SourceExecutor { pub fn new( actor_ctx: ActorContextRef, @@ -117,11 +169,13 @@ impl SourceExecutor { .source_name .clone(), ); - source_desc + let stream = source_desc .source .to_stream(state, column_ids, Arc::new(source_ctx)) .await - .map_err(StreamExecutorError::connector_error) + .map_err(StreamExecutorError::connector_error); + + Ok(apply_rate_limit(stream?, self.source_ctrl_opts.rate_limit).boxed()) } /// `source_id | source_name | actor_id | fragment_id` @@ -500,6 +554,14 @@ impl SourceExecutor { ) .await?; } + Mutation::Throttle(actor_to_apply) => { + if let Some(throttle) = actor_to_apply.get(&self.actor_ctx.id) { + self.source_ctrl_opts.rate_limit = *throttle; + // recreate from latest_split_info + self.rebuild_stream_reader(&source_desc, &mut stream) + .await?; + } + } _ => {} } } diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index d3aa7e0be40a..e07670fb97a6 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -25,8 +25,7 @@ use risingwave_storage::StateStore; use crate::error::StreamResult; use crate::executor::{ - Execute, Executor, FlowControlExecutor, FsFetchExecutor, SourceStateTableHandler, - StreamSourceCore, + Execute, Executor, FsFetchExecutor, SourceStateTableHandler, StreamSourceCore, }; use crate::from_proto::ExecutorBuilder; use crate::task::ExecutorParams; @@ -119,11 +118,6 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { } _ => unreachable!(), }; - let mut info = params.info.clone(); - info.identity = format!("{} (flow controlled)", info.identity); - - let rate_limit = source.rate_limit.map(|x| x as _); - let exec = FlowControlExecutor::new((info, exec).into(), params.actor_context, rate_limit); Ok((params.info, exec).into()) } } diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 7f8bff9d3c6f..ca057ced81f6 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -38,7 +38,7 @@ use super::*; use crate::executor::source::{FsListExecutor, StreamSourceCore}; use crate::executor::source_executor::SourceExecutor; use crate::executor::state_table_handler::SourceStateTableHandler; -use crate::executor::{FlowControlExecutor, TroublemakerExecutor}; +use crate::executor::TroublemakerExecutor; const FS_CONNECTORS: &[&str] = &["s3"]; pub struct SourceExecutorBuilder; @@ -244,12 +244,6 @@ impl ExecutorBuilder for SourceExecutorBuilder { .boxed() } }; - let mut info = params.info.clone(); - info.identity = format!("{} (flow controlled)", info.identity); - - let rate_limit = source.rate_limit.map(|x| x as _); - let exec = - FlowControlExecutor::new((info, exec).into(), params.actor_context, rate_limit); if crate::consistency::insane() { let mut info = params.info.clone(); diff --git a/src/stream/src/from_proto/stream_cdc_scan.rs b/src/stream/src/from_proto/stream_cdc_scan.rs index 2736fdd712cb..2f035fc4f270 100644 --- a/src/stream/src/from_proto/stream_cdc_scan.rs +++ b/src/stream/src/from_proto/stream_cdc_scan.rs @@ -23,7 +23,7 @@ use risingwave_pb::stream_plan::StreamCdcScanNode; use super::*; use crate::common::table::state_table::StateTable; -use crate::executor::{CdcBackfillExecutor, Executor, ExternalStorageTable, FlowControlExecutor}; +use crate::executor::{CdcBackfillExecutor, Executor, ExternalStorageTable}; pub struct StreamCdcScanExecutorBuilder; @@ -106,14 +106,6 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { backfill_chunk_size, disable_backfill, ); - let mut info = params.info.clone(); - info.identity = format!("{} (flow controlled)", info.identity); - - let exec = FlowControlExecutor::new( - (info, exec).into(), - params.actor_context, - node.rate_limit.map(|x| x as _), - ); Ok((params.info, exec).into()) } } diff --git a/src/stream/src/from_proto/stream_scan.rs b/src/stream/src/from_proto/stream_scan.rs index 669b9a247245..56ee7bd36f6e 100644 --- a/src/stream/src/from_proto/stream_scan.rs +++ b/src/stream/src/from_proto/stream_scan.rs @@ -24,8 +24,8 @@ use risingwave_storage::table::batch_table::storage_table::StorageTable; use super::*; use crate::common::table::state_table::{ReplicatedStateTable, StateTable}; use crate::executor::{ - ArrangementBackfillExecutor, BackfillExecutor, ChainExecutor, FlowControlExecutor, - RearrangedChainExecutor, TroublemakerExecutor, + ArrangementBackfillExecutor, BackfillExecutor, ChainExecutor, RearrangedChainExecutor, + TroublemakerExecutor, }; pub struct StreamScanExecutorBuilder; @@ -145,14 +145,6 @@ impl ExecutorBuilder for StreamScanExecutorBuilder { } StreamScanType::Unspecified => unreachable!(), }; - let mut info = params.info.clone(); - info.identity = format!("{} (flow controlled)", info.identity); - - let exec = FlowControlExecutor::new( - (info, exec).into(), - params.actor_context, - node.rate_limit.map(|x| x as _), - ); if crate::consistency::insane() { let mut info = params.info.clone(); diff --git a/src/tests/simulation/tests/integration_tests/backfill_tests.rs b/src/tests/simulation/tests/integration_tests/backfill_tests.rs index bcc271bcb4dc..ba34f45e6af6 100644 --- a/src/tests/simulation/tests/integration_tests/backfill_tests.rs +++ b/src/tests/simulation/tests/integration_tests/backfill_tests.rs @@ -264,7 +264,7 @@ async fn test_arrangement_backfill_progress() -> Result<()> { let progress = progress.replace('%', ""); let progress = progress.parse::().unwrap(); assert!( - (1.0..2.0).contains(&progress), + (1.0..10.0).contains(&progress), "progress not within bounds {}", progress );