Skip to content

Commit

Permalink
feat(metrics): add fragment_id into udf metrics via task local variab…
Browse files Browse the repository at this point in the history
…le (#15391)

Signed-off-by: Runji Wang <[email protected]>
Co-authored-by: Runji Wang <[email protected]>
  • Loading branch information
yufansong and wangrunji0408 authored Mar 5, 2024
1 parent 8dd158c commit 3694459
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 38 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

28 changes: 14 additions & 14 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -4082,12 +4082,12 @@ def section_udf(outer_panels):
"udf_failure_count - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum(rate({metric('udf_success_count')}[$__rate_interval])) by (link, name)",
"udf_success_count - {{link}} {{name}}",
f"sum(rate({metric('udf_success_count')}[$__rate_interval])) by (link, name, fragment_id)",
"udf_success_count - {{link}} {{name}} {{fragment_id}}",
),
panels.target(
f"sum(rate({metric('udf_failure_count')}[$__rate_interval])) by (link, name)",
"udf_failure_count - {{link}} {{name}}",
f"sum(rate({metric('udf_failure_count')}[$__rate_interval])) by (link, name, fragment_id)",
"udf_failure_count - {{link}} {{name}} {{fragment_id}}",
),
],
),
Expand All @@ -4096,8 +4096,8 @@ def section_udf(outer_panels):
"",
[
panels.target(
f"sum(irate({metric('udf_input_chunk_rows_sum')}[$__rate_interval])) by (link, name) / sum(irate({metric('udf_input_chunk_rows_count')}[$__rate_interval])) by (link, name)",
"udf_input_chunk_rows_avg - {{link}} {{name}}",
f"sum(irate({metric('udf_input_chunk_rows_sum')}[$__rate_interval])) by (link, name, fragment_id) / sum(irate({metric('udf_input_chunk_rows_count')}[$__rate_interval])) by (link, name, fragment_id)",
"udf_input_chunk_rows_avg - {{link}} {{name}} {{fragment_id}}",
),
],
),
Expand All @@ -4122,12 +4122,12 @@ def section_udf(outer_panels):
"udf_latency_avg - {{%s}}" % NODE_LABEL,
),
panels.target(
f"histogram_quantile(0.99, sum(irate({metric('udf_latency_bucket')}[$__rate_interval])) by (le, link, name))",
"udf_latency_p99_by_name - {{link}} {{name}}",
f"histogram_quantile(0.99, sum(irate({metric('udf_latency_bucket')}[$__rate_interval])) by (le, link, name, fragment_id))",
"udf_latency_p99_by_name - {{link}} {{name}} {{fragment_id}}",
),
panels.target(
f"sum(irate({metric('udf_latency_sum')}[$__rate_interval])) by (link, name) / sum(irate({metric('udf_latency_count')}[$__rate_interval])) by (link, name)",
"udf_latency_avg_by_name - {{link}} {{name}}",
f"sum(irate({metric('udf_latency_sum')}[$__rate_interval])) by (link, name, fragment_id) / sum(irate({metric('udf_latency_count')}[$__rate_interval])) by (link, name, fragment_id)",
"udf_latency_avg_by_name - {{link}} {{name}} {{fragment_id}}",
),
],
),
Expand All @@ -4140,8 +4140,8 @@ def section_udf(outer_panels):
"udf_throughput_rows - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum(rate({metric('udf_input_rows')}[$__rate_interval])) by (link, name)",
"udf_throughput_rows - {{link}} {{name}}",
f"sum(rate({metric('udf_input_rows')}[$__rate_interval])) by (link, name, fragment_id)",
"udf_throughput_rows - {{link}} {{name}} {{fragment_id}}",
),
],
),
Expand All @@ -4154,8 +4154,8 @@ def section_udf(outer_panels):
"udf_throughput_bytes - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum(rate({metric('udf_input_bytes')}[$__rate_interval])) by (link, name) / (1024*1024)",
"udf_throughput_bytes - {{link}} {{name}}",
f"sum(rate({metric('udf_input_bytes')}[$__rate_interval])) by (link, name, fragment_id) / (1024*1024)",
"udf_throughput_bytes - {{link}} {{name}} {{fragment_id}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

