Skip to content

Commit

Permalink
support spill metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Jun 4, 2024
1 parent e179f3d commit c3ab0e2
Show file tree
Hide file tree
Showing 14 changed files with 165 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1785,6 +1785,20 @@ def section_batch(outer_panels):
),
],
),
panels.timeseries_bytes_per_sec(
"Batch Spill Throughput",
"",
[
panels.target(
f"sum(rate({metric('batch_spill_read_bytes')}[$__rate_interval]))by({COMPONENT_LABEL}, {NODE_LABEL})",
"read - {{%s}} @ {{%s}}" % (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum(rate({metric('batch_spill_write_bytes')}[$__rate_interval]))by({COMPONENT_LABEL}, {NODE_LABEL})",
"write - {{%s}} @ {{%s}}" % (COMPONENT_LABEL, NODE_LABEL),
),
],
),
],
),
]
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

38 changes: 36 additions & 2 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
WrapStreamExecutor,
};
use crate::monitor::BatchSpillMetrics;
use crate::spill::spill_op::{
SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
Expand All @@ -64,6 +65,7 @@ impl HashKeyDispatcher for HashAggExecutorBuilder {
self.chunk_size,
self.mem_context,
self.enable_spill,
self.spill_metrics,
self.shutdown_rx,
))
}
Expand All @@ -84,6 +86,7 @@ pub struct HashAggExecutorBuilder {
chunk_size: usize,
mem_context: MemoryContext,
enable_spill: bool,
spill_metrics: Arc<BatchSpillMetrics>,
shutdown_rx: ShutdownToken,
}

