From 140e84491823a3c8ce6ee08ce0ec3eb155c7ea89 Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Fri, 8 Dec 2023 11:29:44 +0800 Subject: [PATCH] feat(stream): apply rate_limit as chunk_size to snapshot read (#13837) --- .../rate_limit/snapshot_amplification.slt | 46 +++++++++++++ .../executor/backfill/no_shuffle_backfill.rs | 64 +++++++++++++++++-- src/stream/src/from_proto/stream_scan.rs | 1 + 3 files changed, 107 insertions(+), 4 deletions(-) create mode 100644 e2e_test/streaming/rate_limit/snapshot_amplification.slt diff --git a/e2e_test/streaming/rate_limit/snapshot_amplification.slt b/e2e_test/streaming/rate_limit/snapshot_amplification.slt new file mode 100644 index 0000000000000..231bc9b0eb94f --- /dev/null +++ b/e2e_test/streaming/rate_limit/snapshot_amplification.slt @@ -0,0 +1,46 @@ +# This test will test that barrier latency does not spike +# when there's rate limit. +# We test the case where snapshot read is amplified + +statement ok +SET STREAMING_PARALLELISM=2; + +statement ok +SET STREAMING_RATE_LIMIT=1; + +statement ok +CREATE TABLE table (i1 int); + +statement ok +INSERT INTO table select 1 from generate_series(1, 100000); + +statement ok +flush; + +statement ok +CREATE SINK sink AS + SELECT x.i1 as i1 FROM table x + JOIN table s1 ON x.i1 = s1.i1 + JOIN table s2 ON x.i1 = s2.i1 + WITH (connector = 'blackhole'); + +# Let sink amplify... +skipif in-memory +sleep 1s + +# The following sequence of FLUSH should be fast, since barrier should be able to bypass sink. +# Otherwise, these FLUSH will take a long time to complete, and trigger timeout. +statement ok +flush; + +statement ok +flush; + +statement ok +flush; + +statement ok +drop sink sink; + +statement ok +drop table table; \ No newline at end of file diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 1d5d2db42404b..1977687f3caf8 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -23,7 +23,7 @@ use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::row::{OwnedRow, Row}; -use risingwave_common::types::Datum; +use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::epoch::EpochPair; use risingwave_common::{bail, row}; @@ -41,7 +41,7 @@ use crate::executor::backfill::utils::{ use crate::executor::monitor::StreamingMetrics; use crate::executor::{ expect_first_barrier, Barrier, BoxedExecutor, BoxedMessageStream, Executor, ExecutorInfo, - Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, + Message, Mutation, PkIndicesRef, StreamExecutorError, StreamExecutorResult, }; use crate::task::{ActorId, CreateMviewProgress}; @@ -101,6 +101,11 @@ pub struct BackfillExecutor { metrics: Arc, chunk_size: usize, + + /// Rate limit, just used to initialize the chunk size for + /// snapshot read side. + /// If smaller than chunk_size, it will take precedence. + rate_limit: Option, } impl BackfillExecutor @@ -117,6 +122,7 @@ where progress: CreateMviewProgress, metrics: Arc, chunk_size: usize, + rate_limit: Option, ) -> Self { let actor_id = progress.actor_id(); Self { @@ -129,6 +135,7 @@ where actor_id, metrics, chunk_size, + rate_limit, } } @@ -137,6 +144,8 @@ where // The primary key columns, in the output columns of the upstream_table scan. let pk_in_output_indices = self.upstream_table.pk_in_output_indices().unwrap(); + let mut rate_limit = self.rate_limit; + let state_len = pk_in_output_indices.len() + METADATA_STATE_LEN; let pk_order = self.upstream_table.pk_serializer().get_order_types(); @@ -161,8 +170,11 @@ where .await?; tracing::trace!(is_finished, row_count, "backfill state recovered"); - let mut builder = - DataChunkBuilder::new(self.upstream_table.schema().data_types(), self.chunk_size); + let mut builder = Self::create_builder( + rate_limit, + self.chunk_size, + self.upstream_table.schema().data_types(), + ); // Use this buffer to construct state, // which will then be persisted. @@ -433,6 +445,27 @@ where "Backfill state persisted" ); + // Update snapshot read chunk builder. + if let Some(mutation) = barrier.mutation.as_ref() { + if let Mutation::Throttle(actor_to_apply) = mutation.as_ref() { + let new_rate_limit_entry = actor_to_apply.get(&self.actor_id); + if let Some(new_rate_limit) = new_rate_limit_entry { + rate_limit = new_rate_limit.as_ref().map(|x| *x as _); + tracing::info!( + id = self.actor_id, + new_rate_limit = ?self.rate_limit, + "actor rate limit changed", + ); + assert!(builder.is_empty()); + builder = Self::create_builder( + rate_limit, + self.chunk_size, + self.upstream_table.schema().data_types(), + ); + } + } + } + yield Message::Barrier(barrier); if snapshot_read_complete { @@ -644,6 +677,29 @@ where ) .await } + + /// Creates a data chunk builder for snapshot read. + /// If the `rate_limit` is smaller than `chunk_size`, it will take precedence. + /// This is so we can partition snapshot read into smaller chunks than chunk size. + fn create_builder( + rate_limit: Option, + chunk_size: usize, + data_types: Vec, + ) -> DataChunkBuilder { + if let Some(rate_limit) = rate_limit + && rate_limit < chunk_size + { + DataChunkBuilder::new( + data_types, + rate_limit, + ) + } else { + DataChunkBuilder::new( + data_types, + chunk_size, + ) + } + } } impl Executor for BackfillExecutor diff --git a/src/stream/src/from_proto/stream_scan.rs b/src/stream/src/from_proto/stream_scan.rs index 8349652ce644f..f6f35b33f601b 100644 --- a/src/stream/src/from_proto/stream_scan.rs +++ b/src/stream/src/from_proto/stream_scan.rs @@ -145,6 +145,7 @@ impl ExecutorBuilder for StreamScanExecutorBuilder { progress, stream.streaming_metrics.clone(), params.env.config().developer.chunk_size, + node.rate_limit.map(|x| x as _), ) .boxed() }