Skip to content

Commit

Permalink
Merge branch 'main' into feat/lakefs_integration
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Jan 10, 2025
2 parents 514c664 + fb75965 commit dd15932
Show file tree
Hide file tree
Showing 72 changed files with 188 additions and 82 deletions.
2 changes: 1 addition & 1 deletion crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-aws"
version = "0.6.0"
version = "0.6.1"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
10 changes: 7 additions & 3 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use storage::S3StorageOptionsConversion;
use storage::{S3ObjectStoreFactory, S3StorageOptions};
use tracing::debug;
use tracing::warn;
Expand All @@ -46,6 +47,8 @@ use url::Url;
#[derive(Clone, Debug, Default)]
pub struct S3LogStoreFactory {}

impl S3StorageOptionsConversion for S3LogStoreFactory {}

impl LogStoreFactory for S3LogStoreFactory {
fn with_options(
&self,
Expand All @@ -54,7 +57,7 @@ impl LogStoreFactory for S3LogStoreFactory {
options: &StorageOptions,
) -> DeltaResult<Arc<dyn LogStore>> {
let store = url_prefix_handler(store, Path::parse(location.path())?);

let options = self.with_env_s3(options);
if options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
[
Expand All @@ -65,7 +68,7 @@ impl LogStoreFactory for S3LogStoreFactory {
}) {
debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required");
warn!("Most S3 object store support conditional put, remove copy_if_not_exists parameter to use a more performant conditional put.");
return Ok(logstore::default_s3_logstore(store, location, options));
return Ok(logstore::default_s3_logstore(store, location, &options));
}

let s3_options = S3StorageOptions::from_map(&options.0)?;
Expand All @@ -78,7 +81,7 @@ impl LogStoreFactory for S3LogStoreFactory {
store,
)?));
}
Ok(default_logstore(store, location, options))
Ok(default_logstore(store, location, &options))
}
}

Expand Down Expand Up @@ -777,6 +780,7 @@ mod tests {
unsafe {
std::env::set_var(crate::constants::AWS_S3_LOCKING_PROVIDER, "dynamodb");
}

let logstore = factory
.with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new()))
.unwrap();
Expand Down
120 changes: 62 additions & 58 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,52 +32,7 @@ const STORE_NAME: &str = "DeltaS3ObjectStore";
#[derive(Clone, Default, Debug)]
pub struct S3ObjectStoreFactory {}

impl S3ObjectStoreFactory {
fn with_env_s3(&self, options: &StorageOptions) -> StorageOptions {
let mut options = StorageOptions(
options
.0
.clone()
.into_iter()
.map(|(k, v)| {
if let Ok(config_key) = AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()) {
(config_key.as_ref().to_string(), v)
} else {
(k, v)
}
})
.collect(),
);

for (os_key, os_value) in std::env::vars_os() {
if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
if let Ok(config_key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) {
if !options.0.contains_key(config_key.as_ref()) {
options
.0
.insert(config_key.as_ref().to_string(), value.to_string());
}
}
}
}

// All S3-like Object Stores use conditional put, object-store crate however still requires you to explicitly
// set this behaviour. We will however assume, when a locking provider/copy-if-not-exists keys are not provided
// that PutIfAbsent is supported.
// With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent
if !options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
[
AmazonS3ConfigKey::ConditionalPut.as_ref(),
"conditional_put",
]
.contains(&key.as_str())
}) {
options.0.insert("conditional_put".into(), "etag".into());
}
options
}
}
impl S3StorageOptionsConversion for S3ObjectStoreFactory {}

impl ObjectStoreFactory for S3ObjectStoreFactory {
fn parse_url_opts(
Expand All @@ -102,19 +57,17 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
})?;
let prefix = Path::parse(path)?;

