Skip to content

Commit

Permalink
fix and add e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Aug 2, 2024
1 parent 84280d8 commit 5bfae5e
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 7 deletions.
12 changes: 12 additions & 0 deletions e2e_test/backfill/snapshot_backfill/check_data_equal.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# the result from batch query and materialized view should be the same
query T
select * from v
except
select * from mv;
----

query T
select * from mv
except
select * from v;
----
51 changes: 51 additions & 0 deletions e2e_test/backfill/snapshot_backfill/create_nexmark_table.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
statement ok
CREATE table person (
"id" BIGINT,
"name" VARCHAR,
"email_address" VARCHAR,
"credit_card" VARCHAR,
"city" VARCHAR,
"state" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR
) with (
connector = 'nexmark',
nexmark.table.type = 'Person',
nexmark.split.num = '8',
nexmark.min.event.gap.in.ns = '10000'
);

statement ok
CREATE table auction (
"id" BIGINT,
"item_name" VARCHAR,
"description" VARCHAR,
"initial_bid" BIGINT,
"reserve" BIGINT,
"date_time" TIMESTAMP,
"expires" TIMESTAMP,
"seller" BIGINT,
"category" BIGINT,
"extra" VARCHAR
) with (
connector = 'nexmark',
nexmark.table.type = 'Auction',
nexmark.split.num = '8',
nexmark.min.event.gap.in.ns = '10000'
);

statement ok
CREATE table bid (
"auction" BIGINT,
"bidder" BIGINT,
"price" BIGINT,
"channel" VARCHAR,
"url" VARCHAR,
"date_time" TIMESTAMP,
"extra" VARCHAR
) with (
connector = 'nexmark',
nexmark.table.type = 'Bid',
nexmark.split.num = '8',
nexmark.min.event.gap.in.ns = '10000'
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
statement ok
drop table bid;

statement ok
drop table auction;

statement ok
drop table person;
38 changes: 38 additions & 0 deletions e2e_test/backfill/snapshot_backfill/nexmark_q3.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
include ./create_nexmark_table.slt.part

sleep 10s

statement ok
set backfill_rate_limit = 200;

statement ok
set streaming_use_snapshot_backfill = true;

statement ok
create view v as
SELECT P.name,
P.city,
P.state,
A.id
FROM auction AS A
INNER JOIN person AS P on A.seller = P.id
WHERE A.category = 10
and (P.state = 'or' OR P.state = 'id' OR P.state = 'ca');

statement ok
create materialized view mv as select * from v;

include ./check_data_equal.slt.part

sleep 3s

include ./check_data_equal.slt.part


statement ok
drop materialized view mv;

statement ok
drop view v;

include ./drop_nexmark_table.slt.part
15 changes: 12 additions & 3 deletions src/meta/src/barrier/creating_job_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,17 @@ impl CreatingStreamingJobControl {
Some(self.backfill_epoch.0)
}
}
CreatingStreamingJobStatus::Finishing(_) => None,
CreatingStreamingJobStatus::Finished(_) => None,
CreatingStreamingJobStatus::Finishing(_) | CreatingStreamingJobStatus::Finished(_) => {
if self.inflight_barrier_queue.is_empty() {
None
} else if let Some((latest_collected_epoch, _)) = self.collected_barrier.last()
&& *latest_collected_epoch > self.backfill_epoch.0
{
Some(*latest_collected_epoch)
} else {
Some(self.backfill_epoch.0)
}
}
}
}

Expand Down Expand Up @@ -389,7 +398,7 @@ impl CreatingStreamingJobControl {

pub(super) fn should_finish(&self) -> Option<HashMap<FragmentId, InflightFragmentInfo>> {
if let CreatingStreamingJobStatus::ConsumingLogStore { fragment_info } = &self.status {
let temp = 0;
// TODO: should have a new policy before merged
let len = self
.collected_barrier
.iter()
Expand Down
1 change: 0 additions & 1 deletion src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ impl InflightActorInfo {
for (actor_id, node_id) in &info.actors {
assert!(to_add.insert(*actor_id, *node_id).is_none());
}
assert!(self.fragment_infos.insert(fragment_id, info).is_none());
}
CommandFragmentChanges::Reschedule { new_actors, .. } => {
let info = self
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/backfill/snapshot_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,13 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
while let Some(barrier) = upstream_buffer.take_buffered_barrier().await? {
let recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
assert_eq!(barrier.epoch, recv_barrier.epoch);
assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
barrier_epoch = barrier.epoch;

debug!(?barrier_epoch, kind = ?barrier.kind, "before consume change log");
let stream = self
.upstream_table
.batch_iter_log_with_pk_bounds(barrier_epoch.curr, barrier_epoch.curr)
.batch_iter_log_with_pk_bounds(barrier_epoch.prev, barrier_epoch.prev)
.await?;
let data_types = self.upstream_table.schema().data_types();
let builder = create_builder(None, self.chunk_size, data_types);
Expand All @@ -208,8 +210,6 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {

debug!(?barrier_epoch, "after consume change log");

barrier_epoch = barrier.epoch;

yield Message::Barrier(barrier);
}
} else {
Expand Down

0 comments on commit 5bfae5e

Please sign in to comment.