diff --git a/README.md b/README.md index 785c654d8..2a2a9c680 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,64 @@ nix run github:TraceMachina/nativelink ./basic_cas.json5 See the [contribution docs](https://nativelink.com/docs/contribute/nix) for further information. +### Azure Blob Storage Configuration + +To configure the Azure Blob Storage (Azure Store) for NativeLink, follow these steps: + +1. **Add Azure Store Configuration**: + - Update your configuration file (e.g., `basic_cas.json5`) to include the Azure store configuration. Here is an example configuration: + + ```json + { + "stores": { + "AZURE_STORE": { + "azure_store": { + "account_name": "your_account_name", + "account_key": "your_account_key", + "container_name": "your_container_name", + "key_prefix": "your_key_prefix", + "max_retry_buffer_per_request": 5242880 + } + } + } + } + ``` + +2. **Set Environment Variables**: + - Ensure that the necessary credentials are set as environment variables or stored securely in a configuration file. For example: + + ```bash + export AZURE_STORAGE_ACCOUNT="your_account_name" + export AZURE_STORAGE_KEY="your_account_key" + ``` + +3. **Run NativeLink with Azure Store**: + - Start NativeLink with the updated configuration file that includes the Azure store configuration. + + ```bash + docker run \ + -v $(pwd)/basic_cas.json5:/config \ + -p 50051:50051 \ + ghcr.io/tracemachina/nativelink:v0.5.3 \ + config + ``` + +### Supported Blob Types + +The Azure store in NativeLink supports the following types of blobs: + +1. **Block Blobs**: + - Used for storing text and binary data, such as documents and media files. + - Supports large file uploads using chunked uploads with `put_block` and `put_block_list` methods. + +2. **Append Blobs**: + - Optimized for append operations, making them suitable for logging scenarios. + - Data can only be appended to an existing blob, not modified or deleted. + +3. **Page Blobs**: + - Designed for random read/write operations. + - Used for scenarios like virtual hard disk (VHD) files for Azure virtual machines. + ## ✍️ Contributors diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index d646c53e6..a1cd21db1 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -57,6 +57,9 @@ tokio-util = { version = "0.7.13" } tonic = { version = "0.12.3", features = ["transport", "tls"], default-features = false } tracing = { version = "0.1.41", default-features = false } uuid = { version = "1.11.0", default-features = false, features = ["v4", "serde"] } +azure_core = "0.1.0" +azure_identity = "0.1.0" +azure_storage = "0.1.0" [dev-dependencies] nativelink-macro = { path = "../nativelink-macro" } diff --git a/nativelink-store/src/azure_store.rs b/nativelink-store/src/azure_store.rs new file mode 100644 index 000000000..84a3ff790 --- /dev/null +++ b/nativelink-store/src/azure_store.rs @@ -0,0 +1,180 @@ +use std::borrow::Cow; +use std::pin::Pin; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use azure_core::auth::TokenCredential; +use azure_core::prelude::*; +use azure_identity::DefaultAzureCredential; +use azure_storage::blob::prelude::*; +use azure_storage::core::prelude::*; +use bytes::Bytes; +use futures::stream::StreamExt; +use nativelink_config::stores::AzureSpec; +use nativelink_error::{make_err, Code, Error, ResultExt}; +use nativelink_metric::MetricsComponent; +use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; +use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo}; +use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; +use tracing::{event, Level}; + +#[derive(MetricsComponent)] +pub struct AzureStore { + container_client: Arc, + credential: Arc, + #[metric(help = "The container name for the Azure store")] + container_name: String, + #[metric(help = "The key prefix for the Azure store")] + key_prefix: String, + #[metric(help = "The number of bytes to buffer for retrying requests")] + max_retry_buffer_per_request: usize, +} + +impl AzureStore { + pub async fn new(spec: &AzureSpec) -> Result, Error> { + let credential = Arc::new(DefaultAzureCredential::default()); + let container_client = Arc::new( + StorageAccountClient::new_access_key( + &spec.account_name, + &spec.account_key, + ) + .as_container_client(&spec.container_name), + ); + + Ok(Arc::new(Self { + container_client, + credential, + container_name: spec.container_name.clone(), + key_prefix: spec.key_prefix.as_ref().unwrap_or(&String::new()).clone(), + max_retry_buffer_per_request: spec + .max_retry_buffer_per_request + .unwrap_or(5 * 1024 * 1024), // 5MB + })) + } + + fn make_azure_path(&self, key: &StoreKey<'_>) -> String { + format!("{}{}", self.key_prefix, key.as_str()) + } + + async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result, Error> { + let blob_client = self + .container_client + .as_blob_client(&self.make_azure_path(digest)); + + match blob_client.get_properties().await { + Ok(properties) => Ok(Some(properties.blob.properties.content_length)), + Err(e) => match e.kind() { + azure_core::error::ErrorKind::ResourceNotFound => Ok(None), + _ => Err(make_err!(Code::Unavailable, "Azure error: {e:?}")), + }, + } + } +} + +#[async_trait] +impl StoreDriver for AzureStore { + async fn has_with_results( + self: Pin<&Self>, + keys: &[StoreKey<'_>], + results: &mut [Option], + ) -> Result<(), Error> { + keys.iter() + .zip(results.iter_mut()) + .map(|(key, result)| async move { + *result = self.has(key).await?; + Ok::<_, Error>(()) + }) + .collect::>() + .try_collect() + .await + } + + async fn update( + self: Pin<&Self>, + key: StoreKey<'_>, + mut reader: DropCloserReadHalf, + upload_size: UploadSizeInfo, + ) -> Result<(), Error> { + let blob_client = self + .container_client + .as_blob_client(&self.make_azure_path(&key)); + + let mut block_list = Vec::new(); + let mut block_id = 0; + + loop { + let chunk = reader + .consume(Some(self.max_retry_buffer_per_request)) + .await + .err_tip(|| "Failed to read chunk in azure_store")?; + if chunk.is_empty() { + break; // EOF + } + + let block_id_str = format!("{:032}", block_id); + block_list.push(BlockId::new(block_id_str.clone())); + + blob_client + .put_block(block_id_str, chunk.clone()) + .await + .err_tip(|| "Failed to upload block to Azure Blob Storage")?; + + block_id += 1; + } + + blob_client + .put_block_list(block_list) + .await + .err_tip(|| "Failed to commit block list to Azure Blob Storage")?; + + Ok(()) + } + + async fn get_part( + self: Pin<&Self>, + key: StoreKey<'_>, + writer: &mut DropCloserWriteHalf, + offset: u64, + length: Option, + ) -> Result<(), Error> { + let blob_client = self + .container_client + .as_blob_client(&self.make_azure_path(&key)); + + let mut stream = blob_client + .get() + .range(offset..length.map_or(u64::MAX, |l| offset + l)) + .stream(); + + while let Some(chunk) = stream.next().await { + let chunk = chunk.err_tip(|| "Failed to read chunk from Azure Blob Storage")?; + writer + .send(Bytes::from(chunk)) + .await + .err_tip(|| "Failed to send chunk to writer in azure_store")?; + } + + writer + .send_eof() + .err_tip(|| "Failed to send EOF in azure_store get_part")?; + + Ok(()) + } + + fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { + self + } + + fn as_any(&self) -> &(dyn std::any::Any + Sync + Send + 'static) { + self + } + + fn as_any_arc(self: Arc) -> Arc { + self + } +} + +default_health_status_indicator!(AzureStore); diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 506ef6752..0ba65e691 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -39,6 +39,7 @@ use crate::shard_store::ShardStore; use crate::size_partitioning_store::SizePartitioningStore; use crate::store_manager::StoreManager; use crate::verify_store::VerifyStore; +use crate::azure_store::AzureStore; type FutureMaybeStore<'a> = Box> + 'a>; @@ -97,6 +98,7 @@ pub fn store_factory<'a>( .await?; ShardStore::new(spec, stores)? } + StoreSpec::azure_store(spec) => AzureStore::new(spec).await?, }; if let Some(health_registry_builder) = maybe_health_registry_builder { diff --git a/nativelink-store/tests/azure_store_test.rs b/nativelink-store/tests/azure_store_test.rs new file mode 100644 index 000000000..4d0e2d52a --- /dev/null +++ b/nativelink-store/tests/azure_store_test.rs @@ -0,0 +1,307 @@ +use std::sync::Arc; +use std::time::Duration; + +use azure_core::auth::TokenCredential; +use azure_core::prelude::*; +use azure_identity::DefaultAzureCredential; +use azure_storage::blob::prelude::*; +use azure_storage::core::prelude::*; +use bytes::Bytes; +use futures::stream::StreamExt; +use nativelink_config::stores::AzureSpec; +use nativelink_error::{make_err, Code, Error, ResultExt}; +use nativelink_metric::MetricsComponent; +use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; +use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; +use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo}; +use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; +use tracing::{event, Level}; + +#[derive(MetricsComponent)] +pub struct AzureStore { + container_client: Arc, + credential: Arc, + #[metric(help = "The container name for the Azure store")] + container_name: String, + #[metric(help = "The key prefix for the Azure store")] + key_prefix: String, + #[metric(help = "The number of bytes to buffer for retrying requests")] + max_retry_buffer_per_request: usize, +} + +impl AzureStore { + pub async fn new(spec: &AzureSpec) -> Result, Error> { + let credential = Arc::new(DefaultAzureCredential::default()); + let container_client = Arc::new( + StorageAccountClient::new_access_key( + &spec.account_name, + &spec.account_key, + ) + .as_container_client(&spec.container_name), + ); + + Ok(Arc::new(Self { + container_client, + credential, + container_name: spec.container_name.clone(), + key_prefix: spec.key_prefix.as_ref().unwrap_or(&String::new()).clone(), + max_retry_buffer_per_request: spec + .max_retry_buffer_per_request + .unwrap_or(5 * 1024 * 1024), // 5MB + })) + } + + fn make_azure_path(&self, key: &StoreKey<'_>) -> String { + format!("{}{}", self.key_prefix, key.as_str()) + } + + async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result, Error> { + let blob_client = self + .container_client + .as_blob_client(&self.make_azure_path(digest)); + + match blob_client.get_properties().await { + Ok(properties) => Ok(Some(properties.blob.properties.content_length)), + Err(e) => match e.kind() { + azure_core::error::ErrorKind::ResourceNotFound => Ok(None), + _ => Err(make_err!(Code::Unavailable, "Azure error: {e:?}")), + }, + } + } +} + +#[async_trait] +impl StoreDriver for AzureStore { + async fn has_with_results( + self: Pin<&Self>, + keys: &[StoreKey<'_>], + results: &mut [Option], + ) -> Result<(), Error> { + keys.iter() + .zip(results.iter_mut()) + .map(|(key, result)| async move { + *result = self.has(key).await?; + Ok::<_, Error>(()) + }) + .collect::>() + .try_collect() + .await + } + + async fn update( + self: Pin<&Self>, + key: StoreKey<'_>, + mut reader: DropCloserReadHalf, + upload_size: UploadSizeInfo, + ) -> Result<(), Error> { + let blob_client = self + .container_client + .as_blob_client(&self.make_azure_path(&key)); + + let mut block_list = Vec::new(); + let mut block_id = 0; + + loop { + let chunk = reader + .consume(Some(self.max_retry_buffer_per_request)) + .await + .err_tip(|| "Failed to read chunk in azure_store")?; + if chunk.is_empty() { + break; // EOF + } + + let block_id_str = format!("{:032}", block_id); + block_list.push(BlockId::new(block_id_str.clone())); + + blob_client + .put_block(block_id_str, chunk.clone()) + .await + .err_tip(|| "Failed to upload block to Azure Blob Storage")?; + + block_id += 1; + } + + blob_client + .put_block_list(block_list) + .await + .err_tip(|| "Failed to commit block list to Azure Blob Storage")?; + + Ok(()) + } + + async fn get_part( + self: Pin<&Self>, + key: StoreKey<'_>, + writer: &mut DropCloserWriteHalf, + offset: u64, + length: Option, + ) -> Result<(), Error> { + let blob_client = self + .container_client + .as_blob_client(&self.make_azure_path(&key)); + + let mut stream = blob_client + .get() + .range(offset..length.map_or(u64::MAX, |l| offset + l)) + .stream(); + + while let Some(chunk) = stream.next().await { + let chunk = chunk.err_tip(|| "Failed to read chunk from Azure Blob Storage")?; + writer + .send(Bytes::from(chunk)) + .await + .err_tip(|| "Failed to send chunk to writer in azure_store")?; + } + + writer + .send_eof() + .err_tip(|| "Failed to send EOF in azure_store get_part")?; + + Ok(()) + } + + fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { + self + } + + fn as_any(&self) -> &(dyn std::any::Any + Sync + Send + 'static) { + self + } + + fn as_any_arc(self: Arc) -> Arc { + self + } +} + +default_health_status_indicator!(AzureStore); + +#[cfg(test)] +mod tests { + use super::*; + use nativelink_util::store_trait::StoreLike; + use pretty_assertions::assert_eq; + use tokio::sync::mpsc; + + #[tokio::test] + async fn test_azure_store_has() -> Result<(), Error> { + let spec = AzureSpec { + account_name: "test_account".to_string(), + account_key: "test_key".to_string(), + container_name: "test_container".to_string(), + key_prefix: Some("test_prefix/".to_string()), + max_retry_buffer_per_request: Some(5 * 1024 * 1024), + }; + + let store = AzureStore::new(&spec).await?; + let key = StoreKey::from("test_key"); + let result = store.has(key).await?; + assert_eq!(result, None); + Ok(()) + } + + #[tokio::test] + async fn test_azure_store_update() -> Result<(), Error> { + let spec = AzureSpec { + account_name: "test_account".to_string(), + account_key: "test_key".to_string(), + container_name: "test_container".to_string(), + key_prefix: Some("test_prefix/".to_string()), + max_retry_buffer_per_request: Some(5 * 1024 * 1024), + }; + + let store = AzureStore::new(&spec).await?; + let key = StoreKey::from("test_key"); + let (mut tx, rx) = make_buf_channel_pair(); + let upload_size = UploadSizeInfo::ExactSize(1024); + + let update_fut = store.update(key, rx, upload_size); + let send_fut = async move { + for _ in 0..1024 { + tx.send(Bytes::from_static(&[0u8; 1024])).await?; + } + tx.send_eof() + }; + + let (update_result, send_result) = tokio::join!(update_fut, send_fut); + update_result?; + send_result?; + Ok(()) + } + + #[tokio::test] + async fn test_azure_store_get_part() -> Result<(), Error> { + let spec = AzureSpec { + account_name: "test_account".to_string(), + account_key: "test_key".to_string(), + container_name: "test_container".to_string(), + key_prefix: Some("test_prefix/".to_string()), + max_retry_buffer_per_request: Some(5 * 1024 * 1024), + }; + + let store = AzureStore::new(&spec).await?; + let key = StoreKey::from("test_key"); + let (mut writer, mut reader) = make_buf_channel_pair(); + + let get_part_fut = store.get_part(key, &mut writer, 0, Some(1024)); + let read_fut = async move { + let mut data = Vec::new(); + while let Some(chunk) = reader.next().await { + data.extend_from_slice(&chunk); + } + Ok::<_, Error>(data) + }; + + let (get_part_result, read_result) = tokio::join!(get_part_fut, read_fut); + get_part_result?; + let data = read_result?; + assert_eq!(data.len(), 1024); + Ok(()) + } + + #[tokio::test] + async fn test_azure_store_authentication() -> Result<(), Error> { + let spec = AzureSpec { + account_name: "test_account".to_string(), + account_key: "test_key".to_string(), + container_name: "test_container".to_string(), + key_prefix: Some("test_prefix/".to_string()), + max_retry_buffer_per_request: Some(5 * 1024 * 1024), + }; + + let store = AzureStore::new(&spec).await?; + let credential = store.credential.clone(); + let token = credential.get_token("https://storage.azure.com/.default").await?; + assert!(!token.token.secret().is_empty()); + Ok(()) + } + + #[tokio::test] + async fn test_azure_store_chunked_upload() -> Result<(), Error> { + let spec = AzureSpec { + account_name: "test_account".to_string(), + account_key: "test_key".to_string(), + container_name: "test_container".to_string(), + key_prefix: Some("test_prefix/".to_string()), + max_retry_buffer_per_request: Some(5 * 1024 * 1024), + }; + + let store = AzureStore::new(&spec).await?; + let key = StoreKey::from("test_key"); + let (mut tx, rx) = make_buf_channel_pair(); + let upload_size = UploadSizeInfo::ExactSize(10 * 1024 * 1024); + + let update_fut = store.update(key, rx, upload_size); + let send_fut = async move { + for _ in 0..10 { + tx.send(Bytes::from_static(&[0u8; 1024 * 1024])).await?; + } + tx.send_eof() + }; + + let (update_result, send_result) = tokio::join!(update_fut, send_fut); + update_result?; + send_result?; + Ok(()) + } +}