Skip to content

Commit

Permalink
Revert "Fix the canonicalizing for GPU file scan (NVIDIA#10137)"
Browse files Browse the repository at this point in the history
This reverts commit ace4870.
  • Loading branch information
firestarman committed Jan 18, 2024
1 parent 6419da6 commit d84e8a8
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -635,13 +635,13 @@ case class GpuFileSourceScanExec(
override def doCanonicalize(): GpuFileSourceScanExec = {
GpuFileSourceScanExec(
relation,
originalOutput.map(QueryPlan.normalizeExpressions(_, originalOutput)),
output.map(QueryPlan.normalizeExpressions(_, output)),
requiredSchema,
QueryPlan.normalizePredicates(
filterUnusedDynamicPruningExpressions(partitionFilters), originalOutput),
filterUnusedDynamicPruningExpressions(partitionFilters), output),
optionalBucketSet,
optionalNumCoalescedBuckets,
QueryPlan.normalizePredicates(dataFilters, originalOutput),
QueryPlan.normalizePredicates(dataFilters, output),
None,
queryUsesInputFile,
alluxioPathsMap = alluxioPathsMap)(rapidsConf)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -85,27 +85,4 @@ class GpuFileScanPrunePartitionSuite extends SparkQueryCompareTestSuite {
testGpuFileScanOutput(_.select("a").filter("a != 1"), conf, "b", "c")
}
}

test("Canonicalized GpuFileSourceScans with partition columns pruned should be equal") {
withTempPath { file =>
// Generate partitioned files.
withCpuSparkSession(spark => {
import spark.implicits._
Seq((1, 11, "s1"), (2, 22, "s2"), (3, 33, "s3"), (4, 44, "s4"), (5, 55, "s5"))
.toDF("a", "b", "c").write.partitionBy("b", "c").parquet(file.getCanonicalPath)
})

def getDF: DataFrame = withGpuSparkSession(spark =>
spark.read.parquet(file.toString).where("b == 33").select("a", "c"),
new SparkConf().set("spark.sql.sources.useV1SourceList", "parquet")
)

val plans = getDF.unionAll(getDF).queryExecution.executedPlan.collect {
case gfss: GpuFileSourceScanExec if gfss.requiredPartitionSchema.nonEmpty => gfss
}
assert(plans.length == 2, "Expect 2 GPU file scans")
assert(plans.head.canonicalized == plans.last.canonicalized,
"Canonicalized GPU file scans should be equal")
}
}
}

0 comments on commit d84e8a8

Please sign in to comment.