diff --git a/proto/task_service.proto b/proto/task_service.proto index d39a43ee5ce2a..ec72be346750f 100644 --- a/proto/task_service.proto +++ b/proto/task_service.proto @@ -82,7 +82,7 @@ message GetDataRequest { // The structure for permit-based back-pressure. message Permits { oneof value { - // The permits required for a chunk, i.e. the total buffer size of the chunk. + // The permits required for a chunk, i.e. the cardinality of the chunk. uint32 record = 1; // The permits required for a barrier, typically 1. uint32 barrier = 2; diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 2dabbcbb5ecf0..8faf1fdcf5e5a 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -766,13 +766,15 @@ pub struct StreamingDeveloperConfig { #[serde(default = "default::developer::stream_chunk_size")] pub chunk_size: usize, - /// The maximum size of bytes can be buffered in the exchange channel. - #[serde(default = "default::developer::stream_exchange_max_bytes")] - pub exchange_max_bytes: usize, + /// The initial permits that a channel holds, i.e., the maximum row count can be buffered in + /// the channel. + #[serde(default = "default::developer::stream_exchange_initial_permits")] + pub exchange_initial_permits: usize, - /// The threshold of bytes that triggers a backward `AddPermits` message in the remote exchange. - #[serde(default = "default::developer::stream_exchange_ack_bytes")] - pub exchange_ack_bytes: usize, + /// The permits that are batched to add back, for reducing the backward `AddPermits` messages + /// in remote exchange. + #[serde(default = "default::developer::stream_exchange_batched_permits")] + pub exchange_batched_permits: usize, /// The maximum number of concurrent barriers in an exchange channel. #[serde(default = "default::developer::stream_exchange_concurrent_barriers")] @@ -1315,12 +1317,12 @@ pub mod default { 256 } - pub fn stream_exchange_max_bytes() -> usize { - 1 << 20 // 1MB + pub fn stream_exchange_initial_permits() -> usize { + 2048 } - pub fn stream_exchange_ack_bytes() -> usize { - 32 << 10 // 32KB + pub fn stream_exchange_batched_permits() -> usize { + 256 } pub fn stream_exchange_concurrent_barriers() -> usize { diff --git a/src/config/example.toml b/src/config/example.toml index 64be834425b54..fddd8f34bb79a 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -82,8 +82,8 @@ stream_enable_executor_row_count = false stream_connector_message_buffer_size = 16 stream_unsafe_extreme_cache_size = 10 stream_chunk_size = 256 -stream_exchange_max_bytes = 1048576 -stream_exchange_ack_bytes = 32768 +stream_exchange_initial_permits = 2048 +stream_exchange_batched_permits = 256 stream_exchange_concurrent_barriers = 1 stream_exchange_concurrent_dispatchers = 0 stream_dml_channel_initial_permits = 32768 diff --git a/src/config/full-iceberg-bench.toml b/src/config/full-iceberg-bench.toml index 45cc252209721..8c7cae6cd090a 100644 --- a/src/config/full-iceberg-bench.toml +++ b/src/config/full-iceberg-bench.toml @@ -71,8 +71,8 @@ stream_enable_executor_row_count = false stream_connector_message_buffer_size = 16 stream_unsafe_extreme_cache_size = 10 stream_chunk_size = 256 -stream_exchange_max_bytes = 1048576 -stream_exchange_ack_bytes = 32768 +stream_exchange_initial_permits = 2048 +stream_exchange_batched_permits = 256 stream_exchange_concurrent_barriers = 1 stream_dml_channel_initial_permits = 32768 stream_hash_agg_max_dirty_groups_heap_size = 67108864 diff --git a/src/stream/src/executor/exchange/input.rs b/src/stream/src/executor/exchange/input.rs index 419081c871940..6257d2302afd7 100644 --- a/src/stream/src/executor/exchange/input.rs +++ b/src/stream/src/executor/exchange/input.rs @@ -247,7 +247,7 @@ pub(crate) fn new_input( (upstream_actor_id, actor_id), (upstream_fragment_id, fragment_id), metrics, - context.config.developer.exchange_ack_bytes, + context.config.developer.exchange_batched_permits, ) .boxed_input() }; diff --git a/src/stream/src/executor/exchange/permit.rs b/src/stream/src/executor/exchange/permit.rs index 75a73b34fb02f..9c333ddf4b361 100644 --- a/src/stream/src/executor/exchange/permit.rs +++ b/src/stream/src/executor/exchange/permit.rs @@ -16,7 +16,6 @@ use std::sync::Arc; -use risingwave_common::estimate_size::EstimateSize; use risingwave_pb::task_service::permits; use tokio::sync::{mpsc, AcquireError, Semaphore, SemaphorePermit}; @@ -34,18 +33,18 @@ pub struct MessageWithPermits { /// Create a channel for the exchange service. pub fn channel( - max_bytes: usize, - ack_bytes: usize, + initial_permits: usize, + batched_permits: usize, concurrent_barriers: usize, ) -> (Sender, Receiver) { // Use an unbounded channel since we manage the permits manually. let (tx, rx) = mpsc::unbounded_channel(); - let records = Semaphore::new(max_bytes); + let records = Semaphore::new(initial_permits); let barriers = Semaphore::new(concurrent_barriers); let permits = Arc::new(Permits { records, barriers }); - let max_chunk_permits: usize = max_bytes - ack_bytes; + let max_chunk_permits: usize = initial_permits - batched_permits; ( Sender { @@ -59,15 +58,15 @@ pub fn channel( /// The configuration for tests. pub mod for_test { - pub const MAX_BYTES: usize = (u32::MAX / 2) as _; - pub const ACK_BYTES: usize = 1; + pub const INITIAL_PERMITS: usize = (u32::MAX / 2) as _; + pub const BATCHED_PERMITS: usize = 1; pub const CONCURRENT_BARRIERS: usize = (u32::MAX / 2) as _; } pub fn channel_for_test() -> (Sender, Receiver) { use for_test::*; - channel(MAX_BYTES, ACK_BYTES, CONCURRENT_BARRIERS) + channel(INITIAL_PERMITS, BATCHED_PERMITS, CONCURRENT_BARRIERS) } /// Semaphore-based permits to control the back-pressure. @@ -113,8 +112,9 @@ pub struct Sender { tx: mpsc::UnboundedSender, permits: Arc, - /// The maximum permits required by a chunk. If the chunk size is too large, we only - /// acquire these permits. `ack_bytes` is subtracted to avoid deadlock with batching. + /// The maximum permits required by a chunk. If there're too many rows in a chunk, we only + /// acquire these permits. `BATCHED_PERMITS` is subtracted to avoid deadlock with + /// batching. max_chunk_permits: usize, } @@ -126,12 +126,11 @@ impl Sender { // The semaphores should never be closed. let permits = match &message { Message::Chunk(c) => { - let size = c.estimated_size(); - let permits = size.min(self.max_chunk_permits); - if permits == self.max_chunk_permits { - tracing::warn!(size, "large chunk in exchange") + let card = c.cardinality().clamp(1, self.max_chunk_permits); + if card == self.max_chunk_permits { + tracing::warn!(cardinality = c.cardinality(), "large chunk in exchange") } - Some(permits::Value::Record(permits as _)) + Some(permits::Value::Record(card as _)) } Message::Barrier(_) => Some(permits::Value::Barrier(1)), Message::Watermark(_) => None, diff --git a/src/stream/src/task/mod.rs b/src/stream/src/task/mod.rs index 9732b3a747e29..109d81e0b771d 100644 --- a/src/stream/src/task/mod.rs +++ b/src/stream/src/task/mod.rs @@ -116,8 +116,8 @@ impl SharedContext { ))), config: StreamingConfig { developer: StreamingDeveloperConfig { - exchange_max_bytes: permit::for_test::MAX_BYTES, - exchange_ack_bytes: permit::for_test::ACK_BYTES, + exchange_initial_permits: permit::for_test::INITIAL_PERMITS, + exchange_batched_permits: permit::for_test::BATCHED_PERMITS, exchange_concurrent_barriers: permit::for_test::CONCURRENT_BARRIERS, ..Default::default() }, @@ -139,8 +139,8 @@ impl SharedContext { MutexGuard::map(self.channel_map.lock(), |map| { map.entry(ids).or_insert_with(|| { let (tx, rx) = permit::channel( - self.config.developer.exchange_max_bytes, - self.config.developer.exchange_ack_bytes, + self.config.developer.exchange_initial_permits, + self.config.developer.exchange_batched_permits, self.config.developer.exchange_concurrent_barriers, ); (Some(tx), Some(rx))