-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Translate PPL dedup
Command Part 2: allowedDuplication>1
#543
Conversation
Signed-off-by: Lantao Jin <[email protected]>
Signed-off-by: Lantao Jin <[email protected]>
Signed-off-by: Lantao Jin <[email protected]>
|
||
// Build Window | ||
visitFieldList(node.getFields(), context); | ||
Seq<Expression> partitionSpec = context.retainAllNamedParseExpressions(exp -> exp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the order of the dedup fields keeped? for instance, dedup 2, b, a
, will the output be a, b
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. the original order will be kept. I added several tests in IT to verify this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious, if enforece using window function for dedupe = 1, can we benchmark with #521?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CatalystQueryPlanVisitor
needs refactory - see comments above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi
visitDedup(...)
is too large (>136 lines) and needs to be extracted to a dedicated DedupUtils
class or similar
we should try to reduce CatalystQueryPlanVisitor only to be the command pattern consolidator and move the actual business code away into dedicated strategy (commands)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Signed-off-by: Lantao Jin <[email protected]>
Signed-off-by: Lantao Jin <[email protected]>
Signed-off-by: Lantao Jin <[email protected]>
I made a micro-benchmark in my laptop with OpenJDK 64-Bit Server VM 21.0.4+7-LTS on Mac OS X 13.6.9 Apple M1. When numRows = 100
When numRows = 1000
When numRows = 10000
When numRows = 100000
When numRows = 1000000
I got an interesting phenomenon:
The conclusions is "The larger the data volume, the better the performance of Window compared to Aggregation". Appendiximport org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
case class Data2(value1: Float, value2: String, value3: String, value4: String,
value5: String, value6: String, value7: String, value8: String, value9: String)
object DedupeBenchmark extends SqlBasedBenchmark {
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("Dedupe benchmark") {
import spark.implicits._
val numRecords = 10000
val benchmark = new Benchmark("Dedupe Benchmark", numRecords, output = output)
val df = spark.range(numRecords).map {
x => Data2(x.toFloat, s"value2($x%100)", s"value3($x%10)", s"value4$x", s"value5$x",
s"value6$x", s"value7$x", s"value8$x", s"value9$x")
}.select($"value1", $"value2", $"value3", $"value4",
$"value5", $"value6", $"value7", $"value8", $"value9")
val dedupeColumns = Seq("value2", "value3")
// Filter out rows with any null values in the deduplication columns
val nonNullDF = df.filter(dedupeColumns.map(c => col(c).isNotNull).reduce(_ && _))
nonNullDF.cache()
nonNullDF.count() // force cache
benchmark.addCase("Dedupe by Window", numIters = 5) { _ =>
// Define a window specification
val windowSpec = Window.partitionBy(dedupeColumns.map(col): _*)
.orderBy(dedupeColumns.map(col): _*)
// Add row number to each row in the partition
val rankedDF = nonNullDF.withColumn("row_number", row_number().over(windowSpec))
// Filter to keep the first N rows for each partition
val deduplicatedDF = rankedDF.filter(col("row_number") <= 1).drop("row_number")
deduplicatedDF.noop()
}
benchmark.addCase(s"Dedupe by Aggregate", numIters = 5) { _ =>
// Deduplicate the non-null DataFrame based on multiple columns
val deduplicatedDF = nonNullDF.dropDuplicates(dedupeColumns)
// Display the results
deduplicatedDF.noop()
}
benchmark.run()
}
}
}
What is your thoughts @penghuo , do you think we should use Dedupe by Window only for |
IMO lets plan for the larger volume use case scenario @penghuo your thoughts ? |
@LantaoJin can u plz resolve the conflict (after latest merge) |
Signed-off-by: Lantao Jin <[email protected]>
* Translate PPL Command Part 2: allowedDuplication>1 Signed-off-by: Lantao Jin <[email protected]> * update document Signed-off-by: Lantao Jin <[email protected]> * remove the CatalystQueryPlanVisitor.java.orig Signed-off-by: Lantao Jin <[email protected]> * refactor Signed-off-by: Lantao Jin <[email protected]> * add IT for reordering field list Signed-off-by: Lantao Jin <[email protected]> * remove useless code Signed-off-by: Lantao Jin <[email protected]> --------- Signed-off-by: Lantao Jin <[email protected]> Co-authored-by: YANGDB <[email protected]> (cherry picked from commit cc52b1c) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
The backport to
To backport manually, run these commands in your terminal: # Navigate to the root of your repository
cd $(git rev-parse --show-toplevel)
# Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add ../.worktrees/opensearch-spark/backport-0.5-nexus 0.5-nexus
# Navigate to the new working tree
pushd ../.worktrees/opensearch-spark/backport-0.5-nexus
# Create a new branch
git switch --create backport/backport-543-to-0.5-nexus
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 cc52b1c3e68b87dfb735c3fcd055b5e93d015720
# Push it to GitHub
git push --set-upstream origin backport/backport-543-to-0.5-nexus
# Go back to the original working tree
popd
# Delete the working tree
git worktree remove ../.worktrees/opensearch-spark/backport-0.5-nexus Then, create a pull request where the |
* Translate PPL Command Part 2: allowedDuplication>1 * update document * remove the CatalystQueryPlanVisitor.java.orig * refactor * add IT for reordering field list * remove useless code --------- (cherry picked from commit cc52b1c) Signed-off-by: Lantao Jin <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: YANGDB <[email protected]>
…h-project#543) * Translate PPL Command Part 2: allowedDuplication>1 Signed-off-by: Lantao Jin <[email protected]> * update document Signed-off-by: Lantao Jin <[email protected]> * remove the CatalystQueryPlanVisitor.java.orig Signed-off-by: Lantao Jin <[email protected]> * refactor Signed-off-by: Lantao Jin <[email protected]> * add IT for reordering field list Signed-off-by: Lantao Jin <[email protected]> * remove useless code Signed-off-by: Lantao Jin <[email protected]> --------- Signed-off-by: Lantao Jin <[email protected]> Co-authored-by: YANGDB <[email protected]> (cherry picked from commit cc52b1c)
Description
To translate
dedup
command withallowedDuplication > 1
, such as| dedup 2 a,b
to Spark plan, the solution is translating to a plan with Window function (e.g row_number) and a new columnrow_number_col
as Filter.| dedup 2 a, b keepempty=false
| dedup 2 a, b keepempty=true
Issues Resolved
Resolves #524, #421
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.