Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Azure Blob Storage store #1553

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,64 @@

See the [contribution docs](https://nativelink.com/docs/contribute/nix) for further information.

### Azure Blob Storage Configuration

To configure the Azure Blob Storage (Azure Store) for NativeLink, follow these steps:

1. **Add Azure Store Configuration**:
- Update your configuration file (e.g., `basic_cas.json5`) to include the Azure store configuration. Here is an example configuration:

Check failure on line 131 in README.md

View workflow job for this annotation

GitHub Actions / vale

[vale] reported by reviewdog 🐶 [Microsoft.Foreign] Use 'for example' instead of 'e.g.,'. Raw Output: {"message": "[Microsoft.Foreign] Use 'for example' instead of 'e.g.,'.", "location": {"path": "README.md", "range": {"start": {"line": 131, "column": 38}}}, "severity": "ERROR"}

```json
{
"stores": {
"AZURE_STORE": {
"azure_store": {
"account_name": "your_account_name",
"account_key": "your_account_key",
"container_name": "your_container_name",
"key_prefix": "your_key_prefix",
"max_retry_buffer_per_request": 5242880
}
}
}
}
```

2. **Set Environment Variables**:
- Ensure that the necessary credentials are set as environment variables or stored securely in a configuration file. For example:

```bash
export AZURE_STORAGE_ACCOUNT="your_account_name"
export AZURE_STORAGE_KEY="your_account_key"
```

3. **Run NativeLink with Azure Store**:
- Start NativeLink with the updated configuration file that includes the Azure store configuration.

```bash
docker run \
-v $(pwd)/basic_cas.json5:/config \
-p 50051:50051 \
ghcr.io/tracemachina/nativelink:v0.5.3 \
config
```

### Supported Blob Types

The Azure store in NativeLink supports the following types of blobs:

1. **Block Blobs**:
- Used for storing text and binary data, such as documents and media files.
- Supports large file uploads using chunked uploads with `put_block` and `put_block_list` methods.

2. **Append Blobs**:
- Optimized for append operations, making them suitable for logging scenarios.
- Data can only be appended to an existing blob, not modified or deleted.

3. **Page Blobs**:
- Designed for random read/write operations.
- Used for scenarios like virtual hard disk (VHD) files for Azure virtual machines.

## ✍️ Contributors

<a href="https://github.com/tracemachina/nativelink/graphs/contributors" aria-label="View contributors of the NativeLink project on GitHub">
Expand Down
3 changes: 3 additions & 0 deletions nativelink-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ tokio-util = { version = "0.7.13" }
tonic = { version = "0.12.3", features = ["transport", "tls"], default-features = false }
tracing = { version = "0.1.41", default-features = false }
uuid = { version = "1.11.0", default-features = false, features = ["v4", "serde"] }
azure_core = "0.1.0"
azure_identity = "0.1.0"
azure_storage = "0.1.0"

[dev-dependencies]
nativelink-macro = { path = "../nativelink-macro" }
Expand Down
180 changes: 180 additions & 0 deletions nativelink-store/src/azure_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
use std::borrow::Cow;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use azure_core::auth::TokenCredential;
use azure_core::prelude::*;
use azure_identity::DefaultAzureCredential;
use azure_storage::blob::prelude::*;
use azure_storage::core::prelude::*;
use bytes::Bytes;
use futures::stream::StreamExt;
use nativelink_config::stores::AzureSpec;
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_metric::MetricsComponent;
use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tracing::{event, Level};

#[derive(MetricsComponent)]
pub struct AzureStore {
container_client: Arc<ContainerClient>,
credential: Arc<dyn TokenCredential>,
#[metric(help = "The container name for the Azure store")]
container_name: String,
#[metric(help = "The key prefix for the Azure store")]
key_prefix: String,
#[metric(help = "The number of bytes to buffer for retrying requests")]
max_retry_buffer_per_request: usize,
}

impl AzureStore {
pub async fn new(spec: &AzureSpec) -> Result<Arc<Self>, Error> {
let credential = Arc::new(DefaultAzureCredential::default());
let container_client = Arc::new(
StorageAccountClient::new_access_key(
&spec.account_name,
&spec.account_key,
)
.as_container_client(&spec.container_name),
);

Ok(Arc::new(Self {
container_client,
credential,
container_name: spec.container_name.clone(),
key_prefix: spec.key_prefix.as_ref().unwrap_or(&String::new()).clone(),
max_retry_buffer_per_request: spec
.max_retry_buffer_per_request
.unwrap_or(5 * 1024 * 1024), // 5MB
}))
}

fn make_azure_path(&self, key: &StoreKey<'_>) -> String {
format!("{}{}", self.key_prefix, key.as_str())
}

async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result<Option<u64>, Error> {
let blob_client = self
.container_client
.as_blob_client(&self.make_azure_path(digest));

match blob_client.get_properties().await {
Ok(properties) => Ok(Some(properties.blob.properties.content_length)),
Err(e) => match e.kind() {
azure_core::error::ErrorKind::ResourceNotFound => Ok(None),
_ => Err(make_err!(Code::Unavailable, "Azure error: {e:?}")),
},
}
}
}

#[async_trait]
impl StoreDriver for AzureStore {
async fn has_with_results(
self: Pin<&Self>,
keys: &[StoreKey<'_>],
results: &mut [Option<u64>],
) -> Result<(), Error> {
keys.iter()
.zip(results.iter_mut())
.map(|(key, result)| async move {
*result = self.has(key).await?;
Ok::<_, Error>(())
})
.collect::<futures::stream::FuturesUnordered<_>>()
.try_collect()
.await
}

async fn update(
self: Pin<&Self>,
key: StoreKey<'_>,
mut reader: DropCloserReadHalf,
upload_size: UploadSizeInfo,
) -> Result<(), Error> {
let blob_client = self
.container_client
.as_blob_client(&self.make_azure_path(&key));

let mut block_list = Vec::new();
let mut block_id = 0;

loop {
let chunk = reader
.consume(Some(self.max_retry_buffer_per_request))
.await
.err_tip(|| "Failed to read chunk in azure_store")?;
if chunk.is_empty() {
break; // EOF
}

let block_id_str = format!("{:032}", block_id);
block_list.push(BlockId::new(block_id_str.clone()));

blob_client
.put_block(block_id_str, chunk.clone())
.await
.err_tip(|| "Failed to upload block to Azure Blob Storage")?;

block_id += 1;
}

blob_client
.put_block_list(block_list)
.await
.err_tip(|| "Failed to commit block list to Azure Blob Storage")?;

Ok(())
}

async fn get_part(
self: Pin<&Self>,
key: StoreKey<'_>,
writer: &mut DropCloserWriteHalf,
offset: u64,
length: Option<u64>,
) -> Result<(), Error> {
let blob_client = self
.container_client
.as_blob_client(&self.make_azure_path(&key));

let mut stream = blob_client
.get()
.range(offset..length.map_or(u64::MAX, |l| offset + l))
.stream();

while let Some(chunk) = stream.next().await {
let chunk = chunk.err_tip(|| "Failed to read chunk from Azure Blob Storage")?;
writer
.send(Bytes::from(chunk))
.await
.err_tip(|| "Failed to send chunk to writer in azure_store")?;
}

writer
.send_eof()
.err_tip(|| "Failed to send EOF in azure_store get_part")?;

Ok(())
}

fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
self
}

fn as_any(&self) -> &(dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}
}

default_health_status_indicator!(AzureStore);
2 changes: 2 additions & 0 deletions nativelink-store/src/default_store_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::shard_store::ShardStore;
use crate::size_partitioning_store::SizePartitioningStore;
use crate::store_manager::StoreManager;
use crate::verify_store::VerifyStore;
use crate::azure_store::AzureStore;

type FutureMaybeStore<'a> = Box<dyn Future<Output = Result<Store, Error>> + 'a>;

Expand Down Expand Up @@ -97,6 +98,7 @@ pub fn store_factory<'a>(
.await?;
ShardStore::new(spec, stores)?
}
StoreSpec::azure_store(spec) => AzureStore::new(spec).await?,
};

if let Some(health_registry_builder) = maybe_health_registry_builder {
Expand Down
Loading
Loading