-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Enhancement] pushdown join runtime filter into storage layer #55124
base: main
Are you sure you want to change the base?
[Enhancement] pushdown join runtime filter into storage layer #55124
Conversation
f43f35f
to
d306e69
Compare
Signed-off-by: silverbullet233 <[email protected]>
d306e69
to
2753e51
Compare
[BE Incremental Coverage Report]❌ fail : 57 / 219 (26.03%) file detail
|
Signed-off-by: silverbullet233 <[email protected]>
74234bc
to
d62440d
Compare
Signed-off-by: silverbullet233 <[email protected]>
[FE Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[Java-Extensions Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
@@ -459,6 +471,155 @@ struct WithModuloArg { | |||
} | |||
} | |||
}; | |||
|
|||
template <TRuntimeFilterLayoutMode::type M> | |||
struct HashValueComputeWithSelection { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is tedious and error-prone, seems duplicate, suggest to unite it with HashValueCompute into one.
protected: | ||
RuntimeFilterProbeDescriptor* _rf_desc; | ||
const JoinRuntimeFilter* _rf = nullptr; | ||
ColumnId _column_id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we support runtime filter whose probe_expr is other Expr except ColumnRef and/or multi-column partition exprs?
@@ -1554,4 +1554,8 @@ CONF_mBool(avro_ignore_union_type_tag, "false"); | |||
// default batch size for simdjson lib | |||
CONF_mInt32(json_parse_many_batch_size, "1000000"); | |||
CONF_mBool(enable_dynamic_batch_size_for_json_parse_many, "true"); | |||
CONF_mBool(enable_rf_pushdown, "true"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possiable add a session variable to control the behaviour?
Why I'm doing:
Currently, the runtime filter of our scan operator is executed after the storage layer has read all the data. This means that we lose the opportunity to use storage layer optimization, such as late materialization.
What I'm doing:
In this PR, I support pushing the runtime filter down to the storage layer. When querying, it will be treated as a specific predicate and evaluated in the first stage of late materialization, so that we have the opportunity to use late materialization to reduce the amount of data read.
In addition, since each scan operator will generate multiple parallel io tasks, after pushing the runtime filter down, the number of threads evaluating the runtime filters in parallel will increase, which will also improve the computing efficiency to a certain extent.
Design points
somewhat different from ordinary single-column predicates, multiple runtime filter predicates may be related.
For example, when there are multiple runtime filters, in order to balance the execution efficiency, we do not need to execute each runtime filter once, but only select a few runtime filters with good filterability. Similarly, when their filtering effects are not good, we can choose to skip all of them.
our storage layer currently only supports single-column predicates, which is not suitable for the above scenario. so I added
RuntimeFilterPredicates
, which is responsible for collecting all relevant runtime filters and selecting the appropriate runtime filters to execute based on the selectivity.RuntimeFilterPredicates
will be executed in the first stage of late materialization. If the filterability is good, we can read less data.When applying late materialization, we will only choose to read the columns with predicates first, and the columns required by the runtime filters may not have predicates. The implementation here is a bit complicated. In order to reduce the cost of modification, I used a tricky method and introduced
ColumnPlaceHolderPredicate
, which is only used as a placeholder and will not produce any filtering effect.In the ChunkSource init stage, ColumnPlaceHolderPredicates are generated for the columns required by the runtime filter and placed in the PredicateTree. In this way, the existing column selection strategy can ensure that these columns will be read out in the first stage of delayed materialization.
Then, in SegmentIterator init stage, we remove the ColumnPlaceHolderPredicate by
ColumnPredicateRewriter
, so that it will not generate additional overhead.Test Result
I tested on a tpch 1TB data set, using four 16c 64g BEs, each with two disks(each have 5000 iops and 140MB/s throughput), and the baseline code is
main-9a6445a
.Below are the test results, and the query time unit in the table is milliseconds.
TODO: in this pr, I only support this enhancement in internal olap table under shared_nothing mode, will support other tables in other pr.
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: