diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 99bfbbad9d10..9d58465191e1 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -43,7 +43,7 @@ use arrow_schema::{ArrowError, Schema, SchemaRef}; use datafusion_common::{not_impl_err, DataFusionError, FileType, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; -use datafusion_physical_plan::insert::{DataSink, FileSinkExec}; +use datafusion_physical_plan::insert::{DataSink, DataSinkExec}; use datafusion_physical_plan::metrics::MetricsSet; use async_trait::async_trait; @@ -129,7 +129,7 @@ impl FileFormat for ArrowFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(ArrowFileSink::new(conf)); - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, sink, sink_schema, diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index a7849258329b..84235cde0f5d 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -31,7 +31,7 @@ use crate::datasource::physical_plan::{ }; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::insert::{DataSink, FileSinkExec}; +use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; @@ -267,7 +267,7 @@ impl FileFormat for CsvFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(CsvSink::new(conf, writer_options)); - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, sink, sink_schema, diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 0cc38bbb5554..efc0aa4328d8 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -31,7 +31,7 @@ use crate::datasource::physical_plan::FileGroupDisplay; use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec}; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::insert::{DataSink, FileSinkExec}; +use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::{ DisplayAs, DisplayFormatType, SendableRecordBatchStream, Statistics, }; @@ -177,7 +177,7 @@ impl FileFormat for JsonFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(JsonSink::new(conf, writer_options)); - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, sink, sink_schema, diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index bcf4e8a2c8e4..1d41f015121d 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -37,7 +37,7 @@ use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; -use crate::physical_plan::insert::{DataSink, FileSinkExec}; +use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::{ Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, Statistics, @@ -279,7 +279,7 @@ impl FileFormat for ParquetFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(ParquetSink::new(conf, self.options.clone())); - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, sink, sink_schema, diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 608a46144da3..42e05ebeb33f 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -26,7 +26,7 @@ use crate::datasource::{TableProvider, TableType}; use crate::error::Result; use crate::execution::context::SessionState; use crate::logical_expr::Expr; -use crate::physical_plan::insert::{DataSink, FileSinkExec}; +use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::{ @@ -279,7 +279,7 @@ impl TableProvider for MemTable { return not_impl_err!("Overwrite not implemented for MemoryTable yet"); } let sink = Arc::new(MemSink::new(self.batches.clone())); - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, sink, self.schema.clone(), diff --git a/datafusion/core/src/datasource/provider.rs b/datafusion/core/src/datasource/provider.rs index 100011952b3b..7c58aded3108 100644 --- a/datafusion/core/src/datasource/provider.rs +++ b/datafusion/core/src/datasource/provider.rs @@ -269,10 +269,10 @@ pub trait TableProvider: Sync + Send { /// /// # See Also /// - /// See [`FileSinkExec`] for the common pattern of inserting a + /// See [`DataSinkExec`] for the common pattern of inserting a /// streams of `RecordBatch`es as files to an ObjectStore. /// - /// [`FileSinkExec`]: crate::physical_plan::insert::FileSinkExec + /// [`DataSinkExec`]: crate::physical_plan::insert::DataSinkExec async fn insert_into( &self, _state: &SessionState, diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index 079c1a891d14..2059a5ffcfe4 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -34,7 +34,7 @@ use datafusion_common::{plan_err, Constraints, DataFusionError, Result}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::{CreateExternalTable, Expr, TableType}; -use datafusion_physical_plan::insert::{DataSink, FileSinkExec}; +use datafusion_physical_plan::insert::{DataSink, DataSinkExec}; use datafusion_physical_plan::metrics::MetricsSet; use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder; use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; @@ -277,7 +277,7 @@ impl TableProvider for StreamTable { None => None, }; - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, Arc::new(StreamWrite(self.0.clone())), self.0.schema.clone(), diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index ecdb03e97ee3..07185b4d6527 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -415,6 +415,18 @@ impl PartialEq for InListExpr { } } +/// Checks if two types are logically equal, dictionary types are compared by their value types. +fn is_logically_eq(lhs: &DataType, rhs: &DataType) -> bool { + match (lhs, rhs) { + (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => { + v1.as_ref().eq(v2.as_ref()) + } + (DataType::Dictionary(_, l), _) => l.as_ref().eq(rhs), + (_, DataType::Dictionary(_, r)) => lhs.eq(r.as_ref()), + _ => lhs.eq(rhs), + } +} + /// Creates a unary expression InList pub fn in_list( expr: Arc, @@ -426,7 +438,7 @@ pub fn in_list( let expr_data_type = expr.data_type(schema)?; for list_expr in list.iter() { let list_expr_data_type = list_expr.data_type(schema)?; - if !expr_data_type.eq(&list_expr_data_type) { + if !is_logically_eq(&expr_data_type, &list_expr_data_type) { return internal_err!( "The data type inlist should be same, the value type is {expr_data_type}, one of list expr type is {list_expr_data_type}" ); @@ -499,7 +511,21 @@ mod tests { macro_rules! in_list { ($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr, $SCHEMA:expr) => {{ let (cast_expr, cast_list_exprs) = in_list_cast($COL, $LIST, $SCHEMA)?; - let expr = in_list(cast_expr, cast_list_exprs, $NEGATED, $SCHEMA).unwrap(); + in_list_raw!( + $BATCH, + cast_list_exprs, + $NEGATED, + $EXPECTED, + cast_expr, + $SCHEMA + ); + }}; + } + + // applies the in_list expr to an input batch and list without cast + macro_rules! in_list_raw { + ($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr, $SCHEMA:expr) => {{ + let expr = in_list($COL, $LIST, $NEGATED, $SCHEMA).unwrap(); let result = expr .evaluate(&$BATCH)? .into_array($BATCH.num_rows()) @@ -540,7 +566,7 @@ mod tests { &schema ); - // expression: "a not in ("a", "b")" + // expression: "a in ("a", "b", null)" let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))]; in_list!( batch, @@ -551,7 +577,7 @@ mod tests { &schema ); - // expression: "a not in ("a", "b")" + // expression: "a not in ("a", "b", null)" let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))]; in_list!( batch, @@ -1314,4 +1340,96 @@ mod tests { Ok(()) } + + #[test] + fn in_list_utf8_with_dict_types() -> Result<()> { + fn dict_lit(key_type: DataType, value: &str) -> Arc { + lit(ScalarValue::Dictionary( + Box::new(key_type), + Box::new(ScalarValue::new_utf8(value.to_string())), + )) + } + + fn null_dict_lit(key_type: DataType) -> Arc { + lit(ScalarValue::Dictionary( + Box::new(key_type), + Box::new(ScalarValue::Utf8(None)), + )) + } + + let schema = Schema::new(vec![Field::new( + "a", + DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), + true, + )]); + let a: UInt16DictionaryArray = + vec![Some("a"), Some("d"), None].into_iter().collect(); + let col_a = col("a", &schema)?; + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?; + + // expression: "a in ("a", "b")" + let lists = [ + vec![lit("a"), lit("b")], + vec![ + dict_lit(DataType::Int8, "a"), + dict_lit(DataType::UInt16, "b"), + ], + ]; + for list in lists.iter() { + in_list_raw!( + batch, + list.clone(), + &false, + vec![Some(true), Some(false), None], + col_a.clone(), + &schema + ); + } + + // expression: "a not in ("a", "b")" + for list in lists.iter() { + in_list_raw!( + batch, + list.clone(), + &true, + vec![Some(false), Some(true), None], + col_a.clone(), + &schema + ); + } + + // expression: "a in ("a", "b", null)" + let lists = [ + vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))], + vec![ + dict_lit(DataType::Int8, "a"), + dict_lit(DataType::UInt16, "b"), + null_dict_lit(DataType::UInt16), + ], + ]; + for list in lists.iter() { + in_list_raw!( + batch, + list.clone(), + &false, + vec![Some(true), None, None], + col_a.clone(), + &schema + ); + } + + // expression: "a not in ("a", "b", null)" + for list in lists.iter() { + in_list_raw!( + batch, + list.clone(), + &true, + vec![Some(false), None, None], + col_a.clone(), + &schema + ); + } + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index f0233264f280..e3f9f2c76d31 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -74,10 +74,13 @@ pub trait DataSink: DisplayAs + Debug + Send + Sync { ) -> Result; } +#[deprecated(since = "38.0.0", note = "Use [`DataSinkExec`] instead")] +pub type FileSinkExec = DataSinkExec; + /// Execution plan for writing record batches to a [`DataSink`] /// /// Returns a single row with the number of values written -pub struct FileSinkExec { +pub struct DataSinkExec { /// Input plan that produces the record batches to be written. input: Arc, /// Sink to which to write @@ -91,13 +94,13 @@ pub struct FileSinkExec { cache: PlanProperties, } -impl fmt::Debug for FileSinkExec { +impl fmt::Debug for DataSinkExec { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "FileSinkExec schema: {:?}", self.count_schema) + write!(f, "DataSinkExec schema: {:?}", self.count_schema) } } -impl FileSinkExec { +impl DataSinkExec { /// Create a plan to write to `sink` pub fn new( input: Arc, @@ -190,7 +193,7 @@ impl FileSinkExec { } } -impl DisplayAs for FileSinkExec { +impl DisplayAs for DataSinkExec { fn fmt_as( &self, t: DisplayFormatType, @@ -198,16 +201,16 @@ impl DisplayAs for FileSinkExec { ) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "FileSinkExec: sink=")?; + write!(f, "DataSinkExec: sink=")?; self.sink.fmt_as(t, f) } } } } -impl ExecutionPlan for FileSinkExec { +impl ExecutionPlan for DataSinkExec { fn name(&self) -> &'static str { - "FileSinkExec" + "DataSinkExec" } /// Return a reference to Any that can be used for downcasting @@ -269,7 +272,7 @@ impl ExecutionPlan for FileSinkExec { context: Arc, ) -> Result { if partition != 0 { - return internal_err!("FileSinkExec can only be called on partition 0!"); + return internal_err!("DataSinkExec can only be called on partition 0!"); } let data = self.execute_input_stream(0, context.clone())?; diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 4d5d6cadad17..4d95c847bf99 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -58,7 +58,7 @@ use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::explain::ExplainExec; use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::insert::FileSinkExec; +use datafusion::physical_plan::insert::DataSinkExec; use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter}; use datafusion::physical_plan::joins::{ CrossJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec, @@ -1033,7 +1033,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .map(|item| PhysicalSortRequirement::from_sort_exprs(&item)) }) .transpose()?; - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, Arc::new(data_sink), Arc::new(sink_schema), @@ -1063,7 +1063,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .map(|item| PhysicalSortRequirement::from_sort_exprs(&item)) }) .transpose()?; - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, Arc::new(data_sink), Arc::new(sink_schema), @@ -1093,7 +1093,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .map(|item| PhysicalSortRequirement::from_sort_exprs(&item)) }) .transpose()?; - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, Arc::new(data_sink), Arc::new(sink_schema), @@ -1879,7 +1879,7 @@ impl AsExecutionPlan for PhysicalPlanNode { }); } - if let Some(exec) = plan.downcast_ref::() { + if let Some(exec) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.input().to_owned(), extension_codec, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index a74b1a38935b..f97cfea765bf 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -48,7 +48,7 @@ use datafusion::physical_plan::expressions::{ NotExpr, NthValue, PhysicalSortExpr, StringAgg, Sum, }; use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::insert::FileSinkExec; +use datafusion::physical_plan::insert::DataSinkExec; use datafusion::physical_plan::joins::{ HashJoinExec, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode, }; @@ -861,7 +861,7 @@ fn roundtrip_json_sink() -> Result<()> { }), )]; - roundtrip_test(Arc::new(FileSinkExec::new( + roundtrip_test(Arc::new(DataSinkExec::new( input, data_sink, schema.clone(), @@ -896,7 +896,7 @@ fn roundtrip_csv_sink() -> Result<()> { }), )]; - let roundtrip_plan = roundtrip_test_and_return(Arc::new(FileSinkExec::new( + let roundtrip_plan = roundtrip_test_and_return(Arc::new(DataSinkExec::new( input, data_sink, schema.clone(), @@ -906,7 +906,7 @@ fn roundtrip_csv_sink() -> Result<()> { let roundtrip_plan = roundtrip_plan .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); let csv_sink = roundtrip_plan .sink() @@ -948,7 +948,7 @@ fn roundtrip_parquet_sink() -> Result<()> { }), )]; - roundtrip_test(Arc::new(FileSinkExec::new( + roundtrip_test(Arc::new(DataSinkExec::new( input, data_sink, schema.clone(), diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index c348f2cddc93..384c5b7153c3 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -48,7 +48,7 @@ object_store = { workspace = true } postgres-protocol = { version = "0.6.4", optional = true } postgres-types = { version = "0.2.4", optional = true } rust_decimal = { version = "1.27.0" } -sqllogictest = "0.19.0" +sqllogictest = "0.20.0" sqlparser = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 75f1ccb07aac..0991868cdf5d 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -149,7 +149,7 @@ logical_plan CopyTo: format=parquet output_url=test_files/scratch/copy/table/ options: (format.compression zstd(10)) --TableScan: source_table projection=[col1, col2] physical_plan -FileSinkExec: sink=ParquetSink(file_groups=[]) +DataSinkExec: sink=ParquetSink(file_groups=[]) --MemoryExec: partitions=1, partition_sizes=[1] # Error case @@ -163,7 +163,7 @@ logical_plan CopyTo: format=parquet output_url=test_files/scratch/copy/table/ options: () --TableScan: source_table projection=[col1, col2] physical_plan -FileSinkExec: sink=ParquetSink(file_groups=[]) +DataSinkExec: sink=ParquetSink(file_groups=[]) --MemoryExec: partitions=1, partition_sizes=[1] # Copy more files to directory via query diff --git a/datafusion/sqllogictest/test_files/dictionary.slt b/datafusion/sqllogictest/test_files/dictionary.slt index af7bf5cb16e8..891a09fbc177 100644 --- a/datafusion/sqllogictest/test_files/dictionary.slt +++ b/datafusion/sqllogictest/test_files/dictionary.slt @@ -87,6 +87,22 @@ f3 Utf8 YES f4 Float64 YES time Timestamp(Nanosecond, None) YES +# in list with dictionary input +query BBB +SELECT + tag_id in ('1000'), '1000' in (tag_id, null), arrow_cast('999','Dictionary(Int32, Utf8)') in (tag_id, null) +FROM m1 +---- +true true NULL +true true NULL +true true NULL +true true NULL +true true NULL +true true NULL +true true NULL +true true NULL +true true NULL +true true NULL # Table m2 with a tag columns `tag_id` and `type`, a field column `f5`, and `time` statement ok @@ -165,6 +181,29 @@ order by date_bin('30 minutes', time) DESC 3 400 600 500 2023-12-04T00:30:00 3 100 300 200 2023-12-04T00:00:00 +# query with in list +query BBBBBBBB +SELECT + type in ('active', 'passive') + , 'active' in (type) + , 'active' in (type, null) + , arrow_cast('passive','Dictionary(Int8, Utf8)') in (type, null) + , tag_id in ('1000', '2000') + , tag_id in ('999') + , '1000' in (tag_id, null) + , arrow_cast('999','Dictionary(Int16, Utf8)') in (tag_id, null) +FROM m2 +---- +true true true NULL true false true NULL +true true true NULL true false true NULL +true true true NULL true false true NULL +true true true NULL true false true NULL +true true true NULL true false true NULL +true true true NULL true false true NULL +true false NULL true true false true NULL +true false NULL true true false true NULL +true false NULL true true false true NULL +true false NULL true true false true NULL # Reproducer for https://github.com/apache/arrow-datafusion/issues/8738 diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index b7ad36dace16..a38d254e051f 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -168,7 +168,7 @@ Dml: op=[Insert Into] table=[sink_table] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13] physical_plan -FileSinkExec: sink=StreamWrite { location: "../../testing/data/csv/aggregate_test_100.csv", batch_size: 8192, encoding: Csv, header: true, .. } +DataSinkExec: sink=StreamWrite { location: "../../testing/data/csv/aggregate_test_100.csv", batch_size: 8192, encoding: Csv, header: true, .. } --SortExec: expr=[c1@0 ASC NULLS LAST] ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true diff --git a/datafusion/sqllogictest/test_files/insert.slt b/datafusion/sqllogictest/test_files/insert.slt index b3fbb33e68e7..b16a169598e7 100644 --- a/datafusion/sqllogictest/test_files/insert.slt +++ b/datafusion/sqllogictest/test_files/insert.slt @@ -64,7 +64,7 @@ Dml: op=[Insert Into] table=[table_without_values] --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=MemoryTable (partitions=1) +DataSinkExec: sink=MemoryTable (partitions=1) --ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2] ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST] ------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, c1@0 as c1] @@ -125,7 +125,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=MemoryTable (partitions=1) +DataSinkExec: sink=MemoryTable (partitions=1) --CoalescePartitionsExec ----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] ------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted] @@ -175,7 +175,7 @@ Dml: op=[Insert Into] table=[table_without_values] --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=MemoryTable (partitions=8) +DataSinkExec: sink=MemoryTable (partitions=8) --ProjectionExec: expr=[a1@0 as a1, a2@1 as a2] ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST] ------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1] @@ -217,7 +217,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1] physical_plan -FileSinkExec: sink=MemoryTable (partitions=1) +DataSinkExec: sink=MemoryTable (partitions=1) --SortExec: expr=[c1@0 ASC NULLS LAST] ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index 5f100953aff4..0033b070ec1a 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -125,7 +125,7 @@ Dml: op=[Insert Into] table=[ordered_insert_test] --Projection: column1 AS a, column2 AS b ----Values: (Int64(5), Int64(1)), (Int64(4), Int64(2)), (Int64(7), Int64(7)), (Int64(7), Int64(8)), (Int64(7), Int64(9))... physical_plan -FileSinkExec: sink=CsvSink(file_groups=[]) +DataSinkExec: sink=CsvSink(file_groups=[]) --SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC] ----ProjectionExec: expr=[column1@0 as a, column2@1 as b] ------ValuesExec @@ -353,7 +353,7 @@ Dml: op=[Insert Into] table=[table_without_values] --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=ParquetSink(file_groups=[]) +DataSinkExec: sink=ParquetSink(file_groups=[]) --ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2] ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST] ------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, c1@0 as c1] @@ -415,7 +415,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=ParquetSink(file_groups=[]) +DataSinkExec: sink=ParquetSink(file_groups=[]) --CoalescePartitionsExec ----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] ------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted] @@ -458,7 +458,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1] physical_plan -FileSinkExec: sink=ParquetSink(file_groups=[]) +DataSinkExec: sink=ParquetSink(file_groups=[]) --SortExec: expr=[c1@0 ASC NULLS LAST] ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true