Skip to content

Commit

Permalink
refactor: move rate limit inside executor (#15948)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
Co-authored-by: Noel Kwan <[email protected]>
  • Loading branch information
tabVersion and kwannoel authored Apr 16, 2024
1 parent 211b9b9 commit c1ef666
Show file tree
Hide file tree
Showing 18 changed files with 274 additions and 220 deletions.
17 changes: 13 additions & 4 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -951,20 +951,29 @@ impl CatalogController {

fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
let mut found = false;
if *fragment_type_mask & PbFragmentTypeFlag::StreamScan as i32 != 0 {
visit_stream_node(stream_node, |node| {
if let PbNodeBody::StreamScan(node) = node {
if (*fragment_type_mask & PbFragmentTypeFlag::StreamScan as i32 != 0)
|| (*fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0)
{
visit_stream_node(stream_node, |node| match node {
PbNodeBody::StreamScan(node) => {
node.rate_limit = rate_limit;
found = true;
}
PbNodeBody::Source(node) => {
if let Some(inner) = node.source_inner.as_mut() {
inner.rate_limit = rate_limit;
found = true;
}
}
_ => {}
});
}
found
});

if fragments.is_empty() {
return Err(MetaError::invalid_parameter(format!(
"stream scan node not found in job id {job_id}"
"stream scan node or source node not found in job id {job_id}"
)));
}
let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
Expand Down
15 changes: 12 additions & 3 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -952,15 +952,24 @@ impl FragmentManager {
let mut fragment_to_apply = HashMap::new();

for fragment in fragment.fragments.values_mut() {
if (fragment.get_fragment_type_mask() & FragmentTypeFlag::StreamScan as u32) != 0 {
if (fragment.get_fragment_type_mask() & FragmentTypeFlag::StreamScan as u32) != 0
|| (fragment.get_fragment_type_mask() & FragmentTypeFlag::Source as u32) != 0
{
let mut actor_to_apply = Vec::new();
for actor in &mut fragment.actors {
if let Some(node) = actor.nodes.as_mut() {
visit_stream_node(node, |node_body| {
if let NodeBody::StreamScan(ref mut node) = node_body {
visit_stream_node(node, |node_body| match node_body {
NodeBody::StreamScan(ref mut node) => {
node.rate_limit = rate_limit;
actor_to_apply.push(actor.actor_id);
}
NodeBody::Source(ref mut node) => {
if let Some(ref mut node_inner) = node.source_inner {
node_inner.rate_limit = rate_limit;
actor_to_apply.push(actor.actor_id);
}
}
_ => {}
})
};
}
Expand Down
42 changes: 27 additions & 15 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ use crate::common::table::state_table::{ReplicatedStateTable, StateTable};
#[cfg(debug_assertions)]
use crate::executor::backfill::utils::METADATA_STATE_LEN;
use crate::executor::backfill::utils::{
compute_bounds, create_builder, get_progress_per_vnode, mapping_chunk, mapping_message,
mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, update_pos_by_vnode,
BackfillProgressPerVnode, BackfillState,
compute_bounds, create_builder, create_limiter, get_progress_per_vnode, mapping_chunk,
mapping_message, mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode,
update_pos_by_vnode, BackfillProgressPerVnode, BackfillRateLimiter, BackfillState,
};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
Expand Down Expand Up @@ -215,6 +215,9 @@ where
let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];
let mut pending_barrier: Option<Barrier> = None;

let rate_limiter = self.rate_limit.and_then(create_limiter);
let rate_limit = self.rate_limit;

let backfill_snapshot_read_row_count_metric = self
.metrics
.backfill_snapshot_read_row_count
Expand Down Expand Up @@ -243,10 +246,14 @@ where
{
let left_upstream = upstream.by_ref().map(Either::Left);

// Check if stream paused
let paused = paused || matches!(rate_limit, Some(0));
// Create the snapshot stream
let right_snapshot = pin!(Self::make_snapshot_stream(
&upstream_table,
backfill_state.clone(), // FIXME: Use mutable reference instead.
paused,
&rate_limiter,
)
.map(Either::Right));

Expand Down Expand Up @@ -290,15 +297,15 @@ where
// Consume remaining rows in the builder.
for (vnode, builder) in &mut builders {
if let Some(data_chunk) = builder.consume_all() {
yield Self::handle_snapshot_chunk(
yield Message::Chunk(Self::handle_snapshot_chunk(
data_chunk,
*vnode,
&pk_in_output_indices,
&mut backfill_state,
&mut cur_barrier_snapshot_processed_rows,
&mut total_snapshot_processed_rows,
&self.output_indices,
)?;
)?);
}
}

Expand Down Expand Up @@ -326,15 +333,15 @@ where
Some((vnode, row)) => {
let builder = builders.get_mut(&vnode).unwrap();
if let Some(chunk) = builder.append_one_row(row) {
yield Self::handle_snapshot_chunk(
yield Message::Chunk(Self::handle_snapshot_chunk(
chunk,
vnode,
&pk_in_output_indices,
&mut backfill_state,
&mut cur_barrier_snapshot_processed_rows,
&mut total_snapshot_processed_rows,
&self.output_indices,
)?;
)?);
}
}
}
Expand Down Expand Up @@ -366,15 +373,15 @@ where
Some((vnode, row)) => {
let builder = builders.get_mut(&vnode).unwrap();
if let Some(chunk) = builder.append_one_row(row) {
yield Self::handle_snapshot_chunk(
yield Message::Chunk(Self::handle_snapshot_chunk(
chunk,
vnode,
&pk_in_output_indices,
&mut backfill_state,
&mut cur_barrier_snapshot_processed_rows,
&mut total_snapshot_processed_rows,
&self.output_indices,
)?;
)?);
}

break;
Expand Down Expand Up @@ -585,20 +592,26 @@ where
}

#[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
async fn make_snapshot_stream(
upstream_table: &ReplicatedStateTable<S, SD>,
async fn make_snapshot_stream<'a>(
upstream_table: &'a ReplicatedStateTable<S, SD>,
backfill_state: BackfillState,
paused: bool,
rate_limiter: &'a Option<BackfillRateLimiter>,
) {
if paused {
#[for_await]
for _ in tokio_stream::pending() {
yield None;
}
} else {
// Checked the rate limit is not zero.
#[for_await]
for r in Self::snapshot_read_per_vnode(upstream_table, backfill_state) {
yield r?;
let r = r?;
if let Some(rate_limit) = rate_limiter {
rate_limit.until_ready().await;
}
yield r;
}
}
}
Expand All @@ -611,7 +624,7 @@ where
cur_barrier_snapshot_processed_rows: &mut u64,
total_snapshot_processed_rows: &mut u64,
output_indices: &[usize],
) -> StreamExecutorResult<Message> {
) -> StreamExecutorResult<StreamChunk> {
let chunk = StreamChunk::from_parts(vec![Op::Insert; chunk.capacity()], chunk);
// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
Expand All @@ -628,8 +641,7 @@ where
let chunk_cardinality = chunk.cardinality() as u64;
*cur_barrier_snapshot_processed_rows += chunk_cardinality;
*total_snapshot_processed_rows += chunk_cardinality;
let chunk = Message::Chunk(mapping_chunk(chunk, output_indices));
Ok(chunk)
Ok(mapping_chunk(chunk, output_indices))
}

/// Read snapshot per vnode.
Expand Down
11 changes: 11 additions & 0 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,17 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
paused = false;
valve.resume();
}
Mutation::Throttle(some) => {
if let Some(rate_limit) =
some.get(&self.actor_ctx.id)
{
self.chunk_size = rate_limit
.map(|x| x as usize)
.unwrap_or(self.chunk_size);
// rebuild the new reader stream with new chunk size
continue 'backfill_loop;
}
}
_ => (),
}
}
Expand Down
24 changes: 23 additions & 1 deletion src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
// limitations under the License.

use std::future::Future;
use std::num::NonZeroU32;

use futures::{pin_mut, Stream};
use futures_async_stream::try_stream;
use governor::clock::MonotonicClock;
use governor::{Quota, RateLimiter};
use itertools::Itertools;
use risingwave_common::array::StreamChunk;
use risingwave_common::row::OwnedRow;
Expand Down Expand Up @@ -104,9 +107,28 @@ impl UpstreamTableRead for UpstreamTableReader<ExternalStorageTable> {

let mut builder = DataChunkBuilder::new(self.inner.schema().data_types(), args.chunk_size);
let chunk_stream = iter_chunks(row_stream, &mut builder);

if args.chunk_size == 0 {
// If limit is 0, we should not read any data from the upstream table.
// Keep waiting util the stream is rebuilt.
let future = futures::future::pending::<()>();
future.await;
}
let limiter = {
let quota = Quota::per_second(NonZeroU32::new(args.chunk_size as u32).unwrap());
let clock = MonotonicClock;
RateLimiter::direct_with_clock(quota, &clock)
};
#[for_await]
for chunk in chunk_stream {
yield Some(chunk?);
let chunk = chunk?;
if chunk.cardinality() != 0 {
limiter
.until_n_ready(NonZeroU32::new(chunk.cardinality() as u32).unwrap())
.await
.unwrap();
}
yield Some(chunk);
}
yield None;
}
Expand Down
Loading

0 comments on commit c1ef666

Please sign in to comment.