Skip to content

Commit

Permalink
fix(stream): left temporal join handle non-matched rows incorrectly (#…
Browse files Browse the repository at this point in the history
…15327)

Signed-off-by: TennyZhuang <[email protected]>
  • Loading branch information
TennyZhuang authored Feb 29, 2024
1 parent 0c2c2b9 commit 16e3685
Show file tree
Hide file tree
Showing 5 changed files with 469 additions and 113 deletions.
31 changes: 31 additions & 0 deletions e2e_test/streaming/temporal_join/issue_15257.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 and a1 > a2

statement ok
insert into version values(1, 11, 111);

statement ok
insert into stream values(1, 10, 111);

query IIII rowsort
select * from v;
----
1 10 NULL NULL

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;
109 changes: 109 additions & 0 deletions e2e_test/streaming/temporal_join/temporal_join_multiple_rows.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# The suite tests the cases that multiple rows are matched.

statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(a1 int, b1 int) APPEND ONLY;

statement ok
create table version(a2 int, b2 int);

statement ok
create index idx on version (a2);

statement ok
create materialized view v as
select a1, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2;

statement ok
insert into stream values
(1,1)
,(2,1)
;


statement ok
insert into version values
(1,1)
,(1,2)
,(1,3)
;

statement ok
insert into stream values
(1,1)
,(2,1)
;

query II rowsort
select a1, a2 from v;
----
1 1
1 1
1 1
1 NULL
2 NULL
2 NULL

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;

# Test non equal conditions

statement ok
create table stream(a1 int, b1 int) APPEND ONLY;

statement ok
create table version(a2 int, b2 int);

statement ok
create index idx on version (a2);

statement ok
create materialized view v as
select a1, a2, b2
from stream left join version FOR SYSTEM_TIME AS OF PROCTIME()
on a1 = a2 and b1 > b2;

statement ok
insert into version values
(1,1)
,(1,2)
,(1,3)
;

statement ok
insert into stream values
(1,0)
,(1,3)
,(1,6)
,(2,1)
;


query III rowsort
select a1, a2, b2 from v;
----
1 1 1
1 1 1
1 1 2
1 1 2
1 1 3
1 NULL NULL
2 NULL NULL

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;
29 changes: 29 additions & 0 deletions e2e_test/streaming/temporal_join/temporal_join_non_loopup_cond.slt
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,32 @@ drop table stream;

statement ok
drop table version;

statement ok
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create materialized view v as select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 and a1 > a2;

statement ok
insert into version values (1, 12, 111);

statement ok
insert into stream values (1, 11, 111), (1, 13, 111);

query IIII rowsort
select * from v;
----
1 13 1 12

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;
28 changes: 26 additions & 2 deletions src/common/src/array/stream_chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use crate::array::stream_record::Record;
use crate::array::{ArrayBuilderImpl, Op, StreamChunk};
use crate::buffer::BitmapBuilder;
use crate::row::Row;
use crate::types::{DataType, DatumRef};
use crate::util::iter_util::ZipEqFast;
Expand All @@ -26,6 +27,9 @@ pub struct StreamChunkBuilder {
/// arrays in the data chunk to build
column_builders: Vec<ArrayBuilderImpl>,

/// Visibility
vis_builder: BitmapBuilder,

/// Data types of columns
data_types: Vec<DataType>,

Expand Down Expand Up @@ -61,6 +65,7 @@ impl StreamChunkBuilder {
ops,
column_builders,
data_types,
vis_builder: BitmapBuilder::default(),
capacity: chunk_size,
size: 0,
}
Expand Down Expand Up @@ -93,18 +98,34 @@ impl StreamChunkBuilder {
&mut self,
op: Op,
iter: impl IntoIterator<Item = (usize, DatumRef<'a>)>,
) -> Option<StreamChunk> {
self.append_iter_inner::<true>(op, iter)
}

#[must_use]
fn append_iter_inner<'a, const VIS: bool>(
&mut self,
op: Op,
iter: impl IntoIterator<Item = (usize, DatumRef<'a>)>,
) -> Option<StreamChunk> {
self.ops.push(op);
for (i, datum) in iter {
self.column_builders[i].append(datum);
}
self.vis_builder.append(VIS);
self.inc_size()
}

/// Append a row to the builder, return a chunk if the builder is full.
#[must_use]
pub fn append_row(&mut self, op: Op, row: impl Row) -> Option<StreamChunk> {
self.append_iter(op, row.iter().enumerate())
self.append_iter_inner::<true>(op, row.iter().enumerate())
}

/// Append an invisible row to the builder, return a chunk if the builder is full.
#[must_use]
pub fn append_row_invisible(&mut self, op: Op, row: impl Row) -> Option<StreamChunk> {
self.append_iter_inner::<false>(op, row.iter().enumerate())
}

/// Append a record to the builder, return a chunk if the builder is full.
Expand Down Expand Up @@ -138,9 +159,12 @@ impl StreamChunkBuilder {
.map(Into::into)
.collect::<Vec<_>>();

Some(StreamChunk::new(
let vis = std::mem::take(&mut self.vis_builder).finish();

Some(StreamChunk::with_visibility(
std::mem::replace(&mut self.ops, Vec::with_capacity(self.capacity)),
new_columns,
vis,
))
}
}
Loading

0 comments on commit 16e3685

Please sign in to comment.