From 1aaf33361a0142b1f83afb80df2d1ff4fa0d8100 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Tue, 5 Dec 2023 13:13:28 +0800 Subject: [PATCH] feat(streaming): memory-size-based back-pressure in exchange (#13775) Signed-off-by: Runji Wang --- proto/task_service.proto | 2 +- src/common/src/config.rs | 22 ++++++++-------- src/config/example.toml | 4 +-- src/config/full-iceberg-bench.toml | 4 +-- src/stream/src/executor/exchange/input.rs | 2 +- src/stream/src/executor/exchange/permit.rs | 29 +++++++++++----------- src/stream/src/task/mod.rs | 8 +++--- 7 files changed, 35 insertions(+), 36 deletions(-) diff --git a/proto/task_service.proto b/proto/task_service.proto index 121d189c923df..aec4a1704ff7f 100644 --- a/proto/task_service.proto +++ b/proto/task_service.proto @@ -82,7 +82,7 @@ message GetDataRequest { // The structure for permit-based back-pressure. message Permits { oneof value { - // The permits required for a chunk, i.e. the cardinality of the chunk. + // The permits required for a chunk, i.e. the total buffer size of the chunk. uint32 record = 1; // The permits required for a barrier, typically 1. uint32 barrier = 2; diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 7f7cb500d1f47..145f5e7f6d803 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -824,15 +824,13 @@ pub struct StreamingDeveloperConfig { #[serde(default = "default::developer::stream_chunk_size")] pub chunk_size: usize, - /// The initial permits that a channel holds, i.e., the maximum row count can be buffered in - /// the channel. - #[serde(default = "default::developer::stream_exchange_initial_permits")] - pub exchange_initial_permits: usize, + /// The maximum size of bytes can be buffered in the exchange channel. + #[serde(default = "default::developer::stream_exchange_max_bytes")] + pub exchange_max_bytes: usize, - /// The permits that are batched to add back, for reducing the backward `AddPermits` messages - /// in remote exchange. - #[serde(default = "default::developer::stream_exchange_batched_permits")] - pub exchange_batched_permits: usize, + /// The threshold of bytes that triggers a backward `AddPermits` message in the remote exchange. + #[serde(default = "default::developer::stream_exchange_ack_bytes")] + pub exchange_ack_bytes: usize, /// The maximum number of concurrent barriers in an exchange channel. #[serde(default = "default::developer::stream_exchange_concurrent_barriers")] @@ -1405,12 +1403,12 @@ pub mod default { 256 } - pub fn stream_exchange_initial_permits() -> usize { - 2048 + pub fn stream_exchange_max_bytes() -> usize { + 1 << 20 // 1MB } - pub fn stream_exchange_batched_permits() -> usize { - 256 + pub fn stream_exchange_ack_bytes() -> usize { + 32 << 10 // 32KB } pub fn stream_exchange_concurrent_barriers() -> usize { diff --git a/src/config/example.toml b/src/config/example.toml index 34937858db526..914188a53bd16 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -93,8 +93,8 @@ stream_enable_executor_row_count = false stream_connector_message_buffer_size = 16 stream_unsafe_extreme_cache_size = 10 stream_chunk_size = 256 -stream_exchange_initial_permits = 2048 -stream_exchange_batched_permits = 256 +stream_exchange_max_bytes = 1048576 +stream_exchange_ack_bytes = 32768 stream_exchange_concurrent_barriers = 1 stream_exchange_concurrent_dispatchers = 0 stream_dml_channel_initial_permits = 32768 diff --git a/src/config/full-iceberg-bench.toml b/src/config/full-iceberg-bench.toml index 8c7cae6cd090a..45cc252209721 100644 --- a/src/config/full-iceberg-bench.toml +++ b/src/config/full-iceberg-bench.toml @@ -71,8 +71,8 @@ stream_enable_executor_row_count = false stream_connector_message_buffer_size = 16 stream_unsafe_extreme_cache_size = 10 stream_chunk_size = 256 -stream_exchange_initial_permits = 2048 -stream_exchange_batched_permits = 256 +stream_exchange_max_bytes = 1048576 +stream_exchange_ack_bytes = 32768 stream_exchange_concurrent_barriers = 1 stream_dml_channel_initial_permits = 32768 stream_hash_agg_max_dirty_groups_heap_size = 67108864 diff --git a/src/stream/src/executor/exchange/input.rs b/src/stream/src/executor/exchange/input.rs index 5eb583a1caddd..e7ce81a4c44aa 100644 --- a/src/stream/src/executor/exchange/input.rs +++ b/src/stream/src/executor/exchange/input.rs @@ -247,7 +247,7 @@ pub(crate) fn new_input( (upstream_actor_id, actor_id), (upstream_fragment_id, fragment_id), metrics, - context.config.developer.exchange_batched_permits, + context.config.developer.exchange_ack_bytes, ) .boxed_input() }; diff --git a/src/stream/src/executor/exchange/permit.rs b/src/stream/src/executor/exchange/permit.rs index 159494355cff2..39640269baece 100644 --- a/src/stream/src/executor/exchange/permit.rs +++ b/src/stream/src/executor/exchange/permit.rs @@ -16,6 +16,7 @@ use std::sync::Arc; +use risingwave_common::estimate_size::EstimateSize; use risingwave_pb::task_service::permits; use tokio::sync::{mpsc, AcquireError, Semaphore, SemaphorePermit}; @@ -33,18 +34,18 @@ pub struct MessageWithPermits { /// Create a channel for the exchange service. pub fn channel( - initial_permits: usize, - batched_permits: usize, + max_bytes: usize, + ack_bytes: usize, concurrent_barriers: usize, ) -> (Sender, Receiver) { // Use an unbounded channel since we manage the permits manually. let (tx, rx) = mpsc::unbounded_channel(); - let records = Semaphore::new(initial_permits); + let records = Semaphore::new(max_bytes); let barriers = Semaphore::new(concurrent_barriers); let permits = Arc::new(Permits { records, barriers }); - let max_chunk_permits: usize = initial_permits - batched_permits; + let max_chunk_permits: usize = max_bytes - ack_bytes; ( Sender { @@ -58,15 +59,15 @@ pub fn channel( /// The configuration for tests. pub mod for_test { - pub const INITIAL_PERMITS: usize = (u32::MAX / 2) as _; - pub const BATCHED_PERMITS: usize = 1; + pub const MAX_BYTES: usize = (u32::MAX / 2) as _; + pub const ACK_BYTES: usize = 1; pub const CONCURRENT_BARRIERS: usize = (u32::MAX / 2) as _; } pub fn channel_for_test() -> (Sender, Receiver) { use for_test::*; - channel(INITIAL_PERMITS, BATCHED_PERMITS, CONCURRENT_BARRIERS) + channel(MAX_BYTES, ACK_BYTES, CONCURRENT_BARRIERS) } /// Semaphore-based permits to control the back-pressure. @@ -112,9 +113,8 @@ pub struct Sender { tx: mpsc::UnboundedSender, permits: Arc, - /// The maximum permits required by a chunk. If there're too many rows in a chunk, we only - /// acquire these permits. `BATCHED_PERMITS` is subtracted to avoid deadlock with - /// batching. + /// The maximum permits required by a chunk. If the chunk size is too large, we only + /// acquire these permits. `ack_bytes` is subtracted to avoid deadlock with batching. max_chunk_permits: usize, } @@ -126,11 +126,12 @@ impl Sender { // The semaphores should never be closed. let permits = match &message { Message::Chunk(c) => { - let card = c.cardinality().clamp(1, self.max_chunk_permits); - if card == self.max_chunk_permits { - tracing::warn!(cardinality = c.cardinality(), "large chunk in exchange") + let size = c.estimated_size(); + let permits = size.min(self.max_chunk_permits); + if permits == self.max_chunk_permits { + tracing::warn!(size, "large chunk in exchange") } - Some(permits::Value::Record(card as _)) + Some(permits::Value::Record(permits as _)) } Message::Barrier(_) => Some(permits::Value::Barrier(1)), Message::Watermark(_) => None, diff --git a/src/stream/src/task/mod.rs b/src/stream/src/task/mod.rs index 61ce575e5b187..68d587846683d 100644 --- a/src/stream/src/task/mod.rs +++ b/src/stream/src/task/mod.rs @@ -108,8 +108,8 @@ impl SharedContext { compute_client_pool: ComputeClientPool::default(), config: StreamingConfig { developer: StreamingDeveloperConfig { - exchange_initial_permits: permit::for_test::INITIAL_PERMITS, - exchange_batched_permits: permit::for_test::BATCHED_PERMITS, + exchange_max_bytes: permit::for_test::MAX_BYTES, + exchange_ack_bytes: permit::for_test::ACK_BYTES, exchange_concurrent_barriers: permit::for_test::CONCURRENT_BARRIERS, ..Default::default() }, @@ -127,8 +127,8 @@ impl SharedContext { MutexGuard::map(self.channel_map.lock(), |map| { map.entry(ids).or_insert_with(|| { let (tx, rx) = permit::channel( - self.config.developer.exchange_initial_permits, - self.config.developer.exchange_batched_permits, + self.config.developer.exchange_max_bytes, + self.config.developer.exchange_ack_bytes, self.config.developer.exchange_concurrent_barriers, ); (Some(tx), Some(rx))