diff --git a/nativelink-service/src/bytestream_server.rs b/nativelink-service/src/bytestream_server.rs index 43daf96da..abddc858f 100644 --- a/nativelink-service/src/bytestream_server.rs +++ b/nativelink-service/src/bytestream_server.rs @@ -396,11 +396,11 @@ impl ByteStreamServer { // by counting the number of bytes sent from the client. If they send // less than the amount they said they were going to send and then // close the stream, we know there's a problem. - Ok(None) => return Err(make_input_err!("Client closed stream before sending all data")), + None => return Err(make_input_err!("Client closed stream before sending all data")), // Code path for client stream error. Probably client disconnect. - Err(err) => return Err(err), + Some(Err(err)) => return Err(err), // Code path for received chunk of data. - Ok(Some(write_request)) => write_request, + Some(Ok(write_request)) => write_request, }; if write_request.write_offset < 0 { diff --git a/nativelink-service/tests/bytestream_server_test.rs b/nativelink-service/tests/bytestream_server_test.rs index 23d3fbc46..f9e8084e1 100644 --- a/nativelink-service/tests/bytestream_server_test.rs +++ b/nativelink-service/tests/bytestream_server_test.rs @@ -235,7 +235,7 @@ pub mod write_tests { // Now disconnect our stream. drop(tx); let (result, _bs_server) = join_handle.await?; - assert!(result.is_ok(), "Expected success to be returned"); + result?; } { // Check to make sure our store recorded the data properly. diff --git a/nativelink-store/src/grpc_store.rs b/nativelink-store/src/grpc_store.rs index ed3b867e4..d242f0984 100644 --- a/nativelink-store/src/grpc_store.rs +++ b/nativelink-store/src/grpc_store.rs @@ -15,6 +15,7 @@ use std::marker::Send; use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use std::time::Duration; use async_trait::async_trait; @@ -35,6 +36,7 @@ use nativelink_proto::google::bytestream::{ }; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::resource_info::ResourceInfo; use nativelink_util::retry::{ExponentialBackoff, Retrier, RetryResult}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use nativelink_util::tls_utils; @@ -44,7 +46,6 @@ use prost::Message; use rand::rngs::OsRng; use rand::Rng; use tokio::time::sleep; -use tonic::transport::Channel; use tonic::{transport, IntoRequest, Request, Response, Status, Streaming}; use tracing::error; use uuid::Uuid; @@ -90,6 +91,120 @@ impl Stream for FirstStream { } } +/// This structure wraps all of the information required to perform a write +/// request on the GrpcStore. It stores the last message retrieved which allows +/// the write to resume since the UUID allows upload resume at the server. +struct WriteState +where + T: Stream> + Unpin + Send + 'static, + E: Into + 'static, +{ + instance_name: String, + read_stream_error: Option, + read_stream: WriteRequestStreamWrapper, + // Tonic doesn't appear to report an error until it has taken two messages, + // therefore we are required to buffer the last two messages. + last_message: Option, + message_before_last: Option, + // When resuming after an error, set this to 2 to play back + // message_before_last. Will decrement and play back last_message next and + // will then decrement to 0 and start streaming again. + resume_count: u32, +} + +impl WriteState +where + T: Stream> + Unpin + Send + 'static, + E: Into + 'static, +{ + fn new(instance_name: String, read_stream: WriteRequestStreamWrapper) -> Self { + Self { + instance_name, + read_stream_error: None, + read_stream, + last_message: None, + message_before_last: None, + resume_count: 0, + } + } + + fn can_resume(&self) -> bool { + self.read_stream_error.is_none() && (self.last_message.is_some() || self.read_stream.is_first_msg()) + } + + fn resume(&mut self) { + self.resume_count = 2 + } +} + +/// A wrapper around WriteState to allow it to be reclaimed from the underlying +/// write call in the case of failure. +struct WriteStateWrapper +where + T: Stream> + Unpin + Send + 'static, + E: Into + 'static, +{ + shared_state: Arc>>, +} + +impl Stream for WriteStateWrapper +where + T: Stream> + Unpin + Send + 'static, + E: Into + 'static, +{ + type Item = WriteRequest; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // This should be an uncontended lock since write was called. + let mut local_state = self.shared_state.lock(); + // If this is the first or second call after a failure and we have + // cached messages, then use the cached write requests. + if local_state.resume_count == 2 { + local_state.resume_count -= 1; + if let Some(message_before_last) = &local_state.message_before_last { + return Poll::Ready(Some(message_before_last.clone())); + } + } + if local_state.resume_count == 1 { + local_state.resume_count -= 1; + if let Some(last_message) = &local_state.last_message { + return Poll::Ready(Some(last_message.clone())); + } + } + // Read a new write request from the downstream. + let Poll::Ready(maybe_message) = Pin::new(&mut local_state.read_stream).poll_next(cx) else { + return Poll::Pending; + }; + // Update the instance name in the write request and forward it on. + const IS_UPLOAD_TRUE: bool = true; + let result = match maybe_message { + Some(Ok(mut message)) => match ResourceInfo::new(&message.resource_name, IS_UPLOAD_TRUE) { + Ok(mut resource_name) => { + if resource_name.instance_name != local_state.instance_name { + resource_name.instance_name = &local_state.instance_name; + message.resource_name = resource_name.to_string(IS_UPLOAD_TRUE); + } + // Cache the last request in case there is an error to allow + // the upload to be resumed. + local_state.message_before_last = local_state.last_message.take(); + local_state.last_message = Some(message.clone()); + Some(message) + } + Err(err) => { + error!("{err:?}"); + None + } + }, + Some(Err(err)) => { + local_state.read_stream_error = Some(err); + None + } + None => None, + }; + Poll::Ready(result) + } +} + impl GrpcStore { pub async fn new(config: &nativelink_config::stores::GrpcStore) -> Result { let jitter_amt = config.retry.jitter; @@ -247,16 +362,12 @@ impl GrpcStore { } fn get_read_request(&self, mut request: ReadRequest) -> Result { - // `resource_name` pattern is: "{instance_name}/blobs/{hash}/{size}". - let first_slash_pos = request - .resource_name - .find('/') - .err_tip(|| "Resource name expected to follow pattern {instance_name}/blobs/{hash}/{size}")?; - request.resource_name = format!( - "{}/{}", - self.instance_name, - request.resource_name.get((first_slash_pos + 1)..).unwrap() - ); + const IS_UPLOAD_FALSE: bool = false; + let mut resource_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_FALSE)?; + if resource_info.instance_name != self.instance_name { + resource_info.instance_name = &self.instance_name; + request.resource_name = resource_info.to_string(IS_UPLOAD_FALSE); + } Ok(request) } @@ -305,23 +416,7 @@ impl GrpcStore { "CAS operation on AC store" ); - struct LocalState - where - T: Stream> + Unpin + Send + 'static, - E: Into + 'static, - { - instance_name: String, - error: Mutex>, - read_stream: Mutex>>, - client: ByteStreamClient, - } - - let local_state = Arc::new(LocalState { - instance_name: self.instance_name.clone(), - error: Mutex::new(None), - read_stream: Mutex::new(Some(stream)), - client: self.bytestream_client.clone(), - }); + let local_state = Arc::new(Mutex::new(WriteState::new(self.instance_name.clone(), stream))); let retry_config = self.get_retry_config(); let result = self @@ -329,56 +424,42 @@ impl GrpcStore { .retry( retry_config, unfold(local_state, move |local_state| async move { - let stream = unfold((None, local_state.clone()), move |(stream, local_state)| async { - // Only consume the stream on the first request to read, - // then pass it for future requests in the unfold. - let mut stream = stream.or_else(|| local_state.read_stream.lock().take())?; - let maybe_message = stream.next().await; - if let Ok(maybe_message) = maybe_message { - if let Some(mut message) = maybe_message { - // `resource_name` pattern is: "{instance_name}/uploads/{uuid}/blobs/{hash}/{size}". - let first_slash_pos = match message.resource_name.find('/') { - Some(pos) => pos, - None => { - error!("{}", "Resource name should follow pattern {instance_name}/uploads/{uuid}/blobs/{hash}/{size}"); - return None; - } - }; - message.resource_name = format!( - "{}/{}", - &local_state.instance_name, - message.resource_name.get((first_slash_pos + 1)..).unwrap() - ); - return Some((message, (Some(stream), local_state))); - } - return None; - } - // TODO(allada) I'm sure there's a way to do this without a mutex, but rust can be super - // picky with borrowing through a stream await. - *local_state.error.lock() = Some(maybe_message.unwrap_err()); - None - }); - - let result = local_state.client.clone() - .write(stream) - .await - .err_tip(|| "in GrpcStore::write"); - - // If the stream has been consumed, don't retry, but - // otherwise it's ok to try again. - let result = if local_state.read_stream.lock().is_some() { - result.map_or_else(RetryResult::Retry, RetryResult::Ok) - } else { - result.map_or_else(RetryResult::Err, RetryResult::Ok) - }; - - // If there was an error with the stream, then don't retry. - let result = if let Some(err) = local_state.error.lock().take() { + let mut client = self.bytestream_client.clone(); + // The client write may occur on a separate thread and + // therefore in order to share the state with it we have to + // wrap it in a Mutex and retrieve it after the write + // has completed. There is no way to get the value back + // from the client. + let result = client + .write(WriteStateWrapper { + shared_state: local_state.clone(), + }) + .await; + + // Get the state back from StateWrapper, this should be + // uncontended since write has returned. + let mut local_state_locked = local_state.lock(); + + let result = if let Some(err) = local_state_locked.read_stream_error.take() { + // If there was an error with the stream, then don't + // retry. RetryResult::Err(err) } else { - result + // On error determine whether it is possible to retry. + match result.err_tip(|| "in GrpcStore::write") { + Err(err) => { + if local_state_locked.can_resume() { + local_state_locked.resume(); + RetryResult::Retry(err) + } else { + RetryResult::Err(err.append("Retry is not possible")) + } + } + Ok(response) => RetryResult::Ok(response), + } }; + drop(local_state_locked); Some((result, local_state)) }), ) @@ -397,15 +478,12 @@ impl GrpcStore { let mut request = grpc_request.into_inner(); - // `resource_name` pattern is: "{instance_name}/uploads/{uuid}/blobs/{hash}/{size}". - let first_slash_pos = request.resource_name.find('/').err_tip(|| { - "Resource name expected to follow pattern {instance_name}/uploads/{uuid}/blobs/{hash}/{size}" - })?; - request.resource_name = format!( - "{}/{}", - self.instance_name, - request.resource_name.get((first_slash_pos + 1)..).unwrap() - ); + const IS_UPLOAD_TRUE: bool = true; + let mut request_info = ResourceInfo::new(&request.resource_name, IS_UPLOAD_TRUE)?; + if request_info.instance_name != self.instance_name { + request_info.instance_name = &self.instance_name; + request.resource_name = request_info.to_string(IS_UPLOAD_TRUE); + } self.perform_request(request, |request| async move { self.bytestream_client diff --git a/nativelink-util/src/resource_info.rs b/nativelink-util/src/resource_info.rs index 994152e7b..20d832b22 100644 --- a/nativelink-util/src/resource_info.rs +++ b/nativelink-util/src/resource_info.rs @@ -84,6 +84,7 @@ pub struct ResourceInfo<'a> { pub compressor: Option<&'a str>, pub digest_function: Option<&'a str>, pub hash: &'a str, + size: &'a str, pub expected_size: usize, pub optional_metadata: Option<&'a str>, } @@ -129,6 +130,25 @@ impl<'a> ResourceInfo<'a> { } Ok(output) } + + pub fn to_string(&self, is_upload: bool) -> String { + [ + Some(self.instance_name), + is_upload.then_some("uploads"), + self.uuid, + Some(self.compressor.map_or("blobs", |_| "compressed-blobs")), + self.compressor, + self.digest_function, + Some(self.hash), + Some(self.size), + self.optional_metadata, + ] + .into_iter() + .flatten() + .filter(|part| !part.is_empty()) + .collect::>() + .join("/") + } } #[derive(Debug, PartialEq)] @@ -177,8 +197,9 @@ fn recursive_parse<'a>( output.compressor = Some(part); *bytes_processed += part.len() + SLASH_SIZE; return Ok(state); + } else { + return Err(make_input_err!("Expected compressor, got {part}")); } - continue; } State::DigestFunction => { state = State::Hash; @@ -196,6 +217,7 @@ fn recursive_parse<'a>( return Ok(State::Size); } State::Size => { + output.size = part; output.expected_size = part .parse::() .map_err(|_| make_input_err!("Digest size_bytes was not convertible to usize. Got: {}", part))?; diff --git a/nativelink-util/src/write_request_stream_wrapper.rs b/nativelink-util/src/write_request_stream_wrapper.rs index b9d91b5b5..bf27f0539 100644 --- a/nativelink-util/src/write_request_stream_wrapper.rs +++ b/nativelink-util/src/write_request_stream_wrapper.rs @@ -1,4 +1,4 @@ -// Copyright 2023 The Native Link Authors. All rights reserved. +// Copyright 2023-2024 The Native Link Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::pin::Pin; +use std::task::{Context, Poll}; + use futures::{Stream, StreamExt}; -use nativelink_error::{error_if, Error, ResultExt}; +use nativelink_error::{error_if, make_input_err, Error, ResultExt}; use nativelink_proto::google::bytestream::WriteRequest; use crate::resource_info::ResourceInfo; @@ -56,7 +59,6 @@ where let hash = resource_info.hash.to_string(); let expected_size = resource_info.expected_size; let uuid = resource_info.uuid.map(|v| v.to_string()); - let write_finished = first_msg.finish_write; Ok(WriteRequestStreamWrapper { instance_name, @@ -66,15 +68,29 @@ where bytes_received: 0, stream, first_msg: Some(first_msg), - write_finished, + write_finished: false, }) } - pub async fn next(&mut self) -> Result, Error> { - if let Some(first_msg) = self.first_msg.take() { - self.bytes_received += first_msg.data.len(); - return Ok(Some(first_msg)); - } + pub async fn next(&mut self) -> Option> { + futures::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await + } + + pub fn is_first_msg(&self) -> bool { + self.first_msg.is_some() + } +} + +impl Stream for WriteRequestStreamWrapper +where + E: Into, + T: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // If the stream said that the previous message was the last one, then + // return a stream EOF (i.e. None). if self.write_finished { error_if!( self.bytes_received != self.expected_size, @@ -82,23 +98,45 @@ where self.expected_size, self.bytes_received ); - return Ok(None); // Previous message said it was the last msg. + return Poll::Ready(None); } - error_if!( - self.bytes_received > self.expected_size, - "Sent too much data. Expected {}, but so far received {}", - self.expected_size, - self.bytes_received - ); - let next_msg = self - .stream - .next() - .await - .err_tip(|| format!("Stream error at byte {}", self.bytes_received))? - .err_tip(|| "Expected WriteRequest struct in stream")?; - self.write_finished = next_msg.finish_write; - self.bytes_received += next_msg.data.len(); - Ok(Some(next_msg)) + // Gets the next message, this is either the cached first or a + // subsequent message from the wrapped Stream. + let maybe_message = if let Some(first_msg) = self.first_msg.take() { + Ok(first_msg) + } else { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Some(maybe_message)) => { + maybe_message.err_tip(|| format!("Stream error at byte {}", self.bytes_received)) + } + Poll::Ready(None) => Err(make_input_err!("Expected WriteRequest struct in stream")), + } + }; + + // If we successfully got a message, update our internal state with the + // message meta data. + let maybe_message = match maybe_message { + Ok(message) => { + self.write_finished = message.finish_write; + self.bytes_received += message.data.len(); + + // Check that we haven't read past the expected end. + if self.bytes_received > self.expected_size { + Err(make_input_err!( + "Sent too much data. Expected {}, but so far received {}", + self.expected_size, + self.bytes_received + )) + } else { + Ok(message) + } + } + error => error, + }; + + // Return the message. + Poll::Ready(Some(maybe_message)) } } diff --git a/nativelink-util/tests/resource_info_test.rs b/nativelink-util/tests/resource_info_test.rs index 419719b94..cf52eace5 100644 --- a/nativelink-util/tests/resource_info_test.rs +++ b/nativelink-util/tests/resource_info_test.rs @@ -32,6 +32,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, Some("optional_metadata")); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } @@ -47,6 +48,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } @@ -62,6 +64,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, Some("optional_metadata")); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } @@ -76,6 +79,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } @@ -91,6 +95,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, Some("optional_metadata")); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } @@ -120,6 +125,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, Some("optional_metadata")); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } @@ -134,6 +140,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } @@ -149,6 +156,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, Some("optional_metadata")); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } @@ -163,6 +171,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } @@ -177,6 +186,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, Some("optional_metadata")); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } @@ -191,6 +201,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } @@ -206,6 +217,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, Some("optional_metadata")); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } @@ -220,12 +232,13 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } #[tokio::test] async fn read_blobs_hash_size_optional_metadata_test() -> Result<(), Box> { - const RESOURCE_NAME: &str = "compressed-blobs/hash/12345/optional_metadata"; + const RESOURCE_NAME: &str = "blobs/hash/12345/optional_metadata"; let resource_info = ResourceInfo::new(RESOURCE_NAME, false)?; assert_eq!(resource_info.instance_name, ""); assert_eq!(resource_info.uuid, None); @@ -234,12 +247,13 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, Some("optional_metadata")); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } #[tokio::test] async fn read_blobs_hash_size_test() -> Result<(), Box> { - const RESOURCE_NAME: &str = "compressed-blobs/hash/12345"; + const RESOURCE_NAME: &str = "blobs/hash/12345"; let resource_info = ResourceInfo::new(RESOURCE_NAME, false)?; assert_eq!(resource_info.instance_name, ""); assert_eq!(resource_info.uuid, None); @@ -248,6 +262,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } @@ -262,6 +277,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } @@ -276,6 +292,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(false)); Ok(()) } @@ -320,6 +337,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, Some("optional_metadata")); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) } @@ -335,6 +353,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) } @@ -350,6 +369,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, Some("optional_metadata")); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) } @@ -365,6 +385,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) } @@ -380,6 +401,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, Some("optional_metadata")); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) } @@ -395,6 +417,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) } @@ -410,6 +433,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, Some("optional_metadata")); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) } @@ -424,6 +448,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) } @@ -439,6 +464,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, Some("optional_metadata")); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) } @@ -454,6 +480,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) } @@ -469,6 +496,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, Some("optional_metadata")); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) } @@ -483,6 +511,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) } @@ -498,6 +527,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, Some("optional_metadata")); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) } @@ -512,12 +542,20 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); + Ok(()) + } + + #[tokio::test] + async fn compressed_blob_invalid_compression_format() -> Result<(), Box> { + const RESOURCE_NAME: &str = "uploads/uuid/compressed-blobs/INVALID/hash/12345"; + assert!(ResourceInfo::new(RESOURCE_NAME, true).is_err()); Ok(()) } #[tokio::test] async fn write_uploads_uuid_blobs_hash_size_optional_metadata_test() -> Result<(), Box> { - const RESOURCE_NAME: &str = "uploads/uuid/compressed-blobs/hash/12345/optional_metadata"; + const RESOURCE_NAME: &str = "uploads/uuid/blobs/hash/12345/optional_metadata"; let resource_info = ResourceInfo::new(RESOURCE_NAME, true)?; assert_eq!(resource_info.instance_name, ""); assert_eq!(resource_info.uuid, Some("uuid")); @@ -526,12 +564,13 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, Some("optional_metadata")); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) } #[tokio::test] async fn write_uploads_uuid_blobs_hash_size_test() -> Result<(), Box> { - const RESOURCE_NAME: &str = "uploads/uuid/compressed-blobs/hash/12345"; + const RESOURCE_NAME: &str = "uploads/uuid/blobs/hash/12345"; let resource_info = ResourceInfo::new(RESOURCE_NAME, true)?; assert_eq!(resource_info.instance_name, ""); assert_eq!(resource_info.uuid, Some("uuid")); @@ -540,6 +579,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) } @@ -554,6 +594,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) } @@ -568,6 +609,7 @@ mod resource_info_tests { assert_eq!(resource_info.hash, "hash"); assert_eq!(resource_info.expected_size, 12345); assert_eq!(resource_info.optional_metadata, None); + assert_eq!(RESOURCE_NAME, resource_info.to_string(true)); Ok(()) }