Skip to content

Commit

Permalink
Don't retry permanent failures (#634)
Browse files Browse the repository at this point in the history
The retrier always retries on error.  However, some errors are permanent, for
example NotFound.  This means that if a request is made to get something from
the CAS (or GrpcStore AC) which is not there, it keeps trying even though it's
still not there.  This defines the set of errors which shoudl be retried and
fails otherwise.
  • Loading branch information
chrisstaite-menlo authored Feb 2, 2024
1 parent 0cba4fa commit 81b64f7
Show file tree
Hide file tree
Showing 8 changed files with 386 additions and 329 deletions.
36 changes: 36 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,28 @@ pub struct GrpcStore {
pub retry: Retry,
}

/// The possible error codes that might occur on an upstream request.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum ErrorCode {
Cancelled = 1,
Unknown = 2,
InvalidArgument = 3,
DeadlineExceeded = 4,
NotFound = 5,
AlreadyExists = 6,
PermissionDenied = 7,
ResourceExhausted = 8,
FailedPrecondition = 9,
Aborted = 10,
OutOfRange = 11,
Unimplemented = 12,
Internal = 13,
Unavailable = 14,
DataLoss = 15,
Unauthenticated = 16,
// Note: This list is duplicated from nativelink-error/lib.rs.
}

/// Retry configuration. This configuration is exponential and each iteration
/// a jitter as a percentage is applied of the calculated delay. For example:
/// ```rust,ignore
Expand Down Expand Up @@ -591,4 +613,18 @@ pub struct Retry {
/// ```
#[serde(default)]
pub jitter: f32,

/// A list of error codes to retry on, if this is not set then the default
/// error codes to retry on are used. These default codes are the most
/// likely to be non-permanent.
/// - Unknown
/// - Cancelled
/// - DeadlineExceeded
/// - ResourceExhausted
/// - Aborted
/// - Internal
/// - Unavailable
/// - DataLoss
#[serde(default)]
pub retry_on_errors: Option<Vec<ErrorCode>>,
}
2 changes: 2 additions & 0 deletions nativelink-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ pub enum Code {
Unavailable = 14,
DataLoss = 15,
Unauthenticated = 16,
// NOTE: Additional codes must be added to stores.rs in ErrorCodes and also
// in both match statements in retry.rs.
}

impl From<i32> for Code {
Expand Down
36 changes: 15 additions & 21 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
};
use nativelink_proto::google::longrunning::Operation;
use nativelink_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionState, DEFAULT_EXECUTION_PRIORITY};
use nativelink_util::retry::{ExponentialBackoff, Retrier, RetryResult};
use nativelink_util::retry::{Retrier, RetryResult};
use nativelink_util::tls_utils;
use parking_lot::Mutex;
use rand::rngs::OsRng;
Expand All @@ -46,8 +46,6 @@ pub struct GrpcScheduler {
capabilities_client: CapabilitiesClient<transport::Channel>,
execution_client: ExecutionClient<transport::Channel>,
platform_property_managers: Mutex<HashMap<String, Arc<PlatformPropertyManager>>>,
jitter_fn: Box<dyn Fn(Duration) -> Duration + Send + Sync>,
retry: nativelink_config::stores::Retry,
retrier: Retrier,
}

Expand Down Expand Up @@ -76,9 +74,11 @@ impl GrpcScheduler {
capabilities_client: CapabilitiesClient::new(channel.clone()),
execution_client: ExecutionClient::new(channel),
platform_property_managers: Mutex::new(HashMap::new()),
jitter_fn,
retry: config.retry.clone(),
retrier: Retrier::new(Arc::new(|duration| Box::pin(sleep(duration)))),
retrier: Retrier::new(
Arc::new(|duration| Box::pin(sleep(duration))),
Arc::new(jitter_fn),
config.retry.to_owned(),
),
})
}

Expand All @@ -89,22 +89,16 @@ impl GrpcScheduler {
R: Send,
I: Send + Clone,
{
let retry_config = ExponentialBackoff::new(Duration::from_millis(self.retry.delay as u64))
.map(|d| (self.jitter_fn)(d))
.take(self.retry.max_retries); // Remember this is number of retries, so will run max_retries + 1.
self.retrier
.retry(
retry_config,
unfold(input, move |input| async move {
let input_clone = input.clone();
Some((
request(input_clone)
.await
.map_or_else(RetryResult::Retry, RetryResult::Ok),
input,
))
}),
)
.retry(unfold(input, move |input| async move {
let input_clone = input.clone();
Some((
request(input_clone)
.await
.map_or_else(RetryResult::Retry, RetryResult::Ok),
input,
))
}))
.await
}

Expand Down
138 changes: 61 additions & 77 deletions nativelink-store/src/grpc_store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 The Native Link Authors. All rights reserved.
// Copyright 2023-2024 The Native Link Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,7 +35,7 @@ use nativelink_proto::google::bytestream::{
};
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::common::DigestInfo;
use nativelink_util::retry::{ExponentialBackoff, Retrier, RetryResult};
use nativelink_util::retry::{Retrier, RetryResult};
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use nativelink_util::tls_utils;
use nativelink_util::write_request_stream_wrapper::WriteRequestStreamWrapper;
Expand All @@ -60,8 +60,6 @@ pub struct GrpcStore {
bytestream_client: ByteStreamClient<transport::Channel>,
ac_client: ActionCacheClient<transport::Channel>,
store_type: nativelink_config::stores::StoreType,
jitter_fn: Box<dyn Fn(Duration) -> Duration + Send + Sync>,
retry: nativelink_config::stores::Retry,
retrier: Retrier,
}

