Skip to content

Commit

Permalink
Disable scheam inference when schema evolution is disabled
Browse files Browse the repository at this point in the history
This will ensure the non-evolution case stands relatively speedy!

Sponsored-by: Raft LLC
  • Loading branch information
rtyler committed Jan 8, 2024
1 parent 8e77428 commit e4a1ade
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 12 deletions.
6 changes: 1 addition & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ serde_json = "1"
strum_macros = "0.20"
thiserror = "1"
tokio = { version = "1", features = ["full"] }
tokio-util = "0.6.3"
tokio-util = "0.7.10"
uuid = { version = "1.0", features = ["serde", "v4"] }
url = "2.3"

Expand All @@ -35,8 +35,6 @@ deltalake-azure = { git = "https://github.com/delta-io/delta-rs", branch = "main

# s3 feature enabled
dynamodb_lock = { version = "0.6.0", optional = true }
rusoto_core = { version = "0.47", default-features = false, features = ["rustls"], optional = true }
rusoto_credential = { version = "0.47", optional = true }

# sentry
sentry = { version = "0.23.0", optional = true }
Expand Down Expand Up @@ -68,8 +66,6 @@ azure = [
s3 = [
"deltalake-aws",
"dynamodb_lock",
"rusoto_core",
"rusoto_credential",
]

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion src/dead_letters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl DeltaSinkDeadLetterQueue {
dynamo_lock_options::DYNAMO_LOCK_PARTITION_KEY_VALUE.to_string() => std::env::var(env_vars::DEAD_LETTER_DYNAMO_LOCK_PARTITION_KEY_VALUE)
.unwrap_or_else(|_| "kafka_delta_ingest-dead_letters".to_string()),
};
#[cfg(all(feature = "azure", not(feature="s3")))]
#[cfg(all(feature = "azure", not(feature = "s3")))]
let opts = HashMap::default();

let table = crate::delta_helpers::load_table(table_uri, opts.clone()).await?;
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#![deny(warnings)]
#![deny(missing_docs)]
#![allow(unused)]

#[macro_use]
extern crate lazy_static;
Expand Down
32 changes: 26 additions & 6 deletions src/serialization/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;
use serde_json::Value;
use log::*;
use serde_json::Value;

use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat};

Expand All @@ -24,6 +24,13 @@ pub struct DeserializedMessage {
}

impl DeserializedMessage {
fn new(message: Value) -> Self {
Self {
message,
..Default::default()
}
}

pub fn schema(&self) -> &Option<ArrowSchema> {
&self.schema
}
Expand All @@ -41,9 +48,7 @@ impl DeserializedMessage {
/// Allow for `.into()` on [Value] for ease of use
impl From<Value> for DeserializedMessage {
fn from(message: Value) -> Self {
// XXX: This seems wasteful, this function should go away, and the deserializers should
// infer straight from the buffer stream
let iter = vec![message.clone()].into_iter().map(Ok);
let iter = std::iter::once(&message).map(Ok);
let schema =
match deltalake_core::arrow::json::reader::infer_json_schema_from_iterator(iter) {
Ok(schema) => Some(schema),
Expand Down Expand Up @@ -169,7 +174,10 @@ impl MessageDeserializer for DefaultDeserializer {
}
};

Ok(value.into())
match self.can_evolve_schema() {
true => Ok(value.into()),
false => Ok(DeserializedMessage::new(value)),
}
}
}

Expand All @@ -183,8 +191,20 @@ mod default_tests {
}

#[tokio::test]
async fn deserialize_with_schema() {
async fn deserializer_default_without_evolution() {
let mut deser = DefaultDeserializer::default();
let dm = deser
.deserialize(r#"{"hello" : "world"}"#.as_bytes())
.await
.unwrap();
assert_eq!(true, dm.schema().is_none());
}

#[tokio::test]
async fn deserialize_with_schema() {
let mut deser = DefaultDeserializer {
schema_evolution: true,
};
let message = deser
.deserialize(r#"{"hello" : "world"}"#.as_bytes())
.await
Expand Down

0 comments on commit e4a1ade

Please sign in to comment.