From 21d18ea0c028a4ba0a65029a1897ab4ffe21c0e0 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Tue, 26 Sep 2023 11:23:40 +0800 Subject: [PATCH 1/2] add create_type --- proto/catalog.proto | 10 ++++++++++ src/frontend/src/catalog/table_catalog.rs | 4 +++- src/storage/src/filter_key_extractor.rs | 3 ++- src/tests/compaction_test/src/delete_range_runner.rs | 3 ++- 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index 07aff3baee22..a8624cebbb7b 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -39,6 +39,14 @@ enum StreamJobStatus { CREATED = 2; } +// How the stream job was created will determine +// whether they are persisted. +enum CreateType { + CREATE_STATUS_UNSPECIFIED = 0; + BACKGROUND = 1; + FOREGROUND = 2; +} + message StreamSourceInfo { // deprecated plan_common.RowFormatType row_format = 1; @@ -262,6 +270,8 @@ message Table { // Used to filter created / creating tables in meta. StreamJobStatus stream_job_status = 31; + CreateType create_type = 32; + // Per-table catalog version, used by schema change. `None` for internal tables and tests. // Not to be confused with the global catalog version for notification service. TableVersion version = 100; diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 2b8ef546c9be..778b43c0598f 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -24,7 +24,7 @@ use risingwave_common::error::{ErrorCode, RwError}; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion}; -use risingwave_pb::catalog::{PbStreamJobStatus, PbTable}; +use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable}; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::DefaultColumnDesc; @@ -402,6 +402,7 @@ impl TableCatalog { created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0), cleaned_by_watermark: self.cleaned_by_watermark, stream_job_status: PbStreamJobStatus::Creating.into(), + create_type: PbCreateType::Foreground.into(), } } @@ -607,6 +608,7 @@ mod tests { created_at_epoch: None, cleaned_by_watermark: false, stream_job_status: PbStreamJobStatus::Creating.into(), + create_type: PbCreateType::Foreground.into(), } .into(); diff --git a/src/storage/src/filter_key_extractor.rs b/src/storage/src/filter_key_extractor.rs index c3b65bc26992..b5a79a6f6b42 100644 --- a/src/storage/src/filter_key_extractor.rs +++ b/src/storage/src/filter_key_extractor.rs @@ -448,7 +448,7 @@ mod tests { use risingwave_common::util::sort_util::OrderType; use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN; use risingwave_pb::catalog::table::TableType; - use risingwave_pb::catalog::{PbStreamJobStatus, PbTable}; + use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable}; use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType}; use risingwave_pb::plan_common::PbColumnCatalog; @@ -550,6 +550,7 @@ mod tests { created_at_epoch: None, cleaned_by_watermark: false, stream_job_status: PbStreamJobStatus::Created.into(), + create_type: PbCreateType::Foreground.into(), } } diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 486b819d3ee7..abeae5e418af 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -37,7 +37,7 @@ use risingwave_meta::hummock::test_utils::setup_compute_env_with_config; use risingwave_meta::hummock::MockHummockMetaClient; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::parse_remote_object_store; -use risingwave_pb::catalog::{PbStreamJobStatus, PbTable}; +use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable}; use risingwave_pb::hummock::{CompactionConfig, CompactionGroupInfo}; use risingwave_pb::meta::SystemParams; use risingwave_rpc_client::HummockMetaClient; @@ -152,6 +152,7 @@ async fn compaction_test( created_at_epoch: None, cleaned_by_watermark: false, stream_job_status: PbStreamJobStatus::Created.into(), + create_type: PbCreateType::Foreground.into(), }; let mut delete_range_table = delete_key_table.clone(); delete_range_table.id = 2; From 8ca7ba05472f15c9ac34d61dbb3adaeab28a4e74 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Tue, 26 Sep 2023 12:08:37 +0800 Subject: [PATCH 2/2] get background / foreground from table instead of request --- proto/catalog.proto | 2 +- proto/ddl_service.proto | 7 ------ src/frontend/src/catalog/catalog_service.rs | 14 ++++------- src/frontend/src/handler/create_mv.rs | 14 +++++------ src/frontend/src/test_utils.rs | 6 ++--- src/meta/src/rpc/ddl_controller.rs | 26 +++++++-------------- src/meta/src/rpc/service/ddl_service.rs | 11 +++++---- src/rpc_client/src/meta_client.rs | 2 -- 8 files changed, 30 insertions(+), 52 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index a8624cebbb7b..f200d10d234f 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -42,7 +42,7 @@ enum StreamJobStatus { // How the stream job was created will determine // whether they are persisted. enum CreateType { - CREATE_STATUS_UNSPECIFIED = 0; + CREATE_TYPE_UNSPECIFIED = 0; BACKGROUND = 1; FOREGROUND = 2; } diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 68a74d421462..27c9f2ee82f8 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -97,16 +97,9 @@ message DropSinkResponse { uint64 version = 2; } -enum StreamJobExecutionMode { - STREAM_JOB_EXECUTION_MODE_UNSPECIFIED = 0; - BACKGROUND = 1; - FOREGROUND = 2; -} - message CreateMaterializedViewRequest { catalog.Table materialized_view = 1; stream_plan.StreamFragmentGraph fragment_graph = 2; - StreamJobExecutionMode stream_job_execution_mode = 3; } message CreateMaterializedViewResponse { diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index bc2648fe105b..8eb6b9e3e448 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -21,10 +21,10 @@ use risingwave_common::error::ErrorCode::InternalError; use risingwave_common::error::{Result, RwError}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::catalog::{ - PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, + PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, }; use risingwave_pb::ddl_service::alter_relation_name_request::Relation; -use risingwave_pb::ddl_service::{create_connection_request, StreamJobExecutionMode}; +use risingwave_pb::ddl_service::create_connection_request; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_rpc_client::MetaClient; use tokio::sync::watch::Receiver; @@ -70,7 +70,6 @@ pub trait CatalogWriter: Send + Sync { &self, table: PbTable, graph: StreamFragmentGraph, - stream_job_execution_mode: StreamJobExecutionMode, ) -> Result<()>; async fn create_table( @@ -191,16 +190,13 @@ impl CatalogWriter for CatalogWriterImpl { &self, table: PbTable, graph: StreamFragmentGraph, - stream_job_execution_mode: StreamJobExecutionMode, ) -> Result<()> { + let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground); let (_, version) = self .meta_client - .create_materialized_view(table, graph, stream_job_execution_mode) + .create_materialized_view(table, graph) .await?; - if matches!( - stream_job_execution_mode, - StreamJobExecutionMode::Foreground | StreamJobExecutionMode::Unspecified - ) { + if matches!(create_type, PbCreateType::Foreground) { self.wait_version(version).await? } Ok(()) diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index 3983291e2845..3fa9129f3974 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -15,8 +15,7 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::error::{ErrorCode, Result}; -use risingwave_pb::catalog::PbTable; -use risingwave_pb::ddl_service::StreamJobExecutionMode; +use risingwave_pb::catalog::{CreateType, PbTable}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::user::grant_privilege::Action; use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query}; @@ -164,7 +163,7 @@ pub async fn handle_create_mv( Ok(_) => {} }; - let (table, graph) = { + let (mut table, graph) = { let context = OptimizerContext::from_handler_args(handler_args); let has_order_by = !query.order_by.is_empty(); @@ -202,16 +201,17 @@ It only indicates the physical clustering of the data, which may improve the per )); let run_in_background = session.config().get_background_ddl(); - let stream_job_execution_mode = if run_in_background { - StreamJobExecutionMode::Background + let create_type = if run_in_background { + CreateType::Background } else { - StreamJobExecutionMode::Foreground + CreateType::Foreground }; + table.create_type = create_type.into(); let session = session.clone(); let catalog_writer = session.catalog_writer()?; catalog_writer - .create_materialized_view(table, graph, stream_job_execution_mode) + .create_materialized_view(table, graph) .await?; Ok(PgResponse::empty_result( diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 1823dcec9128..20eb252fc505 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -34,7 +34,7 @@ use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, Table, }; -use risingwave_pb::ddl_service::{create_connection_request, DdlProgress, StreamJobExecutionMode}; +use risingwave_pb::ddl_service::{create_connection_request, DdlProgress}; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta, @@ -235,7 +235,6 @@ impl CatalogWriter for MockCatalogWriter { &self, mut table: PbTable, _graph: StreamFragmentGraph, - _stream_job_execution_mode: StreamJobExecutionMode, ) -> Result<()> { table.id = self.gen_id(); self.catalog.write().create_table(&table); @@ -261,8 +260,7 @@ impl CatalogWriter for MockCatalogWriter { table.optional_associated_source_id = Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)); } - self.create_materialized_view(table, graph, StreamJobExecutionMode::Foreground) - .await?; + self.create_materialized_view(table, graph).await?; Ok(()) } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index d4b324ea6ddc..c65efc172682 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -23,10 +23,10 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::epoch::Epoch; use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider; use risingwave_pb::catalog::{ - connection, Connection, Database, Function, Schema, Source, Table, View, + connection, Connection, CreateType, Database, Function, Schema, Source, Table, View, }; use risingwave_pb::ddl_service::alter_relation_name_request::Relation; -use risingwave_pb::ddl_service::{DdlProgress, StreamJobExecutionMode}; +use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto; use tokio::sync::Semaphore; use tracing::log::warn; @@ -93,11 +93,7 @@ pub enum DdlCommand { DropFunction(FunctionId), CreateView(View), DropView(ViewId, DropMode), - CreateStreamingJob( - StreamingJob, - StreamFragmentGraphProto, - StreamJobExecutionMode, - ), + CreateStreamingJob(StreamingJob, StreamFragmentGraphProto, CreateType), DropStreamingJob(StreamingJobId, DropMode), ReplaceTable(StreamingJob, StreamFragmentGraphProto, ColIndexMapping), AlterRelationName(Relation, String), @@ -240,12 +236,8 @@ impl DdlController { DdlCommand::DropView(view_id, drop_mode) => { ctrl.drop_view(view_id, drop_mode).await } - DdlCommand::CreateStreamingJob( - stream_job, - fragment_graph, - stream_job_execution_mode, - ) => { - ctrl.create_streaming_job(stream_job, fragment_graph, stream_job_execution_mode) + DdlCommand::CreateStreamingJob(stream_job, fragment_graph, create_type) => { + ctrl.create_streaming_job(stream_job, fragment_graph, create_type) .await } DdlCommand::DropStreamingJob(job_id, drop_mode) => { @@ -414,7 +406,7 @@ impl DdlController { &self, mut stream_job: StreamingJob, fragment_graph: StreamFragmentGraphProto, - stream_job_execution_mode: StreamJobExecutionMode, + create_type: CreateType, ) -> MetaResult { let _permit = self .creating_streaming_job_permits @@ -462,12 +454,12 @@ impl DdlController { } }; - match stream_job_execution_mode { - StreamJobExecutionMode::Foreground | StreamJobExecutionMode::Unspecified => { + match create_type { + CreateType::Foreground | CreateType::Unspecified => { self.create_streaming_job_inner(stream_job, table_fragments, ctx, internal_tables) .await } - StreamJobExecutionMode::Background => { + CreateType::Background => { let ctrl = self.clone(); let definition = stream_job.definition(); let fut = async move { diff --git a/src/meta/src/rpc/service/ddl_service.rs b/src/meta/src/rpc/service/ddl_service.rs index e22deadf947e..935d398aeacb 100644 --- a/src/meta/src/rpc/service/ddl_service.rs +++ b/src/meta/src/rpc/service/ddl_service.rs @@ -25,7 +25,7 @@ use risingwave_pb::catalog::connection::private_link_service::{ use risingwave_pb::catalog::connection::PbPrivateLinkService; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; -use risingwave_pb::catalog::{connection, Connection, PbSource, PbTable}; +use risingwave_pb::catalog::{connection, Connection, CreateType, PbSource, PbTable}; use risingwave_pb::ddl_service::ddl_service_server::DdlService; use risingwave_pb::ddl_service::drop_table_request::PbSourceId; use risingwave_pb::ddl_service::*; @@ -232,7 +232,7 @@ impl DdlService for DdlServiceImpl { .run_command(DdlCommand::CreateStreamingJob( stream_job, fragment_graph, - StreamJobExecutionMode::Foreground, + CreateType::Foreground, )) .await?; @@ -276,6 +276,7 @@ impl DdlService for DdlServiceImpl { let req = request.into_inner(); let mview = req.get_materialized_view()?.clone(); + let create_type = mview.get_create_type().unwrap_or(CreateType::Foreground); let fragment_graph = req.get_fragment_graph()?.clone(); let mut stream_job = StreamingJob::MaterializedView(mview); @@ -287,7 +288,7 @@ impl DdlService for DdlServiceImpl { .run_command(DdlCommand::CreateStreamingJob( stream_job, fragment_graph, - req.stream_job_execution_mode(), + create_type, )) .await?; @@ -342,7 +343,7 @@ impl DdlService for DdlServiceImpl { .run_command(DdlCommand::CreateStreamingJob( stream_job, fragment_graph, - StreamJobExecutionMode::Foreground, + CreateType::Foreground, )) .await?; @@ -438,7 +439,7 @@ impl DdlService for DdlServiceImpl { .run_command(DdlCommand::CreateStreamingJob( stream_job, fragment_graph, - StreamJobExecutionMode::Foreground, + CreateType::Foreground, )) .await?; diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 2b695f9c045b..357cd4cf37f1 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -332,12 +332,10 @@ impl MetaClient { &self, table: PbTable, graph: StreamFragmentGraph, - stream_job_execution_mode: StreamJobExecutionMode, ) -> Result<(TableId, CatalogVersion)> { let request = CreateMaterializedViewRequest { materialized_view: Some(table), fragment_graph: Some(graph), - stream_job_execution_mode: stream_job_execution_mode as i32, }; let resp = self.inner.create_materialized_view(request).await?; // TODO: handle error in `resp.status` here