diff --git a/src/expr/core/src/expr/expr_udf.rs b/src/expr/core/src/expr/expr_udf.rs index ccd7a6ed70fd3..0c67dc3c58df1 100644 --- a/src/expr/core/src/expr/expr_udf.rs +++ b/src/expr/core/src/expr/expr_udf.rs @@ -51,9 +51,13 @@ pub struct UserDefinedFunction { identifier: String, span: await_tree::Span, /// Number of remaining successful calls until retry is enabled. + /// This parameter is designed to prevent continuous retry on every call, which would increase delay. + /// Logic: + /// It resets to INITIAL_RETRY_COUNT after a single failure and then decrements with each call, enabling retry when it reaches zero. /// If non-zero, we will not retry on connection errors to prevent blocking the stream. /// On each connection error, the count will be reset to `INITIAL_RETRY_COUNT`. /// On each successful call, the count will be decreased by 1. + /// Link: /// See . disable_retry_count: AtomicU8, /// Always retry. Overrides `disable_retry_count`. @@ -152,7 +156,11 @@ impl UserDefinedFunction { let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed); let result = if self.always_retry_on_network_error { client - .call_with_always_retry_on_network_error(&self.identifier, arrow_input) + .call_with_always_retry_on_network_error( + &self.identifier, + arrow_input, + &fragment_id, + ) .instrument_await(self.span.clone()) .await } else { diff --git a/src/expr/udf/src/external.rs b/src/expr/udf/src/external.rs index cb33d289c35b0..b7439f07f86b8 100644 --- a/src/expr/udf/src/external.rs +++ b/src/expr/udf/src/external.rs @@ -30,6 +30,7 @@ use thiserror_ext::AsReport; use tokio::time::Duration as TokioDuration; use tonic::transport::Channel; +use crate::metrics::GLOBAL_METRICS; use crate::{Error, Result}; // Interval between two successive probes of the UDF DNS. @@ -182,8 +183,11 @@ impl ArrowFlightUdfClient { &self, id: &str, input: RecordBatch, + fragment_id: &str, ) -> Result { let mut backoff = Duration::from_millis(100); + let metrics = &*GLOBAL_METRICS; + let labels: &[&str; 4] = &[&self.addr, "external", &id, &fragment_id]; loop { match self.call(id, input.clone()).await { Err(err) if err.is_tonic_error() => { @@ -196,6 +200,7 @@ impl ArrowFlightUdfClient { return ret; } } + metrics.udf_failure_count.with_label_values(labels).inc(); tokio::time::sleep(backoff).await; backoff *= 2; }