Skip to content

Commit

Permalink
upgrade to delta-rs 0.18.1 compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Jul 1, 2024
1 parent 9f30c35 commit 32d3c3a
Show file tree
Hide file tree
Showing 9 changed files with 1,651 additions and 948 deletions.
2,521 changes: 1,607 additions & 914 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 5 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,12 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
url = "2.3"

# datafusion feature is required for writer version 2
deltalake-core = { version = "~0.17.3", features = ["json", "datafusion"]}
deltalake-aws = { version = "~0.1.1", optional = true }
deltalake-core = { version = "~0.18.1", features = ["json", "datafusion"]}
deltalake-aws = { version = "~0.1.2", optional = true }
deltalake-azure = { version = "~0.1.2", optional = true }

# s3 feature enabled
dynamodb_lock = { version = "0.6.0", optional = true }
rusoto_core = { version = "0.47", default-features = false, features = ["rustls"], optional = true }
rusoto_credential = { version = "0.47", optional = true }
rusoto_s3 = { version = "0.47", default-features = false, features = ["rustls"], optional = true }

# sentry
sentry = { version = "0.23.0", optional = true }

Expand All @@ -64,16 +60,16 @@ azure = [
s3 = [
"deltalake-aws",
"dynamodb_lock",
"rusoto_core",
"rusoto_credential",
"rusoto_s3",
]

[dev-dependencies]
utime = "0.3"
serial_test = "*"
tempfile = "3"
time = "0.3.20"
rusoto_core = { version = "0.47", default-features = false, features = ["rustls"]}
rusoto_credential = { version = "0.47"}
rusoto_s3 = { version = "0.47", default-features = false, features = ["rustls"]}

[profile.release]
lto = true
6 changes: 3 additions & 3 deletions src/delta_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{DataTypeOffset, DataTypePartition};
use deltalake_core::kernel::{Action, Add, Txn};
use deltalake_core::kernel::{Action, Add, Transaction};
use deltalake_core::{DeltaTable, DeltaTableError};
use std::collections::HashMap;

Expand Down Expand Up @@ -27,7 +27,7 @@ pub(crate) fn build_actions(
}

pub(crate) fn create_txn_action(txn_app_id: String, offset: DataTypeOffset) -> Action {
Action::Txn(Txn {
Action::Txn(Transaction {
app_id: txn_app_id,
version: offset,
last_updated: Some(
Expand Down Expand Up @@ -76,5 +76,5 @@ pub(crate) fn last_txn_version(table: &DeltaTable, txn_id: &str) -> Option<i64>
table
.get_app_transaction_version()
.get(txn_id)
.map(|v| v.to_owned())
.map(|t| t.version)
}
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,8 +974,8 @@ impl IngestProcessor {
epoch_id,
},
)
.map_err(DeltaTableError::from)?
.await;
.await
.map_err(DeltaTableError::from);
match commit {
Ok(v) => {
/*if v != version {
Expand Down
24 changes: 16 additions & 8 deletions src/offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ pub(crate) async fn write_offsets_to_delta(

for (txn_app_id, offset) in mapped_offsets {
match table.get_app_transaction_version().get(&txn_app_id) {
Some(stored_offset) if *stored_offset < offset => {
conflict_offsets.push((txn_app_id, *stored_offset, offset));
Some(stored_offset) if stored_offset.version < offset => {
conflict_offsets.push((txn_app_id, stored_offset.version, offset));
}
_ => (),
}
Expand Down Expand Up @@ -127,8 +127,8 @@ async fn commit_partition_offsets(
epoch_id,
},
)
.map_err(DeltaTableError::from)?
.await;
.await
.map_err(DeltaTableError::from);
match commit {
Ok(v) => {
info!(
Expand Down Expand Up @@ -185,12 +185,20 @@ mod tests {
table.update().await.unwrap();
assert_eq!(table.version(), 1);
assert_eq!(
table.get_app_transaction_version().get("test-0").unwrap(),
&5
table
.get_app_transaction_version()
.get("test-0")
.unwrap()
.version,
5
);
assert_eq!(
table.get_app_transaction_version().get("test-1").unwrap(),
&10
table
.get_app_transaction_version()
.get("test-1")
.unwrap()
.version,
10
);

// Test ignored write
Expand Down
6 changes: 3 additions & 3 deletions src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ impl DataWriter {
self.storage
.put(
&deltalake_core::Path::parse(&path).unwrap(),
bytes::Bytes::copy_from_slice(obj_bytes.as_slice()),
bytes::Bytes::copy_from_slice(obj_bytes.as_slice()).into(),
)
.await?;

Expand Down Expand Up @@ -594,8 +594,8 @@ impl DataWriter {
predicate: None,
},
)
.map_err(DeltaTableError::from)?
.await?;
.await
.map_err(DeltaTableError::from)?;
Ok(commit.version)
}
}
Expand Down
3 changes: 1 addition & 2 deletions tests/delta_partitions_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,8 @@ async fn test_delta_partitions() {
table.log_store().clone(),
operation,
)
.map_err(DeltaTableError::from)
.unwrap()
.await
.map_err(DeltaTableError::from)
.expect("Failed to create transaction")
.version;

Expand Down
8 changes: 4 additions & 4 deletions tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::time::Duration;

use bytes::Buf;
use chrono::prelude::*;
use deltalake_core::kernel::{Action, Add, Metadata, Protocol, Remove, Txn};
use deltalake_core::kernel::{Action, Add, Metadata, Protocol, Remove, Transaction};
use deltalake_core::parquet::{
file::reader::{FileReader, SerializedFileReader},
record::RowAccessor,
Expand Down Expand Up @@ -452,7 +452,7 @@ pub async fn inspect_table(path: &str) {
let table = deltalake_core::open_table(path).await.unwrap();
println!("Inspecting table {}", path);
for (k, v) in table.get_app_transaction_version().iter() {
println!(" {}: {}", k, v);
println!(" {}: {}", k, v.version);
}
let store = table.object_store().clone();

Expand Down Expand Up @@ -516,7 +516,7 @@ pub async fn inspect_table(path: &str) {
i, p.min_reader_version, p.min_writer_version
);
}
if let Some(t) = parse_json_field::<Txn>(&json, "txn") {
if let Some(t) = parse_json_field::<Transaction>(&json, "txn") {
println!(" {}. txn: appId={}, version={}", i, t.app_id, t.version);
}
if let Some(r) = parse_json_field::<Remove>(&json, "remove") {
Expand Down Expand Up @@ -630,7 +630,7 @@ impl TestScope {
total += table
.get_app_transaction_version()
.get(key)
.copied()
.map(|txn| txn.version)
.unwrap_or(0);
}

Expand Down
13 changes: 10 additions & 3 deletions tests/offset_tests.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use deltalake_core::protocol::Stats;
use deltalake_core::DeltaTable;
use log::{debug, info};
use log::*;
use rdkafka::{producer::Producer, util::Timeout};
use serde::{Deserialize, Serialize};
use serde_json::json;
use serial_test::serial;
use std::path::Path;
use uuid::Uuid;

use std::path::Path;

use kafka_delta_ingest::{AutoOffsetReset, IngestOptions};
#[allow(dead_code)]
mod helpers;
Expand Down Expand Up @@ -90,7 +92,12 @@ fn count_records(table: DeltaTable) -> i64 {

if let Ok(adds) = table.state.unwrap().file_actions() {
for add in adds.iter() {
count += add.get_stats_parsed().unwrap().unwrap().num_records;
if let Some(stats) = add.stats.as_ref() {
// as of deltalake-core 0.18.0 get_stats_parsed() only returns data when loaded
// from checkpoints so manual parsing is necessary
let stats: Stats = serde_json::from_str(stats).unwrap_or(Stats::default());
count += stats.num_records;
}
}
}
count
Expand Down

0 comments on commit 32d3c3a

Please sign in to comment.