14 changes: 11 additions & 3 deletions src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use moka::sync::Cache;
use risingwave_common::array::{ArrayError, ArrayRef, DataChunk};
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
use risingwave_expr::expr_context::FRAGMENT_ID;
use risingwave_pb::expr::ExprNode;
use risingwave_udf::ArrowFlightUdfClient;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -115,21 +116,28 @@ impl UserDefinedFunction {
#[cfg(feature = "embedded-python-udf")]
UdfImpl::Python(runtime) => runtime.call(&self.identifier, &arrow_input)?,
UdfImpl::External(client) => {
// batch query does not have a fragment_id
let fragment_id = FRAGMENT_ID::try_with(ToOwned::to_owned).unwrap_or(0);

let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed);
let result = if self.always_retry_on_network_error {
client
.call_with_always_retry_on_network_error(&self.identifier, arrow_input)
.call_with_always_retry_on_network_error(
&self.identifier,
arrow_input,
fragment_id,
)
.instrument_await(self.span.clone())
.await
} else {
let result = if disable_retry_count != 0 {
client
.call(&self.identifier, arrow_input)
.call(&self.identifier, arrow_input, fragment_id)
.instrument_await(self.span.clone())
.await
} else {
client
.call_with_retry(&self.identifier, arrow_input)
.call_with_retry(&self.identifier, arrow_input, fragment_id)
.instrument_await(self.span.clone())
.await
};
Expand Down
1 change: 1 addition & 0 deletions src/expr/core/src/expr_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_pb::plan_common::ExprContext;
// For all execution mode.
define_context! {
pub TIME_ZONE: String,
pub FRAGMENT_ID: u32,
}

pub fn capture_expr_context() -> ExprResult<ExprContext> {
Expand Down
4 changes: 2 additions & 2 deletions src/expr/udf/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn main() {
.unwrap();

let output = client
.call("gcd", input2)
.call("gcd", input2, 0)
.await
.expect("failed to call function");

Expand All @@ -68,7 +68,7 @@ async fn main() {
.unwrap();

let output = client
.call("gcd3", input3)
.call("gcd3", input3, 0)
.await
.expect("failed to call function");

Expand Down
22 changes: 17 additions & 5 deletions src/expr/udf/src/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,15 @@ impl ArrowFlightUdfClient {
}

/// Call a function.
pub async fn call(&self, id: &str, input: RecordBatch) -> Result<RecordBatch> {
pub async fn call(
&self,
id: &str,
input: RecordBatch,
fragment_id: u32,
) -> Result<RecordBatch> {
let metrics = &*GLOBAL_METRICS;
let labels = &[self.addr.as_str(), id];
let fragment_id_str = fragment_id.to_string();
let labels = &[self.addr.as_str(), id, fragment_id_str.as_str()];
metrics
.udf_input_chunk_rows
.with_label_values(labels)
Expand Down Expand Up @@ -189,10 +195,15 @@ impl ArrowFlightUdfClient {
}

/// Call a function, retry up to 5 times / 3s if connection is broken.
pub async fn call_with_retry(&self, id: &str, input: RecordBatch) -> Result<RecordBatch> {
pub async fn call_with_retry(
&self,
id: &str,
input: RecordBatch,
fragment_id: u32,
) -> Result<RecordBatch> {
let mut backoff = Duration::from_millis(100);
for i in 0..5 {
match self.call(id, input.clone()).await {
match self.call(id, input.clone(), fragment_id).await {
Err(err) if err.is_connection_error() && i != 4 => {
tracing::error!(error = %err.as_report(), "UDF connection error. retry...");
}
Expand All @@ -209,10 +220,11 @@ impl ArrowFlightUdfClient {
&self,
id: &str,
input: RecordBatch,
fragment_id: u32,
) -> Result<RecordBatch> {
let mut backoff = Duration::from_millis(100);
loop {
match self.call(id, input.clone()).await {
match self.call(id, input.clone(), fragment_id).await {
Err(err) if err.is_connection_error() => {
tracing::error!(error = %err.as_report(), "UDF connection error. retry...");
}
Expand Down
2 changes: 1 addition & 1 deletion src/expr/udf/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub static GLOBAL_METRICS: LazyLock<Metrics> =

impl Metrics {
fn new(registry: &Registry) -> Self {
let labels = &["link", "name"];
let labels = &["link", "name", "fragment_id"];
let udf_success_count = register_int_counter_vec_with_registry!(
"udf_success_count",
"Total number of successful UDF calls",
Expand Down
21 changes: 12 additions & 9 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::error::ErrorSuppressor;
use risingwave_common::log::LogSuppresser;
use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
use risingwave_common::util::epoch::EpochPair;
use risingwave_expr::expr_context::expr_context_scope;
use risingwave_expr::expr_context::{expr_context_scope, FRAGMENT_ID};
use risingwave_expr::ExprError;
use risingwave_pb::plan_common::ExprContext;
use risingwave_pb::stream_plan::PbStreamActor;
Expand Down Expand Up @@ -171,14 +171,17 @@ where

#[inline(always)]
pub async fn run(mut self) -> StreamResult<()> {
expr_context_scope(self.expr_context.clone(), async move {
tokio::join!(
// Drive the subtasks concurrently.
join_all(std::mem::take(&mut self.subtasks)),
self.run_consumer(),
)
.1
})
FRAGMENT_ID::scope(
self.actor_context.fragment_id,
expr_context_scope(self.expr_context.clone(), async move {
tokio::join!(
// Drive the subtasks concurrently.
join_all(std::mem::take(&mut self.subtasks)),
self.run_consumer(),
)
.1
}),
)
.await
}

Expand Down

0 comments on commit 3694459

Please sign in to comment.