Expand Down Expand Up @@ -126,39 +124,31 @@ impl GrpcStore {
bytestream_client: ByteStreamClient::new(conn.clone()),
ac_client: ActionCacheClient::new(conn),
store_type: config.store_type,
jitter_fn,
retry: config.retry.to_owned(),
retrier: Retrier::new(Arc::new(|duration| Box::pin(sleep(duration)))),
retrier: Retrier::new(
Arc::new(|duration| Box::pin(sleep(duration))),
Arc::new(jitter_fn),
config.retry.to_owned(),
),
})
}

fn get_retry_config(&self) -> impl Iterator<Item = Duration> + '_ {
ExponentialBackoff::new(Duration::from_millis(self.retry.delay as u64))
.map(|d| (self.jitter_fn)(d))
.take(self.retry.max_retries) // Remember this is number of retries, so will run max_retries + 1.
}

async fn perform_request<F, Fut, R, I>(&self, input: I, mut request: F) -> Result<R, Error>
where
F: FnMut(I) -> Fut + Send + Copy,
Fut: Future<Output = Result<R, Error>> + Send,
R: Send,
I: Send + Clone,
{
let retry_config = self.get_retry_config();
self.retrier
.retry(
retry_config,
unfold(input, move |input| async move {
let input_clone = input.clone();
Some((
request(input_clone)
.await
.map_or_else(RetryResult::Retry, RetryResult::Ok),
input,
))
}),
)
.retry(unfold(input, move |input| async move {
let input_clone = input.clone();
Some((
request(input_clone)
.await
.map_or_else(RetryResult::Retry, RetryResult::Ok),
input,
))
}))
.await
}

Expand Down Expand Up @@ -323,11 +313,9 @@ impl GrpcStore {
client: self.bytestream_client.clone(),
});

let retry_config = self.get_retry_config();
let result = self
.retrier
.retry(
retry_config,
unfold(local_state, move |local_state| async move {
let stream = unfold((None, local_state.clone()), move |(stream, local_state)| async {
// Only consume the stream on the first request to read,
Expand Down Expand Up @@ -680,60 +668,56 @@ impl Store for GrpcStore {
read_limit: length.unwrap_or(0) as i64,
};

let retry_config = self.get_retry_config();
self.retrier
.retry(
retry_config,
unfold(local_state, move |mut local_state| async move {
let request = ReadRequest {
resource_name: local_state.resource_name.clone(),
read_offset: local_state.read_offset,
read_limit: local_state.read_limit,
};
let mut stream = match self.read_internal(request).await.err_tip(|| "in GrpcStore::get_part()") {
Ok(stream) => stream,
Err(err) => return Some((RetryResult::Retry(err), local_state)),
};

loop {
let data = match stream.next().await {
// Create an empty response to represent EOF.
None => bytes::Bytes::new(),
Some(Ok(message)) => message.data,
Some(Err(status)) => {
return Some((
RetryResult::Retry(
Into::<Error>::into(status)
.append("While fetching message in GrpcStore::get_part()"),
),
local_state,
))
}
};
let length = data.len() as i64;
// This is the usual exit from the loop at EOF.
if length == 0 {
let eof_result = local_state
.writer
.send_eof()
.await
.err_tip(|| "Could not send eof in GrpcStore::get_part()")
.map_or_else(RetryResult::Err, RetryResult::Ok);
return Some((eof_result, local_state));
.retry(unfold(local_state, move |mut local_state| async move {
let request = ReadRequest {
resource_name: local_state.resource_name.clone(),
read_offset: local_state.read_offset,
read_limit: local_state.read_limit,
};
let mut stream = match self.read_internal(request).await.err_tip(|| "in GrpcStore::get_part()") {
Ok(stream) => stream,
Err(err) => return Some((RetryResult::Retry(err), local_state)),
};

loop {
let data = match stream.next().await {
// Create an empty response to represent EOF.
None => bytes::Bytes::new(),
Some(Ok(message)) => message.data,
Some(Err(status)) => {
return Some((
RetryResult::Retry(
Into::<Error>::into(status)
.append("While fetching message in GrpcStore::get_part()"),
),
local_state,
))
}
// Forward the data upstream.
if let Err(err) = local_state
};
let length = data.len() as i64;
// This is the usual exit from the loop at EOF.
if length == 0 {
let eof_result = local_state
.writer
.send(data)
.send_eof()
.await
.err_tip(|| "While sending in GrpcStore::get_part()")
{
return Some((RetryResult::Err(err), local_state));
}
local_state.read_offset += length;
.err_tip(|| "Could not send eof in GrpcStore::get_part()")
.map_or_else(RetryResult::Err, RetryResult::Ok);
return Some((eof_result, local_state));
}
}),
)
// Forward the data upstream.
if let Err(err) = local_state
.writer
.send(data)
.await
.err_tip(|| "While sending in GrpcStore::get_part()")
{
return Some((RetryResult::Err(err), local_state));
}
local_state.read_offset += length;
}
}))
.await
}

Expand Down
Loading

0 comments on commit 81b64f7

Please sign in to comment.