Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CH] Optimization for multi joins on the same keys #8007

Open
lgbo-ustc opened this issue Nov 20, 2024 · 1 comment
Open

[CH] Optimization for multi joins on the same keys #8007

lgbo-ustc opened this issue Nov 20, 2024 · 1 comment
Labels
bug Something isn't working triage

Comments

@lgbo-ustc
Copy link
Contributor

lgbo-ustc commented Nov 20, 2024

Backend

CH (ClickHouse)

Bug description

Following query with multiple joins on the same keys, it will have mulitiple join operators in the same stage.

0: jdbc:hive2://localhost:10000> explain select * from join_t1 left join join_t2 on join_t1.a = join_t2.a left join join_t3 on join_t1.a = join_t3.a inner join join_t4 on join_t1.a = join_t4.a;
+----------------------------------------------------+
|                        plan                        |
+----------------------------------------------------+
| == Physical Plan ==
CHNativeColumnarToRow
+- ^(37) CHShuffledHashJoinExecTransformer [a#5L], [a#22L], Inner, BuildRight
   :- ^(37) CHShuffledHashJoinExecTransformer [a#5L], [a#9L], LeftOuter, BuildRight
   :  :- ^(37) CHShuffledHashJoinExecTransformer [a#5L], [a#7L], LeftOuter, BuildRight
   :  :  :- ^(37) InputIteratorTransformer[a#5L, b#6]
   :  :  :  +- ColumnarExchange hashpartitioning(a#5L, 5), ENSURE_REQUIREMENTS, [plan_id=1983], [shuffle_writer_type=hash], [OUTPUT] List(a:LongType, b:StringType)
   :  :  :     +- ^(33) FilterExecTransformer isnotnull(a#5L)
   :  :  :        +- ^(33) FileScanTransformer parquet default.join_t1[a#5L,b#6] Batched: true, DataFilters: [isnotnull(a#5L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.3.2-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:bigint,b:string>
   :  :  +- ^(37) InputIteratorTransformer[a#7L, b#8]
   :  :     +- ColumnarExchange hashpartitioning(a#7L, 5), ENSURE_REQUIREMENTS, [plan_id=1987], [shuffle_writer_type=hash], [OUTPUT] List(a:LongType, b:StringType)
   :  :        +- ^(34) FilterExecTransformer isnotnull(a#7L)
   :  :           +- ^(34) FileScanTransformer parquet default.join_t2[a#7L,b#8] Batched: true, DataFilters: [isnotnull(a#7L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.3.2-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:bigint,b:string>
   :  +- ^(37) InputIteratorTransformer[a#9L, b#10]
   :     +- ColumnarExchange hashpartitioning(a#9L, 5), ENSURE_REQUIREMENTS, [plan_id=1992], [shuffle_writer_type=hash], [OUTPUT] List(a:LongType, b:StringType)
   :        +- ^(35) FilterExecTransformer isnotnull(a#9L)
   :           +- ^(35) FileScanTransformer parquet default.join_t3[a#9L,b#10] Batched: true, DataFilters: [isnotnull(a#9L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.3.2-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:bigint,b:string>
   +- ^(37) InputIteratorTransformer[a#22L, b#23]
      +- ColumnarExchange hashpartitioning(a#22L, 5), ENSURE_REQUIREMENTS, [plan_id=1997], [shuffle_writer_type=hash], [OUTPUT] List(a:LongType, b:StringType)
         +- ^(36) FilterExecTransformer isnotnull(a#22L)
            +- ^(36) FileScanTransformer parquet default.join_t4[a#22L,b#23] Batched: true, DataFilters: [isnotnull(a#22L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.3.2-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:bigint,b:string>

 |
+----------------------------------------------------+

Join is memory-intensive operator, it's easy to cause OOM with multiple join operators in the same stage.

A related issue #8003

Spark version

None

Spark configurations

No response

System information

No response

Relevant logs

No response

@lgbo-ustc lgbo-ustc added bug Something isn't working triage labels Nov 20, 2024
@lgbo-ustc
Copy link
Contributor Author

lgbo-ustc commented Nov 20, 2024

Some possible solutions

Implete a reusable join operator

Implete a reusable join operator, it supports to put multiple tables' data into the same hash table. So we could avoid building multiple hash tables, and save memory usage.

But it may not be an easy job, and we are not sure that its performance is well. A lot of work,😓

Make join spill adaptively

At persent, if the join operator's memory usage is not over a fixed memory limit, it will not spill out. It has no awareness of the memory usage of other operators.

We could make join operator spill out data when total remaining memory is not enough. This is should be done in CH.

Split joins into stages

We could limit to max size of joins that could put into the same stage. Even if we could balance the memory usage for each join operator, too many join operator in the same stage is not efficient.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage
Projects
None yet
Development

No branches or pull requests

1 participant