Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Translate PPL dedup Command Part 2: allowedDuplication>1 #543

Merged
merged 9 commits into from
Aug 30, 2024

Conversation

LantaoJin
Copy link
Member

@LantaoJin LantaoJin commented Aug 9, 2024

Description

To translate dedup command with allowedDuplication > 1, such as | dedup 2 a,b to Spark plan, the solution is translating to a plan with Window function (e.g row_number) and a new column row_number_col as Filter.

  • For | dedup 2 a, b keepempty=false
DataFrameDropColumns('_row_number_)
+- Filter ('_row_number_ <= 2) // allowed duplication = 2
   +- Window [row_number() windowspecdefinition('a, 'b, 'a ASC NULLS FIRST, 'b ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_], ['a, 'b], ['a ASC NULLS FIRST, 'b ASC NULLS FIRST]
       +- Filter (isnotnull('a) AND isnotnull('b)) // keepempty=false
          +- Project
             +- UnresolvedRelation
  • For | dedup 2 a, b keepempty=true
Union
:- DataFrameDropColumns('_row_number_)
:  +- Filter ('_row_number_ <= 2)
:     +- Window [row_number() windowspecdefinition('a, 'b, 'a ASC NULLS FIRST, 'b ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_], ['a, 'b], ['a ASC NULLS FIRST, 'b ASC NULLS FIRST]
:        +- Filter (isnotnull('a) AND isnotnull('b))
:           +- Project
:              +- UnresolvedRelation
+- Filter (isnull('a) OR isnull('b))
   +- Project
      +- UnresolvedRelation

Issues Resolved

Resolves #524, #421

Check List

  • Updated documentation (ppl-spark-integration/README.md)
  • Implemented unit tests
  • Implemented tests for combination with other commands
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@LantaoJin LantaoJin marked this pull request as ready for review August 9, 2024 02:03

// Build Window
visitFieldList(node.getFields(), context);
Seq<Expression> partitionSpec = context.retainAllNamedParseExpressions(exp -> exp);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the order of the dedup fields keeped? for instance, dedup 2, b, a, will the output be a, b?

Copy link
Member Author

@LantaoJin LantaoJin Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. the original order will be kept. I added several tests in IT to verify this case.

Copy link
Collaborator

@penghuo penghuo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious, if enforece using window function for dedupe = 1, can we benchmark with #521?

@penghuo penghuo added the enhancement New feature or request label Aug 12, 2024
@YANG-DB YANG-DB added the Lang:PPL Pipe Processing Language support label Aug 14, 2024
Copy link
Member

@YANG-DB YANG-DB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CatalystQueryPlanVisitor needs refactory - see comments above

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi
visitDedup(...) is too large (>136 lines) and needs to be extracted to a dedicated DedupUtils class or similar
we should try to reduce CatalystQueryPlanVisitor only to be the command pattern consolidator and move the actual business code away into dedicated strategy (commands)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@LantaoJin
Copy link
Member Author

LantaoJin commented Aug 27, 2024

curious, if enforece using window function for dedupe = 1, can we benchmark with #521?

I made a micro-benchmark in my laptop with OpenJDK 64-Bit Server VM 21.0.4+7-LTS on Mac OS X 13.6.9 Apple M1.
The testing data contains 9 columns with different number of rows, the test result is as following:

When numRows = 100

Dedupe Benchmark:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Dedupe by Window                                     28             41           7          0.0      281901.3       1.0X
Dedupe by Aggregate                                  18             20           3          0.0      175580.8       1.6X

When numRows = 1000

Dedupe Benchmark:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Dedupe by Window                                     28             34           5          0.0       27841.0       1.0X
Dedupe by Aggregate                                  19             20           1          0.1       19365.1       1.4X

When numRows = 10000

Dedupe Benchmark:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Dedupe by Window                                     33             37           4          0.3        3293.0       1.0X
Dedupe by Aggregate                                  32             34           1          0.3        3216.1       1.0X

When numRows = 100000

Dedupe Benchmark:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Dedupe by Window                                    102            105           2          1.0        1020.5       1.0X
Dedupe by Aggregate                                 142            144           3          0.7        1417.2       0.7X

When numRows = 1000000

Dedupe Benchmark:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Dedupe by Window                                    715            724           9          1.4         714.6       1.0X
Dedupe by Aggregate                                1254           1264           7          0.8        1254.4       0.6X

I got an interesting phenomenon:

  • When number of row is less than 10000, the performance of Dedupe by Agg is better than Dedupe by Window
  • When number of row is greater than 10000, the performance of Dedupe by Agg is worse than Dedupe by Window

The conclusions is "The larger the data volume, the better the performance of Window compared to Aggregation".

Appendix

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

case class Data2(value1: Float, value2: String, value3: String, value4: String,
     value5: String, value6: String, value7: String, value8: String, value9: String)

object DedupeBenchmark extends SqlBasedBenchmark {

  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
    runBenchmark("Dedupe benchmark") {
      import spark.implicits._
      val numRecords = 10000
      val benchmark = new Benchmark("Dedupe Benchmark", numRecords, output = output)

      val df = spark.range(numRecords).map {
        x => Data2(x.toFloat, s"value2($x%100)", s"value3($x%10)", s"value4$x", s"value5$x",
          s"value6$x", s"value7$x", s"value8$x", s"value9$x")
      }.select($"value1", $"value2", $"value3", $"value4",
        $"value5", $"value6", $"value7", $"value8", $"value9")
      val dedupeColumns = Seq("value2", "value3")
      // Filter out rows with any null values in the deduplication columns
      val nonNullDF = df.filter(dedupeColumns.map(c => col(c).isNotNull).reduce(_ && _))
      nonNullDF.cache()
      nonNullDF.count() // force cache

      benchmark.addCase("Dedupe by Window", numIters = 5) { _ =>
        // Define a window specification
        val windowSpec = Window.partitionBy(dedupeColumns.map(col): _*)
          .orderBy(dedupeColumns.map(col): _*)
        // Add row number to each row in the partition
        val rankedDF = nonNullDF.withColumn("row_number", row_number().over(windowSpec))
        // Filter to keep the first N rows for each partition
        val deduplicatedDF = rankedDF.filter(col("row_number") <= 1).drop("row_number")
        deduplicatedDF.noop()
      }

      benchmark.addCase(s"Dedupe by Aggregate", numIters = 5) { _ =>
        // Deduplicate the non-null DataFrame based on multiple columns
        val deduplicatedDF = nonNullDF.dropDuplicates(dedupeColumns)
        // Display the results
        deduplicatedDF.noop()
      }

      benchmark.run()
    }
  }
}

What is your thoughts @penghuo , do you think we should use Dedupe by Window only for allowedDeduplication=1?

@YANG-DB
Copy link
Member

YANG-DB commented Aug 28, 2024

curious, if enforece using window function for dedupe = 1, can we benchmark with #521?

I made a micro-benchmark in my laptop with OpenJDK 64-Bit Server VM 21.0.4+7-LTS on Mac OS X 13.6.9 Apple M1. The testing data contains 9 columns with different number of rows, the test result is as following:

When numRows = 100

Dedupe Benchmark:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Dedupe by Window                                     28             41           7          0.0      281901.3       1.0X
Dedupe by Aggregate                                  18             20           3          0.0      175580.8       1.6X

When numRows = 1000

Dedupe Benchmark:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Dedupe by Window                                     28             34           5          0.0       27841.0       1.0X
Dedupe by Aggregate                                  19             20           1          0.1       19365.1       1.4X

When numRows = 10000

Dedupe Benchmark:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Dedupe by Window                                     33             37           4          0.3        3293.0       1.0X
Dedupe by Aggregate                                  32             34           1          0.3        3216.1       1.0X

When numRows = 100000

Dedupe Benchmark:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Dedupe by Window                                    102            105           2          1.0        1020.5       1.0X
Dedupe by Aggregate                                 142            144           3          0.7        1417.2       0.7X

When numRows = 1000000

Dedupe Benchmark:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Dedupe by Window                                    715            724           9          1.4         714.6       1.0X
Dedupe by Aggregate                                1254           1264           7          0.8        1254.4       0.6X

I got an interesting phenomenon:

  • When number of row is less than 10000, the performance of Dedupe by Agg is better than Dedupe by Window
  • When number of row is greater than 10000, the performance of Dedupe by Agg is worse than Dedupe by Window

The conclusions is "The larger the data volume, the better the performance of Window compared to Aggregation".

Appendix

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

case class Data2(value1: Float, value2: String, value3: String, value4: String,
     value5: String, value6: String, value7: String, value8: String, value9: String)

object DedupeBenchmark extends SqlBasedBenchmark {

  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
    runBenchmark("Dedupe benchmark") {
      import spark.implicits._
      val numRecords = 10000
      val benchmark = new Benchmark("Dedupe Benchmark", numRecords, output = output)

      val df = spark.range(numRecords).map {
        x => Data2(x.toFloat, s"value2($x%100)", s"value3($x%10)", s"value4$x", s"value5$x",
          s"value6$x", s"value7$x", s"value8$x", s"value9$x")
      }.select($"value1", $"value2", $"value3", $"value4",
        $"value5", $"value6", $"value7", $"value8", $"value9")
      val dedupeColumns = Seq("value2", "value3")
      // Filter out rows with any null values in the deduplication columns
      val nonNullDF = df.filter(dedupeColumns.map(c => col(c).isNotNull).reduce(_ && _))
      nonNullDF.cache()
      nonNullDF.count() // force cache

      benchmark.addCase("Dedupe by Window", numIters = 5) { _ =>
        // Define a window specification
        val windowSpec = Window.partitionBy(dedupeColumns.map(col): _*)
          .orderBy(dedupeColumns.map(col): _*)
        // Add row number to each row in the partition
        val rankedDF = nonNullDF.withColumn("row_number", row_number().over(windowSpec))
        // Filter to keep the first N rows for each partition
        val deduplicatedDF = rankedDF.filter(col("row_number") <= 1).drop("row_number")
        deduplicatedDF.noop()
      }

      benchmark.addCase(s"Dedupe by Aggregate", numIters = 5) { _ =>
        // Deduplicate the non-null DataFrame based on multiple columns
        val deduplicatedDF = nonNullDF.dropDuplicates(dedupeColumns)
        // Display the results
        deduplicatedDF.noop()
      }

      benchmark.run()
    }
  }
}

What is your thoughts @penghuo , do you think we should use Dedupe by Window only for allowedDeduplication=1?

IMO lets plan for the larger volume use case scenario @penghuo your thoughts ?
This is excellent work - lets try to create such benchmarks for additional use cases ...

@YANG-DB
Copy link
Member

YANG-DB commented Aug 29, 2024

@LantaoJin can u plz resolve the conflict (after latest merge)
Thanks 🙏

@YANG-DB YANG-DB merged commit cc52b1c into opensearch-project:main Aug 30, 2024
4 checks passed
opensearch-trigger-bot bot pushed a commit that referenced this pull request Aug 30, 2024
* Translate PPL  Command Part 2:  allowedDuplication>1

Signed-off-by: Lantao Jin <[email protected]>

* update document

Signed-off-by: Lantao Jin <[email protected]>

* remove the CatalystQueryPlanVisitor.java.orig

Signed-off-by: Lantao Jin <[email protected]>

* refactor

Signed-off-by: Lantao Jin <[email protected]>

* add IT for reordering field list

Signed-off-by: Lantao Jin <[email protected]>

* remove useless code

Signed-off-by: Lantao Jin <[email protected]>

---------

Signed-off-by: Lantao Jin <[email protected]>
Co-authored-by: YANGDB <[email protected]>
(cherry picked from commit cc52b1c)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
@opensearch-trigger-bot
Copy link

The backport to 0.5-nexus failed:

The process '/usr/bin/git' failed with exit code 128

To backport manually, run these commands in your terminal:

# Navigate to the root of your repository
cd $(git rev-parse --show-toplevel)
# Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add ../.worktrees/opensearch-spark/backport-0.5-nexus 0.5-nexus
# Navigate to the new working tree
pushd ../.worktrees/opensearch-spark/backport-0.5-nexus
# Create a new branch
git switch --create backport/backport-543-to-0.5-nexus
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 cc52b1c3e68b87dfb735c3fcd055b5e93d015720
# Push it to GitHub
git push --set-upstream origin backport/backport-543-to-0.5-nexus
# Go back to the original working tree
popd
# Delete the working tree
git worktree remove ../.worktrees/opensearch-spark/backport-0.5-nexus

Then, create a pull request where the base branch is 0.5-nexus and the compare/head branch is backport/backport-543-to-0.5-nexus.

YANG-DB added a commit that referenced this pull request Aug 31, 2024
* Translate PPL  Command Part 2:  allowedDuplication>1



* update document



* remove the CatalystQueryPlanVisitor.java.orig



* refactor



* add IT for reordering field list



* remove useless code



---------



(cherry picked from commit cc52b1c)

Signed-off-by: Lantao Jin <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: YANGDB <[email protected]>
LantaoJin added a commit to LantaoJin/opensearch-spark that referenced this pull request Oct 2, 2024
…h-project#543)

* Translate PPL  Command Part 2:  allowedDuplication>1

Signed-off-by: Lantao Jin <[email protected]>

* update document

Signed-off-by: Lantao Jin <[email protected]>

* remove the CatalystQueryPlanVisitor.java.orig

Signed-off-by: Lantao Jin <[email protected]>

* refactor

Signed-off-by: Lantao Jin <[email protected]>

* add IT for reordering field list

Signed-off-by: Lantao Jin <[email protected]>

* remove useless code

Signed-off-by: Lantao Jin <[email protected]>

---------

Signed-off-by: Lantao Jin <[email protected]>
Co-authored-by: YANGDB <[email protected]>
(cherry picked from commit cc52b1c)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport 0.5-nexus backport 0.5 enhancement New feature or request Lang:PPL Pipe Processing Language support
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEATURE] Support dedup Command with allowedDuplication>1
4 participants