Expand All @@ -96,6 +99,7 @@ impl HashAggExecutorBuilder {
chunk_size: usize,
mem_context: MemoryContext,
enable_spill: bool,
spill_metrics: Arc<BatchSpillMetrics>,
shutdown_rx: ShutdownToken,
) -> Result<BoxedExecutor> {
let aggs: Vec<_> = hash_agg_node
Expand Down Expand Up @@ -135,6 +139,7 @@ impl HashAggExecutorBuilder {
chunk_size,
mem_context,
enable_spill,
spill_metrics,
shutdown_rx,
};

Expand All @@ -157,6 +162,8 @@ impl BoxedExecutorBuilder for HashAggExecutorBuilder {

let identity = source.plan_node().get_identity();

let spill_metrics = source.context.spill_metrics();

Self::deserialize(
hash_agg_node,
child,
Expand All @@ -165,6 +172,7 @@ impl BoxedExecutorBuilder for HashAggExecutorBuilder {
source.context.get_config().developer.chunk_size,
source.context.create_executor_mem_context(identity),
source.context.get_config().enable_spill,
spill_metrics,
source.shutdown_rx.clone(),
)
}
Expand All @@ -187,13 +195,15 @@ pub struct HashAggExecutor<K> {
chunk_size: usize,
mem_context: MemoryContext,
enable_spill: bool,
spill_metrics: Arc<BatchSpillMetrics>,
/// The upper bound of memory usage for this executor.
memory_upper_bound: Option<u64>,
shutdown_rx: ShutdownToken,
_phantom: PhantomData<K>,
}

impl<K> HashAggExecutor<K> {
#[allow(clippy::too_many_arguments)]
pub fn new(
aggs: Arc<Vec<BoxedAggregateFunction>>,
group_key_columns: Vec<usize>,
Expand All @@ -204,6 +214,7 @@ impl<K> HashAggExecutor<K> {
chunk_size: usize,
mem_context: MemoryContext,
enable_spill: bool,
spill_metrics: Arc<BatchSpillMetrics>,
shutdown_rx: ShutdownToken,
) -> Self {
Self::new_inner(
Expand All @@ -217,6 +228,7 @@ impl<K> HashAggExecutor<K> {
chunk_size,
mem_context,
enable_spill,
spill_metrics,
None,
shutdown_rx,
)
Expand All @@ -234,6 +246,7 @@ impl<K> HashAggExecutor<K> {
chunk_size: usize,
mem_context: MemoryContext,
enable_spill: bool,
spill_metrics: Arc<BatchSpillMetrics>,
memory_upper_bound: Option<u64>,
shutdown_rx: ShutdownToken,
) -> Self {
Expand All @@ -248,6 +261,7 @@ impl<K> HashAggExecutor<K> {
chunk_size,
mem_context,
enable_spill,
spill_metrics,
memory_upper_bound,
shutdown_rx,
_phantom: PhantomData,
Expand Down Expand Up @@ -297,6 +311,7 @@ pub struct AggSpillManager {
child_data_types: Vec<DataType>,
agg_data_types: Vec<DataType>,
spill_chunk_size: usize,
spill_metrics: Arc<BatchSpillMetrics>,
}

impl AggSpillManager {
Expand All @@ -307,6 +322,7 @@ impl AggSpillManager {
agg_data_types: Vec<DataType>,
child_data_types: Vec<DataType>,
spill_chunk_size: usize,
spill_metrics: Arc<BatchSpillMetrics>,
) -> Result<Self> {
let suffix_uuid = uuid::Uuid::new_v4();
let dir = format!("/{}-{}/", agg_identity, suffix_uuid);
Expand All @@ -333,6 +349,7 @@ impl AggSpillManager {
child_data_types,
agg_data_types,
spill_chunk_size,
spill_metrics,
})
}

Expand Down Expand Up @@ -367,6 +384,9 @@ impl AggSpillManager {
let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
let buf = Message::encode_to_vec(&chunk_pb);
let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
self.spill_metrics
.batch_spill_write_bytes
.inc_by(buf.len() as u64 + 4);
self.agg_state_writers[partition].write(len_bytes).await?;
self.agg_state_writers[partition].write(buf).await?;
}
Expand All @@ -387,6 +407,9 @@ impl AggSpillManager {
let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
let buf = Message::encode_to_vec(&chunk_pb);
let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
self.spill_metrics
.batch_spill_write_bytes
.inc_by(buf.len() as u64 + 4);
self.input_writers[partition].write(len_bytes).await?;
self.input_writers[partition].write(buf).await?;
}
Expand All @@ -400,6 +423,9 @@ impl AggSpillManager {
let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
let buf = Message::encode_to_vec(&chunk_pb);
let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
self.spill_metrics
.batch_spill_write_bytes
.inc_by(buf.len() as u64 + 4);
self.agg_state_writers[partition].write(len_bytes).await?;
self.agg_state_writers[partition].write(buf).await?;
}
Expand All @@ -408,6 +434,9 @@ impl AggSpillManager {
let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
let buf = Message::encode_to_vec(&chunk_pb);
let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
self.spill_metrics
.batch_spill_write_bytes
.inc_by(buf.len() as u64 + 4);
self.input_writers[partition].write(len_bytes).await?;
self.input_writers[partition].write(buf).await?;
}
Expand All @@ -425,13 +454,13 @@ impl AggSpillManager {
async fn read_agg_state_partition(&mut self, partition: usize) -> Result<BoxedDataChunkStream> {
let agg_state_partition_file_name = format!("agg-state-p{}", partition);
let r = self.op.reader_with(&agg_state_partition_file_name).await?;
Ok(SpillOp::read_stream(r))
Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
}

async fn read_input_partition(&mut self, partition: usize) -> Result<BoxedDataChunkStream> {
let input_partition_file_name = format!("input-chunks-p{}", partition);
let r = self.op.reader_with(&input_partition_file_name).await?;
Ok(SpillOp::read_stream(r))
Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
}

async fn estimate_partition_size(&self, partition: usize) -> Result<u64> {
Expand Down Expand Up @@ -572,6 +601,7 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
self.aggs.iter().map(|agg| agg.return_type()).collect(),
child_schema.data_types(),
self.chunk_size,
self.spill_metrics.clone(),
)?;
agg_spill_manager.init_writers().await?;

Expand Down Expand Up @@ -637,6 +667,7 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
self.chunk_size,
self.mem_context.clone(),
self.enable_spill,
self.spill_metrics.clone(),
Some(partition_size),
self.shutdown_rx.clone(),
);
Expand Down Expand Up @@ -785,6 +816,7 @@ mod tests {
CHUNK_SIZE,
mem_context.clone(),
false,
BatchSpillMetrics::for_test(),
ShutdownToken::empty(),
)
.unwrap();
Expand Down Expand Up @@ -858,6 +890,7 @@ mod tests {
CHUNK_SIZE,
MemoryContext::none(),
false,
BatchSpillMetrics::for_test(),
ShutdownToken::empty(),
)
.unwrap();
Expand Down Expand Up @@ -974,6 +1007,7 @@ mod tests {
CHUNK_SIZE,
MemoryContext::none(),
false,
BatchSpillMetrics::for_test(),
shutdown_rx,
)
.unwrap();
Expand Down
Loading

0 comments on commit c3ab0e2

Please sign in to comment.