From 54421333a9fcef7663984f618f15f93bd2916a38 Mon Sep 17 00:00:00 2001 From: August Date: Thu, 23 Feb 2023 15:27:14 +0800 Subject: [PATCH 01/10] fix: introduce ddl controller to make all ddl running in background --- src/meta/src/manager/catalog/fragment.rs | 4 +- src/meta/src/manager/catalog/mod.rs | 114 ++--- src/meta/src/rpc/ddl_controller.rs | 590 +++++++++++++++++++++++ src/meta/src/rpc/mod.rs | 1 + src/meta/src/rpc/service/ddl_service.rs | 548 ++++----------------- src/meta/src/stream/stream_manager.rs | 2 + 6 files changed, 742 insertions(+), 517 deletions(-) create mode 100644 src/meta/src/rpc/ddl_controller.rs diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 8ce58ba5f08d..c29e54633a44 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -825,7 +825,7 @@ where pub async fn get_upstream_mview_fragments( &self, dependent_relation_ids: &HashSet, - ) -> MetaResult> { + ) -> HashMap { let map = &self.core.read().await.table_fragments; let mut fragments = HashMap::new(); @@ -835,7 +835,7 @@ where } } - Ok(fragments) + fragments } /// Get the downstream `Chain` fragments of the specified table. diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 5e5c19e0cea3..d4c996afb398 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -193,6 +193,8 @@ where // database and schemas. user_core.increase_ref_count(database.owner, 1 + schemas_added.len()); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let mut version = self .notify_frontend(Operation::Add, Info::Database(database.to_owned())) .await; @@ -631,24 +633,23 @@ where } } - pub async fn cancel_create_table_procedure(&self, table: &Table) -> MetaResult<()> { + pub async fn cancel_create_table_procedure(&self, table: &Table) { let core = &mut *self.core.lock().await; let database_core = &mut core.database; let user_core = &mut core.user; let key = (table.database_id, table.schema_id, table.name.clone()); - if !database_core.tables.contains_key(&table.id) - && database_core.has_in_progress_creation(&key) - { - database_core.unmark_creating(&key); - database_core.unmark_creating_streaming_job(table.id); - for &dependent_relation_id in &table.dependent_relations { - database_core.decrease_ref_count(dependent_relation_id); - } - user_core.decrease_ref(table.owner); - Ok(()) - } else { - unreachable!("table must not exist and must be in creating procedure"); + assert!( + !database_core.tables.contains_key(&table.id) + && database_core.has_in_progress_creation(&key), + "table must not exit and be in creating procedure" + ); + + database_core.unmark_creating(&key); + database_core.unmark_creating_streaming_job(table.id); + for &dependent_relation_id in &table.dependent_relations { + database_core.decrease_ref_count(dependent_relation_id); } + user_core.decrease_ref(table.owner); } /// return id of streaming jobs in the database which need to be dropped by stream manager. @@ -1035,29 +1036,24 @@ where } } - pub async fn cancel_create_table_procedure_with_source( - &self, - source: &Source, - table: &Table, - ) -> MetaResult<()> { + pub async fn cancel_create_table_procedure_with_source(&self, source: &Source, table: &Table) { let core = &mut *self.core.lock().await; let database_core = &mut core.database; let user_core = &mut core.user; let source_key = (source.database_id, source.schema_id, source.name.clone()); let table_key = (table.database_id, table.schema_id, table.name.clone()); - if !database_core.sources.contains_key(&source.id) - && !database_core.tables.contains_key(&table.id) - && database_core.has_in_progress_creation(&source_key) - && database_core.has_in_progress_creation(&table_key) - { - database_core.unmark_creating(&source_key); - database_core.unmark_creating(&table_key); - database_core.unmark_creating_streaming_job(table.id); - user_core.decrease_ref_count(source.owner, 2); // source and table - Ok(()) - } else { - unreachable!("source must not exist and must be in creating procedure"); - } + assert!( + database_core.sources.contains_key(&source.id) + && !database_core.tables.contains_key(&table.id) + && database_core.has_in_progress_creation(&source_key) + && database_core.has_in_progress_creation(&table_key), + "table and source must not exit and be in creating procedure" + ); + + database_core.unmark_creating(&source_key); + database_core.unmark_creating(&table_key); + database_core.unmark_creating_streaming_job(table.id); + user_core.decrease_ref_count(source.owner, 2); // source and table } /// return id of streaming jobs in the database which need to be dropped by stream manager. @@ -1232,29 +1228,24 @@ where } } - pub async fn cancel_create_index_procedure( - &self, - index: &Index, - index_table: &Table, - ) -> MetaResult<()> { + pub async fn cancel_create_index_procedure(&self, index: &Index, index_table: &Table) { let core = &mut *self.core.lock().await; let database_core = &mut core.database; let user_core = &mut core.user; let key = (index.database_id, index.schema_id, index.name.clone()); - if !database_core.indexes.contains_key(&index.id) - && database_core.has_in_progress_creation(&key) - { - database_core.unmark_creating(&key); - database_core.unmark_creating_streaming_job(index_table.id); - for &dependent_relation_id in &index_table.dependent_relations { - database_core.decrease_ref_count(dependent_relation_id); - } - // index table and index. - user_core.decrease_ref_count(index.owner, 2); - Ok(()) - } else { - unreachable!("index must not exist and must be in creating procedure"); + assert!( + database_core.indexes.contains_key(&index.id) + && database_core.has_in_progress_creation(&key), + "index must not exist and be in creating procedure" + ); + + database_core.unmark_creating(&key); + database_core.unmark_creating_streaming_job(index_table.id); + for &dependent_relation_id in &index_table.dependent_relations { + database_core.decrease_ref_count(dependent_relation_id); } + // index table and index. + user_core.decrease_ref_count(index.owner, 2); } pub async fn finish_create_index_procedure( @@ -1361,24 +1352,23 @@ where } } - pub async fn cancel_create_sink_procedure(&self, sink: &Sink) -> MetaResult<()> { + pub async fn cancel_create_sink_procedure(&self, sink: &Sink) { let core = &mut *self.core.lock().await; let database_core = &mut core.database; let user_core = &mut core.user; let key = (sink.database_id, sink.schema_id, sink.name.clone()); - if !database_core.sinks.contains_key(&sink.id) - && database_core.has_in_progress_creation(&key) - { - database_core.unmark_creating(&key); - database_core.unmark_creating_streaming_job(sink.id); - for &dependent_relation_id in &sink.dependent_relations { - database_core.decrease_ref_count(dependent_relation_id); - } - user_core.decrease_ref(sink.owner); - Ok(()) - } else { - unreachable!("sink must not exist and must be in creating procedure"); + assert!( + database_core.sinks.contains_key(&sink.id) + && database_core.has_in_progress_creation(&key), + "sink must not exit and be in creating procedure" + ); + + database_core.unmark_creating(&key); + database_core.unmark_creating_streaming_job(sink.id); + for &dependent_relation_id in &sink.dependent_relations { + database_core.decrease_ref_count(dependent_relation_id); } + user_core.decrease_ref(sink.owner); } pub async fn drop_sink( diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs new file mode 100644 index 000000000000..12fe933e5d0b --- /dev/null +++ b/src/meta/src/rpc/ddl_controller.rs @@ -0,0 +1,590 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_pb::catalog::{Database, Function, Schema, Source, Table, View}; +use risingwave_pb::ddl_service::DdlProgress; +use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto; + +use crate::barrier::BarrierManagerRef; +use crate::manager::{ + CatalogManagerRef, ClusterManagerRef, DatabaseId, FragmentManagerRef, FunctionId, IndexId, + MetaSrvEnv, NotificationVersion, SchemaId, SinkId, SourceId, StreamingJob, TableId, ViewId, +}; +use crate::model::{StreamEnvironment, TableFragments}; +use crate::storage::MetaStore; +use crate::stream::{ + ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, + CreateStreamingJobContext, GlobalStreamManagerRef, ReplaceTableContext, SourceManagerRef, + StreamFragmentGraph, +}; +use crate::{MetaError, MetaResult}; + +pub enum StreamingJobId { + MaterializedView(TableId), + Sink(SinkId), + Table(Option, TableId), + Index(IndexId), +} + +impl StreamingJobId { + fn id(&self) -> TableId { + match self { + StreamingJobId::MaterializedView(id) + | StreamingJobId::Sink(id) + | StreamingJobId::Table(_, id) + | StreamingJobId::Index(id) => *id, + } + } +} + +pub enum DdlCommand { + CreateDatabase(Database), + DropDatabase(DatabaseId), + CreateSchema(Schema), + DropSchema(SchemaId), + CreateSource(Source), + DropSource(SourceId), + CreateFunction(Function), + DropFunction(FunctionId), + CreateView(View), + DropView(ViewId), + CreatingStreamingJob(StreamingJob, StreamFragmentGraphProto), + DropStreamingJob(StreamingJobId), + ReplaceStreamingJob(StreamingJob, StreamFragmentGraphProto), +} + +#[derive(Clone)] +pub struct DdlController { + env: MetaSrvEnv, + + catalog_manager: CatalogManagerRef, + stream_manager: GlobalStreamManagerRef, + source_manager: SourceManagerRef, + cluster_manager: ClusterManagerRef, + fragment_manager: FragmentManagerRef, + barrier_manager: BarrierManagerRef, +} + +impl DdlController +where + S: MetaStore, +{ + pub(crate) fn new( + env: MetaSrvEnv, + catalog_manager: CatalogManagerRef, + stream_manager: GlobalStreamManagerRef, + source_manager: SourceManagerRef, + cluster_manager: ClusterManagerRef, + fragment_manager: FragmentManagerRef, + barrier_manager: BarrierManagerRef, + ) -> Self { + Self { + env, + catalog_manager, + stream_manager, + source_manager, + cluster_manager, + fragment_manager, + barrier_manager, + } + } + + /// `check_barrier_manager_status` checks the status of the barrier manager, return unavailable + /// when it's not running. + async fn check_barrier_manager_status(&self) -> MetaResult<()> { + if !self.barrier_manager.is_running().await { + return Err(MetaError::unavailable( + "The cluster is starting or recovering".into(), + )); + } + Ok(()) + } + + pub(crate) async fn run_command(&self, command: DdlCommand) -> MetaResult { + self.check_barrier_manager_status().await?; + let ctrl = self.clone(); + let handler = tokio::spawn(async move { + match command { + DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await, + DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await, + DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await, + DdlCommand::DropSchema(schema_id) => ctrl.drop_schema(schema_id).await, + DdlCommand::CreateSource(source) => ctrl.create_source(source).await, + DdlCommand::DropSource(source_id) => ctrl.drop_source(source_id).await, + DdlCommand::CreateFunction(function) => ctrl.create_function(function).await, + DdlCommand::DropFunction(function_id) => ctrl.drop_function(function_id).await, + DdlCommand::CreateView(view) => ctrl.create_view(view).await, + DdlCommand::DropView(view_id) => ctrl.drop_view(view_id).await, + DdlCommand::CreatingStreamingJob(stream_job, fragment_graph) => { + ctrl.create_streaming_job(stream_job, fragment_graph).await + } + DdlCommand::DropStreamingJob(job_id) => ctrl.drop_streaming_job(job_id).await, + DdlCommand::ReplaceStreamingJob(stream_job, fragment_graph) => { + ctrl.replace_streaming_job(stream_job, fragment_graph).await + } + } + }); + handler.await.unwrap() + } + + pub(crate) async fn get_ddl_progress(&self) -> Vec { + self.barrier_manager.get_ddl_progress().await + } + + async fn create_database(&self, database: Database) -> MetaResult { + self.catalog_manager.create_database(&database).await + } + + async fn drop_database(&self, database_id: DatabaseId) -> MetaResult { + // 1. drop all catalogs in this database. + let (version, streaming_ids, source_ids) = + self.catalog_manager.drop_database(database_id).await?; + // 2. Unregister source connector worker. + self.source_manager.unregister_sources(source_ids).await; + // 3. drop streaming jobs. + if !streaming_ids.is_empty() { + self.stream_manager.drop_streaming_jobs(streaming_ids).await; + } + Ok(version) + } + + async fn create_schema(&self, schema: Schema) -> MetaResult { + self.catalog_manager.create_schema(&schema).await + } + + async fn drop_schema(&self, schema_id: SchemaId) -> MetaResult { + self.catalog_manager.drop_schema(schema_id).await + } + + async fn create_source(&self, source: Source) -> MetaResult { + self.catalog_manager + .start_create_source_procedure(&source) + .await?; + + if let Err(e) = self.source_manager.register_source(&source).await { + self.catalog_manager + .cancel_create_source_procedure(&source) + .await?; + return Err(e); + } + + self.catalog_manager + .finish_create_source_procedure(&source) + .await + } + + async fn drop_source(&self, source_id: SourceId) -> MetaResult { + // 1. Drop source in catalog. + let version = self.catalog_manager.drop_source(source_id).await?; + // 2. Unregister source connector worker. + self.source_manager + .unregister_sources(vec![source_id]) + .await; + + Ok(version) + } + + async fn create_function(&self, function: Function) -> MetaResult { + self.catalog_manager.create_function(&function).await + } + + async fn drop_function(&self, function_id: FunctionId) -> MetaResult { + self.catalog_manager.drop_function(function_id).await + } + + async fn create_view(&self, view: View) -> MetaResult { + self.catalog_manager.create_view(&view).await + } + + async fn drop_view(&self, view_id: ViewId) -> MetaResult { + self.catalog_manager.drop_view(view_id).await + } + + async fn create_streaming_job( + &self, + mut stream_job: StreamingJob, + fragment_graph: StreamFragmentGraphProto, + ) -> MetaResult { + let (ctx, table_fragments) = self + .prepare_stream_job(&mut stream_job, fragment_graph) + .await?; + + let internal_tables = ctx.internal_tables(); + let result = try { + if let Some(source) = stream_job.source() { + self.source_manager.register_source(source).await?; + } + self.stream_manager + .create_streaming_job(table_fragments, ctx) + .await?; + }; + + match result { + Ok(_) => self.finish_stream_job(&stream_job, internal_tables).await, + Err(err) => { + self.cancel_stream_job(&stream_job, internal_tables).await; + Err(err) + } + } + } + + async fn drop_streaming_job(&self, job_id: StreamingJobId) -> MetaResult { + let table_fragments = self + .fragment_manager + .select_table_fragments_by_table_id(&job_id.id().into()) + .await?; + let internal_table_ids = table_fragments.internal_table_ids(); + let (version, streaming_job_ids) = match job_id { + StreamingJobId::MaterializedView(table_id) => { + self.catalog_manager + .drop_table(table_id, internal_table_ids) + .await? + } + StreamingJobId::Sink(sink_id) => { + let version = self + .catalog_manager + .drop_sink(sink_id, internal_table_ids) + .await?; + (version, vec![sink_id.into()]) + } + StreamingJobId::Table(source_id, table_id) => { + self.drop_table_inner(source_id, table_id, internal_table_ids) + .await? + } + StreamingJobId::Index(index_id) => { + let index_table_id = self.catalog_manager.get_index_table(index_id).await?; + let version = self + .catalog_manager + .drop_index(index_id, index_table_id) + .await?; + (version, vec![index_table_id.into()]) + } + }; + + self.stream_manager + .drop_streaming_jobs(streaming_job_ids) + .await; + Ok(version) + } + + /// `prepare_stream_job` prepares a stream job and returns the context and table fragments. + async fn prepare_stream_job( + &self, + stream_job: &mut StreamingJob, + fragment_graph: StreamFragmentGraphProto, + ) -> MetaResult<(CreateStreamingJobContext, TableFragments)> { + // 1. Get the env for streaming jobs. + let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap()); + + // 2. Build fragment graph. + let fragment_graph = + StreamFragmentGraph::new(fragment_graph, self.env.id_gen_manager_ref(), &*stream_job) + .await?; + let default_parallelism = fragment_graph.default_parallelism(); + let internal_tables = fragment_graph.internal_tables(); + + // 3. Set the graph-related fields and freeze the `stream_job`. + stream_job.set_table_fragment_id(fragment_graph.table_fragment_id()); + let dependent_relations = fragment_graph.dependent_relations().clone(); + stream_job.set_dependent_relations(dependent_relations.clone()); + + let id = stream_job.id(); + let stream_job = &*stream_job; + + // 4. Mark current relation as "creating" and add reference count to dependent relations. + self.catalog_manager + .start_create_stream_job_procedure(stream_job) + .await?; + + // 5. Resolve the upstream fragments, extend the fragment graph to a complete graph that + // contains all information needed for building the actor graph. + let upstream_mview_fragments = self + .fragment_manager + .get_upstream_mview_fragments(&dependent_relations) + .await; + let upstream_mview_actors = upstream_mview_fragments + .iter() + .map(|(&table_id, fragment)| { + ( + table_id, + fragment.actors.iter().map(|a| a.actor_id).collect_vec(), + ) + }) + .collect(); + + let res: MetaResult<(CreateStreamingJobContext, TableFragments)> = try { + let complete_graph = CompleteStreamFragmentGraph::with_upstreams( + fragment_graph, + upstream_mview_fragments, + )?; + + // 6. Build the actor graph. + let cluster_info = self.cluster_manager.get_streaming_cluster_info().await; + let actor_graph_builder = + ActorGraphBuilder::new(complete_graph, cluster_info, default_parallelism)?; + + let ActorGraphBuildResult { + graph, + building_locations, + existing_locations, + dispatchers, + merge_updates, + } = actor_graph_builder + .generate_graph(self.env.id_gen_manager_ref(), stream_job) + .await?; + assert!(merge_updates.is_empty()); + + // 8. Build the table fragments structure that will be persisted in the stream manager, + // and the context that contains all information needed for building the + // actors on the compute nodes. + let table_fragments = + TableFragments::new(id.into(), graph, &building_locations.actor_locations, env); + + let ctx = CreateStreamingJobContext { + dispatchers, + upstream_mview_actors, + internal_tables, + building_locations, + existing_locations, + table_properties: stream_job.properties(), + definition: stream_job.mview_definition(), + }; + + // 9. Mark creating tables, including internal tables and the table of the stream job. + let creating_tables = ctx + .internal_tables() + .into_iter() + .chain(stream_job.table().cloned()) + .collect_vec(); + + self.catalog_manager + .mark_creating_tables(&creating_tables) + .await; + + (ctx, table_fragments) + }; + if let Err(err) = &res { + tracing::error!("failed to build streaming graph for streaming job: {}", err); + self.cancel_stream_job(stream_job, vec![]).await; + } + + res + } + + /// `cancel_stream_job` cancels a stream job and clean some states. + async fn cancel_stream_job(&self, stream_job: &StreamingJob, internal_tables: Vec) { + let mut creating_internal_table_ids = + internal_tables.into_iter().map(|t| t.id).collect_vec(); + // 1. cancel create procedure. + match stream_job { + StreamingJob::MaterializedView(table) => { + creating_internal_table_ids.push(table.id); + self.catalog_manager + .cancel_create_table_procedure(table) + .await; + } + StreamingJob::Sink(sink) => { + self.catalog_manager + .cancel_create_sink_procedure(sink) + .await; + } + StreamingJob::Table(source, table) => { + creating_internal_table_ids.push(table.id); + if let Some(source) = source { + self.catalog_manager + .cancel_create_table_procedure_with_source(source, table) + .await; + } else { + self.catalog_manager + .cancel_create_table_procedure(table) + .await; + } + } + StreamingJob::Index(index, table) => { + creating_internal_table_ids.push(table.id); + self.catalog_manager + .cancel_create_index_procedure(index, table) + .await; + } + } + // 2. unmark creating tables. + self.catalog_manager + .unmark_creating_tables(&creating_internal_table_ids, true) + .await; + } + + /// `finish_stream_job` finishes a stream job and clean some states. + async fn finish_stream_job( + &self, + stream_job: &StreamingJob, + internal_tables: Vec
, + ) -> MetaResult { + // 1. finish procedure. + let mut creating_internal_table_ids = internal_tables.iter().map(|t| t.id).collect_vec(); + let version = match stream_job { + StreamingJob::MaterializedView(table) => { + creating_internal_table_ids.push(table.id); + self.catalog_manager + .finish_create_table_procedure(internal_tables, table) + .await? + } + StreamingJob::Sink(sink) => { + self.catalog_manager + .finish_create_sink_procedure(internal_tables, sink) + .await? + } + StreamingJob::Table(source, table) => { + creating_internal_table_ids.push(table.id); + if let Some(source) = source { + let internal_tables: [_; 1] = internal_tables.try_into().unwrap(); + self.catalog_manager + .finish_create_table_procedure_with_source( + source, + table, + &internal_tables[0], + ) + .await? + } else { + assert!(internal_tables.is_empty()); + // Though `internal_tables` is empty here, we pass it as a parameter to reuse + // the method. + self.catalog_manager + .finish_create_table_procedure(internal_tables, table) + .await? + } + } + StreamingJob::Index(index, table) => { + creating_internal_table_ids.push(table.id); + self.catalog_manager + .finish_create_index_procedure(index, table) + .await? + } + }; + + // 2. unmark creating tables. + self.catalog_manager + .unmark_creating_tables(&creating_internal_table_ids, false) + .await; + + Ok(version) + } + + async fn drop_table_inner( + &self, + source_id: Option, + table_id: TableId, + internal_table_ids: Vec, + ) -> MetaResult<( + NotificationVersion, + Vec, + )> { + if let Some(source_id) = source_id { + // Drop table and source in catalog. Check `source_id` if it is the table's + // `associated_source_id`. Indexes also need to be dropped atomically. + assert_eq!(internal_table_ids.len(), 1); + let (version, delete_jobs) = self + .catalog_manager + .drop_table_with_source(source_id, table_id, internal_table_ids[0]) + .await?; + // Unregister source connector worker. + self.source_manager + .unregister_sources(vec![source_id]) + .await; + Ok((version, delete_jobs)) + } else { + assert!(internal_table_ids.is_empty()); + self.catalog_manager + .drop_table(table_id, internal_table_ids) + .await + } + } + + async fn replace_streaming_job( + &self, + mut stream_job: StreamingJob, + fragment_graph: StreamFragmentGraphProto, + ) -> MetaResult { + let (_ctx, _table_fragments) = self + .prepare_replace_table(&mut stream_job, fragment_graph) + .await?; + Ok(u64::MAX) + } + + /// Prepares a table replacement and returns the context and table fragments. + async fn prepare_replace_table( + &self, + stream_job: &mut StreamingJob, + fragment_graph: StreamFragmentGraphProto, + ) -> MetaResult<(ReplaceTableContext, TableFragments)> { + let id = stream_job.id(); + + // 1. Get the env for streaming jobs. + let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap()); + + // 2. Build fragment graph. + let fragment_graph = + StreamFragmentGraph::new(fragment_graph, self.env.id_gen_manager_ref(), &*stream_job) + .await?; + let default_parallelism = fragment_graph.default_parallelism(); + assert!(fragment_graph.internal_tables().is_empty()); + + // 3. Set the graph-related fields and freeze the `stream_job`. + stream_job.set_table_fragment_id(fragment_graph.table_fragment_id()); + let stream_job = &*stream_job; + + // TODO: 4. Mark current relation as "updating". + + // 5. Resolve the downstream fragments, extend the fragment graph to a complete graph that + // contains all information needed for building the actor graph. + let downstream_fragments = self + .fragment_manager + .get_downstream_chain_fragments(id.into()) + .await?; + + let complete_graph = + CompleteStreamFragmentGraph::with_downstreams(fragment_graph, downstream_fragments)?; + + // 6. Build the actor graph. + let cluster_info = self.cluster_manager.get_streaming_cluster_info().await; + let actor_graph_builder = + ActorGraphBuilder::new(complete_graph, cluster_info, default_parallelism)?; + + let ActorGraphBuildResult { + graph, + building_locations, + existing_locations, + dispatchers, + merge_updates, + } = actor_graph_builder + .generate_graph(self.env.id_gen_manager_ref(), stream_job) + .await?; + assert!(dispatchers.is_empty()); + + // 7. Build the table fragments structure that will be persisted in the stream manager, and + // the context that contains all information needed for building the actors on the compute + // nodes. + let table_fragments = + TableFragments::new(id.into(), graph, &building_locations.actor_locations, env); + + let ctx = ReplaceTableContext { + merge_updates, + building_locations, + existing_locations, + table_properties: stream_job.properties(), + }; + + Ok((ctx, table_fragments)) + } +} diff --git a/src/meta/src/rpc/mod.rs b/src/meta/src/rpc/mod.rs index a5afb8ee6dc4..dc323ea69284 100644 --- a/src/meta/src/rpc/mod.rs +++ b/src/meta/src/rpc/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod ddl_controller; mod election_client; mod intercept; pub mod metrics; diff --git a/src/meta/src/rpc/service/ddl_service.rs b/src/meta/src/rpc/service/ddl_service.rs index d53c0986349b..0482bad4bf4e 100644 --- a/src/meta/src/rpc/service/ddl_service.rs +++ b/src/meta/src/rpc/service/ddl_service.rs @@ -12,41 +12,29 @@ // See the License for the specific language governing permissions and // limitations under the License. -use itertools::Itertools; -use risingwave_common::catalog::CatalogVersion; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; -use risingwave_pb::catalog::Table; use risingwave_pb::ddl_service::ddl_service_server::DdlService; use risingwave_pb::ddl_service::drop_table_request::SourceId as ProstSourceId; use risingwave_pb::ddl_service::*; use risingwave_pb::stream_plan::stream_node::NodeBody; -use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto; use tonic::{Request, Response, Status}; use crate::barrier::BarrierManagerRef; use crate::manager::{ CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, IdCategory, IdCategoryType, - MetaSrvEnv, NotificationVersion, SourceId, StreamingJob, TableId, + MetaSrvEnv, StreamingJob, }; -use crate::model::{StreamEnvironment, TableFragments}; +use crate::rpc::ddl_controller::{DdlCommand, DdlController, StreamingJobId}; use crate::storage::MetaStore; -use crate::stream::{ - visit_fragment, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, - CreateStreamingJobContext, GlobalStreamManagerRef, ReplaceTableContext, SourceManagerRef, - StreamFragmentGraph, -}; -use crate::{MetaError, MetaResult}; +use crate::stream::{visit_fragment, GlobalStreamManagerRef, SourceManagerRef}; +use crate::MetaResult; #[derive(Clone)] pub struct DdlServiceImpl { env: MetaSrvEnv, catalog_manager: CatalogManagerRef, - stream_manager: GlobalStreamManagerRef, - source_manager: SourceManagerRef, - cluster_manager: ClusterManagerRef, - fragment_manager: FragmentManagerRef, - barrier_manager: BarrierManagerRef, + ddl_controller: DdlController, } impl DdlServiceImpl @@ -63,14 +51,19 @@ where fragment_manager: FragmentManagerRef, barrier_manager: BarrierManagerRef, ) -> Self { - Self { - env, - catalog_manager, + let ddl_controller = DdlController::new( + env.clone(), + catalog_manager.clone(), stream_manager, source_manager, cluster_manager, fragment_manager, barrier_manager, + ); + Self { + env, + catalog_manager, + ddl_controller, } } } @@ -88,7 +81,10 @@ where let id = self.gen_unique_id::<{ IdCategory::Database }>().await?; let mut database = req.get_db()?.clone(); database.id = id; - let version = self.catalog_manager.create_database(&database).await?; + let version = self + .ddl_controller + .run_command(DdlCommand::CreateDatabase(database)) + .await?; Ok(Response::new(CreateDatabaseResponse { status: None, @@ -101,19 +97,13 @@ where &self, request: Request, ) -> Result, Status> { - self.check_barrier_manager_status().await?; let req = request.into_inner(); let database_id = req.get_database_id(); - // 1. drop all catalogs in this database. - let (version, streaming_ids, source_ids) = - self.catalog_manager.drop_database(database_id).await?; - // 2. Unregister source connector worker. - self.source_manager.unregister_sources(source_ids).await; - // 3. drop streaming jobs. - if !streaming_ids.is_empty() { - self.stream_manager.drop_streaming_jobs(streaming_ids).await; - } + let version = self + .ddl_controller + .run_command(DdlCommand::DropDatabase(database_id)) + .await?; Ok(Response::new(DropDatabaseResponse { status: None, @@ -129,7 +119,10 @@ where let id = self.gen_unique_id::<{ IdCategory::Schema }>().await?; let mut schema = req.get_schema()?.clone(); schema.id = id; - let version = self.catalog_manager.create_schema(&schema).await?; + let version = self + .ddl_controller + .run_command(DdlCommand::CreateSchema(schema)) + .await?; Ok(Response::new(CreateSchemaResponse { status: None, @@ -144,7 +137,10 @@ where ) -> Result, Status> { let req = request.into_inner(); let schema_id = req.get_schema_id(); - let version = self.catalog_manager.drop_schema(schema_id).await?; + let version = self + .ddl_controller + .run_command(DdlCommand::DropSchema(schema_id)) + .await?; Ok(Response::new(DropSchemaResponse { status: None, version, @@ -160,20 +156,9 @@ where let id = self.gen_unique_id::<{ IdCategory::Table }>().await?; source.id = id; - self.catalog_manager - .start_create_source_procedure(&source) - .await?; - - if let Err(e) = self.source_manager.register_source(&source).await { - self.catalog_manager - .cancel_create_source_procedure(&source) - .await?; - return Err(e.into()); - } - let version = self - .catalog_manager - .finish_create_source_procedure(&source) + .ddl_controller + .run_command(DdlCommand::CreateSource(source)) .await?; Ok(Response::new(CreateSourceResponse { status: None, @@ -187,13 +172,10 @@ where request: Request, ) -> Result, Status> { let source_id = request.into_inner().source_id; - - // 1. Drop source in catalog. - let version = self.catalog_manager.drop_source(source_id).await?; - // 2. Unregister source connector worker. - self.source_manager - .unregister_sources(vec![source_id]) - .await; + let version = self + .ddl_controller + .run_command(DdlCommand::DropSource(source_id)) + .await?; Ok(Response::new(DropSourceResponse { status: None, @@ -212,13 +194,17 @@ where let fragment_graph = req.get_fragment_graph()?.clone(); let mut stream_job = StreamingJob::Sink(sink); + let id = self.gen_unique_id::<{ IdCategory::Table }>().await?; + stream_job.set_id(id); + let version = self - .create_stream_job(&mut stream_job, fragment_graph) + .ddl_controller + .run_command(DdlCommand::CreatingStreamingJob(stream_job, fragment_graph)) .await?; Ok(Response::new(CreateSinkResponse { status: None, - sink_id: stream_job.id(), + sink_id: id, version, })) } @@ -227,22 +213,12 @@ where &self, request: Request, ) -> Result, Status> { - self.check_barrier_manager_status().await?; let sink_id = request.into_inner().sink_id; - let table_fragment = self - .fragment_manager - .select_table_fragments_by_table_id(&sink_id.into()) - .await?; - let internal_tables = table_fragment.internal_table_ids(); - // 1. Drop sink in catalog. + let version = self - .catalog_manager - .drop_sink(sink_id, internal_tables) + .ddl_controller + .run_command(DdlCommand::DropStreamingJob(StreamingJobId::Sink(sink_id))) .await?; - // 2. drop streaming job of sink. - self.stream_manager - .drop_streaming_jobs(vec![sink_id.into()]) - .await; Ok(Response::new(DropSinkResponse { status: None, @@ -261,13 +237,17 @@ where let fragment_graph = req.get_fragment_graph()?.clone(); let mut stream_job = StreamingJob::MaterializedView(mview); + let id = self.gen_unique_id::<{ IdCategory::Table }>().await?; + stream_job.set_id(id); + let version = self - .create_stream_job(&mut stream_job, fragment_graph) + .ddl_controller + .run_command(DdlCommand::CreatingStreamingJob(stream_job, fragment_graph)) .await?; Ok(Response::new(CreateMaterializedViewResponse { status: None, - table_id: stream_job.id(), + table_id: id, version, })) } @@ -276,24 +256,17 @@ where &self, request: Request, ) -> Result, Status> { - self.check_barrier_manager_status().await?; self.env.idle_manager().record_activity(); let request = request.into_inner(); let table_id = request.table_id; - let table_fragment = self - .fragment_manager - .select_table_fragments_by_table_id(&table_id.into()) - .await?; - let internal_tables = table_fragment.internal_table_ids(); - // 1. Drop table in catalog. Ref count will be checked. - let (version, delete_jobs) = self - .catalog_manager - .drop_table(table_id, internal_tables) + let version = self + .ddl_controller + .run_command(DdlCommand::DropStreamingJob( + StreamingJobId::MaterializedView(table_id), + )) .await?; - // 2. Drop streaming jobs. - self.stream_manager.drop_streaming_jobs(delete_jobs).await; Ok(Response::new(DropMaterializedViewResponse { status: None, @@ -313,13 +286,17 @@ where let fragment_graph = req.get_fragment_graph()?.clone(); let mut stream_job = StreamingJob::Index(index, index_table); + let id = self.gen_unique_id::<{ IdCategory::Table }>().await?; + stream_job.set_id(id); + let version = self - .create_stream_job(&mut stream_job, fragment_graph) + .ddl_controller + .run_command(DdlCommand::CreatingStreamingJob(stream_job, fragment_graph)) .await?; Ok(Response::new(CreateIndexResponse { status: None, - index_id: stream_job.id(), + index_id: id, version, })) } @@ -328,21 +305,15 @@ where &self, request: Request, ) -> Result, Status> { - self.check_barrier_manager_status().await?; self.env.idle_manager().record_activity(); let index_id = request.into_inner().index_id; - let index_table_id = self.catalog_manager.get_index_table(index_id).await?; - - // 1. Drop index in catalog. Ref count will be checked. let version = self - .catalog_manager - .drop_index(index_id, index_table_id) + .ddl_controller + .run_command(DdlCommand::DropStreamingJob(StreamingJobId::Index( + index_id, + ))) .await?; - // 2. drop streaming jobs of the index tables. - self.stream_manager - .drop_streaming_jobs(vec![index_table_id.into()]) - .await; Ok(Response::new(DropIndexResponse { status: None, @@ -358,7 +329,10 @@ where let id = self.gen_unique_id::<{ IdCategory::Function }>().await?; let mut function = req.get_function()?.clone(); function.id = id; - let version = self.catalog_manager.create_function(&function).await?; + let version = self + .ddl_controller + .run_command(DdlCommand::CreateFunction(function)) + .await?; Ok(Response::new(CreateFunctionResponse { status: None, @@ -371,12 +345,11 @@ where &self, request: Request, ) -> Result, Status> { - self.check_barrier_manager_status().await?; let request = request.into_inner(); let version = self - .catalog_manager - .drop_function(request.function_id) + .ddl_controller + .run_command(DdlCommand::DropFunction(request.function_id)) .await?; Ok(Response::new(DropFunctionResponse { @@ -421,13 +394,17 @@ where } let mut stream_job = StreamingJob::Table(source, mview); + let id = self.gen_unique_id::<{ IdCategory::Table }>().await?; + stream_job.set_id(id); + let version = self - .create_stream_job(&mut stream_job, fragment_graph) + .ddl_controller + .run_command(DdlCommand::CreatingStreamingJob(stream_job, fragment_graph)) .await?; Ok(Response::new(CreateTableResponse { status: None, - table_id: stream_job.id(), + table_id: id, version, })) } @@ -436,13 +413,16 @@ where &self, request: Request, ) -> Result, Status> { - self.check_barrier_manager_status().await?; let request = request.into_inner(); let source_id = request.source_id; let table_id = request.table_id; let version = self - .drop_table_inner(source_id.map(|ProstSourceId::Id(id)| id), table_id) + .ddl_controller + .run_command(DdlCommand::DropStreamingJob(StreamingJobId::Table( + source_id.map(|ProstSourceId::Id(id)| id), + table_id, + ))) .await?; Ok(Response::new(DropTableResponse { @@ -460,7 +440,10 @@ where let id = self.gen_unique_id::<{ IdCategory::Table }>().await?; view.id = id; - let version = self.catalog_manager.create_view(&view).await?; + let version = self + .ddl_controller + .run_command(DdlCommand::CreateView(view)) + .await?; Ok(Response::new(CreateViewResponse { status: None, @@ -475,7 +458,10 @@ where ) -> Result, Status> { let req = request.into_inner(); let view_id = req.get_view_id(); - let version = self.catalog_manager.drop_view(view_id).await?; + let version = self + .ddl_controller + .run_command(DdlCommand::DropView(view_id)) + .await?; Ok(Response::new(DropViewResponse { status: None, version, @@ -496,11 +482,12 @@ where ) -> Result, Status> { let req = request.into_inner(); - let mut stream_job = StreamingJob::Table(None, req.table.unwrap()); + let stream_job = StreamingJob::Table(None, req.table.unwrap()); let fragment_graph = req.fragment_graph.unwrap(); - let (_ctx, _table_fragments) = self - .prepare_replace_table(&mut stream_job, fragment_graph) + let _version = self + .ddl_controller + .run_command(DdlCommand::ReplaceStreamingJob(stream_job, fragment_graph)) .await?; Err(Status::unimplemented( @@ -537,7 +524,7 @@ where _request: Request, ) -> Result, Status> { Ok(Response::new(GetDdlProgressResponse { - ddl_progress: self.barrier_manager.get_ddl_progress().await, + ddl_progress: self.ddl_controller.get_ddl_progress().await, })) } } @@ -546,351 +533,6 @@ impl DdlServiceImpl where S: MetaStore, { - /// `check_barrier_manager_status` checks the status of the barrier manager, return unavailable - /// when it's not running. - async fn check_barrier_manager_status(&self) -> MetaResult<()> { - if !self.barrier_manager.is_running().await { - return Err(MetaError::unavailable( - "The cluster is starting or recovering".into(), - )); - } - Ok(()) - } - - /// `create_stream_job` creates a stream job and returns the version of the catalog. - async fn create_stream_job( - &self, - stream_job: &mut StreamingJob, - fragment_graph: StreamFragmentGraphProto, - ) -> MetaResult { - self.check_barrier_manager_status().await?; - - let (ctx, table_fragments) = self.prepare_stream_job(stream_job, fragment_graph).await?; - - let internal_tables = ctx.internal_tables(); - let result = try { - if let Some(source) = stream_job.source() { - self.source_manager.register_source(source).await?; - } - self.stream_manager - .create_streaming_job(table_fragments, ctx) - .await?; - }; - - match result { - Ok(_) => self.finish_stream_job(stream_job, internal_tables).await, - Err(err) => { - self.cancel_stream_job(stream_job, internal_tables).await?; - Err(err) - } - } - } - - /// `prepare_stream_job` prepares a stream job and returns the context and table fragments. - async fn prepare_stream_job( - &self, - stream_job: &mut StreamingJob, - fragment_graph: StreamFragmentGraphProto, - ) -> MetaResult<(CreateStreamingJobContext, TableFragments)> { - // 1. Assign a new id to the stream job. - let id = self.gen_unique_id::<{ IdCategory::Table }>().await?; - stream_job.set_id(id); - - // 2. Get the env for streaming jobs. - let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap()); - - // 3. Build fragment graph. - let fragment_graph = - StreamFragmentGraph::new(fragment_graph, self.env.id_gen_manager_ref(), &*stream_job) - .await?; - let default_parallelism = fragment_graph.default_parallelism(); - let internal_tables = fragment_graph.internal_tables(); - - // 4. Set the graph-related fields and freeze the `stream_job`. - stream_job.set_table_fragment_id(fragment_graph.table_fragment_id()); - let dependent_relations = fragment_graph.dependent_relations().clone(); - stream_job.set_dependent_relations(dependent_relations.clone()); - - let stream_job = &*stream_job; - - // 5. Mark current relation as "creating" and add reference count to dependent relations. - self.catalog_manager - .start_create_stream_job_procedure(stream_job) - .await?; - - // 6. Resolve the upstream fragments, extend the fragment graph to a complete graph that - // contains all information needed for building the actor graph. - let upstream_mview_fragments = self - .fragment_manager - .get_upstream_mview_fragments(&dependent_relations) - .await?; - let upstream_mview_actors = upstream_mview_fragments - .iter() - .map(|(&table_id, fragment)| { - ( - table_id, - fragment.actors.iter().map(|a| a.actor_id).collect_vec(), - ) - }) - .collect(); - - let complete_graph = - CompleteStreamFragmentGraph::with_upstreams(fragment_graph, upstream_mview_fragments)?; - - // 7. Build the actor graph. - let cluster_info = self.cluster_manager.get_streaming_cluster_info().await; - let actor_graph_builder = - ActorGraphBuilder::new(complete_graph, cluster_info, default_parallelism)?; - - let ActorGraphBuildResult { - graph, - building_locations, - existing_locations, - dispatchers, - merge_updates, - } = actor_graph_builder - .generate_graph(self.env.id_gen_manager_ref(), stream_job) - .await?; - assert!(merge_updates.is_empty()); - - // 8. Build the table fragments structure that will be persisted in the stream manager, and - // the context that contains all information needed for building the actors on the compute - // nodes. - let table_fragments = - TableFragments::new(id.into(), graph, &building_locations.actor_locations, env); - - let ctx = CreateStreamingJobContext { - dispatchers, - upstream_mview_actors, - internal_tables, - building_locations, - existing_locations, - table_properties: stream_job.properties(), - definition: stream_job.mview_definition(), - }; - - // 9. Mark creating tables, including internal tables and the table of the stream job. - // Note(bugen): should we take `Sink` into account as well? - let creating_tables = ctx - .internal_tables() - .into_iter() - .chain(stream_job.table().cloned()) - .collect_vec(); - - self.catalog_manager - .mark_creating_tables(&creating_tables) - .await; - - Ok((ctx, table_fragments)) - } - - /// `cancel_stream_job` cancels a stream job and clean some states. - async fn cancel_stream_job( - &self, - stream_job: &StreamingJob, - internal_tables: Vec
, - ) -> MetaResult<()> { - let mut creating_internal_table_ids = - internal_tables.into_iter().map(|t| t.id).collect_vec(); - // 1. cancel create procedure. - match stream_job { - StreamingJob::MaterializedView(table) => { - creating_internal_table_ids.push(table.id); - self.catalog_manager - .cancel_create_table_procedure(table) - .await?; - } - StreamingJob::Sink(sink) => { - self.catalog_manager - .cancel_create_sink_procedure(sink) - .await?; - } - StreamingJob::Table(source, table) => { - creating_internal_table_ids.push(table.id); - if let Some(source) = source { - self.catalog_manager - .cancel_create_table_procedure_with_source(source, table) - .await?; - } else { - self.catalog_manager - .cancel_create_table_procedure(table) - .await?; - } - } - StreamingJob::Index(index, table) => { - creating_internal_table_ids.push(table.id); - self.catalog_manager - .cancel_create_index_procedure(index, table) - .await?; - } - } - // 2. unmark creating tables. - self.catalog_manager - .unmark_creating_tables(&creating_internal_table_ids, true) - .await; - - Ok(()) - } - - /// `finish_stream_job` finishes a stream job and clean some states. - async fn finish_stream_job( - &self, - stream_job: &StreamingJob, - internal_tables: Vec
, - ) -> MetaResult { - // 1. finish procedure. - let mut creating_internal_table_ids = internal_tables.iter().map(|t| t.id).collect_vec(); - let version = match stream_job { - StreamingJob::MaterializedView(table) => { - creating_internal_table_ids.push(table.id); - self.catalog_manager - .finish_create_table_procedure(internal_tables, table) - .await? - } - StreamingJob::Sink(sink) => { - self.catalog_manager - .finish_create_sink_procedure(internal_tables, sink) - .await? - } - StreamingJob::Table(source, table) => { - creating_internal_table_ids.push(table.id); - if let Some(source) = source { - let internal_tables: [_; 1] = internal_tables.try_into().unwrap(); - self.catalog_manager - .finish_create_table_procedure_with_source( - source, - table, - &internal_tables[0], - ) - .await? - } else { - assert!(internal_tables.is_empty()); - // Though `internal_tables` is empty here, we pass it as a parameter to reuse - // the method. - self.catalog_manager - .finish_create_table_procedure(internal_tables, table) - .await? - } - } - StreamingJob::Index(index, table) => { - creating_internal_table_ids.push(table.id); - self.catalog_manager - .finish_create_index_procedure(index, table) - .await? - } - }; - - // 2. unmark creating tables. - self.catalog_manager - .unmark_creating_tables(&creating_internal_table_ids, false) - .await; - - Ok(version) - } - - async fn drop_table_inner( - &self, - source_id: Option, - table_id: TableId, - ) -> MetaResult { - let table_fragment = self - .fragment_manager - .select_table_fragments_by_table_id(&table_id.into()) - .await?; - let internal_table_ids = table_fragment.internal_table_ids(); - - let (version, delete_jobs) = if let Some(source_id) = source_id { - // Drop table and source in catalog. Check `source_id` if it is the table's - // `associated_source_id`. Indexes also need to be dropped atomically. - assert_eq!(internal_table_ids.len(), 1); - let (version, delete_jobs) = self - .catalog_manager - .drop_table_with_source(source_id, table_id, internal_table_ids[0]) - .await?; - // Unregister source connector worker. - self.source_manager - .unregister_sources(vec![source_id]) - .await; - (version, delete_jobs) - } else { - assert!(internal_table_ids.is_empty()); - self.catalog_manager - .drop_table(table_id, internal_table_ids) - .await? - }; - - // Drop streaming jobs. - self.stream_manager.drop_streaming_jobs(delete_jobs).await; - - Ok(version) - } - - /// Prepares a table replacement and returns the context and table fragments. - async fn prepare_replace_table( - &self, - stream_job: &mut StreamingJob, - fragment_graph: StreamFragmentGraphProto, - ) -> MetaResult<(ReplaceTableContext, TableFragments)> { - let id = stream_job.id(); - - // 1. Get the env for streaming jobs. - let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap()); - - // 2. Build fragment graph. - let fragment_graph = - StreamFragmentGraph::new(fragment_graph, self.env.id_gen_manager_ref(), &*stream_job) - .await?; - let default_parallelism = fragment_graph.default_parallelism(); - assert!(fragment_graph.internal_tables().is_empty()); - - // 3. Set the graph-related fields and freeze the `stream_job`. - stream_job.set_table_fragment_id(fragment_graph.table_fragment_id()); - let stream_job = &*stream_job; - - // TODO: 4. Mark current relation as "updating". - - // 5. Resolve the downstream fragments, extend the fragment graph to a complete graph that - // contains all information needed for building the actor graph. - let downstream_fragments = self - .fragment_manager - .get_downstream_chain_fragments(id.into()) - .await?; - - let complete_graph = - CompleteStreamFragmentGraph::with_downstreams(fragment_graph, downstream_fragments)?; - - // 6. Build the actor graph. - let cluster_info = self.cluster_manager.get_streaming_cluster_info().await; - let actor_graph_builder = - ActorGraphBuilder::new(complete_graph, cluster_info, default_parallelism)?; - - let ActorGraphBuildResult { - graph, - building_locations, - existing_locations, - dispatchers, - merge_updates, - } = actor_graph_builder - .generate_graph(self.env.id_gen_manager_ref(), stream_job) - .await?; - assert!(dispatchers.is_empty()); - - // 7. Build the table fragments structure that will be persisted in the stream manager, and - // the context that contains all information needed for building the actors on the compute - // nodes. - let table_fragments = - TableFragments::new(id.into(), graph, &building_locations.actor_locations, env); - - let ctx = ReplaceTableContext { - merge_updates, - building_locations, - existing_locations, - table_properties: stream_job.properties(), - }; - - Ok((ctx, table_fragments)) - } - async fn gen_unique_id(&self) -> MetaResult { let id = self.env.id_gen_manager().generate::().await? as u32; Ok(id) diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 2d243699dec7..7b14731cb1da 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -106,6 +106,7 @@ impl CreatingStreamingJobInfo { async fn delete_job(&self, job_id: TableId) { let mut jobs = self.streaming_jobs.lock().await; + tracing::info!("delete job: {}", job_id); jobs.remove(&job_id); } @@ -321,6 +322,7 @@ where .. }: CreateStreamingJobContext, ) -> MetaResult<()> { + tokio::time::sleep(std::time::Duration::from_secs(2)).await; let actor_map = table_fragments.actor_map(); // Actors on each stream node will need to know where their upstream lies. `actor_info` From a99c4e41c648e1de91122aac79046e43e16f32b7 Mon Sep 17 00:00:00 2001 From: August Date: Thu, 23 Feb 2023 15:45:08 +0800 Subject: [PATCH 02/10] clean --- src/meta/src/manager/catalog/mod.rs | 2 -- src/meta/src/stream/stream_manager.rs | 1 - 2 files changed, 3 deletions(-) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index d4c996afb398..1d2a3573ad69 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -193,8 +193,6 @@ where // database and schemas. user_core.increase_ref_count(database.owner, 1 + schemas_added.len()); - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - let mut version = self .notify_frontend(Operation::Add, Info::Database(database.to_owned())) .await; diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 7b14731cb1da..1994c67233e7 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -322,7 +322,6 @@ where .. }: CreateStreamingJobContext, ) -> MetaResult<()> { - tokio::time::sleep(std::time::Duration::from_secs(2)).await; let actor_map = table_fragments.actor_map(); // Actors on each stream node will need to know where their upstream lies. `actor_info` From 0935a72545f6597804bd0fbe58f79c28a3d4eec5 Mon Sep 17 00:00:00 2001 From: August Date: Thu, 23 Feb 2023 16:02:43 +0800 Subject: [PATCH 03/10] fix --- src/meta/src/manager/catalog/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 1d2a3573ad69..f73a3c81ceae 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1041,7 +1041,7 @@ where let source_key = (source.database_id, source.schema_id, source.name.clone()); let table_key = (table.database_id, table.schema_id, table.name.clone()); assert!( - database_core.sources.contains_key(&source.id) + !database_core.sources.contains_key(&source.id) && !database_core.tables.contains_key(&table.id) && database_core.has_in_progress_creation(&source_key) && database_core.has_in_progress_creation(&table_key), @@ -1232,7 +1232,7 @@ where let user_core = &mut core.user; let key = (index.database_id, index.schema_id, index.name.clone()); assert!( - database_core.indexes.contains_key(&index.id) + !database_core.indexes.contains_key(&index.id) && database_core.has_in_progress_creation(&key), "index must not exist and be in creating procedure" ); @@ -1356,7 +1356,7 @@ where let user_core = &mut core.user; let key = (sink.database_id, sink.schema_id, sink.name.clone()); assert!( - database_core.sinks.contains_key(&sink.id) + !database_core.sinks.contains_key(&sink.id) && database_core.has_in_progress_creation(&key), "sink must not exit and be in creating procedure" ); From a2e4cad3e9ebaf38d02f2f25254503025b587fce Mon Sep 17 00:00:00 2001 From: August Date: Thu, 23 Feb 2023 16:35:12 +0800 Subject: [PATCH 04/10] fmt --- src/meta/src/manager/catalog/mod.rs | 250 +++++++++++++------------- src/meta/src/stream/stream_manager.rs | 1 - 2 files changed, 123 insertions(+), 128 deletions(-) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index f73a3c81ceae..389c69ac4f94 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -602,33 +602,32 @@ where let database_core = &mut core.database; let mut tables = BTreeMapTransaction::new(&mut database_core.tables); let key = (table.database_id, table.schema_id, table.name.clone()); - if !tables.contains_key(&table.id) - && database_core.in_progress_creation_tracker.contains(&key) - { - database_core.in_progress_creation_tracker.remove(&key); - database_core - .in_progress_creation_streaming_job - .remove(&table.id); + assert!( + !tables.contains_key(&table.id) + && database_core.in_progress_creation_tracker.contains(&key), + "table must be in creating procedure" + ); + database_core.in_progress_creation_tracker.remove(&key); + database_core + .in_progress_creation_streaming_job + .remove(&table.id); + tables.insert(table.id, table.clone()); + for table in &internal_tables { tables.insert(table.id, table.clone()); - for table in &internal_tables { - tables.insert(table.id, table.clone()); - } - commit_meta!(self, tables)?; - - for internal_table in internal_tables { - self.notify_frontend(Operation::Add, Info::Table(internal_table)) - .await; - } + } + commit_meta!(self, tables)?; - let version = self - .notify_frontend(Operation::Add, Info::Table(table.to_owned())) + for internal_table in internal_tables { + self.notify_frontend(Operation::Add, Info::Table(internal_table)) .await; - - Ok(version) - } else { - unreachable!("table must not exist and must be in creating procedure"); } + + let version = self + .notify_frontend(Operation::Add, Info::Table(table.to_owned())) + .await; + + Ok(version) } pub async fn cancel_create_table_procedure(&self, table: &Table) { @@ -639,7 +638,7 @@ where assert!( !database_core.tables.contains_key(&table.id) && database_core.has_in_progress_creation(&key), - "table must not exit and be in creating procedure" + "table must be in creating procedure" ); database_core.unmark_creating(&key); @@ -878,22 +877,21 @@ where let database_core = &mut core.database; let mut sources = BTreeMapTransaction::new(&mut database_core.sources); let key = (source.database_id, source.schema_id, source.name.clone()); - if !sources.contains_key(&source.id) - && database_core.in_progress_creation_tracker.contains(&key) - { - database_core.in_progress_creation_tracker.remove(&key); - sources.insert(source.id, source.clone()); + assert!( + !sources.contains_key(&source.id) + && database_core.in_progress_creation_tracker.contains(&key), + "source must be in creating procedure" + ); + database_core.in_progress_creation_tracker.remove(&key); + sources.insert(source.id, source.clone()); - commit_meta!(self, sources)?; + commit_meta!(self, sources)?; - let version = self - .notify_frontend(Operation::Add, Info::Source(source.to_owned())) - .await; + let version = self + .notify_frontend(Operation::Add, Info::Source(source.to_owned())) + .await; - Ok(version) - } else { - unreachable!("source must not exist and must be in creating procedure"); - } + Ok(version) } pub async fn cancel_create_source_procedure(&self, source: &Source) -> MetaResult<()> { @@ -901,15 +899,15 @@ where let database_core = &mut core.database; let user_core = &mut core.user; let key = (source.database_id, source.schema_id, source.name.clone()); - if !database_core.sources.contains_key(&source.id) - && database_core.has_in_progress_creation(&key) - { - database_core.unmark_creating(&key); - user_core.decrease_ref(source.owner); - Ok(()) - } else { - unreachable!("source must not exist and must be in creating procedure"); - } + assert!( + !database_core.sources.contains_key(&source.id) + && database_core.has_in_progress_creation(&key), + "source must be in creating procedure" + ); + + database_core.unmark_creating(&key); + user_core.decrease_ref(source.owner); + Ok(()) } pub async fn drop_source(&self, source_id: SourceId) -> MetaResult { @@ -994,44 +992,42 @@ where let source_key = (source.database_id, source.schema_id, source.name.clone()); let mview_key = (mview.database_id, mview.schema_id, mview.name.clone()); - if !sources.contains_key(&source.id) - && !tables.contains_key(&mview.id) - && database_core - .in_progress_creation_tracker - .contains(&source_key) - && database_core - .in_progress_creation_tracker - .contains(&mview_key) - { - database_core - .in_progress_creation_tracker - .remove(&source_key); - database_core - .in_progress_creation_tracker - .remove(&mview_key); - database_core - .in_progress_creation_streaming_job - .remove(&mview.id); - - sources.insert(source.id, source.clone()); - tables.insert(mview.id, mview.clone()); - tables.insert(internal_table.id, internal_table.clone()); - - commit_meta!(self, sources, tables)?; - - self.notify_frontend(Operation::Add, Info::Table(internal_table.to_owned())) - .await; - self.notify_frontend(Operation::Add, Info::Table(mview.to_owned())) - .await; + assert!( + !sources.contains_key(&source.id) + && !tables.contains_key(&mview.id) + && database_core + .in_progress_creation_tracker + .contains(&source_key) + && database_core + .in_progress_creation_tracker + .contains(&mview_key), + "table and source must be in creating procedure" + ); + database_core + .in_progress_creation_tracker + .remove(&source_key); + database_core + .in_progress_creation_tracker + .remove(&mview_key); + database_core + .in_progress_creation_streaming_job + .remove(&mview.id); + + sources.insert(source.id, source.clone()); + tables.insert(mview.id, mview.clone()); + tables.insert(internal_table.id, internal_table.clone()); + + commit_meta!(self, sources, tables)?; + self.notify_frontend(Operation::Add, Info::Table(internal_table.to_owned())) + .await; + self.notify_frontend(Operation::Add, Info::Table(mview.to_owned())) + .await; - // Currently frontend uses source's version - let version = self - .notify_frontend(Operation::Add, Info::Source(source.to_owned())) - .await; - Ok(version) - } else { - unreachable!("source must not exist and must be in creating procedure"); - } + // Currently frontend uses source's version + let version = self + .notify_frontend(Operation::Add, Info::Source(source.to_owned())) + .await; + Ok(version) } pub async fn cancel_create_table_procedure_with_source(&self, source: &Source, table: &Table) { @@ -1045,7 +1041,7 @@ where && !database_core.tables.contains_key(&table.id) && database_core.has_in_progress_creation(&source_key) && database_core.has_in_progress_creation(&table_key), - "table and source must not exit and be in creating procedure" + "table and source must be in creating procedure" ); database_core.unmark_creating(&source_key); @@ -1234,7 +1230,7 @@ where assert!( !database_core.indexes.contains_key(&index.id) && database_core.has_in_progress_creation(&key), - "index must not exist and be in creating procedure" + "index must be in creating procedure" ); database_core.unmark_creating(&key); @@ -1257,30 +1253,30 @@ where let mut indexes = BTreeMapTransaction::new(&mut database_core.indexes); let mut tables = BTreeMapTransaction::new(&mut database_core.tables); - if !indexes.contains_key(&index.id) - && database_core.in_progress_creation_tracker.contains(&key) - { - database_core.in_progress_creation_tracker.remove(&key); - database_core - .in_progress_creation_streaming_job - .remove(&table.id); + assert!( + !indexes.contains_key(&index.id) + && database_core.in_progress_creation_tracker.contains(&key), + "index must be in creating procedure" + ); - indexes.insert(index.id, index.clone()); - tables.insert(table.id, table.clone()); + database_core.in_progress_creation_tracker.remove(&key); + database_core + .in_progress_creation_streaming_job + .remove(&table.id); - commit_meta!(self, indexes, tables)?; + indexes.insert(index.id, index.clone()); + tables.insert(table.id, table.clone()); - self.notify_frontend(Operation::Add, Info::Table(table.to_owned())) - .await; + commit_meta!(self, indexes, tables)?; - let version = self - .notify_frontend(Operation::Add, Info::Index(index.to_owned())) - .await; + self.notify_frontend(Operation::Add, Info::Table(table.to_owned())) + .await; - Ok(version) - } else { - unreachable!("index must not exist and must be in creating procedure"); - } + let version = self + .notify_frontend(Operation::Add, Info::Index(index.to_owned())) + .await; + + Ok(version) } pub async fn start_create_sink_procedure(&self, sink: &Sink) -> MetaResult<()> { @@ -1321,33 +1317,33 @@ where let key = (sink.database_id, sink.schema_id, sink.name.clone()); let mut tables = BTreeMapTransaction::new(&mut database_core.tables); let mut sinks = BTreeMapTransaction::new(&mut database_core.sinks); - if !sinks.contains_key(&sink.id) - && database_core.in_progress_creation_tracker.contains(&key) - { - database_core.in_progress_creation_tracker.remove(&key); - database_core - .in_progress_creation_streaming_job - .remove(&sink.id); - - sinks.insert(sink.id, sink.clone()); - for table in &internal_tables { - tables.insert(table.id, table.clone()); - } - commit_meta!(self, sinks, tables)?; + assert!( + !sinks.contains_key(&sink.id) + && database_core.in_progress_creation_tracker.contains(&key), + "sink must be in creating procedure" + ); - for internal_table in internal_tables { - self.notify_frontend(Operation::Add, Info::Table(internal_table)) - .await; - } + database_core.in_progress_creation_tracker.remove(&key); + database_core + .in_progress_creation_streaming_job + .remove(&sink.id); - let version = self - .notify_frontend(Operation::Add, Info::Sink(sink.to_owned())) - .await; + sinks.insert(sink.id, sink.clone()); + for table in &internal_tables { + tables.insert(table.id, table.clone()); + } + commit_meta!(self, sinks, tables)?; - Ok(version) - } else { - unreachable!("sink must not exist and must be in creating procedure"); + for internal_table in internal_tables { + self.notify_frontend(Operation::Add, Info::Table(internal_table)) + .await; } + + let version = self + .notify_frontend(Operation::Add, Info::Sink(sink.to_owned())) + .await; + + Ok(version) } pub async fn cancel_create_sink_procedure(&self, sink: &Sink) { @@ -1358,7 +1354,7 @@ where assert!( !database_core.sinks.contains_key(&sink.id) && database_core.has_in_progress_creation(&key), - "sink must not exit and be in creating procedure" + "sink must be in creating procedure" ); database_core.unmark_creating(&key); diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 1994c67233e7..2d243699dec7 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -106,7 +106,6 @@ impl CreatingStreamingJobInfo { async fn delete_job(&self, job_id: TableId) { let mut jobs = self.streaming_jobs.lock().await; - tracing::info!("delete job: {}", job_id); jobs.remove(&job_id); } From 133225cb8c46debfef6fd49f2b1b2e184d721ec9 Mon Sep 17 00:00:00 2001 From: August Date: Thu, 23 Feb 2023 17:14:33 +0800 Subject: [PATCH 05/10] chore change --- src/meta/src/manager/catalog/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 389c69ac4f94..7f5b7125786a 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -774,7 +774,8 @@ where } pub async fn get_index_table(&self, index_id: IndexId) -> MetaResult { - let index = Index::select(self.env.meta_store(), &index_id).await?; + let guard = self.core.lock().await; + let index = guard.database.indexes.get(&index_id); if let Some(index) = index { Ok(index.index_table_id) } else { From 5ce6a204651fc18d738b80da80fdeff239b4952a Mon Sep 17 00:00:00 2001 From: August Date: Fri, 24 Feb 2023 15:00:57 +0800 Subject: [PATCH 06/10] comments --- src/meta/src/rpc/ddl_controller.rs | 111 ++++++++++++++--------------- 1 file changed, 53 insertions(+), 58 deletions(-) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 12fe933e5d0b..4f2d51fd7fef 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -112,6 +112,10 @@ where Ok(()) } + /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client + /// has been interrupted during executing, the request will be cancelled by tonic. Since we have + /// a lot of logic for revert, status management, notification and so on, ensuring consistency + /// would be a huge hassle and pain if we don't spawn here. pub(crate) async fn run_command(&self, command: DdlCommand) -> MetaResult { self.check_barrier_manager_status().await?; let ctrl = self.clone(); @@ -217,12 +221,13 @@ where mut stream_job: StreamingJob, fragment_graph: StreamFragmentGraphProto, ) -> MetaResult { - let (ctx, table_fragments) = self - .prepare_stream_job(&mut stream_job, fragment_graph) - .await?; - - let internal_tables = ctx.internal_tables(); + let mut internal_tables = vec![]; let result = try { + let (ctx, table_fragments) = self + .prepare_stream_job(&mut stream_job, fragment_graph) + .await?; + + internal_tables = ctx.internal_tables(); if let Some(source) = stream_job.source() { self.source_manager.register_source(source).await?; } @@ -324,63 +329,53 @@ where }) .collect(); - let res: MetaResult<(CreateStreamingJobContext, TableFragments)> = try { - let complete_graph = CompleteStreamFragmentGraph::with_upstreams( - fragment_graph, - upstream_mview_fragments, - )?; - - // 6. Build the actor graph. - let cluster_info = self.cluster_manager.get_streaming_cluster_info().await; - let actor_graph_builder = - ActorGraphBuilder::new(complete_graph, cluster_info, default_parallelism)?; - - let ActorGraphBuildResult { - graph, - building_locations, - existing_locations, - dispatchers, - merge_updates, - } = actor_graph_builder - .generate_graph(self.env.id_gen_manager_ref(), stream_job) - .await?; - assert!(merge_updates.is_empty()); - - // 8. Build the table fragments structure that will be persisted in the stream manager, - // and the context that contains all information needed for building the - // actors on the compute nodes. - let table_fragments = - TableFragments::new(id.into(), graph, &building_locations.actor_locations, env); - - let ctx = CreateStreamingJobContext { - dispatchers, - upstream_mview_actors, - internal_tables, - building_locations, - existing_locations, - table_properties: stream_job.properties(), - definition: stream_job.mview_definition(), - }; - - // 9. Mark creating tables, including internal tables and the table of the stream job. - let creating_tables = ctx - .internal_tables() - .into_iter() - .chain(stream_job.table().cloned()) - .collect_vec(); + let complete_graph = + CompleteStreamFragmentGraph::with_upstreams(fragment_graph, upstream_mview_fragments)?; - self.catalog_manager - .mark_creating_tables(&creating_tables) - .await; + // 6. Build the actor graph. + let cluster_info = self.cluster_manager.get_streaming_cluster_info().await; + let actor_graph_builder = + ActorGraphBuilder::new(complete_graph, cluster_info, default_parallelism)?; - (ctx, table_fragments) + let ActorGraphBuildResult { + graph, + building_locations, + existing_locations, + dispatchers, + merge_updates, + } = actor_graph_builder + .generate_graph(self.env.id_gen_manager_ref(), stream_job) + .await?; + assert!(merge_updates.is_empty()); + + // 8. Build the table fragments structure that will be persisted in the stream manager, + // and the context that contains all information needed for building the + // actors on the compute nodes. + let table_fragments = + TableFragments::new(id.into(), graph, &building_locations.actor_locations, env); + + let ctx = CreateStreamingJobContext { + dispatchers, + upstream_mview_actors, + internal_tables, + building_locations, + existing_locations, + table_properties: stream_job.properties(), + definition: stream_job.mview_definition(), }; - if let Err(err) = &res { - tracing::error!("failed to build streaming graph for streaming job: {}", err); - self.cancel_stream_job(stream_job, vec![]).await; - } - res + // 9. Mark creating tables, including internal tables and the table of the stream job. + let creating_tables = ctx + .internal_tables() + .into_iter() + .chain(stream_job.table().cloned()) + .collect_vec(); + + self.catalog_manager + .mark_creating_tables(&creating_tables) + .await; + + Ok((ctx, table_fragments)) } /// `cancel_stream_job` cancels a stream job and clean some states. From db1bb908fa1e7643f95ed8727e916e5feb07a72e Mon Sep 17 00:00:00 2001 From: August Date: Fri, 24 Feb 2023 15:55:57 +0800 Subject: [PATCH 07/10] cancel --- src/meta/src/manager/catalog/mod.rs | 2 +- src/meta/src/rpc/ddl_controller.rs | 139 +++++++++++++++------------- 2 files changed, 75 insertions(+), 66 deletions(-) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 7f5b7125786a..b8f8f3245dec 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -862,7 +862,7 @@ where user_core.ensure_user_id(source.owner)?; if database_core.has_in_progress_creation(&key) { - bail!("table is in creating procedure"); + bail!("source is in creating procedure"); } else { database_core.mark_creating(&key); user_core.increase_ref(source.owner); diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 4f2d51fd7fef..68dc822c7734 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -221,13 +221,12 @@ where mut stream_job: StreamingJob, fragment_graph: StreamFragmentGraphProto, ) -> MetaResult { - let mut internal_tables = vec![]; - let result = try { - let (ctx, table_fragments) = self - .prepare_stream_job(&mut stream_job, fragment_graph) - .await?; + let (ctx, table_fragments) = self + .prepare_stream_job(&mut stream_job, fragment_graph) + .await?; - internal_tables = ctx.internal_tables(); + let internal_tables = ctx.internal_tables(); + let result = try { if let Some(source) = stream_job.source() { self.source_manager.register_source(source).await?; } @@ -313,69 +312,79 @@ where .start_create_stream_job_procedure(stream_job) .await?; - // 5. Resolve the upstream fragments, extend the fragment graph to a complete graph that - // contains all information needed for building the actor graph. - let upstream_mview_fragments = self - .fragment_manager - .get_upstream_mview_fragments(&dependent_relations) - .await; - let upstream_mview_actors = upstream_mview_fragments - .iter() - .map(|(&table_id, fragment)| { - ( - table_id, - fragment.actors.iter().map(|a| a.actor_id).collect_vec(), - ) - }) - .collect(); - - let complete_graph = - CompleteStreamFragmentGraph::with_upstreams(fragment_graph, upstream_mview_fragments)?; - - // 6. Build the actor graph. - let cluster_info = self.cluster_manager.get_streaming_cluster_info().await; - let actor_graph_builder = - ActorGraphBuilder::new(complete_graph, cluster_info, default_parallelism)?; - - let ActorGraphBuildResult { - graph, - building_locations, - existing_locations, - dispatchers, - merge_updates, - } = actor_graph_builder - .generate_graph(self.env.id_gen_manager_ref(), stream_job) - .await?; - assert!(merge_updates.is_empty()); + let res: MetaResult<(CreateStreamingJobContext, TableFragments)> = try { + // 5. Resolve the upstream fragments, extend the fragment graph to a complete graph that + // contains all information needed for building the actor graph. + let upstream_mview_fragments = self + .fragment_manager + .get_upstream_mview_fragments(&dependent_relations) + .await; + let upstream_mview_actors = upstream_mview_fragments + .iter() + .map(|(&table_id, fragment)| { + ( + table_id, + fragment.actors.iter().map(|a| a.actor_id).collect_vec(), + ) + }) + .collect(); + + let complete_graph = CompleteStreamFragmentGraph::with_upstreams( + fragment_graph, + upstream_mview_fragments, + )?; + + // 6. Build the actor graph. + let cluster_info = self.cluster_manager.get_streaming_cluster_info().await; + let actor_graph_builder = + ActorGraphBuilder::new(complete_graph, cluster_info, default_parallelism)?; + + let ActorGraphBuildResult { + graph, + building_locations, + existing_locations, + dispatchers, + merge_updates, + } = actor_graph_builder + .generate_graph(self.env.id_gen_manager_ref(), stream_job) + .await?; + assert!(merge_updates.is_empty()); + + // 8. Build the table fragments structure that will be persisted in the stream manager, + // and the context that contains all information needed for building the + // actors on the compute nodes. + let table_fragments = + TableFragments::new(id.into(), graph, &building_locations.actor_locations, env); + + let ctx = CreateStreamingJobContext { + dispatchers, + upstream_mview_actors, + internal_tables, + building_locations, + existing_locations, + table_properties: stream_job.properties(), + definition: stream_job.mview_definition(), + }; + + // 9. Mark creating tables, including internal tables and the table of the stream job. + let creating_tables = ctx + .internal_tables() + .into_iter() + .chain(stream_job.table().cloned()) + .collect_vec(); - // 8. Build the table fragments structure that will be persisted in the stream manager, - // and the context that contains all information needed for building the - // actors on the compute nodes. - let table_fragments = - TableFragments::new(id.into(), graph, &building_locations.actor_locations, env); + self.catalog_manager + .mark_creating_tables(&creating_tables) + .await; - let ctx = CreateStreamingJobContext { - dispatchers, - upstream_mview_actors, - internal_tables, - building_locations, - existing_locations, - table_properties: stream_job.properties(), - definition: stream_job.mview_definition(), + (ctx, table_fragments) }; + if let Err(err) = &res { + tracing::error!("failed to build streaming graph for streaming job: {}", err); + self.cancel_stream_job(stream_job, vec![]).await; + } - // 9. Mark creating tables, including internal tables and the table of the stream job. - let creating_tables = ctx - .internal_tables() - .into_iter() - .chain(stream_job.table().cloned()) - .collect_vec(); - - self.catalog_manager - .mark_creating_tables(&creating_tables) - .await; - - Ok((ctx, table_fragments)) + res } /// `cancel_stream_job` cancels a stream job and clean some states. From 0785ae98cb15b6636df749ccc5f579a1309fc9ca Mon Sep 17 00:00:00 2001 From: August Date: Fri, 24 Feb 2023 17:01:41 +0800 Subject: [PATCH 08/10] split --- src/meta/src/manager/catalog/fragment.rs | 4 +- src/meta/src/manager/streaming_job.rs | 11 ++ src/meta/src/rpc/ddl_controller.rs | 172 ++++++++++++----------- 3 files changed, 101 insertions(+), 86 deletions(-) diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index c29e54633a44..c243eff93e2f 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -824,12 +824,12 @@ where /// Get and filter the upstream `Materialize` fragments of the specified relations. pub async fn get_upstream_mview_fragments( &self, - dependent_relation_ids: &HashSet, + dependent_relation_ids: impl IntoIterator, ) -> HashMap { let map = &self.core.read().await.table_fragments; let mut fragments = HashMap::new(); - for &table_id in dependent_relation_ids { + for table_id in dependent_relation_ids.into_iter() { if let Some(table_fragments) = map.get(&table_id) && let Some(fragment) = table_fragments.mview_fragment() { fragments.insert(table_id, fragment); } diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 9073f6dec93d..5e06da77d379 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -93,6 +93,17 @@ impl StreamingJob { } } + pub fn dependent_relations(&self) -> impl IntoIterator { + let ids = match self { + StreamingJob::MaterializedView(table) => table.dependent_relations.clone(), + StreamingJob::Sink(sink) => sink.dependent_relations.clone(), + StreamingJob::Table(_, _) => vec![], + StreamingJob::Index(_, index_table) => index_table.dependent_relations.clone(), + }; + + ids.into_iter().map(TableId::from) + } + pub fn schema_id(&self) -> u32 { match self { Self::MaterializedView(table) => table.schema_id, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 68dc822c7734..489bd872dbc6 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -221,12 +221,19 @@ where mut stream_job: StreamingJob, fragment_graph: StreamFragmentGraphProto, ) -> MetaResult { - let (ctx, table_fragments) = self + let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap()); + let fragment_graph = self .prepare_stream_job(&mut stream_job, fragment_graph) .await?; - let internal_tables = ctx.internal_tables(); + let mut internal_tables = vec![]; let result = try { + let (ctx, table_fragments) = self + .build_stream_job(env, &stream_job, fragment_graph) + .await?; + + internal_tables = ctx.internal_tables(); + if let Some(source) = stream_job.source() { self.source_manager.register_source(source).await?; } @@ -283,108 +290,105 @@ where Ok(version) } - /// `prepare_stream_job` prepares a stream job and returns the context and table fragments. + /// `prepare_stream_job` prepares a stream job and returns the stream fragment graph. async fn prepare_stream_job( &self, stream_job: &mut StreamingJob, fragment_graph: StreamFragmentGraphProto, - ) -> MetaResult<(CreateStreamingJobContext, TableFragments)> { - // 1. Get the env for streaming jobs. - let env = StreamEnvironment::from_protobuf(fragment_graph.get_env().unwrap()); - - // 2. Build fragment graph. + ) -> MetaResult { + // 1. Build fragment graph. let fragment_graph = StreamFragmentGraph::new(fragment_graph, self.env.id_gen_manager_ref(), &*stream_job) .await?; - let default_parallelism = fragment_graph.default_parallelism(); - let internal_tables = fragment_graph.internal_tables(); - // 3. Set the graph-related fields and freeze the `stream_job`. + // 2. Set the graph-related fields and freeze the `stream_job`. stream_job.set_table_fragment_id(fragment_graph.table_fragment_id()); let dependent_relations = fragment_graph.dependent_relations().clone(); - stream_job.set_dependent_relations(dependent_relations.clone()); - - let id = stream_job.id(); + stream_job.set_dependent_relations(dependent_relations); let stream_job = &*stream_job; - // 4. Mark current relation as "creating" and add reference count to dependent relations. + // 3. Mark current relation as "creating" and add reference count to dependent relations. self.catalog_manager .start_create_stream_job_procedure(stream_job) .await?; - let res: MetaResult<(CreateStreamingJobContext, TableFragments)> = try { - // 5. Resolve the upstream fragments, extend the fragment graph to a complete graph that - // contains all information needed for building the actor graph. - let upstream_mview_fragments = self - .fragment_manager - .get_upstream_mview_fragments(&dependent_relations) - .await; - let upstream_mview_actors = upstream_mview_fragments - .iter() - .map(|(&table_id, fragment)| { - ( - table_id, - fragment.actors.iter().map(|a| a.actor_id).collect_vec(), - ) - }) - .collect(); - - let complete_graph = CompleteStreamFragmentGraph::with_upstreams( - fragment_graph, - upstream_mview_fragments, - )?; - - // 6. Build the actor graph. - let cluster_info = self.cluster_manager.get_streaming_cluster_info().await; - let actor_graph_builder = - ActorGraphBuilder::new(complete_graph, cluster_info, default_parallelism)?; - - let ActorGraphBuildResult { - graph, - building_locations, - existing_locations, - dispatchers, - merge_updates, - } = actor_graph_builder - .generate_graph(self.env.id_gen_manager_ref(), stream_job) - .await?; - assert!(merge_updates.is_empty()); - - // 8. Build the table fragments structure that will be persisted in the stream manager, - // and the context that contains all information needed for building the - // actors on the compute nodes. - let table_fragments = - TableFragments::new(id.into(), graph, &building_locations.actor_locations, env); - - let ctx = CreateStreamingJobContext { - dispatchers, - upstream_mview_actors, - internal_tables, - building_locations, - existing_locations, - table_properties: stream_job.properties(), - definition: stream_job.mview_definition(), - }; - - // 9. Mark creating tables, including internal tables and the table of the stream job. - let creating_tables = ctx - .internal_tables() - .into_iter() - .chain(stream_job.table().cloned()) - .collect_vec(); + Ok(fragment_graph) + } - self.catalog_manager - .mark_creating_tables(&creating_tables) - .await; + /// `build_stream_job` builds a streaming job and returns the context and table fragments. + async fn build_stream_job( + &self, + env: StreamEnvironment, + stream_job: &StreamingJob, + fragment_graph: StreamFragmentGraph, + ) -> MetaResult<(CreateStreamingJobContext, TableFragments)> { + let id = stream_job.id(); + let default_parallelism = fragment_graph.default_parallelism(); + let internal_tables = fragment_graph.internal_tables(); + + // 1. Resolve the upstream fragments, extend the fragment graph to a complete graph that + // contains all information needed for building the actor graph. + let upstream_mview_fragments = self + .fragment_manager + .get_upstream_mview_fragments(stream_job.dependent_relations()) + .await; + let upstream_mview_actors = upstream_mview_fragments + .iter() + .map(|(&table_id, fragment)| { + ( + table_id, + fragment.actors.iter().map(|a| a.actor_id).collect_vec(), + ) + }) + .collect(); - (ctx, table_fragments) + let complete_graph = + CompleteStreamFragmentGraph::with_upstreams(fragment_graph, upstream_mview_fragments)?; + + // 2. Build the actor graph. + let cluster_info = self.cluster_manager.get_streaming_cluster_info().await; + let actor_graph_builder = + ActorGraphBuilder::new(complete_graph, cluster_info, default_parallelism)?; + + let ActorGraphBuildResult { + graph, + building_locations, + existing_locations, + dispatchers, + merge_updates, + } = actor_graph_builder + .generate_graph(self.env.id_gen_manager_ref(), stream_job) + .await?; + assert!(merge_updates.is_empty()); + + // 3. Build the table fragments structure that will be persisted in the stream manager, + // and the context that contains all information needed for building the + // actors on the compute nodes. + let table_fragments = + TableFragments::new(id.into(), graph, &building_locations.actor_locations, env); + + let ctx = CreateStreamingJobContext { + dispatchers, + upstream_mview_actors, + internal_tables, + building_locations, + existing_locations, + table_properties: stream_job.properties(), + definition: stream_job.mview_definition(), }; - if let Err(err) = &res { - tracing::error!("failed to build streaming graph for streaming job: {}", err); - self.cancel_stream_job(stream_job, vec![]).await; - } - res + // 4. Mark creating tables, including internal tables and the table of the stream job. + let creating_tables = ctx + .internal_tables() + .into_iter() + .chain(stream_job.table().cloned()) + .collect_vec(); + + self.catalog_manager + .mark_creating_tables(&creating_tables) + .await; + + Ok((ctx, table_fragments)) } /// `cancel_stream_job` cancels a stream job and clean some states. From 6c9282be697032e2d40b939f34a504af15591c0b Mon Sep 17 00:00:00 2001 From: August Date: Fri, 24 Feb 2023 17:09:56 +0800 Subject: [PATCH 09/10] fmt --- src/meta/src/manager/catalog/fragment.rs | 4 ++-- src/meta/src/manager/streaming_job.rs | 11 ----------- src/meta/src/rpc/ddl_controller.rs | 2 +- 3 files changed, 3 insertions(+), 14 deletions(-) diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index c243eff93e2f..380e462a5d40 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -824,12 +824,12 @@ where /// Get and filter the upstream `Materialize` fragments of the specified relations. pub async fn get_upstream_mview_fragments( &self, - dependent_relation_ids: impl IntoIterator, + dependent_relation_ids: &HashSet, ) -> HashMap { let map = &self.core.read().await.table_fragments; let mut fragments = HashMap::new(); - for table_id in dependent_relation_ids.into_iter() { + for &table_id in dependent_relation_ids.into_iter() { if let Some(table_fragments) = map.get(&table_id) && let Some(fragment) = table_fragments.mview_fragment() { fragments.insert(table_id, fragment); } diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 5e06da77d379..9073f6dec93d 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -93,17 +93,6 @@ impl StreamingJob { } } - pub fn dependent_relations(&self) -> impl IntoIterator { - let ids = match self { - StreamingJob::MaterializedView(table) => table.dependent_relations.clone(), - StreamingJob::Sink(sink) => sink.dependent_relations.clone(), - StreamingJob::Table(_, _) => vec![], - StreamingJob::Index(_, index_table) => index_table.dependent_relations.clone(), - }; - - ids.into_iter().map(TableId::from) - } - pub fn schema_id(&self) -> u32 { match self { Self::MaterializedView(table) => table.schema_id, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 489bd872dbc6..b9543110048f 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -330,7 +330,7 @@ where // contains all information needed for building the actor graph. let upstream_mview_fragments = self .fragment_manager - .get_upstream_mview_fragments(stream_job.dependent_relations()) + .get_upstream_mview_fragments(fragment_graph.dependent_relations()) .await; let upstream_mview_actors = upstream_mview_fragments .iter() From d71a2a8134e7d173d043708bd01f78a0940ec143 Mon Sep 17 00:00:00 2001 From: August Date: Fri, 24 Feb 2023 17:22:33 +0800 Subject: [PATCH 10/10] clippy --- src/meta/src/manager/catalog/fragment.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 380e462a5d40..c29e54633a44 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -829,7 +829,7 @@ where let map = &self.core.read().await.table_fragments; let mut fragments = HashMap::new(); - for &table_id in dependent_relation_ids.into_iter() { + for &table_id in dependent_relation_ids { if let Some(table_fragments) = map.get(&table_id) && let Some(fragment) = table_fragments.mview_fragment() { fragments.insert(table_id, fragment); }