Skip to content

Commit

Permalink
feat(stream): enable background ddl for StreamNow, StreamSource (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored and kwannoel committed Oct 24, 2024
1 parent 3f48242 commit 8ddc7e9
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 7 deletions.
42 changes: 42 additions & 0 deletions e2e_test/background_ddl/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,48 @@ DROP MATERIALIZED VIEW m2;
statement ok
DROP TABLE t;

################### Test stream now

statement ok
create table t (v1 timestamp with time zone, v2 timestamp with time zone);

statement ok
insert into t select to_timestamp(x) as z, to_timestamp(x) as y from generate_series(1, 1000) t(x);

statement ok
set backfill_rate_limit=1;

statement ok
create materialized view m1 as select * from t where v1 >= now() and v2 >= now();

sleep 2s

query I
select count(*) from rw_catalog.rw_ddl_progress;
----
1

statement ok
recover;

sleep 5s

query I
select count(*) from rw_catalog.rw_ddl_progress;
----
1

statement ok
drop materialized view m1;

query I
select count(*) from rw_catalog.rw_ddl_progress;
----
0

statement ok
drop table t;

statement ok
SET BACKGROUND_DDL=false;

Expand Down
130 changes: 130 additions & 0 deletions e2e_test/source_inline/kafka/background_ddl/basic.slt.serial
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
control substitution on

statement ok
SET streaming_use_shared_source TO false;

############## Create kafka seed data

statement ok
create table kafka_seed_data (v1 int);

statement ok
insert into kafka_seed_data select * from generate_series(1, 1000);

statement ok
create sink kafka_sink
from
kafka_seed_data with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit',
type = 'append-only',
force_append_only='true'
);

# Wait for the topic to create
skipif in-memory
sleep 5s

############## Create kafka source

statement ok
create source kafka_source (v1 int) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit',
scan.startup.mode = 'earliest',
source_rate_limit = 1,
) FORMAT PLAIN ENCODE JSON

statement ok
flush;

############## Create background mv with only source upstream

statement ok
set background_ddl = true;

statement ok
create materialized view rl_mv1 as select count(*) from kafka_source;

statement ok
wait

statement ok
select * from rl_mv1;

statement ok
drop materialized view rl_mv1;

############## Create background mv joined with another mv

statement ok
set streaming_parallelism=1;

statement ok
create table t1(v1 int);

statement ok
insert into t1 select * from generate_series(1, 10);

statement ok
flush;

statement ok
set backfill_rate_limit = 1;

statement ok
set background_ddl=true;

statement ok
create materialized view rl_mv2 as select kafka_source.v1 from kafka_source join t1 on kafka_source.v1 = t1.v1;

query I
select count(*) from rw_catalog.rw_ddl_progress;
----
1

sleep 1s

statement ok
recover

sleep 5s

query I
select count(*) from rw_catalog.rw_ddl_progress;
----
1

statement ok
wait

query I
select count(*) from rw_catalog.rw_ddl_progress;
----
0

statement ok
drop materialized view rl_mv2;

statement ok
drop table t1;

############## Cleanup

statement ok
drop source kafka_source;

statement ok
drop sink kafka_sink;

statement ok
drop table kafka_seed_data;

statement ok
set background_ddl=false;

statement ok
SET streaming_use_shared_source TO true;

statement ok
set streaming_parallelism=default;
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
use crate::error::Result;
use crate::optimizer::plan_node::derive::derive_pk;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::plan_can_use_backgronud_ddl;
use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -89,7 +89,7 @@ impl StreamMaterialize {

let create_type = if matches!(table_type, TableType::MaterializedView)
&& input.ctx().session_ctx().config().background_ddl()
&& plan_can_use_backgronud_ddl(&input)
&& plan_can_use_background_ddl(&input)
{
CreateType::Background
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use super::{generic, ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProjec
use crate::error::{ErrorCode, Result};
use crate::expr::{ExprImpl, FunctionCall, InputRef};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::plan_can_use_backgronud_ddl;
use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
use crate::optimizer::plan_node::PlanTreeNodeUnary;
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -385,7 +385,7 @@ impl StreamSink {
let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?;
let distribution_key = input.distribution().dist_column_indices().to_vec();
let create_type = if input.ctx().session_ctx().config().background_ddl()
&& plan_can_use_backgronud_ddl(&input)
&& plan_can_use_background_ddl(&input)
{
CreateType::Background
} else {
Expand Down
9 changes: 6 additions & 3 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,12 @@ pub fn infer_kv_log_store_table_catalog_inner(
/// since that plan node maps to `backfill` executor, which supports recovery.
/// Some other leaf nodes like `StreamValues` do not support recovery, and they
/// cannot use background ddl.
pub(crate) fn plan_can_use_backgronud_ddl(plan: &PlanRef) -> bool {
pub(crate) fn plan_can_use_background_ddl(plan: &PlanRef) -> bool {
if plan.inputs().is_empty() {
if plan.as_stream_source_scan().is_some() {
if plan.as_stream_source_scan().is_some()
|| plan.as_stream_now().is_some()
|| plan.as_stream_source().is_some()
{
true
} else if let Some(scan) = plan.as_stream_table_scan() {
scan.stream_scan_type() == StreamScanType::Backfill
Expand All @@ -392,7 +395,7 @@ pub(crate) fn plan_can_use_backgronud_ddl(plan: &PlanRef) -> bool {
}
} else {
assert!(!plan.inputs().is_empty());
plan.inputs().iter().all(plan_can_use_backgronud_ddl)
plan.inputs().iter().all(plan_can_use_background_ddl)
}
}

Expand Down

0 comments on commit 8ddc7e9

Please sign in to comment.