From ae7514b01f2b5d21ca34f8966bf0ce9fa6d11512 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Tue, 26 Sep 2023 12:08:37 +0800 Subject: [PATCH] get background / foreground from table instead of request --- proto/catalog.proto | 2 +- proto/ddl_service.proto | 7 ------ src/frontend/src/catalog/catalog_service.rs | 14 ++++------- src/frontend/src/handler/create_mv.rs | 14 +++++------ src/frontend/src/test_utils.rs | 6 ++--- src/meta/src/rpc/ddl_controller.rs | 26 +++++++-------------- src/meta/src/rpc/service/ddl_service.rs | 11 +++++---- src/rpc_client/src/meta_client.rs | 2 -- 8 files changed, 30 insertions(+), 52 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index a8624cebbb7b4..f200d10d234ff 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -42,7 +42,7 @@ enum StreamJobStatus { // How the stream job was created will determine // whether they are persisted. enum CreateType { - CREATE_STATUS_UNSPECIFIED = 0; + CREATE_TYPE_UNSPECIFIED = 0; BACKGROUND = 1; FOREGROUND = 2; } diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 68a74d421462c..27c9f2ee82f83 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -97,16 +97,9 @@ message DropSinkResponse { uint64 version = 2; } -enum StreamJobExecutionMode { - STREAM_JOB_EXECUTION_MODE_UNSPECIFIED = 0; - BACKGROUND = 1; - FOREGROUND = 2; -} - message CreateMaterializedViewRequest { catalog.Table materialized_view = 1; stream_plan.StreamFragmentGraph fragment_graph = 2; - StreamJobExecutionMode stream_job_execution_mode = 3; } message CreateMaterializedViewResponse { diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index bc2648fe105bd..8eb6b9e3e4485 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -21,10 +21,10 @@ use risingwave_common::error::ErrorCode::InternalError; use risingwave_common::error::{Result, RwError}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::catalog::{ - PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, + PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, }; use risingwave_pb::ddl_service::alter_relation_name_request::Relation; -use risingwave_pb::ddl_service::{create_connection_request, StreamJobExecutionMode}; +use risingwave_pb::ddl_service::create_connection_request; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_rpc_client::MetaClient; use tokio::sync::watch::Receiver; @@ -70,7 +70,6 @@ pub trait CatalogWriter: Send + Sync { &self, table: PbTable, graph: StreamFragmentGraph, - stream_job_execution_mode: StreamJobExecutionMode, ) -> Result<()>; async fn create_table( @@ -191,16 +190,13 @@ impl CatalogWriter for CatalogWriterImpl { &self, table: PbTable, graph: StreamFragmentGraph, - stream_job_execution_mode: StreamJobExecutionMode, ) -> Result<()> { + let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground); let (_, version) = self .meta_client - .create_materialized_view(table, graph, stream_job_execution_mode) + .create_materialized_view(table, graph) .await?; - if matches!( - stream_job_execution_mode, - StreamJobExecutionMode::Foreground | StreamJobExecutionMode::Unspecified - ) { + if matches!(create_type, PbCreateType::Foreground) { self.wait_version(version).await? } Ok(()) diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index 3983291e2845f..3fa9129f39743 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -15,8 +15,7 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::error::{ErrorCode, Result}; -use risingwave_pb::catalog::PbTable; -use risingwave_pb::ddl_service::StreamJobExecutionMode; +use risingwave_pb::catalog::{CreateType, PbTable}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::user::grant_privilege::Action; use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query}; @@ -164,7 +163,7 @@ pub async fn handle_create_mv( Ok(_) => {} }; - let (table, graph) = { + let (mut table, graph) = { let context = OptimizerContext::from_handler_args(handler_args); let has_order_by = !query.order_by.is_empty(); @@ -202,16 +201,17 @@ It only indicates the physical clustering of the data, which may improve the per )); let run_in_background = session.config().get_background_ddl(); - let stream_job_execution_mode = if run_in_background { - StreamJobExecutionMode::Background + let create_type = if run_in_background { + CreateType::Background } else { - StreamJobExecutionMode::Foreground + CreateType::Foreground }; + table.create_type = create_type.into(); let session = session.clone(); let catalog_writer = session.catalog_writer()?; catalog_writer - .create_materialized_view(table, graph, stream_job_execution_mode) + .create_materialized_view(table, graph) .await?; Ok(PgResponse::empty_result( diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 1823dcec91281..20eb252fc5053 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -34,7 +34,7 @@ use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, Table, }; -use risingwave_pb::ddl_service::{create_connection_request, DdlProgress, StreamJobExecutionMode}; +use risingwave_pb::ddl_service::{create_connection_request, DdlProgress}; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta, @@ -235,7 +235,6 @@ impl CatalogWriter for MockCatalogWriter { &self, mut table: PbTable, _graph: StreamFragmentGraph, - _stream_job_execution_mode: StreamJobExecutionMode, ) -> Result<()> { table.id = self.gen_id(); self.catalog.write().create_table(&table); @@ -261,8 +260,7 @@ impl CatalogWriter for MockCatalogWriter { table.optional_associated_source_id = Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)); } - self.create_materialized_view(table, graph, StreamJobExecutionMode::Foreground) - .await?; + self.create_materialized_view(table, graph).await?; Ok(()) } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index d4b324ea6ddc6..c65efc1726825 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -23,10 +23,10 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::epoch::Epoch; use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider; use risingwave_pb::catalog::{ - connection, Connection, Database, Function, Schema, Source, Table, View, + connection, Connection, CreateType, Database, Function, Schema, Source, Table, View, }; use risingwave_pb::ddl_service::alter_relation_name_request::Relation; -use risingwave_pb::ddl_service::{DdlProgress, StreamJobExecutionMode}; +use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto; use tokio::sync::Semaphore; use tracing::log::warn; @@ -93,11 +93,7 @@ pub enum DdlCommand { DropFunction(FunctionId), CreateView(View), DropView(ViewId, DropMode), - CreateStreamingJob( - StreamingJob, - StreamFragmentGraphProto, - StreamJobExecutionMode, - ), + CreateStreamingJob(StreamingJob, StreamFragmentGraphProto, CreateType), DropStreamingJob(StreamingJobId, DropMode), ReplaceTable(StreamingJob, StreamFragmentGraphProto, ColIndexMapping), AlterRelationName(Relation, String), @@ -240,12 +236,8 @@ impl DdlController { DdlCommand::DropView(view_id, drop_mode) => { ctrl.drop_view(view_id, drop_mode).await } - DdlCommand::CreateStreamingJob( - stream_job, - fragment_graph, - stream_job_execution_mode, - ) => { - ctrl.create_streaming_job(stream_job, fragment_graph, stream_job_execution_mode) + DdlCommand::CreateStreamingJob(stream_job, fragment_graph, create_type) => { + ctrl.create_streaming_job(stream_job, fragment_graph, create_type) .await } DdlCommand::DropStreamingJob(job_id, drop_mode) => { @@ -414,7 +406,7 @@ impl DdlController { &self, mut stream_job: StreamingJob, fragment_graph: StreamFragmentGraphProto, - stream_job_execution_mode: StreamJobExecutionMode, + create_type: CreateType, ) -> MetaResult { let _permit = self .creating_streaming_job_permits @@ -462,12 +454,12 @@ impl DdlController { } }; - match stream_job_execution_mode { - StreamJobExecutionMode::Foreground | StreamJobExecutionMode::Unspecified => { + match create_type { + CreateType::Foreground | CreateType::Unspecified => { self.create_streaming_job_inner(stream_job, table_fragments, ctx, internal_tables) .await } - StreamJobExecutionMode::Background => { + CreateType::Background => { let ctrl = self.clone(); let definition = stream_job.definition(); let fut = async move { diff --git a/src/meta/src/rpc/service/ddl_service.rs b/src/meta/src/rpc/service/ddl_service.rs index e22deadf947e8..935d398aeacb0 100644 --- a/src/meta/src/rpc/service/ddl_service.rs +++ b/src/meta/src/rpc/service/ddl_service.rs @@ -25,7 +25,7 @@ use risingwave_pb::catalog::connection::private_link_service::{ use risingwave_pb::catalog::connection::PbPrivateLinkService; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; -use risingwave_pb::catalog::{connection, Connection, PbSource, PbTable}; +use risingwave_pb::catalog::{connection, Connection, CreateType, PbSource, PbTable}; use risingwave_pb::ddl_service::ddl_service_server::DdlService; use risingwave_pb::ddl_service::drop_table_request::PbSourceId; use risingwave_pb::ddl_service::*; @@ -232,7 +232,7 @@ impl DdlService for DdlServiceImpl { .run_command(DdlCommand::CreateStreamingJob( stream_job, fragment_graph, - StreamJobExecutionMode::Foreground, + CreateType::Foreground, )) .await?; @@ -276,6 +276,7 @@ impl DdlService for DdlServiceImpl { let req = request.into_inner(); let mview = req.get_materialized_view()?.clone(); + let create_type = mview.get_create_type().unwrap_or(CreateType::Foreground); let fragment_graph = req.get_fragment_graph()?.clone(); let mut stream_job = StreamingJob::MaterializedView(mview); @@ -287,7 +288,7 @@ impl DdlService for DdlServiceImpl { .run_command(DdlCommand::CreateStreamingJob( stream_job, fragment_graph, - req.stream_job_execution_mode(), + create_type, )) .await?; @@ -342,7 +343,7 @@ impl DdlService for DdlServiceImpl { .run_command(DdlCommand::CreateStreamingJob( stream_job, fragment_graph, - StreamJobExecutionMode::Foreground, + CreateType::Foreground, )) .await?; @@ -438,7 +439,7 @@ impl DdlService for DdlServiceImpl { .run_command(DdlCommand::CreateStreamingJob( stream_job, fragment_graph, - StreamJobExecutionMode::Foreground, + CreateType::Foreground, )) .await?; diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 2b695f9c045b0..357cd4cf37f1d 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -332,12 +332,10 @@ impl MetaClient { &self, table: PbTable, graph: StreamFragmentGraph, - stream_job_execution_mode: StreamJobExecutionMode, ) -> Result<(TableId, CatalogVersion)> { let request = CreateMaterializedViewRequest { materialized_view: Some(table), fragment_graph: Some(graph), - stream_job_execution_mode: stream_job_execution_mode as i32, }; let resp = self.inner.create_materialized_view(request).await?; // TODO: handle error in `resp.status` here