Skip to content

Commit

Permalink
fix(stream): fix temporal join append only property (#17239)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Jun 13, 2024
1 parent 3b823d0 commit 2c6f7c3
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
stream_plan: |-
StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version2.k(hidden)], stream_key: [stream._row_id, k], pk_columns: [stream._row_id, k], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(stream.k, stream._row_id) }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] }
└─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] }
├─StreamExchange { dist: HashShard(stream.k) }
│ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] }
│ ├─StreamExchange { dist: HashShard(stream.k) }
Expand All @@ -133,7 +133,7 @@
stream_plan: |-
StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version2.id2(hidden)], stream_key: [stream._row_id, id1, id2], pk_columns: [stream._row_id, id1, id2], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(stream.id1, stream.id2, stream._row_id) }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] }
└─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] }
├─StreamExchange { dist: HashShard(stream.id2) }
│ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] }
│ ├─StreamExchange { dist: HashShard(stream.id1) }
Expand All @@ -155,7 +155,7 @@
stream_plan: |-
StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version2.id2(hidden)], stream_key: [stream._row_id, id1, id2], pk_columns: [stream._row_id, id1, id2], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(stream.id1, stream.id2, stream._row_id) }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] }
└─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] }
├─StreamExchange { dist: HashShard(stream.id2) }
│ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] }
│ ├─StreamExchange { dist: HashShard(stream.id1) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl StreamTemporalJoin {
let base = PlanBase::new_stream_with_core(
&core,
dist,
true,
append_only,
false, // TODO(rc): derive EOWC property from input
watermark_columns,
);
Expand Down
41 changes: 21 additions & 20 deletions src/stream/src/executor/temporal_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,8 @@ mod phase1 {
}
}

impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const A: bool>
TemporalJoinExecutor<K, S, T, A>
impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: bool>
TemporalJoinExecutor<K, S, T, APPEND_ONLY>
{
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand Down Expand Up @@ -698,7 +698,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const A: bool>
let full_schema = full_schema.clone();

if T == JoinType::Inner {
let st1 = phase1::handle_chunk::<K, S, phase1::Inner, A>(
let st1 = phase1::handle_chunk::<K, S, phase1::Inner, APPEND_ONLY>(
self.chunk_size,
right_size,
full_schema,
Expand Down Expand Up @@ -731,19 +731,20 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const A: bool>
}
} else if let Some(ref cond) = self.condition {
// Joined result without evaluating non-lookup conditions.
let st1 = phase1::handle_chunk::<K, S, phase1::LeftOuterWithCond, A>(
self.chunk_size,
right_size,
full_schema,
epoch,
&self.left_join_keys,
&mut self.right_table,
&memo_table_lookup_prefix,
&mut self.memo_table,
&null_matched,
chunk,
&self.metrics,
);
let st1 =
phase1::handle_chunk::<K, S, phase1::LeftOuterWithCond, APPEND_ONLY>(
self.chunk_size,
right_size,
full_schema,
epoch,
&self.left_join_keys,
&mut self.right_table,
&memo_table_lookup_prefix,
&mut self.memo_table,
&null_matched,
chunk,
&self.metrics,
);
let mut matched_count = 0usize;
#[for_await]
for chunk in st1 {
Expand Down Expand Up @@ -783,7 +784,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const A: bool>
// The last row should always be marker row,
assert_eq!(matched_count, 0);
} else {
let st1 = phase1::handle_chunk::<K, S, phase1::LeftOuter, A>(
let st1 = phase1::handle_chunk::<K, S, phase1::LeftOuter, APPEND_ONLY>(
self.chunk_size,
right_size,
full_schema,
Expand All @@ -805,7 +806,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const A: bool>
}
}
InternalMessage::Barrier(updates, barrier) => {
if !A {
if !APPEND_ONLY {
if wait_first_barrier {
wait_first_barrier = false;
self.memo_table.as_mut().unwrap().init_epoch(barrier.epoch);
Expand Down Expand Up @@ -837,8 +838,8 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const A: bool>
}
}

impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const A: bool> Execute
for TemporalJoinExecutor<K, S, T, A>
impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: bool> Execute
for TemporalJoinExecutor<K, S, T, APPEND_ONLY>
{
fn execute(self: Box<Self>) -> super::BoxedMessageStream {
self.into_stream().boxed()
Expand Down

0 comments on commit 2c6f7c3

Please sign in to comment.