From e2226b06ec6227a6385f247b48cb8676a4c8e2f4 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Tue, 2 Jan 2024 16:57:32 +0800 Subject: [PATCH 1/2] Use the original output when creating a canonicalized GPU file scan. The original output should be used when creating a canonicalized GPU file scan, because the rule `prunePartitionForFileSourceScan` in Plugin may remove the partition columns that are not used by the first downstream ProjectExec for some patterns, leading to some partition columns not exist in the finalized output. Then the `AttributeReference`s in some filters but not in the output will not be canonicalized. --------- Signed-off-by: Firestarman --- .../apache/spark/sql/rapids/GpuFileSourceScanExec.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala index 80acf2b0f9e..3e32399fb47 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -635,13 +635,13 @@ case class GpuFileSourceScanExec( override def doCanonicalize(): GpuFileSourceScanExec = { GpuFileSourceScanExec( relation, - output.map(QueryPlan.normalizeExpressions(_, output)), + originalOutput.map(QueryPlan.normalizeExpressions(_, originalOutput)), requiredSchema, QueryPlan.normalizePredicates( - filterUnusedDynamicPruningExpressions(partitionFilters), output), + filterUnusedDynamicPruningExpressions(partitionFilters), originalOutput), optionalBucketSet, optionalNumCoalescedBuckets, - QueryPlan.normalizePredicates(dataFilters, output), + QueryPlan.normalizePredicates(dataFilters, originalOutput), None, queryUsesInputFile, alluxioPathsMap = alluxioPathsMap)(rapidsConf) From 74b8aa041ca70aca71b38f6f60d12f9b1c61bbc1 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Tue, 2 Jan 2024 16:58:16 +0800 Subject: [PATCH 2/2] Add tests Signed-off-by: Firestarman --- .../GpuFileScanPrunePartitionSuite.scala | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuFileScanPrunePartitionSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuFileScanPrunePartitionSuite.scala index 41a52e01739..2cbb77234ae 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuFileScanPrunePartitionSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuFileScanPrunePartitionSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -85,4 +85,27 @@ class GpuFileScanPrunePartitionSuite extends SparkQueryCompareTestSuite { testGpuFileScanOutput(_.select("a").filter("a != 1"), conf, "b", "c") } } + + test("Canonicalized GpuFileSourceScans with partition columns pruned should be equal") { + withTempPath { file => + // Generate partitioned files. + withCpuSparkSession(spark => { + import spark.implicits._ + Seq((1, 11, "s1"), (2, 22, "s2"), (3, 33, "s3"), (4, 44, "s4"), (5, 55, "s5")) + .toDF("a", "b", "c").write.partitionBy("b", "c").parquet(file.getCanonicalPath) + }) + + def getDF: DataFrame = withGpuSparkSession(spark => + spark.read.parquet(file.toString).where("b == 33").select("a", "c"), + new SparkConf().set("spark.sql.sources.useV1SourceList", "parquet") + ) + + val plans = getDF.unionAll(getDF).queryExecution.executedPlan.collect { + case gfss: GpuFileSourceScanExec if gfss.requiredPartitionSchema.nonEmpty => gfss + } + assert(plans.length == 2, "Expect 2 GPU file scans") + assert(plans.head.canonicalized == plans.last.canonicalized, + "Canonicalized GPU file scans should be equal") + } + } }