Skip to content

Commit

Permalink
feat(frontend): support background ddl for materialized views (#12355)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Sep 19, 2023
1 parent 588bb80 commit 161b9b0
Show file tree
Hide file tree
Showing 11 changed files with 220 additions and 26 deletions.
1 change: 1 addition & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ echo "--- e2e, $mode, batch"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
cluster_start
sqllogictest -p 4566 -d dev './e2e_test/ddl/**/*.slt' --junit "batch-ddl-${profile}"
sqllogictest -p 4566 -d dev './e2e_test/background_ddl/**/*.slt' --junit "batch-ddl-${profile}"
sqllogictest -p 4566 -d dev './e2e_test/visibility_mode/*.slt' --junit "batch-${profile}"
sqllogictest -p 4566 -d dev './e2e_test/database/prepare.slt'
sqllogictest -p 4566 -d test './e2e_test/database/test.slt'
Expand Down
73 changes: 73 additions & 0 deletions e2e_test/background_ddl/basic.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
statement ok
SET BACKGROUND_DDL=true;

statement ok
ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 4;

statement ok
CREATE TABLE t (v1 int);

statement ok
INSERT INTO t select * from generate_series(1, 500000);

statement ok
FLUSH;

statement ok
CREATE MATERIALIZED VIEW m1 as SELECT * FROM t;

statement ok
CREATE MATERIALIZED VIEW m2 as SELECT * FROM t;

statement ok
CREATE MATERIALIZED VIEW m3 as SELECT * FROM t;

sleep 3s

query I
select count(*) from rw_catalog.rw_ddl_progress;
----
3

statement error
SELECT * FROM m1;

# Meta should always reject duplicate mview.
statement error
CREATE MATERIALIZED VIEW m3 as SELECT * FROM t;

# Wait for background ddl to finish
sleep 30s

query I
select count(*) from m1;
----
500000

query I
select count(*) from m2;
----
500000

query I
select count(*) from m3;
----
500000

statement ok
DROP MATERIALIZED VIEW m1;

statement ok
DROP MATERIALIZED VIEW m2;

statement ok
DROP MATERIALIZED VIEW m3;

statement ok
DROP TABLE t;

statement ok
SET BACKGROUND_DDL=false;

statement ok
ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 1;
7 changes: 7 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,16 @@ message DropSinkResponse {
uint64 version = 2;
}

enum StreamJobExecutionMode {
STREAM_JOB_EXECUTION_MODE_UNSPECIFIED = 0;
BACKGROUND = 1;
FOREGROUND = 2;
}

message CreateMaterializedViewRequest {
catalog.Table materialized_view = 1;
stream_plan.StreamFragmentGraph fragment_graph = 2;
StreamJobExecutionMode stream_job_execution_mode = 3;
}

message CreateMaterializedViewResponse {
Expand Down
20 changes: 19 additions & 1 deletion src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::util::epoch::Epoch;

// This is a hack, &'static str is not allowed as a const generics argument.
// TODO: refine this using the adt_const_params feature.
const CONFIG_KEYS: [&str; 37] = [
const CONFIG_KEYS: [&str; 38] = [
"RW_IMPLICIT_FLUSH",
"CREATE_COMPACTION_GROUP_FOR_MV",
"QUERY_MODE",
Expand Down Expand Up @@ -74,6 +74,7 @@ const CONFIG_KEYS: [&str; 37] = [
"STREAMING_RATE_LIMIT",
"CDC_BACKFILL",
"RW_STREAMING_OVER_WINDOW_CACHE_POLICY",
"BACKGROUND_DDL",
];

// MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] =
Expand Down Expand Up @@ -115,6 +116,7 @@ const STANDARD_CONFORMING_STRINGS: usize = 33;
const STREAMING_RATE_LIMIT: usize = 34;
const CDC_BACKFILL: usize = 35;
const STREAMING_OVER_WINDOW_CACHE_POLICY: usize = 36;
const BACKGROUND_DDL: usize = 37;

trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> {
fn entry_name() -> &'static str;
Expand Down Expand Up @@ -339,6 +341,7 @@ type RowSecurity = ConfigBool<ROW_SECURITY, true>;
type StandardConformingStrings = ConfigString<STANDARD_CONFORMING_STRINGS>;
type StreamingRateLimit = ConfigU64<STREAMING_RATE_LIMIT, 0>;
type CdcBackfill = ConfigBool<CDC_BACKFILL, false>;
type BackgroundDdl = ConfigBool<BACKGROUND_DDL, false>;

/// Report status or notice to caller.
pub trait ConfigReporter {
Expand Down Expand Up @@ -486,6 +489,8 @@ pub struct ConfigMap {
/// Cache policy for partition cache in streaming over window.
/// Can be "full", "recent", "recent_first_n" or "recent_last_n".
streaming_over_window_cache_policy: OverWindowCachePolicy,

background_ddl: BackgroundDdl,
}

impl ConfigMap {
Expand Down Expand Up @@ -603,6 +608,8 @@ impl ConfigMap {
self.cdc_backfill = val.as_slice().try_into()?
} else if key.eq_ignore_ascii_case(OverWindowCachePolicy::entry_name()) {
self.streaming_over_window_cache_policy = val.as_slice().try_into()?;
} else if key.eq_ignore_ascii_case(BackgroundDdl::entry_name()) {
self.background_ddl = val.as_slice().try_into()?;
} else {
return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into());
}
Expand Down Expand Up @@ -690,6 +697,8 @@ impl ConfigMap {
Ok(self.cdc_backfill.to_string())
} else if key.eq_ignore_ascii_case(OverWindowCachePolicy::entry_name()) {
Ok(self.streaming_over_window_cache_policy.to_string())
} else if key.eq_ignore_ascii_case(BackgroundDdl::entry_name()) {
Ok(self.background_ddl.to_string())
} else {
Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into())
}
Expand Down Expand Up @@ -882,6 +891,11 @@ impl ConfigMap {
setting: self.streaming_over_window_cache_policy.to_string(),
description: String::from(r#"Cache policy for partition cache in streaming over window. Can be "full", "recent", "recent_first_n" or "recent_last_n"."#),
},
VariableInfo{
name: BackgroundDdl::entry_name().to_lowercase(),
setting: self.background_ddl.to_string(),
description: String::from("Run DDL statements in background"),
},
]
}

Expand Down Expand Up @@ -1020,4 +1034,8 @@ impl ConfigMap {
pub fn get_streaming_over_window_cache_policy(&self) -> OverWindowCachePolicy {
self.streaming_over_window_cache_policy
}

pub fn get_background_ddl(&self) -> bool {
self.background_ddl.0
}
}
14 changes: 11 additions & 3 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_pb::catalog::{
PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
};
use risingwave_pb::ddl_service::alter_relation_name_request::Relation;
use risingwave_pb::ddl_service::create_connection_request;
use risingwave_pb::ddl_service::{create_connection_request, StreamJobExecutionMode};
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_rpc_client::MetaClient;
use tokio::sync::watch::Receiver;
Expand Down Expand Up @@ -70,6 +70,7 @@ pub trait CatalogWriter: Send + Sync {
&self,
table: PbTable,
graph: StreamFragmentGraph,
stream_job_execution_mode: StreamJobExecutionMode,
) -> Result<()>;

async fn create_table(
Expand Down Expand Up @@ -190,12 +191,19 @@ impl CatalogWriter for CatalogWriterImpl {
&self,
table: PbTable,
graph: StreamFragmentGraph,
stream_job_execution_mode: StreamJobExecutionMode,
) -> Result<()> {
let (_, version) = self
.meta_client
.create_materialized_view(table, graph)
.create_materialized_view(table, graph, stream_job_execution_mode)
.await?;
self.wait_version(version).await
if matches!(
stream_job_execution_mode,
StreamJobExecutionMode::Foreground | StreamJobExecutionMode::Unspecified
) {
self.wait_version(version).await?
}
Ok(())
}

async fn create_view(&self, view: PbView) -> Result<()> {
Expand Down
12 changes: 11 additions & 1 deletion src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::catalog::PbTable;
use risingwave_pb::ddl_service::StreamJobExecutionMode;
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::user::grant_privilege::Action;
use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
Expand Down Expand Up @@ -188,6 +189,7 @@ It only indicates the physical clustering of the data, which may improve the per
(table, graph)
};

// Ensure writes to `StreamJobTracker` are atomic.
let _job_guard =
session
.env()
Expand All @@ -199,9 +201,17 @@ It only indicates the physical clustering of the data, which may improve the per
table.name.clone(),
));

let run_in_background = session.config().get_background_ddl();
let stream_job_execution_mode = if run_in_background {
StreamJobExecutionMode::Background
} else {
StreamJobExecutionMode::Foreground
};

let session = session.clone();
let catalog_writer = session.catalog_writer()?;
catalog_writer
.create_materialized_view(table, graph)
.create_materialized_view(table, graph, stream_job_execution_mode)
.await?;

Ok(PgResponse::empty_result(
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{
PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, Table,
};
use risingwave_pb::ddl_service::{create_connection_request, DdlProgress};
use risingwave_pb::ddl_service::{create_connection_request, DdlProgress, StreamJobExecutionMode};
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta,
Expand Down Expand Up @@ -235,6 +235,7 @@ impl CatalogWriter for MockCatalogWriter {
&self,
mut table: PbTable,
_graph: StreamFragmentGraph,
_stream_job_execution_mode: StreamJobExecutionMode,
) -> Result<()> {
table.id = self.gen_id();
self.catalog.write().create_table(&table);
Expand All @@ -260,7 +261,8 @@ impl CatalogWriter for MockCatalogWriter {
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
}
self.create_materialized_view(table, graph).await?;
self.create_materialized_view(table, graph, StreamJobExecutionMode::Foreground)
.await?;
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/manager/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub type MessageStatus = Status;
pub type Notification = Result<SubscribeResponse, Status>;
pub type NotificationManagerRef = Arc<NotificationManager>;
pub type NotificationVersion = u64;
/// NOTE(kwannoel): This is just ignored, used in background DDL
pub const IGNORED_NOTIFICATION_VERSION: u64 = 0;

#[derive(Clone, Debug)]
pub enum LocalNotification {
Expand Down
Loading

0 comments on commit 161b9b0

Please sign in to comment.