Skip to content

Commit

Permalink
feat(udf): add track for call_with_always_retry_on_network_error and …
Browse files Browse the repository at this point in the history
…add comments (#15513)
  • Loading branch information
yufansong authored Mar 8, 2024
1 parent ef7eb1d commit a79404d
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 3 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -4068,6 +4068,10 @@ def section_udf(outer_panels):
f"sum(rate({metric('udf_failure_count')}[$__rate_interval])) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"udf_failure_count - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum(rate({metric('udf_retry_count')}[$__rate_interval])) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"udf_retry_count - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum(rate({metric('udf_success_count')}[$__rate_interval])) by (link, name, fragment_id)",
"udf_success_count - {{link}} {{name}} {{fragment_id}}",
Expand All @@ -4076,6 +4080,10 @@ def section_udf(outer_panels):
f"sum(rate({metric('udf_failure_count')}[$__rate_interval])) by (link, name, fragment_id)",
"udf_failure_count - {{link}} {{name}} {{fragment_id}}",
),
panels.target(
f"sum(rate({metric('udf_retry_count')}[$__rate_interval])) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"udf_retry_count - {{%s}}" % NODE_LABEL,
),
],
),
panels.timeseries_count(
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ pub struct UserDefinedFunction {
identifier: String,
span: await_tree::Span,
/// Number of remaining successful calls until retry is enabled.
/// This parameter is designed to prevent continuous retry on every call, which would increase delay.
/// Logic:
/// It resets to INITIAL_RETRY_COUNT after a single failure and then decrements with each call, enabling retry when it reaches zero.
/// If non-zero, we will not retry on connection errors to prevent blocking the stream.
/// On each connection error, the count will be reset to `INITIAL_RETRY_COUNT`.
/// On each successful call, the count will be decreased by 1.
/// Link:
/// See <https://github.com/risingwavelabs/risingwave/issues/13791>.
disable_retry_count: AtomicU8,
/// Always retry. Overrides `disable_retry_count`.
Expand Down Expand Up @@ -152,7 +156,11 @@ impl UserDefinedFunction {
let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed);
let result = if self.always_retry_on_network_error {
client
.call_with_always_retry_on_network_error(&self.identifier, arrow_input)
.call_with_always_retry_on_network_error(
&self.identifier,
arrow_input,
&fragment_id,
)
.instrument_await(self.span.clone())
.await
} else {
Expand Down
5 changes: 5 additions & 0 deletions src/expr/udf/src/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use thiserror_ext::AsReport;
use tokio::time::Duration as TokioDuration;
use tonic::transport::Channel;

use crate::metrics::GLOBAL_METRICS;
use crate::{Error, Result};

// Interval between two successive probes of the UDF DNS.
Expand Down Expand Up @@ -182,8 +183,11 @@ impl ArrowFlightUdfClient {
&self,
id: &str,
input: RecordBatch,
fragment_id: &str,
) -> Result<RecordBatch> {
let mut backoff = Duration::from_millis(100);
let metrics = &*GLOBAL_METRICS;
let labels: &[&str; 4] = &[&self.addr, "external", &id, &fragment_id];
loop {
match self.call(id, input.clone()).await {
Err(err) if err.is_tonic_error() => {
Expand All @@ -196,6 +200,7 @@ impl ArrowFlightUdfClient {
return ret;
}
}
metrics.udf_retry_count.with_label_values(labels).inc();
tokio::time::sleep(backoff).await;
backoff *= 2;
}
Expand Down
10 changes: 10 additions & 0 deletions src/expr/udf/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct Metrics {
pub udf_success_count: IntCounterVec,
/// Number of failed UDF calls.
pub udf_failure_count: IntCounterVec,
/// Total number of retried UDF calls.
pub udf_retry_count: IntCounterVec,
/// Input chunk rows of UDF calls.
pub udf_input_chunk_rows: HistogramVec,
/// The latency of UDF calls in seconds.
Expand Down Expand Up @@ -58,6 +60,13 @@ impl Metrics {
registry
)
.unwrap();
let udf_retry_count = register_int_counter_vec_with_registry!(
"udf_retry_count",
"Total number of retried UDF calls",
labels,
registry
)
.unwrap();
let udf_input_chunk_rows = register_histogram_vec_with_registry!(
"udf_input_chunk_rows",
"Input chunk rows of UDF calls",
Expand Down Expand Up @@ -92,6 +101,7 @@ impl Metrics {
Metrics {
udf_success_count,
udf_failure_count,
udf_retry_count,
udf_input_chunk_rows,
udf_latency,
udf_input_rows,
Expand Down

0 comments on commit a79404d

Please sign in to comment.