Skip to content

Commit

Permalink
feat: improve sink mview throughput metrics (#12622)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Oct 6, 2023
1 parent c847095 commit 3fbe1ab
Show file tree
Hide file tree
Showing 15 changed files with 137 additions and 118 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

40 changes: 27 additions & 13 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,11 +641,6 @@ def section_object_storage(outer_panels):

def section_streaming(outer_panels):
panels = outer_panels.sub_panel()
sink_filter = "executor_identity=~\".*SinkExecutor.*\""
mv_filter = "executor_identity=~\".*MaterializeExecutor.*\""
table_type_filter = "table_type=~\"MATERIALIZED_VIEW\""
mv_throughput_query = f'sum(rate({metric("stream_executor_row_count", filter=mv_filter)}[$__rate_interval]) * on(actor_id) group_left(materialized_view_id, table_name) (group({metric("table_info", filter=table_type_filter)}) by (actor_id, materialized_view_id, table_name))) by (materialized_view_id, table_name)'
sink_throughput_query = f'sum(rate({metric("stream_executor_row_count", filter=sink_filter)}[$__rate_interval]) * on(actor_id) group_left(sink_name) (group({metric("sink_info")}) by (actor_id, sink_name))) by (sink_name)'
return [
outer_panels.row_collapsed(
"Streaming",
Expand Down Expand Up @@ -740,22 +735,41 @@ def section_streaming(outer_panels):
),
panels.timeseries_rowsps(
"Sink Throughput(rows/s)",
"The figure shows the number of rows output by each sink per second.",
"The number of rows streamed into each sink per second.",
[
panels.target(
sink_throughput_query,
"sink {{sink_name}}",
f"sum(rate({metric('stream_sink_input_row_count')}[$__rate_interval])) by (sink_id) * on(sink_id) group_left(sink_name) group({metric('sink_info')}) by (sink_id, sink_name)",
"sink {{sink_id}} {{sink_name}}",
),
],
),
panels.timeseries_rowsps(
"Sink Throughput(rows/s) per Partition",
"The number of rows streamed into each sink per second.",
[
panels.target(
f"sum(rate({metric('stream_sink_input_row_count')}[$__rate_interval])) by (sink_id, actor_id) * on(actor_id) group_left(sink_name) {metric('sink_info')}",
"sink {{sink_id}} {{sink_name}} - actor {{actor_id}}",
),
],
),

panels.timeseries_rowsps(
"Materialized View Throughput(rows/s)",
"The figure shows the number of rows written into each materialized view per second.",
[
panels.target(
mv_throughput_query,
"materialized view {{table_name}} table_id {{materialized_view_id}}",
f"sum(rate({metric('stream_mview_input_row_count')}[$__rate_interval])) by (table_id) * on(table_id) group_left(table_name) group({metric('table_info')}) by (table_id, table_name)",
"mview {{table_id}} {{table_name}}",
),
],
),
panels.timeseries_rowsps(
"Materialized View Throughput(rows/s) per Partition",
"The figure shows the number of rows written into each materialized view per second.",
[
panels.target(
f"sum(rate({metric('stream_mview_input_row_count')}[$__rate_interval])) by (actor_id, table_id) * on(actor_id, table_id) group_left(table_name) {metric('table_info')}",
"mview {{table_id}} {{table_name}} - actor {{actor_id}}",
),
],
),
Expand Down Expand Up @@ -899,8 +913,8 @@ def section_streaming_actors(outer_panels):
"When enabled, this metric shows the input throughput of each executor.",
[
panels.target(
f"rate({metric('stream_executor_row_count')}[$__rate_interval]) > 0",
"actor {{actor_id}}->{{executor_identity}}",
f"rate({metric('stream_executor_row_count')}[$__rate_interval])",
"{{executor_identity}} actor {{actor_id}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,22 @@ def section_overview(panels):
],
),
panels.timeseries_rowsps(
"Aggregated Sink Throughput(rows/s)",
"The figure shows the number of rows output by each sink per second.",
"Sink Throughput(rows/s)",
"The number of rows streamed into each sink per second.",
[
panels.target(
f"sum(rate({metric('stream_executor_row_count', filter=sink_filter)}[$__rate_interval])) by (executor_identity)",
"{{executor_identity}}",
f"sum(rate({metric('stream_sink_input_row_count')}[$__rate_interval])) by (sink_id) * on(sink_id) group_left(sink_name) group({metric('sink_info')}) by (sink_id, sink_name)",
"sink {{sink_id}} {{sink_name}}",
),
],
),
panels.timeseries_rowsps(
"Aggregated Materialized View Throughput(rows/s)",
"The figure shows the number of rows output by each materialized view per second.",
"Materialized View Throughput(rows/s)",
"The figure shows the number of rows written into each materialized view per second.",
[
panels.target(
f"sum(rate({metric('stream_executor_row_count', filter=mv_filter)}[$__rate_interval])) by (executor_identity)",
"{{executor_identity}}",
f"sum(rate({metric('stream_mview_input_row_count')}[$__rate_interval])) by (table_id) * on(table_id) group_left(table_name) group({metric('table_info')}) by (table_id, table_name)",
"mview {{table_id}} {{table_name}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ impl SinkId {
}
}

impl std::fmt::Display for SinkId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.sink_id)
}
}

