Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add GCSStore #1527

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ pub enum StoreSpec {
///
memory(MemorySpec),

/// TODO(SchahinRohani): Add documentation.
experimental_gcs_store(GCSSpec),

/// S3 store will use Amazon's S3 service as a backend to store
/// the files. This configuration can be used to share files
/// across multiple instances.
Expand Down Expand Up @@ -724,6 +727,22 @@ pub struct EvictionPolicy {
pub max_count: u64,
}

#[derive(Serialize, Deserialize, Debug, Default, Clone)]
#[serde(deny_unknown_fields)]
pub struct GCSSpec {
/// Google Cloud Storage region.
#[serde(default, deserialize_with = "convert_string_with_shellexpand")]
pub region: String,

/// If you wish to prefix the location on Google cloud storage. If None, no prefix will be used.
#[serde(default)]
pub key_prefix: Option<String>,

/// Bucket name to use as the backend.
#[serde(default, deserialize_with = "convert_string_with_shellexpand")]
pub bucket: String,
}

#[derive(Serialize, Deserialize, Debug, Default, Clone)]
#[serde(deny_unknown_fields)]
pub struct S3Spec {
Expand Down
1 change: 1 addition & 0 deletions nativelink-store/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ rust_library(
"src/existence_cache_store.rs",
"src/fast_slow_store.rs",
"src/filesystem_store.rs",
"src/gcs_store.rs",
"src/grpc_store.rs",
"src/lib.rs",
"src/memory_store.rs",
Expand Down
2 changes: 2 additions & 0 deletions nativelink-store/src/default_store_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::dedup_store::DedupStore;
use crate::existence_cache_store::ExistenceCacheStore;
use crate::fast_slow_store::FastSlowStore;
use crate::filesystem_store::FilesystemStore;
use crate::gcs_store::GCSStore;
use crate::grpc_store::GrpcStore;
use crate::memory_store::MemoryStore;
use crate::noop_store::NoopStore;
Expand All @@ -51,6 +52,7 @@ pub fn store_factory<'a>(
let store: Arc<dyn StoreDriver> = match backend {
StoreSpec::memory(spec) => MemoryStore::new(spec),
StoreSpec::experimental_s3_store(spec) => S3Store::new(spec, SystemTime::now).await?,
StoreSpec::experimental_gcs_store(spec) => GCSStore::new(spec, SystemTime::now).await?,
StoreSpec::redis_store(spec) => RedisStore::new(spec.clone())?,
StoreSpec::verify(spec) => VerifyStore::new(
spec,
Expand Down
91 changes: 91 additions & 0 deletions nativelink-store/src/gcs_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// use std::fmt::Debug;
use std::pin::Pin;
use std::sync::Arc;
use std::time::SystemTime;

use async_trait::async_trait;
// use googleapis_tonic_google_storage_v2::google::storage::v2::storage_client::StorageClient;
use nativelink_config::stores::GCSSpec;
use nativelink_error::{make_err, Code, Error}; //ResultExt
use nativelink_metric::{
MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
};
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};
// use tonic::transport::Channel;

/// Represents a Google Cloud Platform (GCP) Store.
#[derive(Default)]
pub struct GCSStore {
// spec: GCPSpec,
}

impl MetricsComponent for GCSStore {
fn publish(
&self,
_kind: MetricKind,
_field_metadata: MetricFieldData,
) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
Ok(MetricPublishKnownKindData::Component)
}
}

impl GCSStore {
pub async fn new(spec: &GCSSpec, current_time: fn() -> SystemTime) -> Result<Arc<Self>, Error> {
// Print the spec
println!("GCSStore spec: {spec:?}");

// Get and print the current time
let now = current_time();
println!("Current time: {now:?}");

Ok(Arc::new(Self {}))
}
}

#[async_trait]
impl StoreDriver for GCSStore {
async fn has_with_results(
self: Pin<&Self>,
_keys: &[StoreKey<'_>],
_results: &mut [Option<u64>],
) -> Result<(), Error> {
// results.iter_mut().for_each(|r| *r = None);
Ok(())
}

async fn update(
self: Pin<&Self>,
_key: StoreKey<'_>,
mut _reader: DropCloserReadHalf,
_size_info: UploadSizeInfo,
) -> Result<(), Error> {
// reader.drain().await.err_tip(|| "In GCPStore::update")?;
Ok(())
}

async fn get_part(
self: Pin<&Self>,
_key: StoreKey<'_>,
_writer: &mut DropCloserWriteHalf,
_offset: u64,
_length: Option<u64>,
) -> Result<(), Error> {
Err(make_err!(Code::NotFound, "Not found in GCP store"))
}

fn inner_store(&self, _key: Option<StoreKey>) -> &dyn StoreDriver {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}
}

default_health_status_indicator!(GCSStore);
1 change: 1 addition & 0 deletions nativelink-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod default_store_factory;
pub mod existence_cache_store;
pub mod fast_slow_store;
pub mod filesystem_store;
pub mod gcs_store;
pub mod grpc_store;
pub mod memory_store;
pub mod noop_store;
Expand Down
Loading