From e4ae613846f2ebf78138cd472a8043e18844e632 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 13 Apr 2024 09:16:31 -0400 Subject: [PATCH 1/4] chore(deps): update sqllogictest requirement from 0.19.0 to 0.20.0 (#10057) Updates the requirements on [sqllogictest](https://github.com/risinglightdb/sqllogictest-rs) to permit the latest version. - [Release notes](https://github.com/risinglightdb/sqllogictest-rs/releases) - [Changelog](https://github.com/risinglightdb/sqllogictest-rs/blob/main/CHANGELOG.md) - [Commits](https://github.com/risinglightdb/sqllogictest-rs/commits) --- updated-dependencies: - dependency-name: sqllogictest dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- datafusion/sqllogictest/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index c348f2cddc93..384c5b7153c3 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -48,7 +48,7 @@ object_store = { workspace = true } postgres-protocol = { version = "0.6.4", optional = true } postgres-types = { version = "0.2.4", optional = true } rust_decimal = { version = "1.27.0" } -sqllogictest = "0.19.0" +sqllogictest = "0.20.0" sqlparser = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } From 694d4b838439224f5bd51d68ad6254b1b46c06ed Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Sat, 13 Apr 2024 22:23:03 +0900 Subject: [PATCH 2/4] Rename `FileSinkExec` to `DataSinkExec` (#10065) * Rename `FileSinkExec` to `DataSinkExec` * Add deprecation notice * Fix clippy * Make typedef public --- .../core/src/datasource/file_format/arrow.rs | 4 ++-- .../core/src/datasource/file_format/csv.rs | 4 ++-- .../core/src/datasource/file_format/json.rs | 4 ++-- .../src/datasource/file_format/parquet.rs | 4 ++-- datafusion/core/src/datasource/memory.rs | 4 ++-- datafusion/core/src/datasource/provider.rs | 4 ++-- datafusion/core/src/datasource/stream.rs | 4 ++-- datafusion/physical-plan/src/insert.rs | 21 +++++++++++-------- datafusion/proto/src/physical_plan/mod.rs | 10 ++++----- .../tests/cases/roundtrip_physical_plan.rs | 10 ++++----- datafusion/sqllogictest/test_files/copy.slt | 4 ++-- .../sqllogictest/test_files/explain.slt | 2 +- datafusion/sqllogictest/test_files/insert.slt | 8 +++---- .../test_files/insert_to_external.slt | 8 +++---- 14 files changed, 47 insertions(+), 44 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 99bfbbad9d10..9d58465191e1 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -43,7 +43,7 @@ use arrow_schema::{ArrowError, Schema, SchemaRef}; use datafusion_common::{not_impl_err, DataFusionError, FileType, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; -use datafusion_physical_plan::insert::{DataSink, FileSinkExec}; +use datafusion_physical_plan::insert::{DataSink, DataSinkExec}; use datafusion_physical_plan::metrics::MetricsSet; use async_trait::async_trait; @@ -129,7 +129,7 @@ impl FileFormat for ArrowFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(ArrowFileSink::new(conf)); - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, sink, sink_schema, diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index a7849258329b..84235cde0f5d 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -31,7 +31,7 @@ use crate::datasource::physical_plan::{ }; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::insert::{DataSink, FileSinkExec}; +use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; @@ -267,7 +267,7 @@ impl FileFormat for CsvFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(CsvSink::new(conf, writer_options)); - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, sink, sink_schema, diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 0cc38bbb5554..efc0aa4328d8 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -31,7 +31,7 @@ use crate::datasource::physical_plan::FileGroupDisplay; use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec}; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::insert::{DataSink, FileSinkExec}; +use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::{ DisplayAs, DisplayFormatType, SendableRecordBatchStream, Statistics, }; @@ -177,7 +177,7 @@ impl FileFormat for JsonFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(JsonSink::new(conf, writer_options)); - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, sink, sink_schema, diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index bcf4e8a2c8e4..1d41f015121d 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -37,7 +37,7 @@ use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; -use crate::physical_plan::insert::{DataSink, FileSinkExec}; +use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::{ Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, Statistics, @@ -279,7 +279,7 @@ impl FileFormat for ParquetFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(ParquetSink::new(conf, self.options.clone())); - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, sink, sink_schema, diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 608a46144da3..42e05ebeb33f 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -26,7 +26,7 @@ use crate::datasource::{TableProvider, TableType}; use crate::error::Result; use crate::execution::context::SessionState; use crate::logical_expr::Expr; -use crate::physical_plan::insert::{DataSink, FileSinkExec}; +use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::{ @@ -279,7 +279,7 @@ impl TableProvider for MemTable { return not_impl_err!("Overwrite not implemented for MemoryTable yet"); } let sink = Arc::new(MemSink::new(self.batches.clone())); - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, sink, self.schema.clone(), diff --git a/datafusion/core/src/datasource/provider.rs b/datafusion/core/src/datasource/provider.rs index 100011952b3b..7c58aded3108 100644 --- a/datafusion/core/src/datasource/provider.rs +++ b/datafusion/core/src/datasource/provider.rs @@ -269,10 +269,10 @@ pub trait TableProvider: Sync + Send { /// /// # See Also /// - /// See [`FileSinkExec`] for the common pattern of inserting a + /// See [`DataSinkExec`] for the common pattern of inserting a /// streams of `RecordBatch`es as files to an ObjectStore. /// - /// [`FileSinkExec`]: crate::physical_plan::insert::FileSinkExec + /// [`DataSinkExec`]: crate::physical_plan::insert::DataSinkExec async fn insert_into( &self, _state: &SessionState, diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index 079c1a891d14..2059a5ffcfe4 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -34,7 +34,7 @@ use datafusion_common::{plan_err, Constraints, DataFusionError, Result}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::{CreateExternalTable, Expr, TableType}; -use datafusion_physical_plan::insert::{DataSink, FileSinkExec}; +use datafusion_physical_plan::insert::{DataSink, DataSinkExec}; use datafusion_physical_plan::metrics::MetricsSet; use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder; use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; @@ -277,7 +277,7 @@ impl TableProvider for StreamTable { None => None, }; - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, Arc::new(StreamWrite(self.0.clone())), self.0.schema.clone(), diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index f0233264f280..e3f9f2c76d31 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -74,10 +74,13 @@ pub trait DataSink: DisplayAs + Debug + Send + Sync { ) -> Result; } +#[deprecated(since = "38.0.0", note = "Use [`DataSinkExec`] instead")] +pub type FileSinkExec = DataSinkExec; + /// Execution plan for writing record batches to a [`DataSink`] /// /// Returns a single row with the number of values written -pub struct FileSinkExec { +pub struct DataSinkExec { /// Input plan that produces the record batches to be written. input: Arc, /// Sink to which to write @@ -91,13 +94,13 @@ pub struct FileSinkExec { cache: PlanProperties, } -impl fmt::Debug for FileSinkExec { +impl fmt::Debug for DataSinkExec { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "FileSinkExec schema: {:?}", self.count_schema) + write!(f, "DataSinkExec schema: {:?}", self.count_schema) } } -impl FileSinkExec { +impl DataSinkExec { /// Create a plan to write to `sink` pub fn new( input: Arc, @@ -190,7 +193,7 @@ impl FileSinkExec { } } -impl DisplayAs for FileSinkExec { +impl DisplayAs for DataSinkExec { fn fmt_as( &self, t: DisplayFormatType, @@ -198,16 +201,16 @@ impl DisplayAs for FileSinkExec { ) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "FileSinkExec: sink=")?; + write!(f, "DataSinkExec: sink=")?; self.sink.fmt_as(t, f) } } } } -impl ExecutionPlan for FileSinkExec { +impl ExecutionPlan for DataSinkExec { fn name(&self) -> &'static str { - "FileSinkExec" + "DataSinkExec" } /// Return a reference to Any that can be used for downcasting @@ -269,7 +272,7 @@ impl ExecutionPlan for FileSinkExec { context: Arc, ) -> Result { if partition != 0 { - return internal_err!("FileSinkExec can only be called on partition 0!"); + return internal_err!("DataSinkExec can only be called on partition 0!"); } let data = self.execute_input_stream(0, context.clone())?; diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 4d5d6cadad17..4d95c847bf99 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -58,7 +58,7 @@ use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::explain::ExplainExec; use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::insert::FileSinkExec; +use datafusion::physical_plan::insert::DataSinkExec; use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter}; use datafusion::physical_plan::joins::{ CrossJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec, @@ -1033,7 +1033,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .map(|item| PhysicalSortRequirement::from_sort_exprs(&item)) }) .transpose()?; - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, Arc::new(data_sink), Arc::new(sink_schema), @@ -1063,7 +1063,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .map(|item| PhysicalSortRequirement::from_sort_exprs(&item)) }) .transpose()?; - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, Arc::new(data_sink), Arc::new(sink_schema), @@ -1093,7 +1093,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .map(|item| PhysicalSortRequirement::from_sort_exprs(&item)) }) .transpose()?; - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, Arc::new(data_sink), Arc::new(sink_schema), @@ -1879,7 +1879,7 @@ impl AsExecutionPlan for PhysicalPlanNode { }); } - if let Some(exec) = plan.downcast_ref::() { + if let Some(exec) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.input().to_owned(), extension_codec, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index a74b1a38935b..f97cfea765bf 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -48,7 +48,7 @@ use datafusion::physical_plan::expressions::{ NotExpr, NthValue, PhysicalSortExpr, StringAgg, Sum, }; use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::insert::FileSinkExec; +use datafusion::physical_plan::insert::DataSinkExec; use datafusion::physical_plan::joins::{ HashJoinExec, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode, }; @@ -861,7 +861,7 @@ fn roundtrip_json_sink() -> Result<()> { }), )]; - roundtrip_test(Arc::new(FileSinkExec::new( + roundtrip_test(Arc::new(DataSinkExec::new( input, data_sink, schema.clone(), @@ -896,7 +896,7 @@ fn roundtrip_csv_sink() -> Result<()> { }), )]; - let roundtrip_plan = roundtrip_test_and_return(Arc::new(FileSinkExec::new( + let roundtrip_plan = roundtrip_test_and_return(Arc::new(DataSinkExec::new( input, data_sink, schema.clone(), @@ -906,7 +906,7 @@ fn roundtrip_csv_sink() -> Result<()> { let roundtrip_plan = roundtrip_plan .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); let csv_sink = roundtrip_plan .sink() @@ -948,7 +948,7 @@ fn roundtrip_parquet_sink() -> Result<()> { }), )]; - roundtrip_test(Arc::new(FileSinkExec::new( + roundtrip_test(Arc::new(DataSinkExec::new( input, data_sink, schema.clone(), diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 75f1ccb07aac..0991868cdf5d 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -149,7 +149,7 @@ logical_plan CopyTo: format=parquet output_url=test_files/scratch/copy/table/ options: (format.compression zstd(10)) --TableScan: source_table projection=[col1, col2] physical_plan -FileSinkExec: sink=ParquetSink(file_groups=[]) +DataSinkExec: sink=ParquetSink(file_groups=[]) --MemoryExec: partitions=1, partition_sizes=[1] # Error case @@ -163,7 +163,7 @@ logical_plan CopyTo: format=parquet output_url=test_files/scratch/copy/table/ options: () --TableScan: source_table projection=[col1, col2] physical_plan -FileSinkExec: sink=ParquetSink(file_groups=[]) +DataSinkExec: sink=ParquetSink(file_groups=[]) --MemoryExec: partitions=1, partition_sizes=[1] # Copy more files to directory via query diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index b7ad36dace16..a38d254e051f 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -168,7 +168,7 @@ Dml: op=[Insert Into] table=[sink_table] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13] physical_plan -FileSinkExec: sink=StreamWrite { location: "../../testing/data/csv/aggregate_test_100.csv", batch_size: 8192, encoding: Csv, header: true, .. } +DataSinkExec: sink=StreamWrite { location: "../../testing/data/csv/aggregate_test_100.csv", batch_size: 8192, encoding: Csv, header: true, .. } --SortExec: expr=[c1@0 ASC NULLS LAST] ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true diff --git a/datafusion/sqllogictest/test_files/insert.slt b/datafusion/sqllogictest/test_files/insert.slt index b3fbb33e68e7..b16a169598e7 100644 --- a/datafusion/sqllogictest/test_files/insert.slt +++ b/datafusion/sqllogictest/test_files/insert.slt @@ -64,7 +64,7 @@ Dml: op=[Insert Into] table=[table_without_values] --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=MemoryTable (partitions=1) +DataSinkExec: sink=MemoryTable (partitions=1) --ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2] ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST] ------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, c1@0 as c1] @@ -125,7 +125,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=MemoryTable (partitions=1) +DataSinkExec: sink=MemoryTable (partitions=1) --CoalescePartitionsExec ----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] ------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted] @@ -175,7 +175,7 @@ Dml: op=[Insert Into] table=[table_without_values] --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=MemoryTable (partitions=8) +DataSinkExec: sink=MemoryTable (partitions=8) --ProjectionExec: expr=[a1@0 as a1, a2@1 as a2] ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST] ------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1] @@ -217,7 +217,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1] physical_plan -FileSinkExec: sink=MemoryTable (partitions=1) +DataSinkExec: sink=MemoryTable (partitions=1) --SortExec: expr=[c1@0 ASC NULLS LAST] ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index 5f100953aff4..0033b070ec1a 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -125,7 +125,7 @@ Dml: op=[Insert Into] table=[ordered_insert_test] --Projection: column1 AS a, column2 AS b ----Values: (Int64(5), Int64(1)), (Int64(4), Int64(2)), (Int64(7), Int64(7)), (Int64(7), Int64(8)), (Int64(7), Int64(9))... physical_plan -FileSinkExec: sink=CsvSink(file_groups=[]) +DataSinkExec: sink=CsvSink(file_groups=[]) --SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC] ----ProjectionExec: expr=[column1@0 as a, column2@1 as b] ------ValuesExec @@ -353,7 +353,7 @@ Dml: op=[Insert Into] table=[table_without_values] --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=ParquetSink(file_groups=[]) +DataSinkExec: sink=ParquetSink(file_groups=[]) --ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2] ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST] ------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, c1@0 as c1] @@ -415,7 +415,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=ParquetSink(file_groups=[]) +DataSinkExec: sink=ParquetSink(file_groups=[]) --CoalescePartitionsExec ----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] ------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted] @@ -458,7 +458,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1] physical_plan -FileSinkExec: sink=ParquetSink(file_groups=[]) +DataSinkExec: sink=ParquetSink(file_groups=[]) --SortExec: expr=[c1@0 ASC NULLS LAST] ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true From d698d9deb6d06214ea36cc59739993ddf6441b6a Mon Sep 17 00:00:00 2001 From: advancedxy Date: Sun, 14 Apr 2024 04:05:38 +0800 Subject: [PATCH 3/4] fix: Support Dict types in `in_list` physical plans (#10031) * fix: Relax type check with dict types in in_list * refine comments * fix style, refine comments and address reviewer's comments * refine comments * address comments --- .../physical-expr/src/expressions/in_list.rs | 126 +++++++++++++++++- .../sqllogictest/test_files/dictionary.slt | 39 ++++++ 2 files changed, 161 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index ecdb03e97ee3..07185b4d6527 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -415,6 +415,18 @@ impl PartialEq for InListExpr { } } +/// Checks if two types are logically equal, dictionary types are compared by their value types. +fn is_logically_eq(lhs: &DataType, rhs: &DataType) -> bool { + match (lhs, rhs) { + (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => { + v1.as_ref().eq(v2.as_ref()) + } + (DataType::Dictionary(_, l), _) => l.as_ref().eq(rhs), + (_, DataType::Dictionary(_, r)) => lhs.eq(r.as_ref()), + _ => lhs.eq(rhs), + } +} + /// Creates a unary expression InList pub fn in_list( expr: Arc, @@ -426,7 +438,7 @@ pub fn in_list( let expr_data_type = expr.data_type(schema)?; for list_expr in list.iter() { let list_expr_data_type = list_expr.data_type(schema)?; - if !expr_data_type.eq(&list_expr_data_type) { + if !is_logically_eq(&expr_data_type, &list_expr_data_type) { return internal_err!( "The data type inlist should be same, the value type is {expr_data_type}, one of list expr type is {list_expr_data_type}" ); @@ -499,7 +511,21 @@ mod tests { macro_rules! in_list { ($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr, $SCHEMA:expr) => {{ let (cast_expr, cast_list_exprs) = in_list_cast($COL, $LIST, $SCHEMA)?; - let expr = in_list(cast_expr, cast_list_exprs, $NEGATED, $SCHEMA).unwrap(); + in_list_raw!( + $BATCH, + cast_list_exprs, + $NEGATED, + $EXPECTED, + cast_expr, + $SCHEMA + ); + }}; + } + + // applies the in_list expr to an input batch and list without cast + macro_rules! in_list_raw { + ($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr, $SCHEMA:expr) => {{ + let expr = in_list($COL, $LIST, $NEGATED, $SCHEMA).unwrap(); let result = expr .evaluate(&$BATCH)? .into_array($BATCH.num_rows()) @@ -540,7 +566,7 @@ mod tests { &schema ); - // expression: "a not in ("a", "b")" + // expression: "a in ("a", "b", null)" let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))]; in_list!( batch, @@ -551,7 +577,7 @@ mod tests { &schema ); - // expression: "a not in ("a", "b")" + // expression: "a not in ("a", "b", null)" let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))]; in_list!( batch, @@ -1314,4 +1340,96 @@ mod tests { Ok(()) } + + #[test] + fn in_list_utf8_with_dict_types() -> Result<()> { + fn dict_lit(key_type: DataType, value: &str) -> Arc { + lit(ScalarValue::Dictionary( + Box::new(key_type), + Box::new(ScalarValue::new_utf8(value.to_string())), + )) + } + + fn null_dict_lit(key_type: DataType) -> Arc { + lit(ScalarValue::Dictionary( + Box::new(key_type), + Box::new(ScalarValue::Utf8(None)), + )) + } + + let schema = Schema::new(vec![Field::new( + "a", + DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), + true, + )]); + let a: UInt16DictionaryArray = + vec![Some("a"), Some("d"), None].into_iter().collect(); + let col_a = col("a", &schema)?; + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?; + + // expression: "a in ("a", "b")" + let lists = [ + vec![lit("a"), lit("b")], + vec![ + dict_lit(DataType::Int8, "a"), + dict_lit(DataType::UInt16, "b"), + ], + ]; + for list in lists.iter() { + in_list_raw!( + batch, + list.clone(), + &false, + vec![Some(true), Some(false), None], + col_a.clone(), + &schema + ); + } + + // expression: "a not in ("a", "b")" + for list in lists.iter() { + in_list_raw!( + batch, + list.clone(), + &true, + vec![Some(false), Some(true), None], + col_a.clone(), + &schema + ); + } + + // expression: "a in ("a", "b", null)" + let lists = [ + vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))], + vec![ + dict_lit(DataType::Int8, "a"), + dict_lit(DataType::UInt16, "b"), + null_dict_lit(DataType::UInt16), + ], + ]; + for list in lists.iter() { + in_list_raw!( + batch, + list.clone(), + &false, + vec![Some(true), None, None], + col_a.clone(), + &schema + ); + } + + // expression: "a not in ("a", "b", null)" + for list in lists.iter() { + in_list_raw!( + batch, + list.clone(), + &true, + vec![Some(false), None, None], + col_a.clone(), + &schema + ); + } + + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/dictionary.slt b/datafusion/sqllogictest/test_files/dictionary.slt index af7bf5cb16e8..891a09fbc177 100644 --- a/datafusion/sqllogictest/test_files/dictionary.slt +++ b/datafusion/sqllogictest/test_files/dictionary.slt @@ -87,6 +87,22 @@ f3 Utf8 YES f4 Float64 YES time Timestamp(Nanosecond, None) YES +# in list with dictionary input +query BBB +SELECT + tag_id in ('1000'), '1000' in (tag_id, null), arrow_cast('999','Dictionary(Int32, Utf8)') in (tag_id, null) +FROM m1 +---- +true true NULL +true true NULL +true true NULL +true true NULL +true true NULL +true true NULL +true true NULL +true true NULL +true true NULL +true true NULL # Table m2 with a tag columns `tag_id` and `type`, a field column `f5`, and `time` statement ok @@ -165,6 +181,29 @@ order by date_bin('30 minutes', time) DESC 3 400 600 500 2023-12-04T00:30:00 3 100 300 200 2023-12-04T00:00:00 +# query with in list +query BBBBBBBB +SELECT + type in ('active', 'passive') + , 'active' in (type) + , 'active' in (type, null) + , arrow_cast('passive','Dictionary(Int8, Utf8)') in (type, null) + , tag_id in ('1000', '2000') + , tag_id in ('999') + , '1000' in (tag_id, null) + , arrow_cast('999','Dictionary(Int16, Utf8)') in (tag_id, null) +FROM m2 +---- +true true true NULL true false true NULL +true true true NULL true false true NULL +true true true NULL true false true NULL +true true true NULL true false true NULL +true true true NULL true false true NULL +true true true NULL true false true NULL +true false NULL true true false true NULL +true false NULL true true false true NULL +true false NULL true true false true NULL +true false NULL true true false true NULL # Reproducer for https://github.com/apache/arrow-datafusion/issues/8738 From 671cef85c550969ab2c86d644968a048cb181c0c Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Sun, 14 Apr 2024 11:26:49 +0800 Subject: [PATCH 4/4] Prune pages are all null in ParquetExec by row_counts and fix NOT NULL prune (#10051) * Prune pages are all null in ParquetExec by row_counts and fix NOT NULL prune * fix clippy * Update datafusion/core/src/physical_optimizer/pruning.rs Co-authored-by: Andrew Lamb * Update datafusion/core/tests/parquet/page_pruning.rs Co-authored-by: Andrew Lamb * Update datafusion/core/tests/parquet/page_pruning.rs Co-authored-by: Andrew Lamb * Update datafusion/core/tests/parquet/page_pruning.rs Co-authored-by: Andrew Lamb * Update datafusion/core/tests/parquet/page_pruning.rs Co-authored-by: Andrew Lamb * remove allocate vec * better way avoid allocate vec * simply expr --------- Co-authored-by: Andrew Lamb --- .../physical_plan/parquet/page_filter.rs | 16 ++++- .../core/src/physical_optimizer/pruning.rs | 49 ++++++++------ datafusion/core/tests/parquet/mod.rs | 65 ++++++++++++++++--- datafusion/core/tests/parquet/page_pruning.rs | 51 +++++++++++++++ .../core/tests/parquet/row_group_pruning.rs | 2 +- 5 files changed, 153 insertions(+), 30 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index c7706f3458d0..402cc106492e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -314,6 +314,7 @@ fn prune_pages_in_one_row_group( col_page_indexes, col_offset_indexes, target_type: &target_type, + num_rows_in_row_group: group.num_rows(), }; match predicate.prune(&pruning_stats) { @@ -385,6 +386,7 @@ struct PagesPruningStatistics<'a> { // target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the // real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY` target_type: &'a Option, + num_rows_in_row_group: i64, } // Extract the min or max value calling `func` from page idex @@ -548,7 +550,19 @@ impl<'a> PruningStatistics for PagesPruningStatistics<'a> { } fn row_counts(&self, _column: &datafusion_common::Column) -> Option { - None + // see https://github.com/apache/arrow-rs/blob/91f0b1771308609ca27db0fb1d2d49571b3980d8/parquet/src/file/metadata.rs#L979-L982 + + let row_count_per_page = self.col_offset_indexes.windows(2).map(|location| { + Some(location[1].first_row_index - location[0].first_row_index) + }); + + // append the last page row count + let row_count_per_page = row_count_per_page.chain(std::iter::once(Some( + self.num_rows_in_row_group + - self.col_offset_indexes.last().unwrap().first_row_index, + ))); + + Some(Arc::new(Int64Array::from_iter(row_count_per_page))) } fn contained( diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index ebb811408fb3..d8a3814d77e1 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -335,7 +335,7 @@ pub trait PruningStatistics { /// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 END` /// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN false ELSE y_min <= 10 AND 10 <= y_max END` /// `x IS NULL` | `x_null_count > 0` -/// `x IS NOT NULL` | `x_null_count = 0` +/// `x IS NOT NULL` | `x_null_count != row_count` /// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END` /// /// ## Predicate Evaluation @@ -1240,10 +1240,10 @@ fn build_single_column_expr( /// returns a pruning expression in terms of IsNull that will evaluate to true /// if the column may contain null, and false if definitely does not /// contain null. -/// If set `with_not` to true: which means is not null -/// Given an expression reference to `expr`, if `expr` is a column expression, -/// returns a pruning expression in terms of IsNotNull that will evaluate to true -/// if the column not contain any null, and false if definitely contain null. +/// If `with_not` is true, build a pruning expression for `col IS NOT NULL`: `col_count != col_null_count` +/// The pruning expression evaluates to true ONLY if the column definitely CONTAINS +/// at least one NULL value. In this case we can know that `IS NOT NULL` can not be true and +/// thus can prune the row group / value fn build_is_null_column_expr( expr: &Arc, schema: &Schema, @@ -1254,26 +1254,37 @@ fn build_is_null_column_expr( let field = schema.field_with_name(col.name()).ok()?; let null_count_field = &Field::new(field.name(), DataType::UInt64, true); - required_columns - .null_count_column_expr(col, expr, null_count_field) - .map(|null_count_column_expr| { - if with_not { - // IsNotNull(column) => null_count = 0 - Arc::new(phys_expr::BinaryExpr::new( - null_count_column_expr, - Operator::Eq, - Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))), - )) as _ - } else { + if with_not { + if let Ok(row_count_expr) = + required_columns.row_count_column_expr(col, expr, null_count_field) + { + required_columns + .null_count_column_expr(col, expr, null_count_field) + .map(|null_count_column_expr| { + // IsNotNull(column) => null_count != row_count + Arc::new(phys_expr::BinaryExpr::new( + null_count_column_expr, + Operator::NotEq, + row_count_expr, + )) as _ + }) + .ok() + } else { + None + } + } else { + required_columns + .null_count_column_expr(col, expr, null_count_field) + .map(|null_count_column_expr| { // IsNull(column) => null_count > 0 Arc::new(phys_expr::BinaryExpr::new( null_count_column_expr, Operator::Gt, Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))), )) as _ - } - }) - .ok() + }) + .ok() + } } else { None } diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index f36afe1976b1..f90d0e8afb4c 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -28,7 +28,7 @@ use arrow::{ record_batch::RecordBatch, util::pretty::pretty_format_batches, }; -use arrow_array::new_null_array; +use arrow_array::make_array; use chrono::{Datelike, Duration, TimeDelta}; use datafusion::{ datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider}, @@ -77,6 +77,7 @@ enum Scenario { ByteArray, PeriodsInColumnNames, WithNullValues, + WithNullValuesPageLevel, } enum Unit { @@ -632,8 +633,13 @@ fn make_names_batch(name: &str, service_name_values: Vec<&str>) -> RecordBatch { RecordBatch::try_new(schema, vec![Arc::new(name), Arc::new(service_name)]).unwrap() } -/// Return record batch with i8, i16, i32, and i64 sequences with all Null values -fn make_all_null_values() -> RecordBatch { +/// Return record batch with i8, i16, i32, and i64 sequences with Null values +/// here 5 rows in page when using Unit::Page +fn make_int_batches_with_null( + null_values: usize, + no_null_values_start: usize, + no_null_values_end: usize, +) -> RecordBatch { let schema = Arc::new(Schema::new(vec![ Field::new("i8", DataType::Int8, true), Field::new("i16", DataType::Int16, true), @@ -641,13 +647,46 @@ fn make_all_null_values() -> RecordBatch { Field::new("i64", DataType::Int64, true), ])); + let v8: Vec = (no_null_values_start as _..no_null_values_end as _).collect(); + let v16: Vec = (no_null_values_start as _..no_null_values_end as _).collect(); + let v32: Vec = (no_null_values_start as _..no_null_values_end as _).collect(); + let v64: Vec = (no_null_values_start as _..no_null_values_end as _).collect(); + RecordBatch::try_new( schema, vec![ - new_null_array(&DataType::Int8, 5), - new_null_array(&DataType::Int16, 5), - new_null_array(&DataType::Int32, 5), - new_null_array(&DataType::Int64, 5), + make_array( + Int8Array::from_iter( + v8.into_iter() + .map(Some) + .chain(std::iter::repeat(None).take(null_values)), + ) + .to_data(), + ), + make_array( + Int16Array::from_iter( + v16.into_iter() + .map(Some) + .chain(std::iter::repeat(None).take(null_values)), + ) + .to_data(), + ), + make_array( + Int32Array::from_iter( + v32.into_iter() + .map(Some) + .chain(std::iter::repeat(None).take(null_values)), + ) + .to_data(), + ), + make_array( + Int64Array::from_iter( + v64.into_iter() + .map(Some) + .chain(std::iter::repeat(None).take(null_values)), + ) + .to_data(), + ), ], ) .unwrap() @@ -824,9 +863,17 @@ fn create_data_batch(scenario: Scenario) -> Vec { } Scenario::WithNullValues => { vec![ - make_all_null_values(), + make_int_batches_with_null(5, 0, 0), make_int_batches(1, 6), - make_all_null_values(), + make_int_batches_with_null(5, 0, 0), + ] + } + Scenario::WithNullValuesPageLevel => { + vec![ + make_int_batches_with_null(5, 1, 6), + make_int_batches(1, 11), + make_int_batches_with_null(1, 1, 10), + make_int_batches_with_null(5, 1, 6), ] } } diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index da9617f13ee9..1615a1c5766a 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -871,6 +871,57 @@ async fn without_pushdown_filter() { assert!(bytes_scanned_with_filter > bytes_scanned_without_filter); } +#[tokio::test] +// Data layout like this: +// row_group1: page1(1~5), page2(All Null) +// row_group2: page1(1~5), page2(6~10) +// row_group3: page1(1~5), page2(6~9 + Null) +// row_group4: page1(1~5), page2(All Null) +// total 40 rows +async fn test_pages_with_null_values() { + test_prune( + Scenario::WithNullValuesPageLevel, + "SELECT * FROM t where i8 <= 6", + Some(0), + // expect prune pages with all null or pages that have only values greater than 6 + // (row_group1, page2), (row_group4, page2) + Some(10), + 22, + ) + .await; + + test_prune( + Scenario::WithNullValuesPageLevel, + "SELECT * FROM t where \"i16\" is not null", + Some(0), + // expect prune (row_group1, page2) and (row_group4, page2) = 10 rows + Some(10), + 29, + ) + .await; + + test_prune( + Scenario::WithNullValuesPageLevel, + "SELECT * FROM t where \"i32\" is null", + Some(0), + // expect prune (row_group1, page1), (row_group2, page1+2), (row_group3, page1), (row_group3, page1) = 25 rows + Some(25), + 11, + ) + .await; + + test_prune( + Scenario::WithNullValuesPageLevel, + "SELECT * FROM t where \"i64\" > 6", + Some(0), + // expect to prune pages where i is all null, or where always <= 5 + // (row_group1, page1+2), (row_group2, page1), (row_group3, page1) (row_group4, page1+2) = 30 rows + Some(30), + 7, + ) + .await; +} + fn cast_count_metric(metric: MetricValue) -> Option { match metric { MetricValue::Count { count, .. } => Some(count.value()), diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index 29bf1ef0a8d4..b3f1fec1753b 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -1296,7 +1296,7 @@ async fn test_row_group_with_null_values() { .test_row_group_prune() .await; - // After pruning, only row group 2should be selected + // After pruning, only row group 2 should be selected RowGroupPruningTest::new() .with_scenario(Scenario::WithNullValues) .with_query("SELECT * FROM t WHERE \"i16\" is Not Null")