Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] pushdown join runtime filter into storage layer #55124

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

silverbullet233
Copy link
Contributor

@silverbullet233 silverbullet233 commented Jan 16, 2025

Why I'm doing:

Currently, the runtime filter of our scan operator is executed after the storage layer has read all the data. This means that we lose the opportunity to use storage layer optimization, such as late materialization.

What I'm doing:

In this PR, I support pushing the runtime filter down to the storage layer. When querying, it will be treated as a specific predicate and evaluated in the first stage of late materialization, so that we have the opportunity to use late materialization to reduce the amount of data read.

In addition, since each scan operator will generate multiple parallel io tasks, after pushing the runtime filter down, the number of threads evaluating the runtime filters in parallel will increase, which will also improve the computing efficiency to a certain extent.

Design points

  1. How to represent the runtime filter predicate in the storage layer?

somewhat different from ordinary single-column predicates, multiple runtime filter predicates may be related.
For example, when there are multiple runtime filters, in order to balance the execution efficiency, we do not need to execute each runtime filter once, but only select a few runtime filters with good filterability. Similarly, when their filtering effects are not good, we can choose to skip all of them.

our storage layer currently only supports single-column predicates, which is not suitable for the above scenario. so I added RuntimeFilterPredicates, which is responsible for collecting all relevant runtime filters and selecting the appropriate runtime filters to execute based on the selectivity.
RuntimeFilterPredicates will be executed in the first stage of late materialization. If the filterability is good, we can read less data.

  1. How to ensure that these columns have been read when evaluating the runtime filters?
    When applying late materialization, we will only choose to read the columns with predicates first, and the columns required by the runtime filters may not have predicates. The implementation here is a bit complicated. In order to reduce the cost of modification, I used a tricky method and introduced ColumnPlaceHolderPredicate, which is only used as a placeholder and will not produce any filtering effect.

In the ChunkSource init stage, ColumnPlaceHolderPredicates are generated for the columns required by the runtime filter and placed in the PredicateTree. In this way, the existing column selection strategy can ensure that these columns will be read out in the first stage of delayed materialization.
Then, in SegmentIterator init stage, we remove the ColumnPlaceHolderPredicate by ColumnPredicateRewriter, so that it will not generate additional overhead.

Test Result

I tested on a tpch 1TB data set, using four 16c 64g BEs, each with two disks(each have 5000 iops and 140MB/s throughput), and the baseline code is main-9a6445a.

Below are the test results, and the query time unit in the table is milliseconds.

Query baseline patched patched/baseline
Q01 15909 15802 99.33%
Q02 559 410 73.35%
Q03 7988 7252 90.79%
Q04 5093 4098 80.46%
Q05 10720 9135 85.21%
Q06 257 258 100.39%
Q07 8781 7709 87.79%
Q08 7791 7767 99.69%
Q09 32526 29677 91.24%
Q10 10393 9465 91.07%
Q11 1354 983 72.60%
Q12 1392 1135 81.54%
Q13 19735 19834 100.50%
Q14 2179 1861 85.41%
Q15 1116 1091 97.76%
Q16 2387 1843 77.21%
Q17 4303 4018 93.38%
Q18 18646 18337 98.34%
Q19 5231 5269 100.73%
Q20 2530 1938 76.60%
Q21 16875 13046 77.31%
Q22 2898 2648 91.37%
Total 178663 163576 91.56%

TODO: in this pr, I only support this enhancement in internal olap table under shared_nothing mode, will support other tables in other pr.

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

Bugfix cherry-pick branch check:

  • I have checked the version labels which the pr will be auto-backported to the target branch
    • 3.4
    • 3.3
    • 3.2
    • 3.1
    • 3.0

@silverbullet233 silverbullet233 requested review from a team as code owners January 16, 2025 02:35
@silverbullet233 silverbullet233 changed the title [WIP] pushdown join runtime filter into storage layer [Enhancement] pushdown join runtime filter into storage layer Jan 16, 2025
Copy link

[BE Incremental Coverage Report]

fail : 57 / 219 (26.03%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 src/storage/column_predicate_rewriter.cpp 0 3 00.00% [319, 320, 321]
🔵 src/exec/pipeline/scan/olap_chunk_source.cpp 0 4 00.00% [196, 276, 277, 279]
🔵 src/column/binary_column.h 0 1 00.00% [256]
🔵 src/exprs/runtime_filter.h 7 111 06.31% [315, 318, 319, 320, 321, 322, 475, 477, 478, 481, 482, 484, 487, 490, 491, 494, 497, 498, 499, 500, 501, 502, 504, 505, 506, 509, 510, 511, 514, 515, 516, 517, 519, 520, 521, 522, 524, 525, 526, 529, 530, 531, 534, 537, 538, 539, 540, 541, 542, 544, 545, 546, 549, 551, 552, 553, 555, 557, 558, 559, 560, 562, 563, 564, 565, 566, 567, 569, 570, 571, 572, 573, 576, 577, 578, 579, 580, 585, 588, 589, 590, 592, 593, 595, 596, 598, 610, 611, 612, 616, 617, 618, 621, 622, 778, 780, 781, 784, 785, 786, 791, 792, 794, 796]
🔵 src/storage/rowset/segment_iterator.cpp 1 10 10.00% [1622, 1623, 1624, 1625, 1626, 1628, 1629, 1630, 1631]
🔵 src/column/column.h 1 4 25.00% [382, 385, 387]
🔵 src/exprs/runtime_filter_bank.cpp 3 11 27.27% [424, 425, 426, 427, 430, 607, 608, 609]
🔵 src/column/fixed_length_column_base.cpp 10 24 41.67% [234, 235, 259, 260, 261, 267, 268, 279, 281, 283, 286, 287, 292, 293]
🔵 src/column/nullable_column.cpp 11 23 47.83% [369, 370, 371, 372, 374, 375, 376, 379, 381, 405, 406, 407]
🔵 src/column/binary_column.cpp 22 26 84.62% [630, 631, 632, 633]
🔵 src/exprs/runtime_filter_bank.h 1 1 100.00% []
🔵 src/storage/rowset/rowset.cpp 1 1 100.00% []

Signed-off-by: silverbullet233 <[email protected]>
Signed-off-by: silverbullet233 <[email protected]>
Copy link

[FE Incremental Coverage Report]

pass : 0 / 0 (0%)

Copy link

[Java-Extensions Incremental Coverage Report]

pass : 0 / 0 (0%)

@@ -459,6 +471,155 @@ struct WithModuloArg {
}
}
};

template <TRuntimeFilterLayoutMode::type M>
struct HashValueComputeWithSelection {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is tedious and error-prone, seems duplicate, suggest to unite it with HashValueCompute into one.

protected:
RuntimeFilterProbeDescriptor* _rf_desc;
const JoinRuntimeFilter* _rf = nullptr;
ColumnId _column_id;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we support runtime filter whose probe_expr is other Expr except ColumnRef and/or multi-column partition exprs?

@@ -1554,4 +1554,8 @@ CONF_mBool(avro_ignore_union_type_tag, "false");
// default batch size for simdjson lib
CONF_mInt32(json_parse_many_batch_size, "1000000");
CONF_mBool(enable_dynamic_batch_size_for_json_parse_many, "true");
CONF_mBool(enable_rf_pushdown, "true");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possiable add a session variable to control the behaviour?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants