From 2d58d2e3489fc8a3eb009a0a368899445352efe2 Mon Sep 17 00:00:00 2001 From: Dylan Date: Thu, 13 Jun 2024 16:14:47 +0800 Subject: [PATCH] cherry pick --- .../tests/testdata/output/temporal_join.yaml | 6 +++--- .../optimizer/plan_node/stream_temporal_join.rs | 2 +- src/stream/src/executor/temporal_join.rs | 17 +++++++++-------- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml index aa8887e98bef6..5cdfdf6cf45ea 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml @@ -111,7 +111,7 @@ stream_plan: |- StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version2.k(hidden)], stream_key: [stream._row_id, k], pk_columns: [stream._row_id, k], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.k, stream._row_id) } - └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] } + └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] } ├─StreamExchange { dist: HashShard(stream.k) } │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } │ ├─StreamExchange { dist: HashShard(stream.k) } @@ -133,7 +133,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version2.id2(hidden)], stream_key: [stream._row_id, id1, id2], pk_columns: [stream._row_id, id1, id2], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.id2, stream._row_id) } - └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } + └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } ├─StreamExchange { dist: HashShard(stream.id2) } │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } │ ├─StreamExchange { dist: HashShard(stream.id1) } @@ -155,7 +155,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version2.id2(hidden)], stream_key: [stream._row_id, id1, id2], pk_columns: [stream._row_id, id1, id2], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.id2, stream._row_id) } - └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } + └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } ├─StreamExchange { dist: HashShard(stream.id2) } │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } │ ├─StreamExchange { dist: HashShard(stream.id1) } diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index ce8753b9ddbc8..7108099b46de9 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -74,7 +74,7 @@ impl StreamTemporalJoin { let base = PlanBase::new_stream_with_core( &core, dist, - true, + append_only, false, // TODO(rc): derive EOWC property from input watermark_columns, ); diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index b744fb245a3b4..df9dd4a588996 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -579,8 +579,8 @@ mod phase1 { } } -impl - TemporalJoinExecutor +impl + TemporalJoinExecutor { #[allow(clippy::too_many_arguments)] pub fn new( @@ -709,7 +709,7 @@ impl let full_schema = full_schema.clone(); if T == JoinType::Inner { - let st1 = phase1::handle_chunk::( + let st1 = phase1::handle_chunk::( self.chunk_size, right_size, full_schema, @@ -741,7 +741,8 @@ impl } } else if let Some(ref cond) = self.condition { // Joined result without evaluating non-lookup conditions. - let st1 = phase1::handle_chunk::( + let st1 = + phase1::handle_chunk::( self.chunk_size, right_size, full_schema, @@ -792,7 +793,7 @@ impl // The last row should always be marker row, assert_eq!(matched_count, 0); } else { - let st1 = phase1::handle_chunk::( + let st1 = phase1::handle_chunk::( self.chunk_size, right_size, full_schema, @@ -813,7 +814,7 @@ impl } } InternalMessage::Barrier(updates, barrier) => { - if !A { + if !APPEND_ONLY { if wait_first_barrier { wait_first_barrier = false; self.memo_table.as_mut().unwrap().init_epoch(barrier.epoch); @@ -846,8 +847,8 @@ impl } } -impl Execute - for TemporalJoinExecutor +impl Execute + for TemporalJoinExecutor { fn execute(self: Box) -> super::BoxedMessageStream { self.into_stream().boxed()