Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): add memory size limit to exchange #16195

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions e2e_test/streaming/aggregate/count_star.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t (v int);

Expand All @@ -7,9 +10,6 @@ insert into t values (114), (514);
statement ok
create materialized view mv as select * from t;

statement ok
flush;

query I
select count(*) from t;
----
Expand Down
84 changes: 59 additions & 25 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,29 @@ def section_actor_info(outer_panels):
panels.table_info(
"Actor Info",
"Information about actors",
[panels.table_target(f"group({metric('actor_info')}) by (actor_id, fragment_id, compute_node)")],
[
panels.table_target(
f"group({metric('actor_info')}) by (actor_id, fragment_id, compute_node)"
)
],
["actor_id", "fragment_id", "compute_node"],
),
panels.table_info(
"State Table Info",
"Information about state tables. Column `materialized_view_id` is the id of the materialized view that this state table belongs to.",
[panels.table_target(f"group({metric('table_info')}) by (table_id, table_name, table_type, materialized_view_id, fragment_id, compaction_group_id)")],
["table_id", "table_name", "table_type", "materialized_view_id", "fragment_id", "compaction_group_id"],
[
panels.table_target(
f"group({metric('table_info')}) by (table_id, table_name, table_type, materialized_view_id, fragment_id, compaction_group_id)"
)
],
[
"table_id",
"table_name",
"table_type",
"materialized_view_id",
"fragment_id",
"compaction_group_id",
],
),
],
)
Expand Down Expand Up @@ -267,7 +282,6 @@ def section_compaction(outer_panels):
"compactor_task_count - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),

