diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml index aa8887e98bef6..5cdfdf6cf45ea 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml @@ -111,7 +111,7 @@ stream_plan: |- StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version2.k(hidden)], stream_key: [stream._row_id, k], pk_columns: [stream._row_id, k], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.k, stream._row_id) } - └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] } + └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] } ├─StreamExchange { dist: HashShard(stream.k) } │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } │ ├─StreamExchange { dist: HashShard(stream.k) } @@ -133,7 +133,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version2.id2(hidden)], stream_key: [stream._row_id, id1, id2], pk_columns: [stream._row_id, id1, id2], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.id2, stream._row_id) } - └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } + └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } ├─StreamExchange { dist: HashShard(stream.id2) } │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } │ ├─StreamExchange { dist: HashShard(stream.id1) } @@ -155,7 +155,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version2.id2(hidden)], stream_key: [stream._row_id, id1, id2], pk_columns: [stream._row_id, id1, id2], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.id2, stream._row_id) } - └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } + └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } ├─StreamExchange { dist: HashShard(stream.id2) } │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } │ ├─StreamExchange { dist: HashShard(stream.id1) } diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index aa090143b925b..f94dbba36cb79 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -72,7 +72,7 @@ impl StreamTemporalJoin { let base = PlanBase::new_stream_with_core( &core, dist, - true, + append_only, false, // TODO(rc): derive EOWC property from input watermark_columns, ); diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 077ea17ca74d9..ce0c4d29621d6 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -571,8 +571,8 @@ mod phase1 { } } -impl - TemporalJoinExecutor +impl + TemporalJoinExecutor { #[allow(clippy::too_many_arguments)] pub fn new( @@ -698,7 +698,7 @@ impl let full_schema = full_schema.clone(); if T == JoinType::Inner { - let st1 = phase1::handle_chunk::( + let st1 = phase1::handle_chunk::( self.chunk_size, right_size, full_schema, @@ -731,19 +731,20 @@ impl } } else if let Some(ref cond) = self.condition { // Joined result without evaluating non-lookup conditions. - let st1 = phase1::handle_chunk::( - self.chunk_size, - right_size, - full_schema, - epoch, - &self.left_join_keys, - &mut self.right_table, - &memo_table_lookup_prefix, - &mut self.memo_table, - &null_matched, - chunk, - &self.metrics, - ); + let st1 = + phase1::handle_chunk::( + self.chunk_size, + right_size, + full_schema, + epoch, + &self.left_join_keys, + &mut self.right_table, + &memo_table_lookup_prefix, + &mut self.memo_table, + &null_matched, + chunk, + &self.metrics, + ); let mut matched_count = 0usize; #[for_await] for chunk in st1 { @@ -783,7 +784,7 @@ impl // The last row should always be marker row, assert_eq!(matched_count, 0); } else { - let st1 = phase1::handle_chunk::( + let st1 = phase1::handle_chunk::( self.chunk_size, right_size, full_schema, @@ -805,7 +806,7 @@ impl } } InternalMessage::Barrier(updates, barrier) => { - if !A { + if !APPEND_ONLY { if wait_first_barrier { wait_first_barrier = false; self.memo_table.as_mut().unwrap().init_epoch(barrier.epoch); @@ -837,8 +838,8 @@ impl } } -impl Execute - for TemporalJoinExecutor +impl Execute + for TemporalJoinExecutor { fn execute(self: Box) -> super::BoxedMessageStream { self.into_stream().boxed()