Skip to content

Commit

Permalink
fix(meta): only return for background sink once initial barrier colle…
Browse files Browse the repository at this point in the history
…cted (#16367)
  • Loading branch information
kwannoel authored Apr 18, 2024
1 parent ebf0104 commit 4aed67f
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 8 deletions.
9 changes: 6 additions & 3 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,8 +774,11 @@ impl DdlController {
}
};

match create_type {
CreateType::Foreground | CreateType::Unspecified => {
match (create_type, &stream_job) {
(CreateType::Foreground, _)
| (CreateType::Unspecified, _)
// FIXME(kwannoel): Unify background stream's creation path with MV below.
| (CreateType::Background, &StreamingJob::Sink(_, _)) => {
self.create_streaming_job_inner(
mgr,
stream_job,
Expand All @@ -785,7 +788,7 @@ impl DdlController {
)
.await
}
CreateType::Background => {
(CreateType::Background, _) => {
let ctrl = self.clone();
let mgr = mgr.clone();
let stream_job_id = stream_job.id();
Expand Down
9 changes: 6 additions & 3 deletions src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,11 @@ impl DdlController {

// create streaming jobs.
let stream_job_id = streaming_job.id();
match streaming_job.create_type() {
CreateType::Unspecified | CreateType::Foreground => {
match (streaming_job.create_type(), streaming_job) {
(CreateType::Unspecified, _)
| (CreateType::Foreground, _)
// FIXME(kwannoel): Unify background stream's creation path with MV below.
| (CreateType::Background, StreamingJob::Sink(_, _)) => {
let replace_table_job_info = ctx.replace_table_job_info.as_ref().map(
|(streaming_job, ctx, table_fragments)| {
(
Expand Down Expand Up @@ -241,7 +244,7 @@ impl DdlController {

Ok(version)
}
CreateType::Background => {
(CreateType::Background, _) => {
let ctrl = self.clone();
let mgr = mgr.clone();
let fut = async move {
Expand Down
11 changes: 9 additions & 2 deletions src/tests/simulation/src/slt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ pub async fn run_slt_task(
// NOTE(kwannoel): For background ddl
let mut background_ddl_enabled = false;

// If background ddl is set to true within the test case, prevent random setting of background_ddl to true.
// We can revert it back to false only if we encounter a record that sets background_ddl to false.
let mut manual_background_ddl_enabled = false;

for record in sqllogictest::parse_file(path).expect("failed to parse file") {
// uncomment to print metrics for task counts
// let metrics = madsim::runtime::Handle::current().metrics();
Expand Down Expand Up @@ -254,8 +258,10 @@ pub async fn run_slt_task(
};
tracing::debug!(?cmd, "Running");

if matches!(cmd, SqlCmd::SetBackgroundDdl { .. }) && background_ddl_rate > 0.0 {
panic!("We cannot run background_ddl statement with background_ddl_rate > 0.0, since it could be reset");
if background_ddl_rate > 0.0
&& let SqlCmd::SetBackgroundDdl { enable } = cmd
{
manual_background_ddl_enabled = enable;
}

// For each background ddl compatible statement, provide a chance for background_ddl=true.
Expand All @@ -266,6 +272,7 @@ pub async fn run_slt_task(
..
} = &record
&& matches!(cmd, SqlCmd::CreateMaterializedView { .. })
&& !manual_background_ddl_enabled
{
let background_ddl_setting = rng.gen_bool(background_ddl_rate);
let set_background_ddl = Record::Statement {
Expand Down

0 comments on commit 4aed67f

Please sign in to comment.