panels.target(
f"avg({metric('storage_compact_task_pending_parallelism')}) by({COMPONENT_LABEL}, {NODE_LABEL})",
"compactor_task_pending_parallelism - {{%s}} @ {{%s}}"
Expand Down Expand Up @@ -1643,7 +1657,7 @@ def section_streaming_exchange(outer_panels):
[
panels.target(
f"rate({metric('stream_exchange_frag_send_size')}[$__rate_interval])",
"{{up_fragment_id}}->{{down_fragment_id}}",
"fragment {{up_fragment_id}} -> fragment {{down_fragment_id}}",
),
],
),
Expand All @@ -1653,7 +1667,27 @@ def section_streaming_exchange(outer_panels):
[
panels.target(
f"rate({metric('stream_exchange_frag_recv_size')}[$__rate_interval])",
"{{up_fragment_id}}->{{down_fragment_id}}",
"fragment {{up_fragment_id}} -> fragment {{down_fragment_id}}",
),
],
),
panels.timeseries_bytes(
"Fragment-level Local Exchange Memory Size",
"",
[
panels.target(
f"{metric('stream_exchange_memory_size')}",
"fragment {{up_fragment_id}} -> fragment {{down_fragment_id}}",
),
],
),
panels.timeseries_count(
"Fragment-level Local Exchange Number of Rows",
"",
[
panels.target(
f"{metric('stream_exchange_num_rows')}",
"fragment {{up_fragment_id}} -> fragment {{down_fragment_id}}",
),
],
),
Expand Down Expand Up @@ -2239,20 +2273,23 @@ def section_hummock_write(outer_panels):
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('state_store_sync_duration_bucket')}[$__rate_interval])) by (le, {COMPONENT_LABEL}, {NODE_LABEL}))",
f"p{legend}"
+ " Sync duration - {{%s}} @ {{%s}}" % (COMPONENT_LABEL, NODE_LABEL),
+ " Sync duration - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
[50, 99, "max"],
),
panels.target(
f"sum by(le, {COMPONENT_LABEL}, {NODE_LABEL}) (rate({metric('state_store_sync_duration_sum')}[$__rate_interval])) / sum by(le, {COMPONENT_LABEL}, {NODE_LABEL}) (rate({metric('state_store_sync_duration_count')}[$__rate_interval]))",
"avg Sync duration - {{%s}} @ {{%s}}" % (COMPONENT_LABEL, NODE_LABEL),
"avg Sync duration - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('state_store_uploader_upload_task_latency_bucket')}[$__rate_interval])) by (le, {COMPONENT_LABEL}, {NODE_LABEL}))",
f"p{legend}"
+ " upload task duration - {{%s}} @ {{%s}}" % (COMPONENT_LABEL, NODE_LABEL),
),
+ " upload task duration - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
[50, 99, "max"],
),
],
Expand Down Expand Up @@ -2292,7 +2329,7 @@ def section_hummock_write(outer_panels):
f"sum({metric('state_store_uploader_syncing_epoch_count')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"syncing epoch count - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
),
],
),
panels.timeseries_bytes(
Expand Down Expand Up @@ -2436,8 +2473,7 @@ def section_hummock_write(outer_panels):
[
panels.target(
f"sum({metric('state_store_event_handler_pending_event')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"{{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
"{{%s}} @ {{%s}}" % (COMPONENT_LABEL, NODE_LABEL),
),
],
),
Expand All @@ -2449,16 +2485,18 @@ def section_hummock_write(outer_panels):
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('state_store_event_handler_latency_bucket')}[$__rate_interval])) by (le, event_type, {COMPONENT_LABEL}, {NODE_LABEL}))",
f"p{legend}"
+ " {{event_type}} {{%s}} @ {{%s}}" % (COMPONENT_LABEL, NODE_LABEL),
),
+ " {{event_type}} {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
[50, 99, "max"],
),
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('state_store_uploader_wait_poll_latency_bucket')}[$__rate_interval])) by (le, {COMPONENT_LABEL}, {NODE_LABEL}))",
f"p{legend}"
+ " finished_task_wait_poll {{%s}} @ {{%s}}" % (COMPONENT_LABEL, NODE_LABEL),
),
+ " finished_task_wait_poll {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
[50, 99, "max"],
),
],
Expand Down Expand Up @@ -2553,8 +2591,7 @@ def section_hummock_tiered_cache(outer_panels):
[
panels.target(
f"sum(rate({metric('refill_bytes')}[$__rate_interval])) by (foyer, op, {NODE_LABEL})",
"{{type}} file cache - {{op}} @ {{%s}}"
% NODE_LABEL,
"{{type}} file cache - {{op}} @ {{%s}}" % NODE_LABEL,
),
],
),
Expand Down Expand Up @@ -2640,8 +2677,7 @@ def section_hummock_tiered_cache(outer_panels):
[
panels.target(
f"sum(rate({metric('refill_total', inheritance_parent_lookup_filter)}[$__rate_interval])) by (op, {NODE_LABEL})",
"parent meta lookup {{op}} @ {{%s}}"
% NODE_LABEL,
"parent meta lookup {{op}} @ {{%s}}" % NODE_LABEL,
),
],
),
Expand All @@ -2661,8 +2697,7 @@ def section_hummock_tiered_cache(outer_panels):
[
panels.target(
f"sum(rate({metric('refill_total', unit_inheritance_filter)}[$__rate_interval])) by (op, {NODE_LABEL})",
"unit inheritance {{op}} @ {{%s}}"
% NODE_LABEL,
"unit inheritance {{op}} @ {{%s}}" % NODE_LABEL,
),
],
),
Expand All @@ -2682,8 +2717,7 @@ def section_hummock_tiered_cache(outer_panels):
[
panels.target(
f"sum(rate({metric('refill_total', block_refill_filter)}[$__rate_interval])) by (op, {NODE_LABEL})",
"block refill {{op}} @ {{%s}}"
% NODE_LABEL,
"block refill {{op}} @ {{%s}}" % NODE_LABEL,
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ message GetDataRequest {

// The structure for permit-based back-pressure.
message Permits {
oneof value {
// The permits required for a chunk, i.e. the cardinality of the chunk.
uint32 record = 1;
// The permits required for a barrier, typically 1.
uint32 barrier = 2;
}
// The cardinality of the chunk.
uint32 records = 1;
// The total bytes of the chunk.
uint64 bytes = 3;
// The number of barriers, typically 1.
uint32 barriers = 2;
}

message GetStreamRequest {
Expand Down
28 changes: 13 additions & 15 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,19 +883,17 @@ pub struct StreamingDeveloperConfig {
#[serde(default = "default::developer::stream_chunk_size")]
pub chunk_size: usize,

/// The initial permits that a channel holds, i.e., the maximum row count can be buffered in
/// the channel.
#[serde(default = "default::developer::stream_exchange_initial_permits")]
pub exchange_initial_permits: usize,
/// The maximum number of rows can be buffered in an exchange channel.
#[serde(default = "default::developer::stream_exchange_max_records")]
pub exchange_max_records: usize,

/// The permits that are batched to add back, for reducing the backward `AddPermits` messages
/// in remote exchange.
#[serde(default = "default::developer::stream_exchange_batched_permits")]
pub exchange_batched_permits: usize,
/// The maximum number of bytes can be buffered in an exchange channel.
#[serde(default = "default::developer::stream_exchange_max_bytes")]
pub exchange_max_bytes: usize,

/// The maximum number of concurrent barriers in an exchange channel.
#[serde(default = "default::developer::stream_exchange_concurrent_barriers")]
pub exchange_concurrent_barriers: usize,
/// The maximum number of barriers in an exchange channel.
#[serde(default = "default::developer::stream_exchange_max_barriers")]
pub exchange_max_barriers: usize,

/// The concurrency for dispatching messages to different downstream jobs.
///
Expand Down Expand Up @@ -1534,15 +1532,15 @@ pub mod default {
256
}

pub fn stream_exchange_initial_permits() -> usize {
pub fn stream_exchange_max_records() -> usize {
2048
}

pub fn stream_exchange_batched_permits() -> usize {
256
pub fn stream_exchange_max_bytes() -> usize {
1024 * 1024 // 1MB
}

pub fn stream_exchange_concurrent_barriers() -> usize {
pub fn stream_exchange_max_barriers() -> usize {
1
}

Expand Down
13 changes: 8 additions & 5 deletions src/compute/src/rpc/service/exchange_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use futures_async_stream::try_stream;
use risingwave_batch::task::BatchManager;
use risingwave_pb::task_service::exchange_service_server::ExchangeService;
use risingwave_pb::task_service::{
permits, GetDataRequest, GetDataResponse, GetStreamRequest, GetStreamResponse, PbPermits,
GetDataRequest, GetDataResponse, GetStreamRequest, GetStreamResponse, PbPermits,
};
use risingwave_stream::executor::exchange::permit::{MessageWithPermits, Receiver};
use risingwave_stream::executor::Message;
Expand Down Expand Up @@ -106,13 +106,16 @@ impl ExchangeService for ExchangeServiceImpl {

let receiver = self
.stream_mgr
.take_receiver((up_actor_id, down_actor_id))
.take_receiver(
(up_actor_id, down_actor_id),
(up_fragment_id, down_fragment_id),
)
.await?;

// Map the remaining stream to add-permits.
let add_permits_stream = request_stream.map_ok(|req| match req.value.unwrap() {
Value::Get(_) => unreachable!("the following messages must be `AddPermits`"),
Value::AddPermits(add_permits) => add_permits.value.unwrap(),
Value::AddPermits(add_permits) => add_permits,
});

Ok(Response::new(Self::get_stream_impl(
Expand Down Expand Up @@ -143,7 +146,7 @@ impl ExchangeServiceImpl {
metrics: Arc<ExchangeServiceMetrics>,
peer_addr: SocketAddr,
mut receiver: Receiver,
add_permits_stream: impl Stream<Item = std::result::Result<permits::Value, tonic::Status>>,
add_permits_stream: impl Stream<Item = std::result::Result<PbPermits, tonic::Status>>,
up_down_fragment_ids: (u32, u32),
) {
tracing::debug!(target: "events::compute::exchange", peer_addr = %peer_addr, "serve stream exchange RPC");
Expand Down Expand Up @@ -181,7 +184,7 @@ impl ExchangeServiceImpl {
// forward the acquired permit to the downstream
let response = GetStreamResponse {
message: Some(proto),
permits: Some(PbPermits { value: permits }),
permits,
};
let bytes = Message::get_encoded_len(&response);

Expand Down
6 changes: 3 additions & 3 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ stream_enable_executor_row_count = false
stream_connector_message_buffer_size = 16
stream_unsafe_extreme_cache_size = 10
stream_chunk_size = 256
stream_exchange_initial_permits = 2048
stream_exchange_batched_permits = 256
stream_exchange_concurrent_barriers = 1
stream_exchange_max_records = 2048
stream_exchange_max_bytes = 1048576
stream_exchange_max_barriers = 1
stream_exchange_concurrent_dispatchers = 0
stream_dml_channel_initial_permits = 32768
stream_hash_agg_max_dirty_groups_heap_size = 67108864
Expand Down
6 changes: 3 additions & 3 deletions src/config/full-iceberg-bench.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ stream_enable_executor_row_count = false
stream_connector_message_buffer_size = 16
stream_unsafe_extreme_cache_size = 10
stream_chunk_size = 256
stream_exchange_initial_permits = 2048
stream_exchange_batched_permits = 256
stream_exchange_concurrent_barriers = 1
stream_exchange_max_records = 2048
stream_exchange_max_bytes = 1048576
stream_exchange_max_barriers = 1
stream_dml_channel_initial_permits = 32768
stream_hash_agg_max_dirty_groups_heap_size = 67108864

Expand Down
11 changes: 4 additions & 7 deletions src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ use risingwave_pb::plan_common::ExprContext;
use risingwave_pb::task_service::exchange_service_client::ExchangeServiceClient;
use risingwave_pb::task_service::task_service_client::TaskServiceClient;
use risingwave_pb::task_service::{
permits, CancelTaskRequest, CancelTaskResponse, CreateTaskRequest, ExecuteRequest,
GetDataRequest, GetDataResponse, GetStreamRequest, GetStreamResponse, PbPermits,
TaskInfoResponse,
CancelTaskRequest, CancelTaskResponse, CreateTaskRequest, ExecuteRequest, GetDataRequest,
GetDataResponse, GetStreamRequest, GetStreamResponse, PbPermits, TaskInfoResponse,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
Expand Down Expand Up @@ -111,7 +110,7 @@ impl ComputeClient {
down_fragment_id: u32,
) -> Result<(
Streaming<GetStreamResponse>,
mpsc::UnboundedSender<permits::Value>,
mpsc::UnboundedSender<PbPermits>,
)> {
use risingwave_pb::task_service::get_stream_request::*;

Expand All @@ -132,9 +131,7 @@ impl ComputeClient {
.chain(
// `AddPermits` as the followings.
UnboundedReceiverStream::new(permits_rx).map(|permits| GetStreamRequest {
value: Some(Value::AddPermits(PbPermits {
value: Some(permits),
})),
value: Some(Value::AddPermits(permits)),
}),
);

Expand Down
Loading
Loading