Skip to content

Commit

Permalink
feat(streaming): memory-size-based back-pressure in exchange (risingw…
Browse files Browse the repository at this point in the history
…avelabs#13775)

Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Dec 5, 2023
1 parent 5454f6e commit 51c76aa
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 36 deletions.
2 changes: 1 addition & 1 deletion proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ message GetDataRequest {
// The structure for permit-based back-pressure.
message Permits {
oneof value {
// The permits required for a chunk, i.e. the cardinality of the chunk.
// The permits required for a chunk, i.e. the total buffer size of the chunk.
uint32 record = 1;
// The permits required for a barrier, typically 1.
uint32 barrier = 2;
Expand Down
22 changes: 10 additions & 12 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,15 +763,13 @@ pub struct StreamingDeveloperConfig {
#[serde(default = "default::developer::stream_chunk_size")]
pub chunk_size: usize,

/// The initial permits that a channel holds, i.e., the maximum row count can be buffered in
/// the channel.
#[serde(default = "default::developer::stream_exchange_initial_permits")]
pub exchange_initial_permits: usize,
/// The maximum size of bytes can be buffered in the exchange channel.
#[serde(default = "default::developer::stream_exchange_max_bytes")]
pub exchange_max_bytes: usize,

/// The permits that are batched to add back, for reducing the backward `AddPermits` messages
/// in remote exchange.
#[serde(default = "default::developer::stream_exchange_batched_permits")]
pub exchange_batched_permits: usize,
/// The threshold of bytes that triggers a backward `AddPermits` message in the remote exchange.
#[serde(default = "default::developer::stream_exchange_ack_bytes")]
pub exchange_ack_bytes: usize,

/// The maximum number of concurrent barriers in an exchange channel.
#[serde(default = "default::developer::stream_exchange_concurrent_barriers")]
Expand Down Expand Up @@ -1314,12 +1312,12 @@ pub mod default {
256
}

pub fn stream_exchange_initial_permits() -> usize {
2048
pub fn stream_exchange_max_bytes() -> usize {
1 << 20 // 1MB
}

pub fn stream_exchange_batched_permits() -> usize {
256
pub fn stream_exchange_ack_bytes() -> usize {
32 << 10 // 32KB
}

pub fn stream_exchange_concurrent_barriers() -> usize {
Expand Down
4 changes: 2 additions & 2 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ stream_enable_executor_row_count = false
stream_connector_message_buffer_size = 16
stream_unsafe_extreme_cache_size = 10
stream_chunk_size = 256
stream_exchange_initial_permits = 2048
stream_exchange_batched_permits = 256
stream_exchange_max_bytes = 1048576
stream_exchange_ack_bytes = 32768
stream_exchange_concurrent_barriers = 1
stream_exchange_concurrent_dispatchers = 0
stream_dml_channel_initial_permits = 32768
Expand Down
4 changes: 2 additions & 2 deletions src/config/full-iceberg-bench.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ stream_enable_executor_row_count = false
stream_connector_message_buffer_size = 16
stream_unsafe_extreme_cache_size = 10
stream_chunk_size = 256
stream_exchange_initial_permits = 2048
stream_exchange_batched_permits = 256
stream_exchange_max_bytes = 1048576
stream_exchange_ack_bytes = 32768
stream_exchange_concurrent_barriers = 1
stream_dml_channel_initial_permits = 32768
stream_hash_agg_max_dirty_groups_heap_size = 67108864
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/exchange/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ pub(crate) fn new_input(
(upstream_actor_id, actor_id),
(upstream_fragment_id, fragment_id),
metrics,
context.config.developer.exchange_batched_permits,
context.config.developer.exchange_ack_bytes,
)
.boxed_input()
};
Expand Down
29 changes: 15 additions & 14 deletions src/stream/src/executor/exchange/permit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use std::sync::Arc;

use risingwave_common::estimate_size::EstimateSize;
use risingwave_pb::task_service::permits;
use tokio::sync::{mpsc, AcquireError, Semaphore, SemaphorePermit};

Expand All @@ -33,18 +34,18 @@ pub struct MessageWithPermits {

/// Create a channel for the exchange service.
pub fn channel(
initial_permits: usize,
batched_permits: usize,
max_bytes: usize,
ack_bytes: usize,
concurrent_barriers: usize,
) -> (Sender, Receiver) {
// Use an unbounded channel since we manage the permits manually.
let (tx, rx) = mpsc::unbounded_channel();

let records = Semaphore::new(initial_permits);
let records = Semaphore::new(max_bytes);
let barriers = Semaphore::new(concurrent_barriers);
let permits = Arc::new(Permits { records, barriers });

let max_chunk_permits: usize = initial_permits - batched_permits;
let max_chunk_permits: usize = max_bytes - ack_bytes;

(
Sender {
Expand All @@ -58,15 +59,15 @@ pub fn channel(

/// The configuration for tests.
pub mod for_test {
pub const INITIAL_PERMITS: usize = (u32::MAX / 2) as _;
pub const BATCHED_PERMITS: usize = 1;
pub const MAX_BYTES: usize = (u32::MAX / 2) as _;
pub const ACK_BYTES: usize = 1;
pub const CONCURRENT_BARRIERS: usize = (u32::MAX / 2) as _;
}

pub fn channel_for_test() -> (Sender, Receiver) {
use for_test::*;

channel(INITIAL_PERMITS, BATCHED_PERMITS, CONCURRENT_BARRIERS)
channel(MAX_BYTES, ACK_BYTES, CONCURRENT_BARRIERS)
}

/// Semaphore-based permits to control the back-pressure.
Expand Down Expand Up @@ -112,9 +113,8 @@ pub struct Sender {
tx: mpsc::UnboundedSender<MessageWithPermits>,
permits: Arc<Permits>,

/// The maximum permits required by a chunk. If there're too many rows in a chunk, we only
/// acquire these permits. `BATCHED_PERMITS` is subtracted to avoid deadlock with
/// batching.
/// The maximum permits required by a chunk. If the chunk size is too large, we only
/// acquire these permits. `ack_bytes` is subtracted to avoid deadlock with batching.
max_chunk_permits: usize,
}

Expand All @@ -126,11 +126,12 @@ impl Sender {
// The semaphores should never be closed.
let permits = match &message {
Message::Chunk(c) => {
let card = c.cardinality().clamp(1, self.max_chunk_permits);
if card == self.max_chunk_permits {
tracing::warn!(cardinality = c.cardinality(), "large chunk in exchange")
let size = c.estimated_size();
let permits = size.min(self.max_chunk_permits);
if permits == self.max_chunk_permits {
tracing::warn!(size, "large chunk in exchange")
}
Some(permits::Value::Record(card as _))
Some(permits::Value::Record(permits as _))
}
Message::Barrier(_) => Some(permits::Value::Barrier(1)),
Message::Watermark(_) => None,
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ impl SharedContext {
))),
config: StreamingConfig {
developer: StreamingDeveloperConfig {
exchange_initial_permits: permit::for_test::INITIAL_PERMITS,
exchange_batched_permits: permit::for_test::BATCHED_PERMITS,
exchange_max_bytes: permit::for_test::MAX_BYTES,
exchange_ack_bytes: permit::for_test::ACK_BYTES,
exchange_concurrent_barriers: permit::for_test::CONCURRENT_BARRIERS,
..Default::default()
},
Expand All @@ -139,8 +139,8 @@ impl SharedContext {
MutexGuard::map(self.channel_map.lock(), |map| {
map.entry(ids).or_insert_with(|| {
let (tx, rx) = permit::channel(
self.config.developer.exchange_initial_permits,
self.config.developer.exchange_batched_permits,
self.config.developer.exchange_max_bytes,
self.config.developer.exchange_ack_bytes,
self.config.developer.exchange_concurrent_barriers,
);
(Some(tx), Some(rx))
Expand Down

0 comments on commit 51c76aa

Please sign in to comment.