Skip to content

Commit

Permalink
Merge branch 'main' into feat/support-register-deregister-listing-sch…
Browse files Browse the repository at this point in the history
…ema-provider
  • Loading branch information
Nordalf committed Jan 16, 2025
2 parents 6d048f6 + af3102e commit 8b96bfe
Show file tree
Hide file tree
Showing 41 changed files with 1,054 additions and 259 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ arrow-ord = { version = "53" }
arrow-row = { version = "53" }
arrow-schema = { version = "53" }
arrow-select = { version = "53" }
object_store = { version = "0.11.2" }
object_store = { version = "0.11.2" , features = ["cloud"]}
parquet = { version = "53" }

# datafusion
Expand Down
4 changes: 2 additions & 2 deletions crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-aws"
version = "0.6.1"
version = "0.7.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.23.0", path = "../core" }
deltalake-core = { version = "0.24.0", path = "../core" }
aws-smithy-runtime-api = { version="1.7" }
aws-smithy-runtime = { version="1.7", optional = true}
aws-credential-types = { version="1.2", features = ["hardcoded-credentials"]}
Expand Down
2 changes: 2 additions & 0 deletions crates/aws/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ mod tests {
}

#[tokio::test]
#[serial]
async fn test_object_store_credential_provider() -> DeltaResult<()> {
let options = StorageOptions(hashmap! {
constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(),
Expand All @@ -388,6 +389,7 @@ mod tests {
/// API calls in the scenarios where the delta-rs process is performing a large number of S3
/// operations.
#[tokio::test]
#[serial]
async fn test_object_store_credential_provider_consistency() -> DeltaResult<()> {
let options = StorageOptions(hashmap! {
constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(),
Expand Down
11 changes: 8 additions & 3 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use deltalake_core::storage::object_store::{
PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult,
};
use deltalake_core::storage::{
limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions,
limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, RetryConfigParse,
StorageOptions,
};
use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path};
use futures::stream::BoxStream;
Expand All @@ -34,6 +35,8 @@ pub struct S3ObjectStoreFactory {}

impl S3StorageOptionsConversion for S3ObjectStoreFactory {}

impl RetryConfigParse for S3ObjectStoreFactory {}

impl ObjectStoreFactory for S3ObjectStoreFactory {
fn parse_url_opts(
&self,
Expand All @@ -51,7 +54,7 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
}
}

let (_scheme, path) =
let (_, path) =
ObjectStoreScheme::parse(url).map_err(|e| DeltaTableError::GenericError {
source: Box::new(e),
})?;
Expand All @@ -65,7 +68,9 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
));
}

let inner = builder.build()?;
let inner = builder
.with_retry(self.parse_retry_config(&options)?)
.build()?;

let store = aws_storage_handler(limit_store_handler(inner, &options), &s3_options)?;
debug!("Initialized the object store: {store:?}");
Expand Down
4 changes: 2 additions & 2 deletions crates/azure/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-azure"
version = "0.6.0"
version = "0.7.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.23.0", path = "../core", features = [
deltalake-core = { version = "0.24.0", path = "../core", features = [
"datafusion",
]}
lazy_static = "1"
Expand Down
28 changes: 23 additions & 5 deletions crates/azure/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::sync::Arc;
use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
use deltalake_core::storage::{
factories, limit_store_handler, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef,
StorageOptions,
RetryConfigParse, StorageOptions,
};
use deltalake_core::{DeltaResult, Path};
use object_store::azure::AzureConfigKey;
use object_store::parse_url_opts;
use deltalake_core::{DeltaResult, DeltaTableError, Path};
use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder};
use object_store::ObjectStoreScheme;
use url::Url;

mod config;
Expand All @@ -36,14 +36,32 @@ impl AzureOptions for StorageOptions {
#[derive(Clone, Default, Debug)]
pub struct AzureFactory {}

impl RetryConfigParse for AzureFactory {}

impl ObjectStoreFactory for AzureFactory {
fn parse_url_opts(
&self,
url: &Url,
options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)> {
let config = config::AzureConfigHelper::try_new(options.as_azure_options())?.build()?;
let (inner, prefix) = parse_url_opts(url, config)?;

let (_, path) =
ObjectStoreScheme::parse(&url).map_err(|e| DeltaTableError::GenericError {
source: Box::new(e),
})?;
let prefix = Path::parse(path)?;

let mut builder = MicrosoftAzureBuilder::new().with_url(url.to_string());

for (key, value) in config.iter() {
builder = builder.with_config(key.clone(), value.clone());
}

let inner = builder
.with_retry(self.parse_retry_config(&options)?)
.build()?;

let store = limit_store_handler(url_prefix_handler(inner, prefix.clone()), options);
Ok((store, prefix))
}
Expand Down
4 changes: 2 additions & 2 deletions crates/catalog-glue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-catalog-glue"
version = "0.7.0"
version = "0.8.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -15,7 +15,7 @@ rust-version.workspace = true
async-trait = { workspace = true }
aws-config = "1"
aws-sdk-glue = "1"
deltalake-core = { version = "0.23.0", path = "../core" }
deltalake-core = { version = "0.24.0", path = "../core" }
thiserror = { workspace = true }

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions crates/catalog-unity/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-catalog-unity"
version = "0.7.0"
version = "0.8.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -17,7 +17,7 @@ tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
deltalake-core = { version = "0.23", path = "../core", features = [
deltalake-core = { version = "0.24.0", path = "../core", features = [
"datafusion",
]}
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "http2"] }
Expand Down
3 changes: 2 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-core"
version = "0.23.1"
version = "0.24.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down Expand Up @@ -98,6 +98,7 @@ rand = "0.8"
z85 = "3.0.5"
maplit = "1"
sqlparser = { version = "0.53.0" }
humantime = { version = "2.1.0" }

[dev-dependencies]
criterion = "0.5"
Expand Down
13 changes: 8 additions & 5 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,13 +539,16 @@ impl fmt::Display for ScalarValueFormat<'_> {
},
None => write!(f, "NULL")?,
},
ScalarValue::Utf8(e) | ScalarValue::LargeUtf8(e) => match e {
Some(e) => write!(f, "'{}'", escape_quoted_string(e, '\''))?,
None => write!(f, "NULL")?,
},
ScalarValue::Utf8(e) | ScalarValue::LargeUtf8(e) | ScalarValue::Utf8View(e) => {
match e {
Some(e) => write!(f, "'{}'", escape_quoted_string(e, '\''))?,
None => write!(f, "NULL")?,
}
}
ScalarValue::Binary(e)
| ScalarValue::FixedSizeBinary(_, e)
| ScalarValue::LargeBinary(e) => match e {
| ScalarValue::LargeBinary(e)
| ScalarValue::BinaryView(e) => match e {
Some(l) => write!(
f,
"decode('{}', 'hex')",
Expand Down
26 changes: 24 additions & 2 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTy
use crate::logstore::LogStoreRef;
use crate::table::builder::ensure_table_uri;
use crate::table::state::DeltaTableState;
use crate::table::Constraint;
use crate::table::{Constraint, GeneratedColumn};
use crate::{open_table, open_table_with_storage_options, DeltaTable};

pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
Expand Down Expand Up @@ -1159,6 +1159,7 @@ pub(crate) async fn execute_plan_to_batch(
pub struct DeltaDataChecker {
constraints: Vec<Constraint>,
invariants: Vec<Invariant>,
generated_columns: Vec<GeneratedColumn>,
non_nullable_columns: Vec<String>,
ctx: SessionContext,
}
Expand All @@ -1169,6 +1170,7 @@ impl DeltaDataChecker {
Self {
invariants: vec![],
constraints: vec![],
generated_columns: vec![],
non_nullable_columns: vec![],
ctx: DeltaSessionContext::default().into(),
}
Expand All @@ -1179,6 +1181,7 @@ impl DeltaDataChecker {
Self {
invariants,
constraints: vec![],
generated_columns: vec![],
non_nullable_columns: vec![],
ctx: DeltaSessionContext::default().into(),
}
Expand All @@ -1189,6 +1192,18 @@ impl DeltaDataChecker {
Self {
constraints,
invariants: vec![],
generated_columns: vec![],
non_nullable_columns: vec![],
ctx: DeltaSessionContext::default().into(),
}
}

/// Create a new DeltaDataChecker with a specified set of generated columns
pub fn new_with_generated_columns(generated_columns: Vec<GeneratedColumn>) -> Self {
Self {
constraints: vec![],
invariants: vec![],
generated_columns,
non_nullable_columns: vec![],
ctx: DeltaSessionContext::default().into(),
}
Expand All @@ -1209,6 +1224,10 @@ impl DeltaDataChecker {
/// Create a new DeltaDataChecker
pub fn new(snapshot: &DeltaTableState) -> Self {
let invariants = snapshot.schema().get_invariants().unwrap_or_default();
let generated_columns = snapshot
.schema()
.get_generated_columns()
.unwrap_or_default();
let constraints = snapshot.table_config().get_constraints();
let non_nullable_columns = snapshot
.schema()
Expand All @@ -1224,6 +1243,7 @@ impl DeltaDataChecker {
Self {
invariants,
constraints,
generated_columns,
non_nullable_columns,
ctx: DeltaSessionContext::default().into(),
}
Expand All @@ -1236,7 +1256,9 @@ impl DeltaDataChecker {
pub async fn check_batch(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
self.check_nullability(record_batch)?;
self.enforce_checks(record_batch, &self.invariants).await?;
self.enforce_checks(record_batch, &self.constraints).await
self.enforce_checks(record_batch, &self.constraints).await?;
self.enforce_checks(record_batch, &self.generated_columns)
.await
}

/// Return true if all the nullability checks are valid
Expand Down
9 changes: 9 additions & 0 deletions crates/core/src/kernel/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ pub enum Error {
line: String,
},

/// Error returned when the log contains invalid stats JSON.
#[error("Invalid JSON in generation expression, line=`{line}`, err=`{json_err}`")]
InvalidGenerationExpressionJson {
/// JSON error details returned when parsing the generation expression JSON.
json_err: serde_json::error::Error,
/// Generation expression.
line: String,
},

#[error("Table metadata is invalid: {0}")]
MetadataError(String),

Expand Down
Loading

0 comments on commit 8b96bfe

Please sign in to comment.