Skip to content

Commit

Permalink
Fix the canonicalizing for GPU file scan (#10137)
Browse files Browse the repository at this point in the history
The original output should be used when creating a canonicalized GPU file scan.

Because the rule prunePartitionForFileSourceScan will remove partition columns that are not
used by the first downstream ProjectExec for some patterns, leading to some partition columns
not exist in the finalized output. Then the AttributeReferences in the partitionFilters but excluded
from the finalized output will not be canonicalized.

---------

Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman authored Jan 3, 2024
1 parent 2919d80 commit ace4870
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -635,13 +635,13 @@ case class GpuFileSourceScanExec(
override def doCanonicalize(): GpuFileSourceScanExec = {
GpuFileSourceScanExec(
relation,
output.map(QueryPlan.normalizeExpressions(_, output)),
originalOutput.map(QueryPlan.normalizeExpressions(_, originalOutput)),
requiredSchema,
QueryPlan.normalizePredicates(
filterUnusedDynamicPruningExpressions(partitionFilters), output),
filterUnusedDynamicPruningExpressions(partitionFilters), originalOutput),
optionalBucketSet,
optionalNumCoalescedBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
QueryPlan.normalizePredicates(dataFilters, originalOutput),
None,
queryUsesInputFile,
alluxioPathsMap = alluxioPathsMap)(rapidsConf)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -85,4 +85,27 @@ class GpuFileScanPrunePartitionSuite extends SparkQueryCompareTestSuite {
testGpuFileScanOutput(_.select("a").filter("a != 1"), conf, "b", "c")
}
}

test("Canonicalized GpuFileSourceScans with partition columns pruned should be equal") {
withTempPath { file =>
// Generate partitioned files.
withCpuSparkSession(spark => {
import spark.implicits._
Seq((1, 11, "s1"), (2, 22, "s2"), (3, 33, "s3"), (4, 44, "s4"), (5, 55, "s5"))
.toDF("a", "b", "c").write.partitionBy("b", "c").parquet(file.getCanonicalPath)
})

def getDF: DataFrame = withGpuSparkSession(spark =>
spark.read.parquet(file.toString).where("b == 33").select("a", "c"),
new SparkConf().set("spark.sql.sources.useV1SourceList", "parquet")
)

val plans = getDF.unionAll(getDF).queryExecution.executedPlan.collect {
case gfss: GpuFileSourceScanExec if gfss.requiredPartitionSchema.nonEmpty => gfss
}
assert(plans.length == 2, "Expect 2 GPU file scans")
assert(plans.head.canonicalized == plans.last.canonicalized,
"Canonicalized GPU file scans should be equal")
}
}
}

0 comments on commit ace4870

Please sign in to comment.