Skip to content

Commit

Permalink
Upgrade the tree to be deltalake-v0.17 compatibe
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Jan 6, 2024
1 parent a92571c commit cdb4dc8
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 112 deletions.
32 changes: 16 additions & 16 deletions src/coercions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use deltalake::{Schema as DeltaSchema, SchemaDataType as DeltaDataType};
use deltalake::kernel::Schema as DeltaSchema;
use deltalake::kernel::{DataType, PrimitiveType};

use chrono::prelude::*;
use serde_json::Value;
Expand Down Expand Up @@ -34,33 +35,32 @@ pub(crate) struct CoercionArray {
pub(crate) fn create_coercion_tree(schema: &DeltaSchema) -> CoercionTree {
let mut root = HashMap::new();

for field in schema.get_fields() {
if let Some(node) = build_coercion_node(field.get_type()) {
root.insert(field.get_name().to_string(), node);
for field in schema.fields() {
if let Some(node) = build_coercion_node(field.data_type()) {
root.insert(field.name().to_string(), node);
}
}

CoercionTree { root }
}

fn build_coercion_node(r#type: &DeltaDataType) -> Option<CoercionNode> {
match r#type {
DeltaDataType::primitive(r#type) if r#type == "string" => {
Some(CoercionNode::Coercion(Coercion::ToString))
}
DeltaDataType::primitive(r#type) if r#type == "timestamp" => {
Some(CoercionNode::Coercion(Coercion::ToTimestamp))
}
DeltaDataType::r#struct(schema) => {
let nested_context = create_coercion_tree(schema);
fn build_coercion_node(data_type: &DataType) -> Option<CoercionNode> {
match data_type {
DataType::Primitive(primitive) => match primitive {
PrimitiveType::String => Some(CoercionNode::Coercion(Coercion::ToString)),
PrimitiveType::Timestamp => Some(CoercionNode::Coercion(Coercion::ToTimestamp)),
_ => None,
},
DataType::Struct(st) => {
let nested_context = create_coercion_tree(st);
if !nested_context.root.is_empty() {
Some(CoercionNode::Tree(nested_context))
} else {
None
}
}
DeltaDataType::array(schema) => {
build_coercion_node(schema.get_element_type()).and_then(|node| match node {
DataType::Array(array) => {
build_coercion_node(array.element_type()).and_then(|node| match node {
CoercionNode::Coercion(c) => Some(CoercionNode::ArrayPrimitive(c)),
CoercionNode::Tree(t) => Some(CoercionNode::ArrayTree(t)),
_ => None,
Expand Down
6 changes: 3 additions & 3 deletions src/delta_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{DataTypeOffset, DataTypePartition};
use deltalake::protocol::{Action, Add, Txn};
use deltalake::kernel::{Action, Add, Txn};
use deltalake::{DeltaTable, DeltaTableError};
use std::collections::HashMap;

Expand All @@ -22,12 +22,12 @@ pub(crate) fn build_actions(
.map(|(partition, offset)| {
create_txn_action(txn_app_id_for_partition(app_id, *partition), *offset)
})
.chain(add.drain(..).map(Action::add))
.chain(add.drain(..).map(Action::Add))
.collect()
}

pub(crate) fn create_txn_action(txn_app_id: String, offset: DataTypeOffset) -> Action {
Action::txn(Txn {
Action::Txn(Txn {
app_id: txn_app_id,
version: offset,
last_updated: Some(
Expand Down
23 changes: 6 additions & 17 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ impl IngestProcessor {
let dlq = dead_letter_queue_from_options(&opts).await?;
let transformer = Transformer::from_transforms(&opts.transforms)?;
let table = delta_helpers::load_table(table_uri, HashMap::new()).await?;
let coercion_tree = coercions::create_coercion_tree(&table.get_metadata()?.schema);
let coercion_tree = coercions::create_coercion_tree(table.schema().unwrap());
let delta_writer = DataWriter::for_table(&table, HashMap::new())?;
let deserializer = match MessageDeserializerFactory::try_build(&opts.input_format) {
Ok(deserializer) => deserializer,
Expand Down Expand Up @@ -945,41 +945,30 @@ impl IngestProcessor {
return Err(IngestError::ConflictingOffsets);
}

/* XXX: update_schema has been removed because it just swaps the schema
if self
.delta_writer
.update_schema(self.table.get_metadata()?)?
.update_schema(self.table.metadata().unwrap())
{
info!("Table schema has been updated");
// Update the coercion tree to reflect the new schema
let coercion_tree = coercions::create_coercion_tree(&self.table.get_metadata()?.schema);
let coercion_tree = coercions::create_coercion_tree(self.table.schema().unwrap());
let _ = std::mem::replace(&mut self.coercion_tree, coercion_tree);
return Err(IngestError::DeltaSchemaChanged);
}
*/

// Try to commit
let mut attempt_number: u32 = 0;
let actions = build_actions(&partition_offsets, self.opts.app_id.as_str(), add);
loop {
/*let partition_columns = self.table.get_metadata().unwrap().partition_columns.clone();
match deltalake::operations::transaction::commit(
(self.table.object_store().storage_backend()).as_ref(),
&actions,
deltalake::action::DeltaOperation::Write {
mode: deltalake::action::SaveMode::Append,
partition_by: Some(partition_columns),
predicate: None,
},
&self.table.state,
None,
)*/

let epoch_id = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as i64;
match deltalake::operations::transaction::commit(
(self.table.object_store().storage_backend()).as_ref(),
self.table.log_store().clone().as_ref(),
&actions,
DeltaOperation::StreamingUpdate {
output_mode: OutputMode::Append,
Expand Down
4 changes: 2 additions & 2 deletions src/offsets.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::delta_helpers::*;
use crate::{DataTypeOffset, DataTypePartition};
use deltalake::protocol::Action;
use deltalake::kernel::Action;
use deltalake::protocol::DeltaOperation;
use deltalake::protocol::OutputMode;
use deltalake::{DeltaTable, DeltaTableError};
Expand Down Expand Up @@ -116,7 +116,7 @@ async fn commit_partition_offsets(

table.update().await?;
match deltalake::operations::transaction::commit(
(table.object_store().storage_backend()).as_ref(),
table.log_store().clone().as_ref(),
&actions,
DeltaOperation::StreamingUpdate {
output_mode: OutputMode::Complete,
Expand Down
60 changes: 12 additions & 48 deletions src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ use deltalake::parquet::{
use deltalake::protocol::DeltaOperation;
use deltalake::protocol::SaveMode;
use deltalake::{
protocol::{Action, Add, ColumnCountStat, ColumnValueStat, Stats},
storage::DeltaObjectStore,
kernel::{Action, Add, Schema},
protocol::{ColumnCountStat, ColumnValueStat, Stats},
storage::ObjectStoreRef,
table::DeltaTableMetaData,
DeltaResult, DeltaTable, DeltaTableError, ObjectStoreError, Schema,
DeltaTable, DeltaTableError, ObjectStoreError,
};
use log::{error, info, warn};
use serde_json::{Number, Value};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::io::Write;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, env};
use url::Url;
use uuid::Uuid;

use crate::cursor::InMemoryWriteableCursor;
Expand Down Expand Up @@ -172,7 +172,7 @@ impl From<DeltaTableError> for Box<DataWriterError> {

/// Writes messages to a delta lake table.
pub struct DataWriter {
storage: DeltaObjectStore,
storage: ObjectStoreRef,
arrow_schema_ref: Arc<deltalake::arrow::datatypes::Schema>,
writer_properties: WriterProperties,
partition_columns: Vec<String>,
Expand Down Expand Up @@ -339,13 +339,13 @@ impl DataWriter {
/// Creates a DataWriter to write to the given table
pub fn for_table(
table: &DeltaTable,
options: HashMap<String, String>,
_options: HashMap<String, String>, // XXX: figure out if this is necessary
) -> Result<DataWriter, Box<DataWriterError>> {
let storage = load_object_store_from_uri(table.table_uri().as_str(), Some(options))?;
let storage = table.object_store();

// Initialize an arrow schema ref from the delta table schema
let metadata = table.get_metadata()?;
let arrow_schema = ArrowSchema::try_from(&metadata.schema)?;
let metadata = table.metadata()?;
let arrow_schema = ArrowSchema::try_from(table.schema().unwrap())?;
let arrow_schema_ref = Arc::new(arrow_schema);
let partition_columns = metadata.partition_columns.clone();

Expand Down Expand Up @@ -462,7 +462,6 @@ impl DataWriter {
//

self.storage
.storage_backend()
.put(
&deltalake::Path::parse(&path).unwrap(),
bytes::Bytes::copy_from_slice(obj_bytes.as_slice()),
Expand Down Expand Up @@ -585,9 +584,9 @@ impl DataWriter {
) -> Result<i64, Box<DataWriterError>> {
self.write(values).await?;
let mut adds = self.write_parquet_files(&table.table_uri()).await?;
let actions = adds.drain(..).map(Action::add).collect();
let actions = adds.drain(..).map(Action::Add).collect();
let version = deltalake::operations::transaction::commit(
(table.object_store().storage_backend()).as_ref(),
table.log_store().clone().as_ref(),
&actions,
DeltaOperation::Write {
mode: SaveMode::Append,
Expand Down Expand Up @@ -615,41 +614,6 @@ pub fn record_batch_from_json(
.ok_or(Box::new(DataWriterError::EmptyRecordBatch))
}

/// Creates an object store from a uri while normalizing file system paths
pub fn load_object_store_from_uri(
path: &str,
options: Option<HashMap<String, String>>,
) -> DeltaResult<DeltaObjectStore> {
match Url::parse(path) {
Ok(table_uri) => DeltaObjectStore::try_new(table_uri, options.unwrap_or_default()),
Err(url::ParseError::RelativeUrlWithoutBase) => {
match std::path::Path::new(path).is_absolute() {
true => load_table_from_file_uri(path, options),
false => {
let result = env::current_dir()?
.join(path)
.to_str()
.unwrap_or(path)
.to_string();
load_table_from_file_uri(result.as_str(), options)
}
}
}
Err(e) => {
error!("unable to parse table uri: {}", e);
DeltaResult::Err(DeltaTableError::InvalidTableLocation(path.to_string()))
}
}
}

fn load_table_from_file_uri(
absolute_path: &str,
options: Option<HashMap<String, String>>,
) -> DeltaResult<DeltaObjectStore> {
let url = Url::from_file_path(absolute_path).unwrap();
DeltaObjectStore::try_new(url, options.unwrap_or_default())
}

type BadValue = (Value, ParquetError);

fn quarantine_failed_parquet_rows(
Expand Down
7 changes: 4 additions & 3 deletions tests/delta_partitions_tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#[allow(dead_code)]
mod helpers;

use deltalake::protocol::{Action, Add, DeltaOperation, SaveMode};
use deltalake::kernel::{Action, Add};
use deltalake::protocol::{DeltaOperation, SaveMode};
use kafka_delta_ingest::writer::*;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
Expand Down Expand Up @@ -102,8 +103,8 @@ async fn test_delta_partitions() {
};

let version = deltalake::operations::transaction::commit(
&*table.object_store(),
&result.iter().cloned().map(Action::add).collect(),
table.log_store().clone().as_ref(),
&result.iter().cloned().map(Action::Add).collect(),
operation,
&table.state,
None,
Expand Down
Loading

0 comments on commit cdb4dc8

Please sign in to comment.