Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): enable background ddl for StreamNow, StreamSource (#19099) #19116

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions e2e_test/background_ddl/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,48 @@ DROP MATERIALIZED VIEW m2;
statement ok
DROP TABLE t;

################### Test stream now

statement ok
create table t (v1 timestamp with time zone, v2 timestamp with time zone);

statement ok
insert into t select to_timestamp(x) as z, to_timestamp(x) as y from generate_series(1, 1000) t(x);

statement ok
set backfill_rate_limit=1;

statement ok
create materialized view m1 as select * from t where v1 >= now() and v2 >= now();

sleep 2s

query I
select count(*) from rw_catalog.rw_ddl_progress;
----
1

statement ok
recover;

sleep 5s

query I
select count(*) from rw_catalog.rw_ddl_progress;
----
1

statement ok
drop materialized view m1;

query I
select count(*) from rw_catalog.rw_ddl_progress;
----
0

statement ok
drop table t;

statement ok
SET BACKGROUND_DDL=false;

Expand Down
130 changes: 130 additions & 0 deletions e2e_test/source_inline/kafka/background_ddl/basic.slt.serial
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
control substitution on

statement ok
SET streaming_use_shared_source TO false;

############## Create kafka seed data

statement ok
create table kafka_seed_data (v1 int);

statement ok
insert into kafka_seed_data select * from generate_series(1, 1000);

statement ok
create sink kafka_sink
from
kafka_seed_data with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit',
type = 'append-only',
force_append_only='true'
);

# Wait for the topic to create
skipif in-memory
sleep 5s

############## Create kafka source

statement ok
create source kafka_source (v1 int) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit',
scan.startup.mode = 'earliest',
source_rate_limit = 1,
) FORMAT PLAIN ENCODE JSON

statement ok
flush;

############## Create background mv with only source upstream

statement ok
set background_ddl = true;

statement ok
create materialized view rl_mv1 as select count(*) from kafka_source;

statement ok
wait

statement ok
select * from rl_mv1;

statement ok
drop materialized view rl_mv1;

############## Create background mv joined with another mv

statement ok
set streaming_parallelism=1;

statement ok
create table t1(v1 int);

statement ok
insert into t1 select * from generate_series(1, 10);

statement ok
flush;

statement ok
set backfill_rate_limit = 1;

statement ok
set background_ddl=true;

statement ok
create materialized view rl_mv2 as select kafka_source.v1 from kafka_source join t1 on kafka_source.v1 = t1.v1;

query I
select count(*) from rw_catalog.rw_ddl_progress;
----
1

sleep 1s

statement ok
recover

sleep 5s

query I
select count(*) from rw_catalog.rw_ddl_progress;
----
1

statement ok
wait

query I
select count(*) from rw_catalog.rw_ddl_progress;
----
0

statement ok
drop materialized view rl_mv2;

statement ok
drop table t1;

############## Cleanup

statement ok
drop source kafka_source;

statement ok
drop sink kafka_sink;

statement ok
drop table kafka_seed_data;

statement ok
set background_ddl=false;

statement ok
SET streaming_use_shared_source TO true;

statement ok
set streaming_parallelism=default;
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
use crate::error::Result;
use crate::optimizer::plan_node::derive::derive_pk;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::plan_has_backfill_leaf_nodes;
use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -88,7 +88,7 @@ impl StreamMaterialize {

let create_type = if matches!(table_type, TableType::MaterializedView)
&& input.ctx().session_ctx().config().background_ddl()
&& plan_has_backfill_leaf_nodes(&input)
&& plan_can_use_background_ddl(&input)
{
CreateType::Background
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use super::{generic, ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProjec
use crate::error::{ErrorCode, Result};
use crate::expr::{ExprImpl, FunctionCall, InputRef};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::plan_has_backfill_leaf_nodes;
use crate::optimizer::plan_node::utils::plan_can_use_background_ddl;
use crate::optimizer::plan_node::PlanTreeNodeUnary;
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -380,7 +380,7 @@ impl StreamSink {
let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?;
let distribution_key = input.distribution().dist_column_indices().to_vec();
let create_type = if input.ctx().session_ctx().config().background_ddl()
&& plan_has_backfill_leaf_nodes(&input)
&& plan_can_use_background_ddl(&input)
{
CreateType::Background
} else {
Expand Down
12 changes: 7 additions & 5 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,19 +374,21 @@ pub fn infer_kv_log_store_table_catalog_inner(
table_catalog_builder.build(dist_key, read_prefix_len_hint)
}

/// Check that all leaf nodes must be stream table scan,
/// since that plan node maps to `backfill` executor, which supports recovery.
pub(crate) fn plan_has_backfill_leaf_nodes(plan: &PlanRef) -> bool {
/// Some other leaf nodes like `StreamValues` do not support recovery, and they
/// cannot use background ddl.
pub(crate) fn plan_can_use_background_ddl(plan: &PlanRef) -> bool {
if plan.inputs().is_empty() {
if let Some(scan) = plan.as_stream_table_scan() {
if plan.as_stream_now().is_some() || plan.as_stream_source().is_some() {
true
} else if let Some(scan) = plan.as_stream_table_scan() {
scan.stream_scan_type() == StreamScanType::Backfill
|| scan.stream_scan_type() == StreamScanType::ArrangementBackfill
} else {
false
}
} else {
assert!(!plan.inputs().is_empty());
plan.inputs().iter().all(plan_has_backfill_leaf_nodes)
plan.inputs().iter().all(plan_can_use_background_ddl)
}
}

Expand Down
Loading