if is_aws(storage_options) {
debug!("Detected AWS S3, resolving credentials");
let sdk_config = execute_sdk_future(crate::credentials::resolve_credentials(
storage_options.clone(),
))??;
let s3_options: S3StorageOptions = S3StorageOptions::from_map(&options.0)?;

if let Some(ref sdk_config) = s3_options.sdk_config {
builder = builder.with_credentials(Arc::new(
crate::credentials::AWSForObjectStore::new(sdk_config),
crate::credentials::AWSForObjectStore::new(sdk_config.clone()),
));
}

let inner = builder.build()?;

let store = aws_storage_handler(limit_store_handler(inner, &options), &options)?;
let store = aws_storage_handler(limit_store_handler(inner, &options), &s3_options)?;
debug!("Initialized the object store: {store:?}");

Ok((store, prefix))
Expand All @@ -123,9 +76,8 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {

fn aws_storage_handler(
store: ObjectStoreRef,
options: &StorageOptions,
s3_options: &S3StorageOptions,
) -> DeltaResult<ObjectStoreRef> {
let s3_options = S3StorageOptions::from_map(&options.0)?;
// Nearly all S3 Object stores support conditional put, so we change the default to always returning an S3 Object store
// unless explicitly passing a locking provider key or allow_unsafe_rename. Then we will pass it to the old S3StorageBackend.
if s3_options.locking_provider.as_deref() == Some("dynamodb") || s3_options.allow_unsafe_rename
Expand All @@ -146,12 +98,18 @@ fn aws_storage_handler(
// This function will return true in the default case since it's most likely that the absence of
// options will mean default/S3 configuration
fn is_aws(options: &StorageOptions) -> bool {
if options.0.contains_key(constants::AWS_FORCE_CREDENTIAL_LOAD) {
// Checks storage option first then env var for existence of aws force credential load
// .from_s3_env never inserts these into the options because they are delta-rs specific
if str_option(&options.0, constants::AWS_FORCE_CREDENTIAL_LOAD).is_some() {
return true;
}
if options.0.contains_key(constants::AWS_S3_LOCKING_PROVIDER) {

// Checks storage option first then env var for existence of locking provider
// .from_s3_env never inserts these into the options because they are delta-rs specific
if str_option(&options.0, constants::AWS_S3_LOCKING_PROVIDER).is_some() {
return true;
}

// Options at this stage should only contain 'aws_endpoint' in lowercase
// due to with_env_s3
!(options.0.contains_key("aws_endpoint") || options.0.contains_key(constants::AWS_ENDPOINT_URL))
Expand Down Expand Up @@ -240,7 +198,7 @@ impl S3StorageOptions {
let sdk_config = match is_aws(&storage_options) {
false => None,
true => {
debug!("Detected AWS S3, resolving credentials");
debug!("Detected AWS S3 Storage options, resolving AWS credentials");
Some(execute_sdk_future(
crate::credentials::resolve_credentials(storage_options.clone()),
)??)
Expand Down Expand Up @@ -477,6 +435,52 @@ pub(crate) fn str_option(map: &HashMap<String, String>, key: &str) -> Option<Str
std::env::var(key).ok()
}

pub(crate) trait S3StorageOptionsConversion {
fn with_env_s3(&self, options: &StorageOptions) -> StorageOptions {
let mut options = StorageOptions(
options
.0
.clone()
.into_iter()
.map(|(k, v)| {
if let Ok(config_key) = AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()) {
(config_key.as_ref().to_string(), v)
} else {
(k, v)
}
})
.collect(),
);

for (os_key, os_value) in std::env::vars_os() {
if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
if let Ok(config_key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) {
options
.0
.entry(config_key.as_ref().to_string())
.or_insert(value.to_string());
}
}
}

// All S3-like Object Stores use conditional put, object-store crate however still requires you to explicitly
// set this behaviour. We will however assume, when a locking provider/copy-if-not-exists keys are not provided
// that PutIfAbsent is supported.
// With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent
if !options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
[
AmazonS3ConfigKey::ConditionalPut.as_ref(),
"conditional_put",
]
.contains(&key.as_str())
}) {
options.0.insert("conditional_put".into(), "etag".into());
}
options
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 2 additions & 2 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-core"
version = "0.23.0"
version = "0.23.1"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down Expand Up @@ -82,7 +82,7 @@ errno = "0.3"
either = "1.8"
fix-hidden-lifetime-bug = "0.2"
indexmap = "2.2.1"
itertools = "0.13"
itertools = "0.14"
lazy_static = "1"
libc = ">=0.2.90, <1"
num-bigint = "0.4"
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ pub enum DeltaTableError {

#[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")]
ChangeDataTimestampGreaterThanCommit { ending_timestamp: DateTime<Utc> },

#[error("No starting version or timestamp provided for CDC")]
NoStartingVersionOrTimestamp,
}

impl From<object_store::path::Error> for DeltaTableError {
Expand Down
Loading

0 comments on commit dd15932

Please sign in to comment.