Skip to content

Commit

Permalink
Switch to using deltalake_core instead of the meta package
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Jan 6, 2024
1 parent cdb4dc8 commit 40da522
Show file tree
Hide file tree
Showing 16 changed files with 410 additions and 318 deletions.
578 changes: 330 additions & 248 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ tokio-util = "0.6.3"
uuid = { version = "0.8", features = ["serde", "v4"] }
url = "2.3"

deltalake = { version = "0.16.5", features = ["arrow", "json", "parquet"], optional = true }
#deltalake = { version = "0.16.5", features = ["arrow", "json", "parquet"], optional = true }
deltalake-core = { git = "https://github.com/delta-io/delta-rs", branch = "main", features = ["json"]}
deltalake-aws = { git = "https://github.com/delta-io/delta-rs", branch = "main", optional = true }
deltalake-azure = { git = "https://github.com/delta-io/delta-rs", branch = "main", optional = true }

# s3 feature enabled
dynamodb_lock = { version = "0.6.0", optional = true }
Expand All @@ -53,13 +56,13 @@ default = []
sentry-ext = ["sentry"]
dynamic-linking = [ "rdkafka/dynamic-linking" ]
azure = [
"deltalake/azure",
"deltalake-azure",
"azure_core",
"azure_storage",
"azure_storage_blobs"
]
s3 = [
"deltalake/s3",
"deltalake-aws",
"dynamodb_lock",
"rusoto_core",
"rusoto_credential",
Expand Down
4 changes: 2 additions & 2 deletions src/coercions.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use deltalake::kernel::Schema as DeltaSchema;
use deltalake::kernel::{DataType, PrimitiveType};
use deltalake_core::kernel::Schema as DeltaSchema;
use deltalake_core::kernel::{DataType, PrimitiveType};

use chrono::prelude::*;
use serde_json::Value;
Expand Down
4 changes: 2 additions & 2 deletions src/dead_letters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::transforms::Transformer;
use async_trait::async_trait;
use chrono::prelude::*;
use core::fmt::Debug;
use deltalake::parquet::errors::ParquetError;
use deltalake::{DeltaTable, DeltaTableError};
use deltalake_core::parquet::errors::ParquetError;
use deltalake_core::{DeltaTable, DeltaTableError};
#[cfg(feature = "s3")]
use dynamodb_lock::dynamo_lock_options;
use log::{error, info, warn};
Expand Down
10 changes: 5 additions & 5 deletions src/delta_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::{DataTypeOffset, DataTypePartition};
use deltalake::kernel::{Action, Add, Txn};
use deltalake::{DeltaTable, DeltaTableError};
use deltalake_core::kernel::{Action, Add, Txn};
use deltalake_core::{DeltaTable, DeltaTableError};
use std::collections::HashMap;

pub(crate) async fn load_table(
table_uri: &str,
options: HashMap<String, String>,
) -> Result<DeltaTable, DeltaTableError> {
let mut table = deltalake::open_table_with_storage_options(table_uri, options).await?;
let mut table = deltalake_core::open_table_with_storage_options(table_uri, options).await?;
table.load().await?;
Ok(table)
}
Expand Down Expand Up @@ -52,10 +52,10 @@ pub(crate) async fn try_create_checkpoint(
table.load_version(version).await?;
}

deltalake::checkpoints::create_checkpoint(table).await?;
deltalake_core::checkpoints::create_checkpoint(table).await?;
log::info!("Created checkpoint version {}.", version);

let removed = deltalake::checkpoints::cleanup_metadata(table).await?;
let removed = deltalake_core::checkpoints::cleanup_metadata(table).await?;
if removed > 0 {
log::info!("Metadata cleanup, removed {} obsolete logs.", removed);
}
Expand Down
12 changes: 5 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ extern crate strum_macros;
extern crate serde_json;

use coercions::CoercionTree;
use deltalake::protocol::DeltaOperation;
use deltalake::protocol::OutputMode;
use deltalake::{DeltaTable, DeltaTableError};
use deltalake_core::protocol::DeltaOperation;
use deltalake_core::protocol::OutputMode;
use deltalake_core::{DeltaTable, DeltaTableError};
use futures::stream::StreamExt;
use log::{debug, error, info, warn};
use rdkafka::{
Expand Down Expand Up @@ -945,10 +945,9 @@ impl IngestProcessor {
return Err(IngestError::ConflictingOffsets);
}

/* XXX: update_schema has been removed because it just swaps the schema
if self
.delta_writer
.update_schema(self.table.metadata().unwrap())
.update_schema(self.table.state.delta_metadata().unwrap())?
{
info!("Table schema has been updated");
// Update the coercion tree to reflect the new schema
Expand All @@ -957,7 +956,6 @@ impl IngestProcessor {

return Err(IngestError::DeltaSchemaChanged);
}
*/

// Try to commit
let mut attempt_number: u32 = 0;
Expand All @@ -967,7 +965,7 @@ impl IngestProcessor {
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as i64;
match deltalake::operations::transaction::commit(
match deltalake_core::operations::transaction::commit(
self.table.log_store().clone().as_ref(),
&actions,
DeltaOperation::StreamingUpdate {
Expand Down
5 changes: 5 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ use std::str::FromStr;

#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
#[cfg(feature = "s3")]
deltalake_aws::register_handlers(None);
#[cfg(feature = "azure")]
deltalake_azure::register_handlers(None);

#[cfg(feature = "sentry-ext")]
{
let _guard = std::env::var("SENTRY_DSN").ok().map(|dsn| {
Expand Down
12 changes: 6 additions & 6 deletions src/offsets.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::delta_helpers::*;
use crate::{DataTypeOffset, DataTypePartition};
use deltalake::kernel::Action;
use deltalake::protocol::DeltaOperation;
use deltalake::protocol::OutputMode;
use deltalake::{DeltaTable, DeltaTableError};
use deltalake_core::kernel::Action;
use deltalake_core::protocol::DeltaOperation;
use deltalake_core::protocol::OutputMode;
use deltalake_core::{DeltaTable, DeltaTableError};
use log::{error, info};

/// Errors returned by `write_offsets_to_delta` function.
Expand Down Expand Up @@ -115,7 +115,7 @@ async fn commit_partition_offsets(
.as_millis() as i64;

table.update().await?;
match deltalake::operations::transaction::commit(
match deltalake_core::operations::transaction::commit(
table.log_store().clone().as_ref(),
&actions,
DeltaOperation::StreamingUpdate {
Expand Down Expand Up @@ -221,6 +221,6 @@ mod tests {
let v0_path = format!("{}/_delta_log/00000000000000000000.json", &table_path);
std::fs::create_dir_all(Path::new(&v0_path).parent().unwrap()).unwrap();
std::fs::write(&v0_path, VERSION_0).unwrap();
deltalake::open_table(&table_path).await.unwrap()
deltalake_core::open_table(&table_path).await.unwrap()
}
}
47 changes: 24 additions & 23 deletions src/writer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! High-level writer implementations for [`deltalake`].
#[allow(deprecated)]
use deltalake::arrow::{
use deltalake_core::arrow::{
array::{
as_boolean_array, as_primitive_array, as_struct_array, make_array, Array, ArrayData,
StructArray,
Expand All @@ -12,18 +12,18 @@ use deltalake::arrow::{
json::reader::ReaderBuilder,
record_batch::*,
};
use deltalake::parquet::format::FileMetaData;
use deltalake::parquet::{
use deltalake_core::parquet::format::FileMetaData;
use deltalake_core::parquet::{
arrow::ArrowWriter,
basic::{Compression, LogicalType},
errors::ParquetError,
file::{metadata::RowGroupMetaData, properties::WriterProperties, statistics::Statistics},
format::TimeUnit,
schema::types::{ColumnDescriptor, SchemaDescriptor},
};
use deltalake::protocol::DeltaOperation;
use deltalake::protocol::SaveMode;
use deltalake::{
use deltalake_core::protocol::DeltaOperation;
use deltalake_core::protocol::SaveMode;
use deltalake_core::{
kernel::{Action, Add, Schema},
protocol::{ColumnCountStat, ColumnValueStat, Stats},
storage::ObjectStoreRef,
Expand Down Expand Up @@ -63,7 +63,7 @@ pub enum DataWriterError {
/// The record batch schema.
record_batch_schema: SchemaRef,
/// The schema of the target delta table.
expected_schema: Arc<deltalake::arrow::datatypes::Schema>,
expected_schema: Arc<ArrowSchema>,
},

/// An Arrow RecordBatch could not be created from the JSON buffer.
Expand Down Expand Up @@ -173,7 +173,7 @@ impl From<DeltaTableError> for Box<DataWriterError> {
/// Writes messages to a delta lake table.
pub struct DataWriter {
storage: ObjectStoreRef,
arrow_schema_ref: Arc<deltalake::arrow::datatypes::Schema>,
arrow_schema_ref: Arc<ArrowSchema>,
writer_properties: WriterProperties,
partition_columns: Vec<String>,
arrow_writers: HashMap<String, DataArrowWriter>,
Expand Down Expand Up @@ -463,7 +463,7 @@ impl DataWriter {

self.storage
.put(
&deltalake::Path::parse(&path).unwrap(),
&deltalake_core::Path::parse(&path).unwrap(),
bytes::Bytes::copy_from_slice(obj_bytes.as_slice()),
)
.await?;
Expand Down Expand Up @@ -497,7 +497,7 @@ impl DataWriter {

/// Returns the arrow schema representation of the delta table schema defined for the wrapped
/// table.
pub fn arrow_schema(&self) -> Arc<deltalake::arrow::datatypes::Schema> {
pub fn arrow_schema(&self) -> Arc<ArrowSchema> {
self.arrow_schema_ref.clone()
}

Expand Down Expand Up @@ -585,7 +585,7 @@ impl DataWriter {
self.write(values).await?;
let mut adds = self.write_parquet_files(&table.table_uri()).await?;
let actions = adds.drain(..).map(Action::Add).collect();
let version = deltalake::operations::transaction::commit(
let version = deltalake_core::operations::transaction::commit(
table.log_store().clone().as_ref(),
&actions,
DeltaOperation::Write {
Expand Down Expand Up @@ -658,7 +658,8 @@ fn min_max_values_from_file_metadata(
partition_values: &HashMap<String, Option<String>>,
file_metadata: &FileMetaData,
) -> Result<MinAndMaxValues, ParquetError> {
let type_ptr = deltalake::parquet::schema::types::from_thrift(file_metadata.schema.as_slice());
let type_ptr =
deltalake_core::parquet::schema::types::from_thrift(file_metadata.schema.as_slice());
let schema_descriptor = type_ptr.map(|type_| Arc::new(SchemaDescriptor::new(type_)))?;

let mut min_values: HashMap<String, ColumnValueStat> = HashMap::new();
Expand Down Expand Up @@ -853,7 +854,7 @@ fn min_and_max_from_parquet_statistics(
statistics: &[&Statistics],
column_descr: Arc<ColumnDescriptor>,
) -> Result<(Option<Value>, Option<Value>), ParquetError> {
use deltalake::arrow::compute::*;
use deltalake_core::arrow::compute::*;

let stats_with_min_max: Vec<&Statistics> = statistics
.iter()
Expand Down Expand Up @@ -914,23 +915,23 @@ fn min_and_max_from_parquet_statistics(
}
DataType::Int32 => {
let min_array =
as_primitive_array::<deltalake::arrow::datatypes::Int32Type>(&min_array);
as_primitive_array::<deltalake_core::arrow::datatypes::Int32Type>(&min_array);
let min = min(min_array);
let min = min.map(|i| Value::Number(Number::from(i)));

let max_array =
as_primitive_array::<deltalake::arrow::datatypes::Int32Type>(&max_array);
as_primitive_array::<deltalake_core::arrow::datatypes::Int32Type>(&max_array);
let max = max(max_array);
let max = max.map(|i| Value::Number(Number::from(i)));

Ok((min, max))
}
DataType::Int64 => {
let min_array =
as_primitive_array::<deltalake::arrow::datatypes::Int64Type>(&min_array);
as_primitive_array::<deltalake_core::arrow::datatypes::Int64Type>(&min_array);
let min = min(min_array);
let max_array =
as_primitive_array::<deltalake::arrow::datatypes::Int64Type>(&max_array);
as_primitive_array::<deltalake_core::arrow::datatypes::Int64Type>(&max_array);
let max = max(max_array);

match column_descr.logical_type().as_ref() {
Expand All @@ -953,25 +954,25 @@ fn min_and_max_from_parquet_statistics(
}
DataType::Float32 => {
let min_array =
as_primitive_array::<deltalake::arrow::datatypes::Float32Type>(&min_array);
as_primitive_array::<deltalake_core::arrow::datatypes::Float32Type>(&min_array);
let min = min(min_array);
let min = min.and_then(|f| Number::from_f64(f as f64).map(Value::Number));

let max_array =
as_primitive_array::<deltalake::arrow::datatypes::Float32Type>(&max_array);
as_primitive_array::<deltalake_core::arrow::datatypes::Float32Type>(&max_array);
let max = max(max_array);
let max = max.and_then(|f| Number::from_f64(f as f64).map(Value::Number));

Ok((min, max))
}
DataType::Float64 => {
let min_array =
as_primitive_array::<deltalake::arrow::datatypes::Float64Type>(&min_array);
as_primitive_array::<deltalake_core::arrow::datatypes::Float64Type>(&min_array);
let min = min(min_array);
let min = min.and_then(|f| Number::from_f64(f).map(Value::Number));

let max_array =
as_primitive_array::<deltalake::arrow::datatypes::Float64Type>(&max_array);
as_primitive_array::<deltalake_core::arrow::datatypes::Float64Type>(&max_array);
let max = max(max_array);
let max = max.and_then(|f| Number::from_f64(f).map(Value::Number));

Expand Down Expand Up @@ -1112,7 +1113,7 @@ fn stringified_partition_value(
DataType::UInt32 => as_primitive_array::<UInt32Type>(arr).value(0).to_string(),
DataType::UInt64 => as_primitive_array::<UInt64Type>(arr).value(0).to_string(),
DataType::Utf8 => {
let data = deltalake::arrow::array::as_string_array(arr);
let data = deltalake_core::arrow::array::as_string_array(arr);

data.value(0).to_string()
}
Expand All @@ -1127,7 +1128,7 @@ fn stringified_partition_value(

/// Vendored from delta-rs since it's no longer a public API
fn timestamp_to_delta_stats_string(n: i64, time_unit: &TimeUnit) -> Option<String> {
use deltalake::arrow::temporal_conversions;
use deltalake_core::arrow::temporal_conversions;

let dt = match time_unit {
TimeUnit::MILLIS(_) => temporal_conversions::timestamp_ms_to_datetime(n),
Expand Down
2 changes: 1 addition & 1 deletion tests/buffer_flush_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async fn test_dont_write_an_empty_buffer() {
// verify that an empty version _was not_ created.
// i.e. we should still be at version 1

let t = deltalake::open_table(&table).await.unwrap();
let t = deltalake_core::open_table(&table).await.unwrap();

assert_eq!(1, t.version());

Expand Down
12 changes: 6 additions & 6 deletions tests/delta_partitions_tests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#[allow(dead_code)]
mod helpers;

use deltalake::kernel::{Action, Add};
use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake_core::kernel::{Action, Add};
use deltalake_core::protocol::{DeltaOperation, SaveMode};
use kafka_delta_ingest::writer::*;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
Expand Down Expand Up @@ -38,7 +38,7 @@ async fn test_delta_partitions() {
"test_delta_partitions",
);

let mut table = deltalake::open_table(&table_path).await.unwrap();
let mut table = deltalake_core::open_table(&table_path).await.unwrap();
let mut delta_writer = DataWriter::for_table(&table, HashMap::new()).unwrap();

let batch1 = vec![
Expand Down Expand Up @@ -102,7 +102,7 @@ async fn test_delta_partitions() {
predicate: None,
};

let version = deltalake::operations::transaction::commit(
let version = deltalake_core::operations::transaction::commit(
table.log_store().clone().as_ref(),
&result.iter().cloned().map(Action::Add).collect(),
operation,
Expand All @@ -112,11 +112,11 @@ async fn test_delta_partitions() {
.await
.expect("Failed to create transaction");

deltalake::checkpoints::create_checkpoint(&table)
deltalake_core::checkpoints::create_checkpoint(&table)
.await
.unwrap();

let table = deltalake::open_table(&table_path).await.unwrap();
let table = deltalake_core::open_table(&table_path).await.unwrap();
assert_eq!(table.version(), version);

std::fs::remove_dir_all(&table_path).unwrap();
Expand Down
1 change: 1 addition & 0 deletions tests/emails_azure_blob_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async fn when_rebalance_happens_azure() {
}

async fn run_emails_s3_tests(initiate_rebalance: bool) {
deltalake_azure::register_handlers(None);
helpers::init_logger();
let topic = format!("emails_azure-{}", Uuid::new_v4());
let table = prepare_table(&topic).await;
Expand Down
Loading

0 comments on commit 40da522

Please sign in to comment.