diff --git a/Cargo.lock b/Cargo.lock index 6e25bb2b5..12d740e53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1324,6 +1324,55 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "googleapis-tonic-google-api" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0fac8a714a790dacdbc09fd423079b9cfc85a754b225c6c4ece22ad4f277346" +dependencies = [ + "prost", + "prost-types", + "tonic", +] + +[[package]] +name = "googleapis-tonic-google-iam-v1" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa2de38058bcf2fcb512a441dd5f8ae0c5893475eb3cc4c9f2178ca3856451a6" +dependencies = [ + "googleapis-tonic-google-api", + "googleapis-tonic-google-type", + "prost", + "prost-types", + "tonic", +] + +[[package]] +name = "googleapis-tonic-google-storage-v2" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e93e7cde66413a2bcedf7c5aa9c1ea36a17d3bccaa680cc931434db295f1769" +dependencies = [ + "googleapis-tonic-google-api", + "googleapis-tonic-google-iam-v1", + "googleapis-tonic-google-type", + "prost", + "prost-types", + "tonic", +] + +[[package]] +name = "googleapis-tonic-google-type" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ba7a85cbf34af971cfcc2d15bc7de8935f33b7a675def0e520e567b64ce8c7f" +dependencies = [ + "prost", + "prost-types", + "tonic", +] + [[package]] name = "h2" version = "0.3.26" @@ -2173,9 +2222,11 @@ dependencies = [ "bytes", "bytes-utils", "const_format", + "crc32c", "filetime", "fred", "futures", + "googleapis-tonic-google-storage-v2", "hex", "http 1.2.0", "http-body 1.0.1", @@ -4060,4 +4111,4 @@ dependencies = [ "proc-macro2", "quote", "syn 2.0.90", -] +] \ No newline at end of file diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index 7e6e31821..879eda713 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -54,6 +54,9 @@ pub enum StoreSpec { /// memory(MemorySpec), + /// TODO(asr2003): Add documentation. + experimental_gcs_store(GCSSpec), + /// S3 store will use Amazon's S3 service as a backend to store /// the files. This configuration can be used to share files /// across multiple instances. @@ -724,6 +727,61 @@ pub struct EvictionPolicy { pub max_count: u64, } +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +#[serde(deny_unknown_fields)] +pub struct GCSSpec { + /// GCS region or location. Example: US, US-CENTRAL1, EUROPE-WEST1. + #[serde(default, deserialize_with = "convert_string_with_shellexpand")] + pub location: String, + + /// Bucket name to use as the backend. + #[serde(default, deserialize_with = "convert_string_with_shellexpand")] + pub bucket: String, + + /// Optional prefix for object keys. If None, no prefix will be used. + #[serde(default)] + pub key_prefix: Option, + + /// Retry configuration to use when a network request fails. + #[serde(default)] + pub retry: Retry, + + /// Time in seconds after which an object is considered "expired." + /// Allows external tools to clean up unused objects. + /// Default: 0. Zero means never consider an object expired. + #[serde(default, deserialize_with = "convert_duration_with_shellexpand")] + pub consider_expired_after_s: u32, + + /// Maximum buffer size to retain in case of a retryable error during upload. + /// Setting this to zero will disable upload buffering. + /// Default: 5MB. + pub max_retry_buffer_per_request: Option, + + /// Enable resumable uploads for large objects. + /// Default: true. + #[serde(default)] + pub enable_resumable_uploads: bool, + + /// The maximum size of chunks (in bytes) for resumable uploads. + /// Default: 8MB. + pub resumable_chunk_size: Option, + + /// Allow unencrypted HTTP connections. Only use this for local testing. + /// Default: false + #[serde(default)] + pub insecure_allow_http: bool, + + /// Disable http/2 connections and only use http/1.1. + /// Default: false + #[serde(default)] + pub disable_http2: bool, + + /// Optional configuration for client authentication. + /// Example: Path to a service account JSON key file or environment-based authentication. + #[serde(default)] + pub auth_key_file: Option, +} + #[derive(Serialize, Deserialize, Debug, Default, Clone)] #[serde(deny_unknown_fields)] pub struct S3Spec { diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel index 4f7691e7d..229d8c705 100644 --- a/nativelink-store/BUILD.bazel +++ b/nativelink-store/BUILD.bazel @@ -18,6 +18,7 @@ rust_library( "src/existence_cache_store.rs", "src/fast_slow_store.rs", "src/filesystem_store.rs", + "src/gcs_store.rs", "src/grpc_store.rs", "src/lib.rs", "src/memory_store.rs", @@ -52,9 +53,11 @@ rust_library( "@crates//:bytes", "@crates//:bytes-utils", "@crates//:const_format", + "@crates//:crc32c", "@crates//:filetime", "@crates//:fred", "@crates//:futures", + "@crates//:googleapis-tonic-google-storage-v2", "@crates//:hex", "@crates//:http-body", "@crates//:hyper-0.14.31", diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index d646c53e6..27d7351bf 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -57,6 +57,8 @@ tokio-util = { version = "0.7.13" } tonic = { version = "0.12.3", features = ["transport", "tls"], default-features = false } tracing = { version = "0.1.41", default-features = false } uuid = { version = "1.11.0", default-features = false, features = ["v4", "serde"] } +googleapis-tonic-google-storage-v2 = "0.16.0" +crc32c = "0.6.8" [dev-dependencies] nativelink-macro = { path = "../nativelink-macro" } diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 506ef6752..3658d6e11 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -29,6 +29,7 @@ use crate::dedup_store::DedupStore; use crate::existence_cache_store::ExistenceCacheStore; use crate::fast_slow_store::FastSlowStore; use crate::filesystem_store::FilesystemStore; +use crate::gcs_store::GCSStore; use crate::grpc_store::GrpcStore; use crate::memory_store::MemoryStore; use crate::noop_store::NoopStore; @@ -51,6 +52,7 @@ pub fn store_factory<'a>( let store: Arc = match backend { StoreSpec::memory(spec) => MemoryStore::new(spec), StoreSpec::experimental_s3_store(spec) => S3Store::new(spec, SystemTime::now).await?, + StoreSpec::experimental_gcs_store(spec) => GCSStore::new(spec, SystemTime::now).await?, StoreSpec::redis_store(spec) => RedisStore::new(spec.clone())?, StoreSpec::verify(spec) => VerifyStore::new( spec, diff --git a/nativelink-store/src/gcs_store.rs b/nativelink-store/src/gcs_store.rs new file mode 100644 index 000000000..08230cbc5 --- /dev/null +++ b/nativelink-store/src/gcs_store.rs @@ -0,0 +1,553 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::pin::Pin; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use futures::stream::{unfold, FuturesUnordered}; +use futures::{stream, StreamExt, TryStreamExt}; +// use tokio_stream::StreamExt; +use googleapis_tonic_google_storage_v2::google::storage::v2::{ + storage_client::StorageClient, write_object_request, ChecksummedData, Object, + QueryWriteStatusRequest, ReadObjectRequest, StartResumableWriteRequest, WriteObjectRequest, + WriteObjectSpec, +}; +use nativelink_config::stores::GCSSpec; +use nativelink_error::{make_err, Code, Error, ResultExt}; +use nativelink_metric::MetricsComponent; +use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; +use nativelink_util::health_utils::{HealthStatus, HealthStatusIndicator}; +use nativelink_util::instant_wrapper::InstantWrapper; +use nativelink_util::retry::{Retrier, RetryResult}; +use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo}; +use rand::rngs::OsRng; +use rand::Rng; +use tokio::time::sleep; +use tonic::transport::Channel; + +// use tracing::{event, Level}; +use crate::cas_utils::is_zero_digest; + +// # How is this Different from the S3 Store Implementation +// +// The GCS store implementation differs from the S3 store implementation in several ways, reflecting +// differences in underlying APIs and service capabilities. This section provides a summary of key +// differences relevant to the **store implementation** for maintainability and reviewability: + +// TODO: Add more reviewable docs comments +/* +--- + +### **Rationale for Implementation Differences** + +The GCS store implementation adheres to the requirements and limitations of Google Cloud Storage's gRPC API. +Sequential chunk uploads, explicit session handling and checksum validation reflect the service's design. +In contrast, S3's multipart upload API simplifies concurrency, error handling, and session management. + +These differences emphasize the need for tailored approaches to storage backends while maintaining a consistent abstraction layer for higher-level operations. +--- +*/ + +// Default Buffer size for reading chunks of data in bytes. +// Note: If you change this, adjust the docs in the config. +const DEFAULT_CHUNK_SIZE: u64 = 8 * 1024 * 1024; + +#[derive(MetricsComponent)] +pub struct GCSStore { + // The gRPC client for GCS + gcs_client: Arc>, + now_fn: NowFn, + #[metric(help = "The bucket name for the GCS store")] + bucket: String, + #[metric(help = "The key prefix for the GCS store")] + key_prefix: String, + retrier: Retrier, + #[metric(help = "The number of seconds to consider an object expired")] + consider_expired_after_s: i64, + #[metric(help = "The number of bytes to buffer for retrying requests")] + max_retry_buffer_per_request: usize, + #[metric(help = "The size of chunks for resumable uploads")] + resumable_chunk_size: usize, + #[metric(help = "The number of concurrent uploads allowed for resumable uploads")] + resumable_max_concurrent_uploads: usize, +} + +impl GCSStore +where + I: InstantWrapper, + NowFn: Fn() -> I + Send + Sync + Unpin + 'static, +{ + pub async fn new(spec: &GCSSpec, now_fn: NowFn) -> Result, Error> { + let jitter_amt = spec.retry.jitter; + let jitter_fn = Arc::new(move |delay: Duration| { + if jitter_amt == 0. { + return delay; + } + let min = 1. - (jitter_amt / 2.); + let max = 1. + (jitter_amt / 2.); + delay.mul_f32(OsRng.gen_range(min..max)) + }); + + let channel = tonic::transport::Channel::from_static("https://storage.googleapis.com") + .connect() + .await + .map_err(|e| make_err!(Code::Unavailable, "Failed to connect to GCS: {e:?}"))?; + + let gcs_client = StorageClient::new(channel); + + Self::new_with_client_and_jitter(spec, gcs_client, jitter_fn, now_fn) + } + + pub fn new_with_client_and_jitter( + spec: &GCSSpec, + gcs_client: StorageClient, + jitter_fn: Arc Duration + Send + Sync>, + now_fn: NowFn, + ) -> Result, Error> { + Ok(Arc::new(Self { + gcs_client: Arc::new(gcs_client), + now_fn, + bucket: spec.bucket.to_string(), + key_prefix: spec.key_prefix.as_ref().unwrap_or(&String::new()).clone(), + retrier: Retrier::new( + Arc::new(|duration| Box::pin(sleep(duration))), + jitter_fn, + spec.retry.clone(), + ), + consider_expired_after_s: i64::from(spec.consider_expired_after_s), + max_retry_buffer_per_request: spec + .max_retry_buffer_per_request + .unwrap_or(DEFAULT_CHUNK_SIZE as usize), + resumable_chunk_size: spec + .resumable_chunk_size + .unwrap_or(DEFAULT_CHUNK_SIZE as usize), + resumable_max_concurrent_uploads: 0, + })) + } + + fn make_gcs_path(&self, key: &StoreKey<'_>) -> String { + format!("{}{}", self.key_prefix, key.as_str()) + } + + /// Check if the object exists and is not expired + pub async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result, Error> { + let client = Arc::clone(&self.gcs_client); + + self.retrier + .retry(unfold((), move |state| { + let mut client = (*client).clone(); + async move { + let object_path = self.make_gcs_path(digest); + let request = ReadObjectRequest { + bucket: self.bucket.clone(), + object: object_path.clone(), + ..Default::default() + }; + + let result = client.read_object(request).await; + + match result { + Ok(response) => { + let mut response_stream = response.into_inner(); + + // The first message contains the metadata + if let Some(Ok(first_message)) = response_stream.next().await { + if let Some(metadata) = first_message.metadata { + if self.consider_expired_after_s != 0 { + if let Some(last_modified) = metadata.update_time { + let now_s = (self.now_fn)().unix_timestamp() as i64; + if last_modified.seconds + self.consider_expired_after_s + <= now_s + { + return Some((RetryResult::Ok(None), state)); + } + } + } + let length = metadata.size as u64; + return Some((RetryResult::Ok(Some(length)), state)); + } + } + Some((RetryResult::Ok(None), state)) + } + Err(status) => match status.code() { + tonic::Code::NotFound => Some((RetryResult::Ok(None), state)), + _ => Some(( + RetryResult::Retry(make_err!( + Code::Unavailable, + "Unhandled ReadObject error: {status:?}" + )), + state, + )), + }, + } + } + })) + .await + } +} + +#[async_trait] +impl StoreDriver for GCSStore +where + I: InstantWrapper, + NowFn: Fn() -> I + Send + Sync + Unpin + 'static, +{ + async fn has_with_results( + self: Pin<&Self>, + keys: &[StoreKey<'_>], + results: &mut [Option], + ) -> Result<(), Error> { + keys.iter() + .zip(results.iter_mut()) + .map(|(key, result)| async move { + // Check for zero digest as a special case. + if is_zero_digest(key.borrow()) { + *result = Some(0); + return Ok::<_, Error>(()); + } + *result = self.has(key).await?; + Ok::<_, Error>(()) + }) + .collect::>() + .try_collect() + .await + } + + async fn update( + self: Pin<&Self>, + digest: StoreKey<'_>, + mut reader: DropCloserReadHalf, + upload_size: UploadSizeInfo, + ) -> Result<(), Error> { + let gcs_path = self.make_gcs_path(&digest.borrow()); + + let max_size = match upload_size { + UploadSizeInfo::ExactSize(sz) | UploadSizeInfo::MaxSize(sz) => sz, + }; + + // If size is below chunk threshold and is known, use a simple upload + // Single-chunk upload for small files + if max_size < DEFAULT_CHUNK_SIZE && matches!(upload_size, UploadSizeInfo::ExactSize(_)) { + let UploadSizeInfo::ExactSize(sz) = upload_size else { + unreachable!("upload_size must be UploadSizeInfo::ExactSize here"); + }; + reader.set_max_recent_data_size( + u64::try_from(self.max_retry_buffer_per_request) + .err_tip(|| "Could not convert max_retry_buffer_per_request to u64")?, + ); + + // Read all data and upload in one request + let data = reader + .consume(Some(sz as usize)) + .await + .err_tip(|| "Failed to read data for single upload")?; + + return self + .retrier + .retry(unfold((), move |()| { + let client = Arc::clone(&self.gcs_client); + let mut client = (*client).clone(); + let gcs_path = gcs_path.clone(); + let data = data.clone(); + + async move { + let write_spec = WriteObjectSpec { + resource: Some(Object { + name: gcs_path.clone(), + ..Default::default() + }), + object_size: Some(sz as i64), + ..Default::default() + }; + + let request_stream = stream::iter(vec![WriteObjectRequest { + first_message: Some( + write_object_request::FirstMessage::WriteObjectSpec(write_spec), + ), + data: Some(write_object_request::Data::ChecksummedData( + ChecksummedData { + content: data.to_vec(), + crc32c: Some(crc32c::crc32c(&data)), + }, + )), + finish_write: true, + ..Default::default() + }]); + + let result = client + .write_object(request_stream) + .await + .map_err(|e| make_err!(Code::Aborted, "WriteObject failed: {e:?}")); + + match result { + Ok(_) => Some((RetryResult::Ok(()), ())), + Err(e) => Some((RetryResult::Retry(e), ())), + } + } + })) + .await; + } + + // Start a resumable write session for larger files + let upload_id = self + .retrier + .retry(unfold((), move |()| { + let client = Arc::clone(&self.gcs_client); + let mut client = (*client).clone(); + let gcs_path = gcs_path.clone(); + async move { + let write_spec = WriteObjectSpec { + resource: Some(Object { + name: gcs_path.clone(), + ..Default::default() + }), + object_size: Some(max_size as i64), + ..Default::default() + }; + + let request = StartResumableWriteRequest { + write_object_spec: Some(write_spec), + ..Default::default() + }; + + let result = client.start_resumable_write(request).await.map_err(|e| { + make_err!(Code::Unavailable, "Failed to start resumable upload: {e:?}") + }); + + match result { + Ok(response) => { + Some((RetryResult::Ok(response.into_inner().upload_id), ())) + } + Err(e) => Some((RetryResult::Retry(e), ())), + } + } + })) + .await?; + + // Chunked upload loop + let mut offset = 0; + let chunk_size = self.resumable_chunk_size; + let upload_id = Arc::new(upload_id); + + while offset < max_size { + let data = reader + .consume(Some(chunk_size)) + .await + .err_tip(|| "Failed to read data for chunked upload")?; + + let is_last_chunk = offset + chunk_size as u64 >= max_size; + + let upload_id = Arc::clone(&upload_id); + + self.retrier + .retry(unfold(data, move |data| { + let client = Arc::clone(&self.gcs_client); + let mut client = (*client).clone(); + let upload_id = Arc::clone(&upload_id); + let data = data.clone(); + let offset = offset; + + async move { + let request_stream = stream::iter(vec![WriteObjectRequest { + first_message: Some(write_object_request::FirstMessage::UploadId( + (*upload_id).clone(), + )), + write_offset: offset as i64, + finish_write: is_last_chunk, + data: Some(write_object_request::Data::ChecksummedData( + ChecksummedData { + content: data.to_vec(), + crc32c: Some(crc32c::crc32c(&data)), + }, + )), + ..Default::default() + }]); + + let result = client + .write_object(request_stream) + .await + .map_err(|e| make_err!(Code::Aborted, "Failed to upload chunk: {e:?}")); + + match result { + Ok(_) => Some((RetryResult::Ok(()), data)), + Err(e) => Some((RetryResult::Retry(e), data)), + } + } + })) + .await?; + + offset += chunk_size as u64; + } + + // Finalize the upload + self.retrier + .retry(unfold((), move |()| { + let client = Arc::clone(&self.gcs_client); + let mut client = (*client).clone(); + let upload_id = Arc::clone(&upload_id); + async move { + let request = QueryWriteStatusRequest { + upload_id: (*upload_id).clone(), + ..Default::default() + }; + + let result = client.query_write_status(request).await.map_err(|e| { + make_err!(Code::Unavailable, "Failed to finalize upload: {e:?}") + }); + + match result { + Ok(_) => Some((RetryResult::Ok(()), ())), + Err(e) => Some((RetryResult::Retry(e), ())), + } + } + })) + .await?; + Ok(()) + } + + async fn get_part( + self: Pin<&Self>, + key: StoreKey<'_>, + writer: &mut DropCloserWriteHalf, + offset: u64, + length: Option, + ) -> Result<(), Error> { + if is_zero_digest(key.borrow()) { + writer + .send_eof() + .err_tip(|| "Failed to send zero EOF in GCS store get_part")?; + return Ok(()); + } + + let gcs_path = self.make_gcs_path(&key); + + self.retrier + .retry(unfold(writer, move |writer| { + let path = gcs_path.clone(); + async move { + let request = ReadObjectRequest { + bucket: self.bucket.clone(), + object: path.clone(), + read_offset: offset as i64, + read_limit: length.unwrap_or(0) as i64, + ..Default::default() + }; + + let client = Arc::clone(&self.gcs_client); + let mut cloned_client = (*client).clone(); + + let result = cloned_client.read_object(request).await; + + let mut response_stream = match result { + Ok(response) => response.into_inner(), + Err(status) if status.code() == tonic::Code::NotFound => { + return Some(( + RetryResult::Err(make_err!( + Code::NotFound, + "GCS object not found: {path}" + )), + writer, + )); + } + Err(e) => { + return Some(( + RetryResult::Retry(make_err!( + Code::Unavailable, + "Failed to initiate read for GCS object: {e:?}" + )), + writer, + )); + } + }; + + // Stream data from the GCS response to the writer + while let Some(chunk) = response_stream.next().await { + match chunk { + Ok(data) => { + if let Some(checksummed_data) = data.checksummed_data { + if checksummed_data.content.is_empty() { + // Ignore empty chunks + continue; + } + if let Err(e) = + writer.send(checksummed_data.content.into()).await + { + return Some(( + RetryResult::Err(make_err!( + Code::Aborted, + "Failed to send bytes to writer in GCS: {e:?}" + )), + writer, + )); + } + } + } + Err(e) => { + return Some(( + RetryResult::Retry(make_err!( + Code::Aborted, + "Error in GCS response stream: {e:?}" + )), + writer, + )); + } + } + } + + if let Err(e) = writer.send_eof() { + return Some(( + RetryResult::Err(make_err!( + Code::Aborted, + "Failed to send EOF to writer in GCS: {e:?}" + )), + writer, + )); + } + + Some((RetryResult::Ok(()), writer)) + } + })) + .await + } + + fn inner_store(&self, _digest: Option) -> &'_ dyn StoreDriver { + self + } + + fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) { + self + } + + fn as_any_arc(self: Arc) -> Arc { + self + } +} + +#[async_trait] +impl HealthStatusIndicator for GCSStore +where + I: InstantWrapper, + NowFn: Fn() -> I + Send + Sync + Unpin + 'static, +{ + fn get_name(&self) -> &'static str { + "GCSStore" + } + + async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { + StoreDriver::check_health(Pin::new(self), namespace).await + } +} diff --git a/nativelink-store/src/lib.rs b/nativelink-store/src/lib.rs index 04040fa5b..49b0105d0 100644 --- a/nativelink-store/src/lib.rs +++ b/nativelink-store/src/lib.rs @@ -21,6 +21,7 @@ pub mod default_store_factory; pub mod existence_cache_store; pub mod fast_slow_store; pub mod filesystem_store; +pub mod gcs_store; pub mod grpc_store; pub mod memory_store; pub mod noop_store;