From c5b9a8393cdf392d055ab2262be3f05a084df302 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 30 Jan 2024 10:45:43 +0800 Subject: [PATCH] fix(optimizer): fix temporal join shuffle (#14848) (#14849) Co-authored-by: Dylan --- .../tests/testdata/output/temporal_join.yaml | 13 +++++++------ .../src/optimizer/plan_node/logical_join.rs | 14 +++++++------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml index f49a82be2dd78..ea844cda185b1 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml @@ -107,12 +107,13 @@ StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version2.k(hidden)], stream_key: [stream._row_id, k], pk_columns: [stream._row_id, k], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.k, stream._row_id) } └─StreamTemporalJoin { type: Inner, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] } - ├─StreamTemporalJoin { type: Inner, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } - │ ├─StreamExchange { dist: HashShard(stream.k) } - │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } - │ │ └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } - │ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version1.k) } - │ └─StreamTableScan { table: version1, columns: [version1.k, version1.x1], pk: [version1.k], dist: UpstreamHashShard(version1.k) } + ├─StreamExchange { dist: HashShard(stream.k) } + │ └─StreamTemporalJoin { type: Inner, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } + │ ├─StreamExchange { dist: HashShard(stream.k) } + │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } + │ │ └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } + │ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version1.k) } + │ └─StreamTableScan { table: version1, columns: [version1.k, version1.x1], pk: [version1.k], dist: UpstreamHashShard(version1.k) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version2.k) } └─StreamTableScan { table: version2, columns: [version2.k, version2.x2], pk: [version2.k], dist: UpstreamHashShard(version2.k) } - name: multi-way temporal join with different keys diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index 5365eb3642b79..a76fc953ee929 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -1049,9 +1049,8 @@ impl LogicalJoin { let lookup_prefix_len = reorder_idx.len(); let predicate = predicate.reorder(&reorder_idx); - let left = if dist_key_in_order_key_pos.is_empty() { - self.left() - .to_stream_with_dist_required(&RequiredDist::single(), ctx)? + let required_dist = if dist_key_in_order_key_pos.is_empty() { + RequiredDist::single() } else { let left_eq_indexes = predicate.left_eq_indexes(); let left_dist_key = dist_key_in_order_key_pos @@ -1059,12 +1058,13 @@ impl LogicalJoin { .map(|pos| left_eq_indexes[*pos]) .collect_vec(); - self.left().to_stream_with_dist_required( - &RequiredDist::shard_by_key(self.left().schema().len(), &left_dist_key), - ctx, - )? + RequiredDist::hash_shard(&left_dist_key) }; + let left = self.left().to_stream(ctx)?; + // Enforce a shuffle for the temporal join LHS to let the scheduler be able to schedule the join fragment together with the RHS with a `no_shuffle` exchange. + let left = required_dist.enforce(left, &Order::any()); + if !left.append_only() { return Err(RwError::from(ErrorCode::NotSupported( "Temporal join requires an append-only left input".into(),