From b160d0b8050cdc1a384746269fe42da5da368dc9 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Thu, 7 Dec 2023 21:49:15 +0800 Subject: [PATCH] big change Signed-off-by: Shanicky Chen --- src/meta/service/src/ddl_service.rs | 10 +- src/meta/src/manager/catalog/mod.rs | 27 +++++- src/meta/src/manager/streaming_job.rs | 32 +++---- src/meta/src/rpc/ddl_controller.rs | 126 +++++++++++--------------- src/meta/src/stream/stream_manager.rs | 78 ++++------------ 5 files changed, 118 insertions(+), 155 deletions(-) diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 09ceea2c1eac8..4b8e9e983a9bc 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -292,7 +292,15 @@ impl DdlService for DdlServiceImpl { self.validate_connection(connection_id).await?; } - let mut stream_job = StreamingJob::Sink(sink); + let mut stream_job = match &affected_table_change { + None => StreamingJob::Sink(sink, None), + Some(change) => { + let table = change.table.clone().unwrap(); + let source = change.source.clone(); + StreamingJob::Sink(sink, Some((table, source))) + } + }; + let id = self.gen_unique_id::<{ IdCategory::Table }>().await?; stream_job.set_id(id); diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 638da1971ec46..8250862f9a71c 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -34,8 +34,8 @@ use risingwave_common::catalog::{ use risingwave_common::{bail, ensure}; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, TableType}; use risingwave_pb::catalog::{ - Comment, Connection, CreateType, Database, Function, Index, PbStreamJobStatus, Schema, Sink, - Source, StreamJobStatus, Table, View, + Comment, Connection, CreateType, Database, Function, Index, PbSource, PbStreamJobStatus, + Schema, Sink, Source, StreamJobStatus, Table, View, }; use risingwave_pb::ddl_service::{alter_owner_request, alter_set_schema_request}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; @@ -646,7 +646,7 @@ impl CatalogManager { self.start_create_table_procedure(table, internal_tables) .await } - StreamingJob::Sink(sink) => self.start_create_sink_procedure(sink).await, + StreamingJob::Sink(sink, _) => self.start_create_sink_procedure(sink).await, StreamingJob::Index(index, index_table) => { self.start_create_index_procedure(index, index_table).await } @@ -2811,7 +2811,11 @@ impl CatalogManager { Ok(version) } - pub async fn cancel_create_sink_procedure(&self, sink: &Sink) { + pub async fn cancel_create_sink_procedure( + &self, + sink: &Sink, + target_table: &Option<(Table, Option)>, + ) { let core = &mut *self.core.lock().await; let database_core = &mut core.database; let user_core = &mut core.user; @@ -2828,6 +2832,10 @@ impl CatalogManager { } user_core.decrease_ref(sink.owner); refcnt_dec_connection(database_core, sink.connection_id); + + if let Some((table, source)) = target_table { + Self::cancel_replace_table_procedure_inner(source, table, core); + } } /// This is used for `ALTER TABLE ADD/DROP COLUMN`. @@ -2974,6 +2982,16 @@ impl CatalogManager { unreachable!("unexpected job: {stream_job:?}") }; let core = &mut *self.core.lock().await; + + Self::cancel_replace_table_procedure_inner(source, table, core); + Ok(()) + } + + fn cancel_replace_table_procedure_inner( + source: &Option, + table: &Table, + core: &mut CatalogManagerCore, + ) { let database_core = &mut core.database; let key = (table.database_id, table.schema_id, table.name.clone()); @@ -2999,7 +3017,6 @@ impl CatalogManager { // TODO: Here we reuse the `creation` tracker for `alter` procedure, as an `alter` must // occur after it's created. We may need to add a new tracker for `alter` procedure.s database_core.unmark_creating(&key); - Ok(()) } pub async fn comment_on(&self, comment: Comment) -> MetaResult { diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index da70542421016..cc7545c29f340 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -26,7 +26,7 @@ use crate::model::FragmentId; #[derive(Debug, Clone)] pub enum StreamingJob { MaterializedView(Table), - Sink(Sink), + Sink(Sink, Option<(Table, Option)>), Table(Option, Table, TableJobType), Index(Index, Table), Source(PbSource), @@ -55,7 +55,7 @@ impl From<&StreamingJob> for DdlType { fn from(job: &StreamingJob) -> Self { match job { StreamingJob::MaterializedView(_) => DdlType::MaterializedView, - StreamingJob::Sink(_) => DdlType::Sink, + StreamingJob::Sink(_, _) => DdlType::Sink, StreamingJob::Table(_, _, _) => DdlType::Table, StreamingJob::Index(_, _) => DdlType::Index, StreamingJob::Source(_) => DdlType::Source, @@ -68,7 +68,7 @@ impl StreamingJob { let created_at_epoch = Some(Epoch::now().0); match self { StreamingJob::MaterializedView(table) => table.created_at_epoch = created_at_epoch, - StreamingJob::Sink(table) => table.created_at_epoch = created_at_epoch, + StreamingJob::Sink(table, _) => table.created_at_epoch = created_at_epoch, StreamingJob::Table(source, table, ..) => { table.created_at_epoch = created_at_epoch; if let Some(source) = source { @@ -90,7 +90,7 @@ impl StreamingJob { StreamingJob::MaterializedView(table) => { table.initialized_at_epoch = initialized_at_epoch } - StreamingJob::Sink(table) => table.initialized_at_epoch = initialized_at_epoch, + StreamingJob::Sink(table, _) => table.initialized_at_epoch = initialized_at_epoch, StreamingJob::Table(source, table, ..) => { table.initialized_at_epoch = initialized_at_epoch; if let Some(source) = source { @@ -111,7 +111,7 @@ impl StreamingJob { pub fn set_id(&mut self, id: u32) { match self { Self::MaterializedView(table) => table.id = id, - Self::Sink(sink) => sink.id = id, + Self::Sink(sink, _) => sink.id = id, Self::Table(_, table, ..) => table.id = id, Self::Index(index, index_table) => { index.id = id; @@ -132,7 +132,7 @@ impl StreamingJob { Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => { table.fragment_id = id; } - Self::Sink(_) | Self::Source(_) => {} + Self::Sink(_, _) | Self::Source(_) => {} } } @@ -142,7 +142,7 @@ impl StreamingJob { Self::Table(_, table, ..) => { table.dml_fragment_id = id; } - Self::MaterializedView(_) | Self::Index(_, _) | Self::Sink(_) => {} + Self::MaterializedView(_) | Self::Index(_, _) | Self::Sink(_, _) => {} Self::Source(_) => {} } } @@ -150,7 +150,7 @@ impl StreamingJob { pub fn id(&self) -> u32 { match self { Self::MaterializedView(table) => table.id, - Self::Sink(sink) => sink.id, + Self::Sink(sink, _) => sink.id, Self::Table(_, table, ..) => table.id, Self::Index(index, _) => index.id, Self::Source(source) => source.id, @@ -160,7 +160,7 @@ impl StreamingJob { pub fn mv_table(&self) -> Option { match self { Self::MaterializedView(table) => Some(table.id), - Self::Sink(_sink) => None, + Self::Sink(_, _) => None, Self::Table(_, table, ..) => Some(table.id), Self::Index(_, table) => Some(table.id), Self::Source(_) => None, @@ -173,14 +173,14 @@ impl StreamingJob { Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => { Some(table) } - Self::Sink(_) | Self::Source(_) => None, + Self::Sink(_, _) | Self::Source(_) => None, } } pub fn schema_id(&self) -> u32 { match self { Self::MaterializedView(table) => table.schema_id, - Self::Sink(sink) => sink.schema_id, + Self::Sink(sink, _) => sink.schema_id, Self::Table(_, table, ..) => table.schema_id, Self::Index(index, _) => index.schema_id, Self::Source(source) => source.schema_id, @@ -190,7 +190,7 @@ impl StreamingJob { pub fn database_id(&self) -> u32 { match self { Self::MaterializedView(table) => table.database_id, - Self::Sink(sink) => sink.database_id, + Self::Sink(sink, _) => sink.database_id, Self::Table(_, table, ..) => table.database_id, Self::Index(index, _) => index.database_id, Self::Source(source) => source.database_id, @@ -200,7 +200,7 @@ impl StreamingJob { pub fn name(&self) -> String { match self { Self::MaterializedView(table) => table.name.clone(), - Self::Sink(sink) => sink.name.clone(), + Self::Sink(sink, _) => sink.name.clone(), Self::Table(_, table, ..) => table.name.clone(), Self::Index(index, _) => index.name.clone(), Self::Source(source) => source.name.clone(), @@ -210,7 +210,7 @@ impl StreamingJob { pub fn owner(&self) -> u32 { match self { StreamingJob::MaterializedView(mv) => mv.owner, - StreamingJob::Sink(sink) => sink.owner, + StreamingJob::Sink(sink, _) => sink.owner, StreamingJob::Table(_, table, ..) => table.owner, StreamingJob::Index(index, _) => index.owner, StreamingJob::Source(source) => source.owner, @@ -222,7 +222,7 @@ impl StreamingJob { Self::MaterializedView(table) => table.definition.clone(), Self::Table(_, table, ..) => table.definition.clone(), Self::Index(_, table) => table.definition.clone(), - Self::Sink(sink) => sink.definition.clone(), + Self::Sink(sink, _) => sink.definition.clone(), Self::Source(source) => source.definition.clone(), } } @@ -230,7 +230,7 @@ impl StreamingJob { pub fn properties(&self) -> HashMap { match self { Self::MaterializedView(table) => table.properties.clone(), - Self::Sink(sink) => sink.properties.clone(), + Self::Sink(sink, _) => sink.properties.clone(), Self::Table(_, table, ..) => table.properties.clone(), Self::Index(_, index_table) => index_table.properties.clone(), Self::Source(source) => source.properties.clone(), diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 57aeb361ca493..93ea8943d2873 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -59,8 +59,8 @@ use crate::model::{FragmentId, StreamEnvironment, TableFragments}; use crate::rpc::cloud_provider::AwsEc2Client; use crate::stream::{ validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, - CreateStreamingJobContext, GlobalStreamManagerRef, ReplaceTableContext, ReplaceTableJobForSink, - SourceManagerRef, StreamFragmentGraph, + CreateStreamingJobContext, GlobalStreamManagerRef, ReplaceTableContext, SourceManagerRef, + StreamFragmentGraph, }; use crate::{MetaError, MetaResult}; @@ -99,7 +99,7 @@ impl StreamingJobId { } } -/// It’s used to describe the information of the table that needs to be replaced and it will be used during `ReplaceTable` and `CreateSink`/`DropSink` operations. +// It’s used to describe the information of the table that needs to be replaced and it will be used during replacing table and creating sink into table operations. pub struct ReplaceTableInfo { pub streaming_job: StreamingJob, pub fragment_graph: StreamFragmentGraphProto, @@ -504,7 +504,7 @@ impl DdlController { let mut internal_tables = vec![]; let result = try { tracing::debug!(id = stream_job.id(), "building stream job"); - let (ctx, table_fragments, replace_table_job) = self + let (ctx, table_fragments) = self .build_stream_job( env.clone(), &stream_job, @@ -523,7 +523,16 @@ impl DdlController { // Register the source on the connector node. self.source_manager.register_source(source).await?; } - StreamingJob::Sink(ref sink) => { + StreamingJob::Sink(ref sink, ref mut target_table) => { + // When sinking into table occurs, some variables of the target table may be modified, + // such as `fragment_id` being altered by `prepare_replace_table`. + // At this point, it’s necessary to update the table info carried with the sink. + if let Some((StreamingJob::Table(source, table, _), ..)) = + &ctx.replace_table_job_info + { + *target_table = Some((table.clone(), source.clone())); + } + // Validate the sink on the connector node. validate_sink(sink).await?; } @@ -533,10 +542,10 @@ impl DdlController { } _ => {} } - (ctx, table_fragments, replace_table_job) + (ctx, table_fragments) }; - let (ctx, table_fragments, replace_table_job) = match result { + let (ctx, table_fragments) = match result { Ok(r) => r, Err(e) => { self.cancel_stream_job(&stream_job, internal_tables, Some(&e)) @@ -547,14 +556,8 @@ impl DdlController { match create_type { CreateType::Foreground | CreateType::Unspecified => { - self.create_streaming_job_inner( - stream_job, - table_fragments, - ctx, - internal_tables, - replace_table_job, - ) - .await + self.create_streaming_job_inner(stream_job, table_fragments, ctx, internal_tables) + .await } CreateType::Background => { let ctrl = self.clone(); @@ -566,7 +569,6 @@ impl DdlController { table_fragments, ctx, internal_tables, - replace_table_job, ) .await; match result { @@ -631,7 +633,7 @@ impl DdlController { fragment_graph, .. }: ReplaceTableInfo, - ) -> MetaResult { + ) -> MetaResult<(StreamingJob, ReplaceTableContext, TableFragments)> { let fragment_graph = self .prepare_replace_table(&mut streaming_job, fragment_graph) .await?; @@ -674,11 +676,7 @@ impl DdlController { target_fragment_id, ); - Ok(ReplaceTableJobForSink { - streaming_job, - context: Some(replace_table_ctx), - table_fragments: Some(table_fragments), - }) + Ok((streaming_job, replace_table_ctx, table_fragments)) } fn inject_replace_table_plan_for_sink( @@ -771,23 +769,13 @@ impl DdlController { table_fragments: TableFragments, ctx: CreateStreamingJobContext, internal_tables: Vec, - replace_table_job: Option, ) -> MetaResult { let job_id = stream_job.id(); tracing::debug!(id = job_id, "creating stream job"); - let mut replace_table_job = replace_table_job; - - let replace_table_detail = replace_table_job.as_mut().map(|replace_table_job| { - let context = replace_table_job.context.take().unwrap(); - let table_fragments = replace_table_job.table_fragments.take().unwrap(); - - (context, table_fragments) - }); - let result = self .stream_manager - .create_streaming_job(table_fragments, ctx, replace_table_detail) + .create_streaming_job(table_fragments, ctx) .await; if let Err(e) = result { @@ -800,35 +788,15 @@ impl DdlController { _ => { self.cancel_stream_job(&stream_job, internal_tables, Some(&e)) .await?; - - if let Some(replace_table_job) = &replace_table_job { - let _ = self - .cancel_replace_table(&replace_table_job.streaming_job) - .await; - } } } return Err(e); }; - let sink_id = if let StreamingJob::Sink(s) = &stream_job { - Some(s.id as SinkId) - } else { - None - }; - tracing::debug!(id = job_id, "finishing stream job"); let version = self.finish_stream_job(stream_job, internal_tables).await?; tracing::debug!(id = job_id, "finished stream job"); - if let Some(replace_table_job) = &replace_table_job { - let version = self - .finish_replace_table(&replace_table_job.streaming_job, None, sink_id) - .await?; - - return Ok(version); - } - Ok(version) } @@ -968,11 +936,7 @@ impl DdlController { stream_job: &StreamingJob, fragment_graph: StreamFragmentGraph, affected_table_replace_info: Option, - ) -> MetaResult<( - CreateStreamingJobContext, - TableFragments, - Option, - )> { + ) -> MetaResult<(CreateStreamingJobContext, TableFragments)> { let id = stream_job.id(); let default_parallelism = fragment_graph.default_parallelism(); let internal_tables = fragment_graph.internal_tables(); @@ -1033,6 +997,14 @@ impl DdlController { env.clone(), ); + let replace_table_job_info = match affected_table_replace_info { + Some(replace_table_info) => Some( + self.inject_replace_table_job(env, &table_fragments, replace_table_info) + .await?, + ), + None => None, + }; + let ctx = CreateStreamingJobContext { dispatchers, upstream_mview_actors: upstream_actors, @@ -1044,6 +1016,7 @@ impl DdlController { mv_table_id: stream_job.mv_table(), create_type: stream_job.create_type(), ddl_type: stream_job.into(), + replace_table_job_info, }; // 4. Mark creating tables, including internal tables and the table of the stream job. @@ -1057,18 +1030,7 @@ impl DdlController { .mark_creating_tables(&creating_tables) .await; - // 5. If we have other tables that will be affected (for example, Sink into table), - // we need to modify them to generate a replace table plan to change their operational status. - let replace_table_job = if let Some(replace_table_info) = affected_table_replace_info { - Some( - self.inject_replace_table_job(env, &table_fragments, replace_table_info) - .await?, - ) - } else { - None - }; - - Ok((ctx, table_fragments, replace_table_job)) + Ok((ctx, table_fragments)) } /// This is NOT used by `CANCEL JOBS`. @@ -1105,9 +1067,9 @@ impl DdlController { tracing::warn!("Failed to cancel create table procedure, perhaps barrier manager has already cleaned it. Reason: {e:#?}"); } } - StreamingJob::Sink(sink) => { + StreamingJob::Sink(sink, target_table) => { self.catalog_manager - .cancel_create_sink_procedure(sink) + .cancel_create_sink_procedure(sink, target_table) .await; } StreamingJob::Table(source, table, ..) => { @@ -1167,10 +1129,24 @@ impl DdlController { .finish_create_table_procedure(internal_tables, table) .await? } - StreamingJob::Sink(sink) => { - self.catalog_manager + StreamingJob::Sink(sink, target_table) => { + let sink_id = sink.id; + + let mut version = self + .catalog_manager .finish_create_sink_procedure(internal_tables, sink) - .await? + .await?; + + if let Some((table, source)) = target_table { + let streaming_job = + StreamingJob::Table(source, table, TableJobType::Unspecified); + + version = self + .finish_replace_table(&streaming_job, None, Some(sink_id)) + .await?; + } + + version } StreamingJob::Table(source, table, ..) => { creating_internal_table_ids.push(table.id); diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 8a1c2eeea10a3..30266b7e92951 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -71,6 +71,9 @@ pub struct CreateStreamingJobContext { pub create_type: CreateType, pub ddl_type: DdlType, + + /// Context provided for potential replace table, typically used when sinking into a table. + pub replace_table_job_info: Option<(StreamingJob, ReplaceTableContext, TableFragments)>, } impl CreateStreamingJobContext { @@ -175,13 +178,6 @@ pub struct ReplaceTableContext { pub table_properties: HashMap, } -// This is used to replace the downstream table during the sinking into table process. -pub struct ReplaceTableJobForSink { - pub streaming_job: StreamingJob, - pub context: Option, - pub table_fragments: Option, -} - /// `GlobalStreamManager` manages all the streams in the system. pub struct GlobalStreamManager { pub env: MetaSrvEnv, @@ -248,7 +244,6 @@ impl GlobalStreamManager { self: &Arc, table_fragments: TableFragments, ctx: CreateStreamingJobContext, - replace_table_detail: Option<(ReplaceTableContext, TableFragments)>, ) -> MetaResult<()> { let table_id = table_fragments.table_id(); let (sender, mut receiver) = tokio::sync::mpsc::channel(10); @@ -259,12 +254,7 @@ impl GlobalStreamManager { let fut = async move { let mut revert_funcs = vec![]; let res = stream_manager - .create_streaming_job_impl( - &mut revert_funcs, - table_fragments, - ctx, - replace_table_detail, - ) + .create_streaming_job_impl(&mut revert_funcs, table_fragments, ctx) .await; match res { Ok(_) => { @@ -456,8 +446,8 @@ impl GlobalStreamManager { internal_tables, create_type, ddl_type, + replace_table_job_info, }: CreateStreamingJobContext, - replace_table_detail: Option<(ReplaceTableContext, TableFragments)>, ) -> MetaResult<()> { let mut replace_table_command = None; let mut replace_table_id = None; @@ -486,7 +476,7 @@ impl GlobalStreamManager { self.build_actors(&table_fragments, &building_locations, &existing_locations) .await?; - if let Some((context, table_fragments)) = replace_table_detail { + if let Some((_, context, table_fragments)) = replace_table_job_info { self.build_actors( &table_fragments, &context.building_locations, @@ -526,53 +516,25 @@ impl GlobalStreamManager { let init_split_assignment = self.source_manager.pre_allocate_splits(&table_id).await?; - let command = if let Some(ReplaceTablePlan { - old_table_fragments, - new_table_fragments, - merge_updates, - dispatchers: table_dispatchers, - init_split_assignment: table_init_split_assignment, - }) = replace_table_command - { - Command::CreateStreamingJob { - table_fragments, - upstream_mview_actors, - dispatchers, - init_split_assignment, - definition: definition.to_string(), - ddl_type, - replace_table: Some(ReplaceTablePlan { - old_table_fragments, - new_table_fragments, - merge_updates, - dispatchers: table_dispatchers, - init_split_assignment: table_init_split_assignment, - }), - } - } else { - Command::CreateStreamingJob { - table_fragments, - upstream_mview_actors, - dispatchers, - init_split_assignment, - definition: definition.to_string(), - ddl_type, - replace_table: None, - } + let command = Command::CreateStreamingJob { + table_fragments, + upstream_mview_actors, + dispatchers, + init_split_assignment, + definition: definition.to_string(), + ddl_type, + replace_table: replace_table_command, }; if let Err(err) = self.barrier_scheduler.run_command(command).await { if create_type == CreateType::Foreground { - self.fragment_manager - .drop_table_fragments_vec(&HashSet::from_iter(std::iter::once(table_id))) - .await?; + let mut table_ids = HashSet::from_iter(std::iter::once(table_id)); if let Some(dummy_table_id) = replace_table_id { - self.fragment_manager - .drop_table_fragments_vec(&HashSet::from_iter(std::iter::once( - dummy_table_id, - ))) - .await?; + table_ids.insert(dummy_table_id); } + self.fragment_manager + .drop_table_fragments_vec(&table_ids) + .await?; } return Err(err); @@ -1052,7 +1014,7 @@ mod tests { .start_create_table_procedure(&table, vec![]) .await?; self.global_stream_manager - .create_streaming_job(table_fragments, ctx, None) + .create_streaming_job(table_fragments, ctx) .await?; self.catalog_manager .finish_create_table_procedure(vec![], table)