From 9065495aa17c198c8caa610e69441380b5725942 Mon Sep 17 00:00:00 2001 From: Schahin Rouhanizadeh Date: Sat, 7 Dec 2024 06:09:53 +0100 Subject: [PATCH] [WIP] Add gcs store --- nativelink-config/src/stores.rs | 19 ++++ nativelink-store/BUILD.bazel | 1 + nativelink-store/src/default_store_factory.rs | 2 + nativelink-store/src/gcs_store.rs | 91 +++++++++++++++++++ nativelink-store/src/lib.rs | 1 + 5 files changed, 114 insertions(+) create mode 100644 nativelink-store/src/gcs_store.rs diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index 7e6e31821..036c0c2c5 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -54,6 +54,9 @@ pub enum StoreSpec { /// memory(MemorySpec), + /// TODO(SchahinRohani): Add documentation. + experimental_gcs_store(GCSSpec), + /// S3 store will use Amazon's S3 service as a backend to store /// the files. This configuration can be used to share files /// across multiple instances. @@ -724,6 +727,22 @@ pub struct EvictionPolicy { pub max_count: u64, } +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +#[serde(deny_unknown_fields)] +pub struct GCSSpec { + /// Google Cloud Storage region. + #[serde(default, deserialize_with = "convert_string_with_shellexpand")] + pub region: String, + + /// If you wish to prefix the location on Google cloud storage. If None, no prefix will be used. + #[serde(default)] + pub key_prefix: Option, + + /// Bucket name to use as the backend. + #[serde(default, deserialize_with = "convert_string_with_shellexpand")] + pub bucket: String, +} + #[derive(Serialize, Deserialize, Debug, Default, Clone)] #[serde(deny_unknown_fields)] pub struct S3Spec { diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel index 4f7691e7d..afad1f167 100644 --- a/nativelink-store/BUILD.bazel +++ b/nativelink-store/BUILD.bazel @@ -18,6 +18,7 @@ rust_library( "src/existence_cache_store.rs", "src/fast_slow_store.rs", "src/filesystem_store.rs", + "src/gcs_store.rs", "src/grpc_store.rs", "src/lib.rs", "src/memory_store.rs", diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 506ef6752..3658d6e11 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -29,6 +29,7 @@ use crate::dedup_store::DedupStore; use crate::existence_cache_store::ExistenceCacheStore; use crate::fast_slow_store::FastSlowStore; use crate::filesystem_store::FilesystemStore; +use crate::gcs_store::GCSStore; use crate::grpc_store::GrpcStore; use crate::memory_store::MemoryStore; use crate::noop_store::NoopStore; @@ -51,6 +52,7 @@ pub fn store_factory<'a>( let store: Arc = match backend { StoreSpec::memory(spec) => MemoryStore::new(spec), StoreSpec::experimental_s3_store(spec) => S3Store::new(spec, SystemTime::now).await?, + StoreSpec::experimental_gcs_store(spec) => GCSStore::new(spec, SystemTime::now).await?, StoreSpec::redis_store(spec) => RedisStore::new(spec.clone())?, StoreSpec::verify(spec) => VerifyStore::new( spec, diff --git a/nativelink-store/src/gcs_store.rs b/nativelink-store/src/gcs_store.rs new file mode 100644 index 000000000..5f7a967ab --- /dev/null +++ b/nativelink-store/src/gcs_store.rs @@ -0,0 +1,91 @@ +// use std::fmt::Debug; +use std::pin::Pin; +use std::sync::Arc; +use std::time::SystemTime; + +use async_trait::async_trait; +// use googleapis_tonic_google_storage_v2::google::storage::v2::storage_client::StorageClient; +use nativelink_config::stores::GCSSpec; +use nativelink_error::{make_err, Code, Error}; //ResultExt +use nativelink_metric::{ + MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, +}; +use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; +use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo}; +// use tonic::transport::Channel; + +/// Represents a Google Cloud Platform (GCP) Store. +#[derive(Default)] +pub struct GCSStore { + // spec: GCPSpec, +} + +impl MetricsComponent for GCSStore { + fn publish( + &self, + _kind: MetricKind, + _field_metadata: MetricFieldData, + ) -> Result { + Ok(MetricPublishKnownKindData::Component) + } +} + +impl GCSStore { + pub async fn new(spec: &GCSSpec, current_time: fn() -> SystemTime) -> Result, Error> { + // Print the spec + println!("GCSStore spec: {spec:?}"); + + // Get and print the current time + let now = current_time(); + println!("Current time: {now:?}"); + + Ok(Arc::new(Self {})) + } +} + +#[async_trait] +impl StoreDriver for GCSStore { + async fn has_with_results( + self: Pin<&Self>, + _keys: &[StoreKey<'_>], + _results: &mut [Option], + ) -> Result<(), Error> { + // results.iter_mut().for_each(|r| *r = None); + Ok(()) + } + + async fn update( + self: Pin<&Self>, + _key: StoreKey<'_>, + mut _reader: DropCloserReadHalf, + _size_info: UploadSizeInfo, + ) -> Result<(), Error> { + // reader.drain().await.err_tip(|| "In GCPStore::update")?; + Ok(()) + } + + async fn get_part( + self: Pin<&Self>, + _key: StoreKey<'_>, + _writer: &mut DropCloserWriteHalf, + _offset: u64, + _length: Option, + ) -> Result<(), Error> { + Err(make_err!(Code::NotFound, "Not found in GCP store")) + } + + fn inner_store(&self, _key: Option) -> &dyn StoreDriver { + self + } + + fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) { + self + } + + fn as_any_arc(self: Arc) -> Arc { + self + } +} + +default_health_status_indicator!(GCSStore); diff --git a/nativelink-store/src/lib.rs b/nativelink-store/src/lib.rs index 04040fa5b..49b0105d0 100644 --- a/nativelink-store/src/lib.rs +++ b/nativelink-store/src/lib.rs @@ -21,6 +21,7 @@ pub mod default_store_factory; pub mod existence_cache_store; pub mod fast_slow_store; pub mod filesystem_store; +pub mod gcs_store; pub mod grpc_store; pub mod memory_store; pub mod noop_store;