impl From<u32> for SinkId {
fn from(id: u32) -> Self {
Self::new(id)
Expand Down
11 changes: 6 additions & 5 deletions src/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ impl MetaMetrics {
let sink_info = register_int_gauge_vec_with_registry!(
"sink_info",
"Mapping from actor id to (actor id, sink name)",
&["actor_id", "sink_name",],
&["actor_id", "sink_id", "sink_name",],
registry
)
.unwrap();
Expand Down Expand Up @@ -810,13 +810,14 @@ pub async fn start_fragment_info_monitor(

if let Some(stream_node) = &actor.nodes {
if let Some(Sink(sink_node)) = &stream_node.node_body {
let sink_name = match &sink_node.sink_desc {
Some(sink_desc) => &sink_desc.name,
_ => "unknown",
let (sink_id, sink_name) = match &sink_node.sink_desc {
Some(sink_desc) => (sink_desc.id, sink_desc.name.as_str()),
_ => (0, "unknown"), // unreachable
};
let sink_id_str = sink_id.to_string();
meta_metrics
.sink_info
.with_label_values(&[&actor_id_str, sink_name])
.with_label_values(&[&actor_id_str, &sink_id_str, sink_name])
.set(1);
}
}
Expand Down
29 changes: 28 additions & 1 deletion src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
pub struct StreamingMetrics {
pub level: MetricLevel,

// Executor metrics (disabled by default)
pub executor_row_count: GenericCounterVec<AtomicU64>,

// Actor metrics
pub actor_execution_time: GenericGaugeVec<AtomicF64>,
pub actor_output_buffer_blocking_duration_ns: GenericCounterVec<AtomicU64>,
pub actor_input_buffer_blocking_duration_ns: GenericCounterVec<AtomicU64>,
Expand All @@ -48,10 +51,16 @@ pub struct StreamingMetrics {
pub actor_in_record_cnt: GenericCounterVec<AtomicU64>,
pub actor_out_record_cnt: GenericCounterVec<AtomicU64>,
pub actor_sampled_deserialize_duration_ns: GenericCounterVec<AtomicU64>,

// Source
pub source_output_row_count: GenericCounterVec<AtomicU64>,
pub source_row_per_barrier: GenericCounterVec<AtomicU64>,
pub source_split_change_count: GenericCounterVec<AtomicU64>,

// Sink & materialized view
pub sink_input_row_count: GenericCounterVec<AtomicU64>,
pub mview_input_row_count: GenericCounterVec<AtomicU64>,

// Exchange (see also `compute::ExchangeServiceMetrics`)
pub exchange_frag_recv_size: GenericCounterVec<AtomicU64>,

Expand Down Expand Up @@ -159,7 +168,7 @@ impl StreamingMetrics {
let executor_row_count = register_int_counter_vec_with_registry!(
"stream_executor_row_count",
"Total number of rows that have been output from each executor",
&["actor_id", "executor_identity"],
&["actor_id", "fragment_id", "executor_identity"],
registry
)
.unwrap();
Expand Down Expand Up @@ -188,6 +197,22 @@ impl StreamingMetrics {
)
.unwrap();

let sink_input_row_count = register_int_counter_vec_with_registry!(
"stream_sink_input_row_count",
"Total number of rows streamed into sink executors",
&["sink_id", "actor_id", "fragment_id"],
registry
)
.unwrap();

let mview_input_row_count = register_int_counter_vec_with_registry!(
"stream_mview_input_row_count",
"Total number of rows streamed into materialize executors",
&["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();

let actor_execution_time = register_gauge_vec_with_registry!(
"stream_actor_actor_execution_time",
"Total execution time (s) of an actor",
Expand Down Expand Up @@ -825,6 +850,8 @@ impl StreamingMetrics {
source_output_row_count,
source_row_per_barrier,
source_split_change_count,
sink_input_row_count,
mview_input_row_count,
exchange_frag_recv_size,
join_lookup_miss_count,
join_total_lookup_count,
Expand Down
11 changes: 11 additions & 0 deletions src/stream/src/executor/mview/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self) {
// for metrics
let table_id_str = self.state_table.table_id().to_string();
let actor_id_str = self.actor_context.id.to_string();
let fragment_id_str = self.actor_context.fragment_id.to_string();

let data_types = self.schema().data_types().clone();
let mut input = self.input.execute();

Expand All @@ -136,6 +141,12 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
yield match msg {
Message::Watermark(w) => Message::Watermark(w),
Message::Chunk(chunk) => {
self.actor_context
.streaming_metrics
.mview_input_row_count
.with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc_by(chunk.cardinality() as u64);

match self.conflict_behavior {
ConflictBehavior::Overwrite | ConflictBehavior::IgnoreConflict => {
// create MaterializeBuffer from chunk
Expand Down
23 changes: 14 additions & 9 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::mem;
use std::sync::Arc;

use anyhow::anyhow;
use futures::stream::select;
Expand All @@ -25,7 +24,7 @@ use risingwave_common::array::{merge_chunk_row, Op, StreamChunk, StreamChunkComp
use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
use risingwave_common::util::epoch::EpochPair;
use risingwave_connector::dispatch_sink;
use risingwave_connector::sink::catalog::SinkType;
use risingwave_connector::sink::catalog::{SinkId, SinkType};
use risingwave_connector::sink::log_store::{
LogReader, LogStoreFactory, LogStoreTransformChunkLogReader, LogWriter,
};
Expand All @@ -35,12 +34,10 @@ use risingwave_connector::sink::{

use super::error::{StreamExecutorError, StreamExecutorResult};
use super::{BoxedExecutor, Executor, Message, PkIndices};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{expect_first_barrier, ActorContextRef, BoxedMessageStream};

pub struct SinkExecutor<F: LogStoreFactory> {
input: BoxedExecutor,
_metrics: Arc<StreamingMetrics>,
sink: SinkImpl,
identity: String,
pk_indices: PkIndices,
Expand Down Expand Up @@ -83,7 +80,6 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
#[allow(clippy::too_many_arguments)]
pub async fn new(
input: BoxedExecutor,
metrics: Arc<StreamingMetrics>,
sink_writer_param: SinkWriterParam,
sink_param: SinkParam,
columns: Vec<ColumnCatalog>,
Expand All @@ -100,7 +96,6 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
.collect();
Ok(Self {
input,
_metrics: metrics,
sink,
identity: format!("SinkExecutor {:X?}", sink_writer_param.executor_id),
pk_indices,
Expand All @@ -127,6 +122,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
self.input,
stream_key,
self.log_writer,
self.sink_param.sink_id,
self.sink_param.sink_type,
self.actor_context,
stream_key_sink_pk_mismatch,
Expand All @@ -148,6 +144,7 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
input: BoxedExecutor,
stream_key: PkIndices,
mut log_writer: impl LogWriter,
sink_id: SinkId,
sink_type: SinkType,
actor_context: ActorContextRef,
stream_key_sink_pk_mismatch: bool,
Expand All @@ -165,6 +162,11 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
// Propagate the first barrier
yield Message::Barrier(barrier);

// for metrics
let sink_id_str = sink_id.to_string();
let actor_id_str = actor_context.id.to_string();
let fragment_id_str = actor_context.fragment_id.to_string();

// When stream key is different from the user defined primary key columns for sinks. The operations could be out of order
// stream key: a,b
// sink pk: a
Expand Down Expand Up @@ -203,6 +205,12 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
match msg? {
Message::Watermark(w) => watermark = Some(w),
Message::Chunk(c) => {
actor_context
.streaming_metrics
.sink_input_row_count
.with_label_values(&[&sink_id_str, &actor_id_str, &fragment_id_str])
.inc_by(c.capacity() as u64);

chunk_buffer.push_chunk(c);
}
Message::Barrier(barrier) => {
Expand Down Expand Up @@ -415,7 +423,6 @@ mod test {

let sink_executor = SinkExecutor::new(
Box::new(mock),
Arc::new(StreamingMetrics::unused()),
SinkWriterParam::for_test(),
sink_param,
columns.clone(),
Expand Down Expand Up @@ -537,7 +544,6 @@ mod test {

let sink_executor = SinkExecutor::new(
Box::new(mock),
Arc::new(StreamingMetrics::unused()),
SinkWriterParam::for_test(),
sink_param,
columns.clone(),
Expand Down Expand Up @@ -656,7 +662,6 @@ mod test {

let sink_executor = SinkExecutor::new(
Box::new(mock),
Arc::new(StreamingMetrics::unused()),
SinkWriterParam::for_test(),
sink_param,
columns,
Expand Down
Loading

0 comments on commit 3fbe1ab

Please sign in to comment.