diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2b7f8b694..92e47fb92 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -158,4 +158,5 @@ jobs: rust-version: ${{ env.rust_msrv }} - name: Check MSRV run: | - cargo +${{ env.rust_msrv }} check --locked --workspace --exclude iceberg-datafusion --exclude iceberg-catalog-s3tables + cargo +${{ env.rust_msrv }} check --locked --workspace --exclude iceberg-datafusion \ + --exclude iceberg-catalog-s3tables --exclude iceberg-sqllogictest diff --git a/Cargo.lock b/Cargo.lock index ca6d792af..39a9d4d4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1361,6 +1361,46 @@ dependencies = [ "inout", ] +[[package]] +name = "clap" +version = "4.5.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8eb5e908ef3a6efbe1ed62520fb7287959888c88485abe072543190ecc66783" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b01801b5fc6a0a232407abc821660c9c6d25a1cafc0d4f85f29fb8d9afc121" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b755194d6389280185988721fffba69495eed5ee9feeee9a599b53db80318c" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.92", +] + +[[package]] +name = "clap_lex" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" + [[package]] name = "colorchoice" version = "1.0.3" @@ -1385,7 +1425,7 @@ checksum = "24f165e7b643266ea80cb858aed492ad9280e3e05ce24d4a99d7d7b889b6a4d9" dependencies = [ "strum", "strum_macros", - "unicode-width", + "unicode-width 0.2.0", ] [[package]] @@ -2185,6 +2225,18 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "educe" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d7bc049e1bd8cdeb31b68bbd586a9464ecf9f3944af3958a7a9d0f8b9799417" +dependencies = [ + "enum-ordinalize", + "proc-macro2", + "quote", + "syn 2.0.92", +] + [[package]] name = "either" version = "1.13.0" @@ -2194,6 +2246,26 @@ dependencies = [ "serde", ] +[[package]] +name = "enum-ordinalize" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea0dcfa4e54eeb516fe454635a95753ddd39acda650ce703031c6973e315dd5" +dependencies = [ + "enum-ordinalize-derive", +] + +[[package]] +name = "enum-ordinalize-derive" +version = "4.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d28318a75d4aead5c4db25382e8ef717932d0346600cacae6357eb5941bc5ff" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.92", +] + [[package]] name = "env_filter" version = "0.1.3" @@ -2233,6 +2305,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "escape8259" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5692dd7b5a1978a5aeb0ce83b7655c58ca8efdcb79d21036ea249da95afec2c6" + [[package]] name = "etcetera" version = "0.8.0" @@ -2351,6 +2429,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs-err" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88a41f105fe1d5b6b34b2055e3dc59bb79b46b48b2040b9e6c7b4b5de097aa41" +dependencies = [ + "autocfg", +] + [[package]] name = "funty" version = "2.0.0" @@ -2643,6 +2730,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + [[package]] name = "hermit-abi" version = "0.4.0" @@ -3101,6 +3194,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "iceberg-sqllogictest" +version = "0.4.0" +dependencies = [ + "anyhow", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-schema", + "async-trait", + "bigdecimal", + "datafusion", + "datafusion-common", + "half", + "itertools", + "rust_decimal", + "sqllogictest", + "tokio", + "toml", +] + [[package]] name = "iceberg_test_utils" version = "0.4.0" @@ -3508,6 +3622,18 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libtest-mimic" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc0bda45ed5b3a2904262c1bb91e526127aa70e7ef3758aba2ef93cf896b9b58" +dependencies = [ + "clap", + "escape8259", + "termcolor", + "threadpool", +] + [[package]] name = "linked-hash-map" version = "0.5.6" @@ -3900,6 +4026,16 @@ dependencies = [ "libm", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi 0.3.9", + "libc", +] + [[package]] name = "num_enum" version = "0.7.3" @@ -4034,6 +4170,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "owo-colors" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb37767f6569cd834a413442455e0f066d0d522de8630436e2a1761d9726ba56" + [[package]] name = "parking" version = "2.2.1" @@ -4360,7 +4502,7 @@ checksum = "a604568c3202727d1507653cb121dbd627a58684eb09a820fd746bee38b4442f" dependencies = [ "cfg-if", "concurrent-queue", - "hermit-abi", + "hermit-abi 0.4.0", "pin-project-lite", "rustix", "tracing", @@ -5209,6 +5351,15 @@ dependencies = [ "syn 2.0.92", ] +[[package]] +name = "serde_spanned" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87607cb1398ed59d48732e575a4c28a7a8ebf2454b964fe3f224f2afc07909e1" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -5471,9 +5622,26 @@ dependencies = [ [[package]] name = "sqllogictest" -version = "0.4.0" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b6b8f606d3c4cdcaf2031c4320b79d7584e454b79562ba3d675f49701c160e" dependencies = [ - "anyhow", + "async-trait", + "educe", + "fs-err", + "futures", + "glob", + "humantime", + "itertools", + "libtest-mimic", + "md-5", + "owo-colors", + "regex", + "similar", + "subst", + "tempfile", + "thiserror 1.0.69", + "tracing", ] [[package]] @@ -5752,6 +5920,16 @@ dependencies = [ "syn 2.0.92", ] +[[package]] +name = "subst" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33e7942675ea19db01ef8cf15a1e6443007208e6c74568bd64162da26d40160d" +dependencies = [ + "memchr", + "unicode-width 0.1.14", +] + [[package]] name = "subtle" version = "2.6.1" @@ -5848,6 +6026,15 @@ dependencies = [ "unic-segment", ] +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -5898,6 +6085,15 @@ dependencies = [ "once_cell", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + [[package]] name = "thrift" version = "0.17.0" @@ -6047,11 +6243,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + [[package]] name = "toml_datetime" version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" +dependencies = [ + "serde", +] [[package]] name = "toml_edit" @@ -6060,6 +6271,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" dependencies = [ "indexmap 2.7.0", + "serde", + "serde_spanned", "toml_datetime", "winnow", ] @@ -6310,6 +6523,12 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" +[[package]] +name = "unicode-width" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" + [[package]] name = "unicode-width" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index e4b982e7e..42ce56b6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ apache-avro = "0.17" array-init = "2" arrow-arith = { version = "53.3.0" } arrow-array = { version = "53.3.0" } +arrow-buffer = { version = "53.3.0" } arrow-cast = { version = "53.3.0" } arrow-ord = { version = "53.3.0" } arrow-schema = { version = "53.3.0" } @@ -58,6 +59,8 @@ bitvec = "1.0.1" bytes = "1.6" chrono = "0.4.38" ctor = "0.2.8" +datafusion = { version = "44.0.0" } +datafusion-common = { version = "44.0.0" } derive_builder = "0.20" either = "1" env_logger = "0.11.0" @@ -102,3 +105,7 @@ hive_metastore = "0.1" tera = "1" zstd = "0.13.2" expect-test = "1" +toml = "0.8.19" +sqllogictest = "0.22" +bigdecimal = "0.4.7" +half = "2.4.1" diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 7a706d0e9..ccb9ca175 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -34,7 +34,7 @@ keywords = ["iceberg", "integrations", "datafusion"] [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } -datafusion = { version = "44" } +datafusion = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } tokio = { workspace = true } diff --git a/crates/sqllogictest/Cargo.toml b/crates/sqllogictest/Cargo.toml index 7c596159f..737ce8b0b 100644 --- a/crates/sqllogictest/Cargo.toml +++ b/crates/sqllogictest/Cargo.toml @@ -16,7 +16,7 @@ # under the License. [package] -name = "sqllogictest" +name = "iceberg-sqllogictest" version = { workspace = true } edition = { workspace = true } homepage = { workspace = true } @@ -26,3 +26,17 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } +arrow-array = { workspace = true } +arrow-buffer = { workspace = true } +arrow-cast = { workspace = true } +arrow-schema = { workspace = true } +async-trait = { workspace = true } +bigdecimal = { workspace = true } +datafusion = { workspace = true, default-features = true } +datafusion-common = { workspace = true, default-features = true } +half = { workspace = true } +itertools = { workspace = true } +rust_decimal = { workspace = true } +sqllogictest = { workspace = true } +tokio = { workspace = true } +toml = { workspace = true } diff --git a/crates/sqllogictest/src/display/conversion.rs b/crates/sqllogictest/src/display/conversion.rs new file mode 100644 index 000000000..f5fe3decd --- /dev/null +++ b/crates/sqllogictest/src/display/conversion.rs @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::types::{Decimal128Type, Decimal256Type, DecimalType}; +use arrow_buffer::i256; +use bigdecimal::BigDecimal; +use half::f16; +use rust_decimal::prelude::*; + +/// Represents a constant for NULL string in your database. +pub const NULL_STR: &str = "NULL"; + +pub(crate) fn bool_to_str(value: bool) -> String { + if value { + "true".to_string() + } else { + "false".to_string() + } +} + +pub(crate) fn varchar_to_str(value: &str) -> String { + if value.is_empty() { + "(empty)".to_string() + } else { + value.trim_end_matches('\n').to_string() + } +} + +pub(crate) fn f16_to_str(value: f16) -> String { + if value.is_nan() { + // The sign of NaN can be different depending on platform. + // So the string representation of NaN ignores the sign. + "NaN".to_string() + } else if value == f16::INFINITY { + "Infinity".to_string() + } else if value == f16::NEG_INFINITY { + "-Infinity".to_string() + } else { + big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap()) + } +} + +pub(crate) fn f32_to_str(value: f32) -> String { + if value.is_nan() { + // The sign of NaN can be different depending on platform. + // So the string representation of NaN ignores the sign. + "NaN".to_string() + } else if value == f32::INFINITY { + "Infinity".to_string() + } else if value == f32::NEG_INFINITY { + "-Infinity".to_string() + } else { + big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap()) + } +} + +pub(crate) fn f64_to_str(value: f64) -> String { + if value.is_nan() { + // The sign of NaN can be different depending on platform. + // So the string representation of NaN ignores the sign. + "NaN".to_string() + } else if value == f64::INFINITY { + "Infinity".to_string() + } else if value == f64::NEG_INFINITY { + "-Infinity".to_string() + } else { + big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap()) + } +} + +pub(crate) fn i128_to_str(value: i128, precision: &u8, scale: &i8) -> String { + big_decimal_to_str( + BigDecimal::from_str(&Decimal128Type::format_decimal(value, *precision, *scale)).unwrap(), + ) +} + +pub(crate) fn i256_to_str(value: i256, precision: &u8, scale: &i8) -> String { + big_decimal_to_str( + BigDecimal::from_str(&Decimal256Type::format_decimal(value, *precision, *scale)).unwrap(), + ) +} + +pub(crate) fn big_decimal_to_str(value: BigDecimal) -> String { + value.round(12).normalized().to_string() +} diff --git a/crates/sqllogictest/src/display/mod.rs b/crates/sqllogictest/src/display/mod.rs new file mode 100644 index 000000000..115f67f1d --- /dev/null +++ b/crates/sqllogictest/src/display/mod.rs @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Code in this module is copied from [Apache Datafusion](https://github.com/apache/datafusion) +pub mod conversion; +pub mod normalize; diff --git a/crates/sqllogictest/src/display/normalize.rs b/crates/sqllogictest/src/display/normalize.rs new file mode 100644 index 000000000..9013b6ee1 --- /dev/null +++ b/crates/sqllogictest/src/display/normalize.rs @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use anyhow::anyhow; +use arrow_array::{ + ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float16Array, Float32Array, + Float64Array, LargeStringArray, RecordBatch, StringArray, StringViewArray, +}; +use arrow_cast::display::ArrayFormatter; +use arrow_schema::{DataType, Fields}; +use datafusion_common::format::DEFAULT_FORMAT_OPTIONS; + +use crate::display::conversion::*; +use crate::engine::output::DFColumnType; + +/// Converts `batches` to a result as expected by sqllogicteset. +pub(crate) fn convert_batches(batches: Vec) -> anyhow::Result>> { + if batches.is_empty() { + Ok(vec![]) + } else { + let schema = batches[0].schema(); + let mut rows = vec![]; + for batch in batches { + // Verify schema + if !schema.contains(&batch.schema()) { + return Err(anyhow!( + "Schema mismatch. Previously had\n{:#?}\n\nGot:\n{:#?}", + &schema, + batch.schema() + )); + } + + let new_rows = convert_batch(batch)?.into_iter().flat_map(expand_row); + rows.extend(new_rows); + } + Ok(rows) + } +} + +/// special case rows that have newlines in them (like explain plans) +// +/// Transform inputs like: +/// ```text +/// [ +/// "logical_plan", +/// "Sort: d.b ASC NULLS LAST\n Projection: d.b, MAX(d.a) AS max_a", +/// ] +/// ``` +/// +/// Into one cell per line, adding lines if necessary +/// ```text +/// [ +/// "logical_plan", +/// ] +/// [ +/// "Sort: d.b ASC NULLS LAST", +/// ] +/// [ <--- newly added row +/// "|-- Projection: d.b, MAX(d.a) AS max_a", +/// ] +/// ``` +fn expand_row(mut row: Vec) -> impl Iterator> { + use std::iter::once; + + use itertools::Either; + + // check last cell + if let Some(cell) = row.pop() { + let lines: Vec<_> = cell.split('\n').collect(); + + // no newlines in last cell + if lines.len() < 2 { + row.push(cell); + return Either::Left(once(row)); + } + + // form new rows with each additional line + let new_lines: Vec<_> = lines + .into_iter() + .enumerate() + .map(|(idx, l)| { + // replace any leading spaces with '-' as + // `sqllogictests` ignores whitespace differences + // + // See https://github.com/apache/datafusion/issues/6328 + let content = l.trim_start(); + let new_prefix = "-".repeat(l.len() - content.len()); + // maintain for each line a number, so + // reviewing explain result changes is easier + let line_num = idx + 1; + vec![format!("{line_num:02}){new_prefix}{content}")] + }) + .collect(); + + Either::Right(once(row).chain(new_lines)) + } else { + Either::Left(once(row)) + } +} + +/// Convert a single batch to a `Vec>` for comparison +fn convert_batch(batch: RecordBatch) -> anyhow::Result>> { + (0..batch.num_rows()) + .map(|row| { + batch + .columns() + .iter() + .map(|col| cell_to_string(col, row)) + .collect::>>() + }) + .collect() +} + +macro_rules! get_row_value { + ($array_type:ty, $column: ident, $row: ident) => {{ + let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); + + array.value($row) + }}; +} + +/// Normalizes the content of a single cell in RecordBatch prior to printing. +/// +/// This is to make the output comparable to the semi-standard .slt format +/// +/// Normalizations applied to [NULL Values and empty strings] +/// +/// [NULL Values and empty strings]: https://duckdb.org/dev/sqllogictest/result_verification#null-values-and-empty-strings +/// +/// Floating numbers are rounded to have a consistent representation with the Postgres runner. +pub fn cell_to_string(col: &ArrayRef, row: usize) -> anyhow::Result { + if !col.is_valid(row) { + // represent any null value with the string "NULL" + Ok(NULL_STR.to_string()) + } else { + match col.data_type() { + DataType::Null => Ok(NULL_STR.to_string()), + DataType::Boolean => Ok(bool_to_str(get_row_value!(BooleanArray, col, row))), + DataType::Float16 => Ok(f16_to_str(get_row_value!(Float16Array, col, row))), + DataType::Float32 => Ok(f32_to_str(get_row_value!(Float32Array, col, row))), + DataType::Float64 => Ok(f64_to_str(get_row_value!(Float64Array, col, row))), + DataType::Decimal128(precision, scale) => { + let value = get_row_value!(Decimal128Array, col, row); + Ok(i128_to_str(value, precision, scale)) + } + DataType::Decimal256(precision, scale) => { + let value = get_row_value!(Decimal256Array, col, row); + Ok(i256_to_str(value, precision, scale)) + } + DataType::LargeUtf8 => Ok(varchar_to_str(get_row_value!(LargeStringArray, col, row))), + DataType::Utf8 => Ok(varchar_to_str(get_row_value!(StringArray, col, row))), + DataType::Utf8View => Ok(varchar_to_str(get_row_value!(StringViewArray, col, row))), + _ => { + let f = ArrayFormatter::try_new(col.as_ref(), &DEFAULT_FORMAT_OPTIONS); + Ok(f.unwrap().value(row).to_string()) + } + } + } +} + +/// Converts columns to a result as expected by sqllogicteset. +pub(crate) fn convert_schema_to_types(columns: &Fields) -> Vec { + columns + .iter() + .map(|f| f.data_type()) + .map(|data_type| match data_type { + DataType::Boolean => DFColumnType::Boolean, + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 => DFColumnType::Integer, + DataType::Float16 + | DataType::Float32 + | DataType::Float64 + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) => DFColumnType::Float, + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => DFColumnType::Text, + DataType::Date32 | DataType::Date64 | DataType::Time32(_) | DataType::Time64(_) => { + DFColumnType::DateTime + } + DataType::Timestamp(_, _) => DFColumnType::Timestamp, + _ => DFColumnType::Another, + }) + .collect() +} diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs new file mode 100644 index 000000000..1b6e65b00 --- /dev/null +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::{anyhow, Context}; +use arrow_array::RecordBatch; +use async_trait::async_trait; +use datafusion::catalog::CatalogProvider; +use datafusion::physical_plan::common::collect; +use datafusion::physical_plan::execute_stream; +use datafusion::prelude::{SessionConfig, SessionContext}; +use sqllogictest::{AsyncDB, DBOutput}; +use toml::Table as TomlTable; + +use crate::display::normalize; +use crate::engine::output::{DFColumnType, DFOutput}; +use crate::engine::Engine; +use crate::error::{Error, Result}; + +pub struct DataFusionEngine { + config: TomlTable, +} + +#[async_trait::async_trait] +impl Engine for DataFusionEngine { + async fn new(config: TomlTable) -> Result { + Ok(Self { config }) + } + + async fn run_slt_file(&self, path: &Path) -> Result<()> { + let content = std::fs::read_to_string(path) + .with_context(|| format!("Failed to read slt file {:?}", path)) + .map_err(|e| anyhow!(e))?; + let mut db = DataFusionDB::new(&self.config).await?; + + db.run(content.as_str()).await?; + + Ok(()) + } +} + +struct DataFusionDB { + ctx: SessionContext, +} + +impl Default for DataFusionDB { + fn default() -> Self { + let config = SessionConfig::new().with_target_partitions(4); + + let ctx = SessionContext::new_with_config(config); + + Self { ctx } + } +} + +#[async_trait] +impl AsyncDB for DataFusionDB { + type Error = Error; + type ColumnType = DFColumnType; + + async fn run(&mut self, sql: &str) -> Result { + Ok(run_query(&self.ctx, sql).await?) + } + + /// Engine name of current database. + fn engine_name(&self) -> &str { + "DataFusion" + } + + /// [`DataFusionDB`] calls this function to perform sleep. + /// + /// The default implementation is `std::thread::sleep`, which is universal to any async runtime + /// but would block the current thread. If you are running in tokio runtime, you should override + /// this by `tokio::time::sleep`. + async fn sleep(dur: Duration) { + tokio::time::sleep(dur).await; + } +} + +async fn run_query(ctx: &SessionContext, sql: impl Into) -> anyhow::Result { + let df = ctx.sql(sql.into().as_str()).await?; + let task_ctx = Arc::new(df.task_ctx()); + let plan = df.create_physical_plan().await?; + + let stream = execute_stream(plan, task_ctx)?; + let types = normalize::convert_schema_to_types(stream.schema().fields()); + let results: Vec = collect(stream).await?; + let rows = normalize::convert_batches(results)?; + + if rows.is_empty() && types.is_empty() { + Ok(DBOutput::StatementComplete(0)) + } else { + Ok(DBOutput::Rows { types, rows }) + } +} + +impl DataFusionDB { + pub async fn new(configs: &TomlTable) -> Result { + let config = SessionConfig::new().with_target_partitions(4); + + let ctx = SessionContext::new_with_config(config); + ctx.register_catalog("demo", Self::create_catalog(configs).await?); + + Ok(Self { ctx }) + } + + async fn create_catalog(_configs: &TomlTable) -> anyhow::Result> { + todo!() + } +} diff --git a/crates/sqllogictest/src/engine/mod.rs b/crates/sqllogictest/src/engine/mod.rs new file mode 100644 index 000000000..0a116ae4d --- /dev/null +++ b/crates/sqllogictest/src/engine/mod.rs @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod datafusion; +pub mod output; + +use std::path::Path; + +use toml::Table as TomlTable; + +use crate::error::Result; + +#[async_trait::async_trait] +pub trait Engine: Sized { + async fn new(config: TomlTable) -> Result; + async fn run_slt_file(&self, path: &Path) -> Result<()>; +} diff --git a/crates/sqllogictest/src/engine/output.rs b/crates/sqllogictest/src/engine/output.rs new file mode 100644 index 000000000..24299856e --- /dev/null +++ b/crates/sqllogictest/src/engine/output.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use sqllogictest::{ColumnType, DBOutput}; + +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum DFColumnType { + Boolean, + DateTime, + Integer, + Float, + Text, + Timestamp, + Another, +} + +impl ColumnType for DFColumnType { + fn from_char(value: char) -> Option { + match value { + 'B' => Some(Self::Boolean), + 'D' => Some(Self::DateTime), + 'I' => Some(Self::Integer), + 'P' => Some(Self::Timestamp), + 'R' => Some(Self::Float), + 'T' => Some(Self::Text), + _ => Some(Self::Another), + } + } + + fn to_char(&self) -> char { + match self { + Self::Boolean => 'B', + Self::DateTime => 'D', + Self::Integer => 'I', + Self::Timestamp => 'P', + Self::Float => 'R', + Self::Text => 'T', + Self::Another => '?', + } + } +} + +pub(crate) type DFOutput = DBOutput; diff --git a/crates/sqllogictest/src/lib.rs b/crates/sqllogictest/src/lib.rs index 196d16c63..197b04279 100644 --- a/crates/sqllogictest/src/lib.rs +++ b/crates/sqllogictest/src/lib.rs @@ -18,5 +18,8 @@ // This lib contains codes copied from // [Apache Datafusion](https://github.com/apache/datafusion/tree/main/datafusion/sqllogictest) +mod display; +#[allow(dead_code)] +mod engine; #[allow(dead_code)] mod error;