Skip to content

Commit

Permalink
Upgrade datafusion 37
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Apr 7, 2024
1 parent b3f9693 commit ba4012a
Show file tree
Hide file tree
Showing 28 changed files with 1,205 additions and 317 deletions.
313 changes: 215 additions & 98 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,5 +149,6 @@ debug = 1
# Use this section to test or apply emergency ovverides to dependencies
# See: https://doc.rust-lang.org/cargo/reference/overriding-dependencies.html
[patch.crates-io]
# datafusion = { git = 'https://github.com/apache/arrow-datafusion.git', tag = '36.0.0-rc1' }
# datafusion-common = { git = 'https://github.com/apache/arrow-datafusion.git', tag = '36.0.0-rc1' }
# datafusion = { git = 'https://github.com/apache/arrow-datafusion.git', tag = '37.0.0-rc1' }
# datafusion-common = { git = 'https://github.com/apache/arrow-datafusion.git', tag = '37.0.0-rc1' }
# datafusion-odata = { git = 'https://github.com/kamu-data/datafusion-odata.git', tag = '37.0.0-rc1' }
4 changes: 2 additions & 2 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ license-files = [
unknown-git = "deny"
unknown-registry = "deny"
allow-git = [
# TODO: Remove after we switch to datafusion 33.0.0 release
"https://github.com/apache/arrow-datafusion"
"https://github.com/apache/arrow-datafusion",
"https://github.com/kamu-data/datafusion-odata"
]


Expand Down
6 changes: 3 additions & 3 deletions src/adapter/flight-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ doctest = false


[dependencies]
arrow-flight = { version = "50", features = ["flight-sql-experimental"] }
arrow-flight = { version = "51", features = ["flight-sql-experimental"] }
async-trait = { version = "0.1", default-features = false }
base64 = { version = "0.21", default-features = false }
dashmap = { version = "5", default-features = false }
datafusion = { version = "36", default-features = false }
datafusion = { version = "37", default-features = false }
futures = "0.3"
like = { version = "0.3", default-features = false }
prost = { version = "0.12", default-features = false }
tonic = { version = "0.10", default-features = false }
tonic = { version = "0.11", default-features = false }
tracing = { version = "0.1", default-features = false }
uuid = { version = "1", default-features = false }

Expand Down
9 changes: 8 additions & 1 deletion src/adapter/flight-sql/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::DFSchema;
use datafusion::logical_expr::LogicalPlan;
use datafusion::prelude::{DataFrame, SessionContext};
use prost::bytes::Bytes;
use prost::Message;
use tonic::codegen::tokio_stream::Stream;
use tonic::metadata::MetadataValue;
Expand Down Expand Up @@ -274,7 +275,7 @@ impl KamuFlightSqlService {
continue;
}

let table = schema.table(&table_name).await.unwrap();
let table = schema.table(&table_name).await.unwrap().unwrap();

col_catalog_name.push(catalog_name.clone());
col_db_schema_name.push(schema_name.clone());
Expand Down Expand Up @@ -543,6 +544,8 @@ impl KamuFlightSqlService {
let fieps = vec![FlightEndpoint {
ticket: Some(Ticket { ticket }),
location: vec![],
expiration_time: None,
app_metadata: Bytes::new(),
}];

let flight_desc = FlightDescriptor {
Expand All @@ -557,6 +560,7 @@ impl KamuFlightSqlService {
total_records,
total_bytes,
ordered: false,
app_metadata: Bytes::new(),
};
tracing::debug!(
schema = ?schema.as_ref(),
Expand Down Expand Up @@ -585,6 +589,8 @@ impl KamuFlightSqlService {
let fieps = vec![FlightEndpoint {
ticket: Some(Ticket { ticket }),
location: vec![],
expiration_time: None,
app_metadata: Bytes::new(),
}];

let flight_desc = FlightDescriptor {
Expand All @@ -599,6 +605,7 @@ impl KamuFlightSqlService {
total_records: -1,
total_bytes: -1,
ordered: false,
app_metadata: Bytes::new(),
};
tracing::debug!(
schema = ?df.schema(),
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/graphql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async-graphql = { version = "6", features = ["chrono", "url", "apollo_tracing"]
async-trait = { version = "0.1", default-features = false }
cron = { version = "0.12.0", default-features = false }
chrono = "0.4"
datafusion = { version = "36", default-features = false, features = ["serde"] } # TODO: Currently needed for type conversions but ideally should be encapsulated by kamu-core
datafusion = { version = "37", default-features = false, features = ["serde"] } # TODO: Currently needed for type conversions but ideally should be encapsulated by kamu-core
dill = "0.8"
futures = "0.3"
serde = {version = "1", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ axum-extra = { version = "0.8", features = ["async-read-body"] }
async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4", features = ["serde"] }
datafusion = { version = "36", default-features = false } # TODO: Currently needed for type conversions but ideally should be encapsulated by kamu-core
datafusion = { version = "37", default-features = false } # TODO: Currently needed for type conversions but ideally should be encapsulated by kamu-core
dill = "0.8"
flate2 = "1" # GZip decoder
futures = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/odata/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ kamu-core = { workspace = true }

axum = { version = "0.6", features = ["headers"] }
chrono = { version = "0.4", default-features = false }
datafusion = { version = "36", default-features = false }
datafusion-odata = { version = "0.4", default-features = false }
datafusion = { version = "37", default-features = false }
datafusion-odata = { version = "37", default-features = false }
dill = { version = "0.8" }
futures = { version = "0.3", default-features = false }
http = "0.2"
Expand Down
6 changes: 3 additions & 3 deletions src/app/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ read_input = "0.8" # Basic user input
webbrowser = "0.8" # For opening URLs in default system browser

# APIs
arrow-flight = { version = "50", features = ["flight-sql-experimental"] }
arrow-flight = { version = "51", features = ["flight-sql-experimental"] }
async-graphql = { version = "6", features = ["chrono", "url", "apollo_tracing"] }
async-graphql-axum = "6"
axum = { version = "0.6", features = ["ws"] }
Expand All @@ -78,7 +78,7 @@ http = "0.2"
hyper = "0.14"
reqwest = { version = "0.11", default-features = false, features = [] }
serde_json = "1"
tonic = { version = "0.10", default-features = false }
tonic = { version = "0.11", default-features = false }
tower = "0.4"
tower-http = { version = "0.4", features = ["trace", "cors"] }

Expand Down Expand Up @@ -106,7 +106,7 @@ tracing-bunyan-formatter = "0.3"
async-trait = "0.1"
chrono = "0.4"
cfg-if = "1" # Conditional compilation
datafusion = { version = "36", default-features = false, features = ["crypto_expressions", "encoding_expressions", "parquet", "regex_expressions", "unicode_expressions", "compression"] }
datafusion = { version = "37", default-features = false, features = ["crypto_expressions", "encoding_expressions", "parquet", "regex_expressions", "unicode_expressions", "compression"] }
dill = "0.8"
dirs = "5"
fs_extra = "1.3"
Expand Down
2 changes: 1 addition & 1 deletion src/domain/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ tracing = { version = "0.1", default-features = false }
url = { version = "2", default-features = false, features = ["serde"] }

# TODO: Avoid this dependency or depend on sub-crates
datafusion = { version = "36", default-features = false, features = ["parquet"] }
datafusion = { version = "37", default-features = false, features = ["parquet"] }
object_store = { version = "0.9", default-features = false }

# TODO: Make serde optional
Expand Down
2 changes: 1 addition & 1 deletion src/domain/flow-system/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ serde = { version = "1", default-features = false, features = ["derive"] }
serde_with = { version = "3", default-features = false }

[dev-dependencies]
datafusion = { version = "36", default-features = false }
datafusion = { version = "37", default-features = false }
4 changes: 2 additions & 2 deletions src/domain/opendatafabric/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ serde_yaml = "0.9"

# gRPC
prost = "0.12"
tonic = "0.10"
tonic = "0.11"

# Optional
arrow = { optional = true, version = "50", default-features = false, features = ["ipc"] }
arrow = { optional = true, version = "51", default-features = false, features = ["ipc"] }

[dev-dependencies]
indoc = "2"
2 changes: 1 addition & 1 deletion src/infra/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ ringbuf = "0.3"
zip = "0.6"

# Data
datafusion = { version = "36", default-features = false }
datafusion = { version = "37", default-features = false }
object_store = { version = "0.9", features = ["aws"] }
digest = "0.10"
sha3 = "0.10"
Expand Down
18 changes: 10 additions & 8 deletions src/infra/core/src/engine/engine_odf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::Arc;

use container_runtime::*;
use datafusion::config::{ParquetOptions, TableParquetOptions};
use kamu_core::engine::*;
use kamu_core::*;
use odf::engine::{EngineGrpcClient, ExecuteRawQueryError, ExecuteTransformError};
Expand Down Expand Up @@ -274,14 +276,14 @@ impl Engine for ODFEngine {
.write_parquet(
host_input_data_path.as_os_str().to_str().unwrap(),
datafusion::dataframe::DataFrameWriteOptions::new().with_single_file_output(true),
Some(
datafusion::parquet::file::properties::WriterProperties::builder()
.set_writer_version(
datafusion::parquet::file::properties::WriterVersion::PARQUET_1_0,
)
.set_compression(datafusion::parquet::basic::Compression::SNAPPY)
.build(),
),
Some(TableParquetOptions {
global: ParquetOptions {
writer_version: "1.0".into(),
compression: Some("snappy".into()),
..Default::default()
},
column_specific_options: HashMap::new(),
}),
)
.await
.int_err()?;
Expand Down
24 changes: 17 additions & 7 deletions src/infra/core/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use datafusion::common::{Constraints, Statistics};
use datafusion::datasource::empty::EmptyTable;
use datafusion::datasource::listing::{ListingTable, ListingTableConfig};
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::DataFusionError;
use datafusion::execution::context::{DataFilePaths, SessionState};
use datafusion::execution::options::ReadOptions;
use datafusion::logical_expr::LogicalPlan;
Expand Down Expand Up @@ -242,23 +243,31 @@ impl SchemaProvider for KamuSchema {
.unwrap_or(false)
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
let Ok(alias) = DatasetAlias::try_from(name) else {
return None;
return Ok(None);
};

let table = {
let cache = self.ensure_cache().await.unwrap();
let cache = self
.ensure_cache()
.await
.map_err(|e| DataFusionError::External(e.into()))?;

cache.tables.as_ref().unwrap().get(&alias).cloned()
};

if let Some(table) = table {
// HACK: We pre-initialize the schema here because `TableProvider::schema()` is
// not async
table.get_table_schema().await.unwrap();
Some(table)
table
.get_table_schema()
.await
.map_err(|e| DataFusionError::External(e.into()))?;

Ok(Some(table))
} else {
None
Ok(None)
}
}
}
Expand Down Expand Up @@ -363,7 +372,8 @@ impl KamuTable {

let table_paths = file_urls.to_urls().int_err()?;
let session_config = self.session_context.copied_config();
let listing_options = options.to_listing_options(&session_config);
let listing_options = options
.to_listing_options(&session_config, self.session_context.copied_table_options());

let resolved_schema = options
.get_resolved_schema(
Expand Down
2 changes: 1 addition & 1 deletion src/infra/ingest-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ opendatafabric = { workspace = true, features = ["arrow"] }
kamu-core = { workspace = true }
kamu-data-utils = { workspace = true }

datafusion = { version = "36", default-features = false }
datafusion = { version = "37", default-features = false }
digest = "0.10"
geo-types = { version = "0.7", default-features = false, features = [] }
geojson ={ version = "0.24", default-features = false, features = ["geo-types"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl MergeStrategySnapshot {
partition_by: self.primary_key.iter().map(col).collect(),
order_by: vec![col(&self.vocab.offset_column).sort(false, false)],
window_frame: datafusion::logical_expr::WindowFrame::new(Some(false)),
null_treatment: None,
},
)
.alias(rank_col)])
Expand Down
42 changes: 29 additions & 13 deletions src/infra/ingest-datafusion/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::HashMap;
use std::convert::TryFrom;
use std::path::{Path, PathBuf};
use std::sync::Arc;
Expand All @@ -15,8 +16,8 @@ use chrono::{DateTime, TimeZone, Utc};
use datafusion::arrow::array::Array;
use datafusion::arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
use datafusion::common::DFSchema;
use datafusion::config::{ColumnOptions, ParquetOptions, TableParquetOptions};
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::prelude::*;
use internal_error::*;
use kamu_core::ingest::*;
Expand Down Expand Up @@ -338,6 +339,7 @@ impl DataWriterDataFusion {
partition_by: vec![],
order_by: self.merge_strategy.sort_order(),
window_frame: expr::WindowFrame::new(Some(false)),
null_treatment: None,
}),
)
.int_err()?;
Expand Down Expand Up @@ -401,21 +403,35 @@ impl DataWriterDataFusion {
}

// TODO: Externalize configuration
fn get_write_properties(&self) -> WriterProperties {
fn get_write_properties(&self) -> TableParquetOptions {
// TODO: `offset` column is sorted integers so we could use delta encoding, but
// Flink does not support it.
// See: https://github.com/kamu-data/kamu-engine-flink/issues/3
WriterProperties::builder()
.set_writer_version(datafusion::parquet::file::properties::WriterVersion::PARQUET_1_0)
.set_compression(datafusion::parquet::basic::Compression::SNAPPY)
// op column is low cardinality and best encoded as RLE_DICTIONARY
.set_column_dictionary_enabled(
self.meta.vocab.operation_type_column.as_str().into(),
true,
)
// system_time value will be the same for all rows in a batch
.set_column_dictionary_enabled(self.meta.vocab.system_time_column.as_str().into(), true)
.build()
TableParquetOptions {
global: ParquetOptions {
writer_version: "1.0".into(),
compression: Some("snappy".into()),
..Default::default()
},
column_specific_options: HashMap::from([
(
// op column is low cardinality and best encoded as RLE_DICTIONARY
self.meta.vocab.operation_type_column.clone(),
ColumnOptions {
dictionary_enabled: Some(true),
..Default::default()
},
),
(
self.meta.vocab.system_time_column.clone(),
ColumnOptions {
// system_time value will be the same for all rows in a batch
dictionary_enabled: Some(true),
..Default::default()
},
),
]),
}
}

#[tracing::instrument(level = "debug", skip_all, fields(?path))]
Expand Down
8 changes: 4 additions & 4 deletions src/utils/data-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ doctest = false
[dependencies]
opendatafabric = { workspace = true }

arrow = { version = "50", default-features = false }
arrow-json = { version = "50", default-features = false }
arrow-digest = { version = "50", default-features = false }
datafusion = { version = "36", default-features = false, features = ["parquet", "serde"] }
arrow = { version = "51", default-features = false }
arrow-json = { version = "51", default-features = false }
arrow-digest = { version = "51", default-features = false }
datafusion = { version = "37", default-features = false, features = ["parquet", "serde"] }
tracing = { version = "0.1", default-features = false }

async-trait = "0.1"
Expand Down
Loading

0 comments on commit ba4012a

Please sign in to comment.