Skip to content

Commit

Permalink
add comments and add track for call_with_always_retry_on_network_error
Browse files Browse the repository at this point in the history
  • Loading branch information
yufansong committed Mar 7, 2024
1 parent 3a2e574 commit 3774877
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
10 changes: 9 additions & 1 deletion src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ pub struct UserDefinedFunction {
identifier: String,
span: await_tree::Span,
/// Number of remaining successful calls until retry is enabled.
/// This parameter is designed to prevent continuous retry on every call, which would increase delay.
/// Logic:
/// It resets to INITIAL_RETRY_COUNT after a single failure and then decrements with each call, enabling retry when it reaches zero.
/// If non-zero, we will not retry on connection errors to prevent blocking the stream.
/// On each connection error, the count will be reset to `INITIAL_RETRY_COUNT`.
/// On each successful call, the count will be decreased by 1.
/// Link:
/// See <https://github.com/risingwavelabs/risingwave/issues/13791>.
disable_retry_count: AtomicU8,
/// Always retry. Overrides `disable_retry_count`.
Expand Down Expand Up @@ -152,7 +156,11 @@ impl UserDefinedFunction {
let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed);
let result = if self.always_retry_on_network_error {
client
.call_with_always_retry_on_network_error(&self.identifier, arrow_input)
.call_with_always_retry_on_network_error(
&self.identifier,
arrow_input,
&fragment_id,
)
.instrument_await(self.span.clone())
.await
} else {
Expand Down
5 changes: 5 additions & 0 deletions src/expr/udf/src/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use thiserror_ext::AsReport;
use tokio::time::Duration as TokioDuration;
use tonic::transport::Channel;

use crate::metrics::GLOBAL_METRICS;
use crate::{Error, Result};

// Interval between two successive probes of the UDF DNS.
Expand Down Expand Up @@ -182,8 +183,11 @@ impl ArrowFlightUdfClient {
&self,
id: &str,
input: RecordBatch,
fragment_id: &str,
) -> Result<RecordBatch> {
let mut backoff = Duration::from_millis(100);
let metrics = &*GLOBAL_METRICS;
let labels: &[&str; 4] = &[&self.addr, "external", &id, &fragment_id];
loop {
match self.call(id, input.clone()).await {
Err(err) if err.is_tonic_error() => {
Expand All @@ -196,6 +200,7 @@ impl ArrowFlightUdfClient {
return ret;
}
}
metrics.udf_failure_count.with_label_values(labels).inc();
tokio::time::sleep(backoff).await;
backoff *= 2;
}
Expand Down

0 comments on commit 3774877

Please sign in to comment.