Skip to content

Commit

Permalink
cherry pick
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Jun 13, 2024
1 parent 71ede25 commit 2d58d2e
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
stream_plan: |-
StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version2.k(hidden)], stream_key: [stream._row_id, k], pk_columns: [stream._row_id, k], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(stream.k, stream._row_id) }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] }
└─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] }
├─StreamExchange { dist: HashShard(stream.k) }
│ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] }
│ ├─StreamExchange { dist: HashShard(stream.k) }
Expand All @@ -133,7 +133,7 @@
stream_plan: |-
StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version2.id2(hidden)], stream_key: [stream._row_id, id1, id2], pk_columns: [stream._row_id, id1, id2], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(stream.id1, stream.id2, stream._row_id) }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] }
└─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] }
├─StreamExchange { dist: HashShard(stream.id2) }
│ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] }
│ ├─StreamExchange { dist: HashShard(stream.id1) }
Expand All @@ -155,7 +155,7 @@
stream_plan: |-
StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version2.id2(hidden)], stream_key: [stream._row_id, id1, id2], pk_columns: [stream._row_id, id1, id2], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(stream.id1, stream.id2, stream._row_id) }
└─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] }
└─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] }
├─StreamExchange { dist: HashShard(stream.id2) }
│ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] }
│ ├─StreamExchange { dist: HashShard(stream.id1) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl StreamTemporalJoin {
let base = PlanBase::new_stream_with_core(
&core,
dist,
true,
append_only,
false, // TODO(rc): derive EOWC property from input
watermark_columns,
);
Expand Down
17 changes: 9 additions & 8 deletions src/stream/src/executor/temporal_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,8 @@ mod phase1 {
}
}

impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const A: bool>
TemporalJoinExecutor<K, S, T, A>
impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: bool>
TemporalJoinExecutor<K, S, T, APPEND_ONLY>
{
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand Down Expand Up @@ -709,7 +709,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const A: bool>
let full_schema = full_schema.clone();

if T == JoinType::Inner {
let st1 = phase1::handle_chunk::<K, S, phase1::Inner, A>(
let st1 = phase1::handle_chunk::<K, S, phase1::Inner, APPEND_ONLY>(
self.chunk_size,
right_size,
full_schema,
Expand Down Expand Up @@ -741,7 +741,8 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const A: bool>
}
} else if let Some(ref cond) = self.condition {
// Joined result without evaluating non-lookup conditions.
let st1 = phase1::handle_chunk::<K, S, phase1::LeftOuterWithCond, A>(
let st1 =
phase1::handle_chunk::<K, S, phase1::LeftOuterWithCond, APPEND_ONLY>(
self.chunk_size,
right_size,
full_schema,
Expand Down Expand Up @@ -792,7 +793,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const A: bool>
// The last row should always be marker row,
assert_eq!(matched_count, 0);
} else {
let st1 = phase1::handle_chunk::<K, S, phase1::LeftOuter, A>(
let st1 = phase1::handle_chunk::<K, S, phase1::LeftOuter, APPEND_ONLY>(
self.chunk_size,
right_size,
full_schema,
Expand All @@ -813,7 +814,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const A: bool>
}
}
InternalMessage::Barrier(updates, barrier) => {
if !A {
if !APPEND_ONLY {
if wait_first_barrier {
wait_first_barrier = false;
self.memo_table.as_mut().unwrap().init_epoch(barrier.epoch);
Expand Down Expand Up @@ -846,8 +847,8 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const A: bool>
}
}

impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const A: bool> Execute
for TemporalJoinExecutor<K, S, T, A>
impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: bool> Execute
for TemporalJoinExecutor<K, S, T, APPEND_ONLY>
{
fn execute(self: Box<Self>) -> super::BoxedMessageStream {
self.into_stream().boxed()
Expand Down

0 comments on commit 2d58d2e

Please sign in to comment.