diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index ad6f5ba38942e..3556890db04cc 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -40,7 +40,7 @@ use crate::barrier::BarrierManagerRef; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{MetaSrvEnv, StreamingJob}; use crate::rpc::ddl_controller::{ - DdlCommand, DdlController, DropMode, ReplaceTableInfo, StreamingJobId, + DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId, }; use crate::stream::{GlobalStreamManagerRef, SourceManagerRef}; use crate::MetaError; @@ -91,13 +91,13 @@ impl DdlServiceImpl { source, job_type, }: ReplaceTablePlan, - ) -> ReplaceTableInfo { + ) -> ReplaceStreamJobInfo { let table = table.unwrap(); let col_index_mapping = table_col_index_mapping .as_ref() .map(ColIndexMapping::from_protobuf); - ReplaceTableInfo { + ReplaceStreamJobInfo { streaming_job: StreamingJob::Table( source, table, diff --git a/src/meta/src/barrier/checkpoint/control.rs b/src/meta/src/barrier/checkpoint/control.rs index c91f63196c667..1d7eef6f81b5e 100644 --- a/src/meta/src/barrier/checkpoint/control.rs +++ b/src/meta/src/barrier/checkpoint/control.rs @@ -643,7 +643,7 @@ impl DatabaseCheckpointControl { node.state.resps.extend(resps); finished_jobs.push(TrackingJob::New(TrackingCommand { info, - replace_table_info: None, + replace_stream_job: None, })); }); let task = task.get_or_insert_default(); diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 2a5db2ab9eb5a..4e7a659c5029d 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -84,12 +84,14 @@ pub struct Reschedule { pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>, } -/// Replacing an old table with a new one. All actors in the table job will be rebuilt. -/// Used for `ALTER TABLE` ([`Command::ReplaceTable`]) and sink into table ([`Command::CreateStreamingJob`]). +/// Replacing an old job with a new one. All actors in the job will be rebuilt. +/// Used for `ALTER TABLE` ([`Command::ReplaceStreamJob`]) and sink into table ([`Command::CreateStreamingJob`]). #[derive(Debug, Clone)] -pub struct ReplaceTablePlan { +pub struct ReplaceStreamJobPlan { pub old_fragments: StreamJobFragments, pub new_fragments: StreamJobFragments, + /// Downstream jobs of the replaced job need to update their `Merge` node to + /// connect to the new fragment. pub merge_updates: Vec, pub dispatchers: HashMap>, /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids. @@ -104,7 +106,7 @@ pub struct ReplaceTablePlan { pub tmp_id: u32, } -impl ReplaceTablePlan { +impl ReplaceStreamJobPlan { fn fragment_changes(&self) -> HashMap { let mut fragment_changes = HashMap::new(); for fragment in self.new_fragments.fragments.values() { @@ -206,7 +208,7 @@ pub struct SnapshotBackfillInfo { #[derive(Debug, Clone)] pub enum CreateStreamingJobType { Normal, - SinkIntoTable(ReplaceTablePlan), + SinkIntoTable(ReplaceStreamJobPlan), SnapshotBackfill(SnapshotBackfillInfo), } @@ -271,13 +273,13 @@ pub enum Command { fragment_actors: HashMap>, }, - /// `ReplaceTable` command generates a `Update` barrier with the given `merge_updates`. This is + /// `ReplaceStreamJob` command generates a `Update` barrier with the given `merge_updates`. This is /// essentially switching the downstream of the old table fragments to the new ones, and /// dropping the old table fragments. Used for table schema change. /// /// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment /// of the Merge executors are changed additionally. - ReplaceTable(ReplaceTablePlan), + ReplaceStreamJob(ReplaceStreamJobPlan), /// `SourceSplitAssignment` generates a `Splits` barrier for pushing initialized splits or /// changed splits. @@ -384,7 +386,7 @@ impl Command { }) .collect(), ), - Command::ReplaceTable(plan) => Some(plan.fragment_changes()), + Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()), Command::MergeSnapshotBackfillStreamingJobs(_) => None, Command::SourceSplitAssignment(_) => None, Command::Throttle(_) => None, @@ -688,7 +690,7 @@ impl Command { subscriptions_to_add, })); - if let CreateStreamingJobType::SinkIntoTable(ReplaceTablePlan { + if let CreateStreamingJobType::SinkIntoTable(ReplaceStreamJobPlan { old_fragments, new_fragments: _, merge_updates, @@ -731,7 +733,7 @@ impl Command { })) } - Command::ReplaceTable(ReplaceTablePlan { + Command::ReplaceStreamJob(ReplaceStreamJobPlan { old_fragments, merge_updates, dispatchers, @@ -943,7 +945,7 @@ impl Command { } Some(map) } - Command::ReplaceTable(replace_table) => { + Command::ReplaceStreamJob(replace_table) => { Some(replace_table.new_fragments.actors_to_create()) } _ => None, diff --git a/src/meta/src/barrier/context/context_impl.rs b/src/meta/src/barrier/context/context_impl.rs index a687312f29e16..4d652494ffd36 100644 --- a/src/meta/src/barrier/context/context_impl.rs +++ b/src/meta/src/barrier/context/context_impl.rs @@ -27,7 +27,7 @@ use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerCon use crate::barrier::progress::TrackingJob; use crate::barrier::{ BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo, - CreateStreamingJobType, RecoveryReason, ReplaceTablePlan, Scheduled, + CreateStreamingJobType, RecoveryReason, ReplaceStreamJobPlan, Scheduled, }; use crate::hummock::CommitEpochInfo; use crate::{MetaError, MetaResult}; @@ -180,7 +180,7 @@ impl CommandContext { ) .await?; - if let CreateStreamingJobType::SinkIntoTable(ReplaceTablePlan { + if let CreateStreamingJobType::SinkIntoTable(ReplaceStreamJobPlan { new_fragments, dispatchers, init_split_assignment, @@ -223,7 +223,7 @@ impl CommandContext { .await?; } - Command::ReplaceTable(ReplaceTablePlan { + Command::ReplaceStreamJob(ReplaceStreamJobPlan { old_fragments, new_fragments, dispatchers, diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 4143827027693..beb54da013ea2 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -44,8 +44,8 @@ mod utils; mod worker; pub use self::command::{ - BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceTablePlan, - Reschedule, SnapshotBackfillInfo, + BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, + ReplaceStreamJobPlan, Reschedule, SnapshotBackfillInfo, }; pub use self::info::InflightSubscriptionInfo; pub use self::manager::{BarrierManagerRef, GlobalBarrierManager}; diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index eac060004116a..aa676c01b3bdb 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -27,7 +27,7 @@ use risingwave_pb::stream_service::PbBarrierCompleteResponse; use crate::barrier::info::BarrierInfo; use crate::barrier::{ - Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceTablePlan, + Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceStreamJobPlan, }; use crate::manager::{DdlType, MetadataManager}; use crate::model::{ActorId, BackfillUpstreamType, StreamJobFragments}; @@ -229,7 +229,7 @@ impl TrackingJob { .catalog_controller .finish_streaming_job( streaming_job.id() as i32, - command.replace_table_info.clone(), + command.replace_stream_job.clone(), ) .await?; Ok(()) @@ -274,7 +274,7 @@ pub struct RecoveredTrackingJob { /// The command tracking by the [`CreateMviewProgressTracker`]. pub(super) struct TrackingCommand { pub info: CreateStreamingJobCommandInfo, - pub replace_table_info: Option, + pub replace_stream_job: Option, } /// Tracking is done as follows: @@ -379,7 +379,10 @@ impl CreateMviewProgressTracker { pub(super) fn update_tracking_jobs<'a>( &mut self, - info: Option<(&CreateStreamingJobCommandInfo, Option<&ReplaceTablePlan>)>, + info: Option<( + &CreateStreamingJobCommandInfo, + Option<&ReplaceStreamJobPlan>, + )>, create_mview_progress: impl IntoIterator, version_stats: &HummockVersionStats, ) { @@ -389,9 +392,9 @@ impl CreateMviewProgressTracker { let finished_commands = { let mut commands = vec![]; // Add the command to tracker. - if let Some((create_job_info, replace_table)) = info + if let Some((create_job_info, replace_stream_job)) = info && let Some(command) = - self.add(create_job_info, replace_table, version_stats) + self.add(create_job_info, replace_stream_job, version_stats) { // Those with no actors to track can be finished immediately. commands.push(command); @@ -429,8 +432,8 @@ impl CreateMviewProgressTracker { if let Some(Command::CreateStreamingJob { info, job_type }) = command { match job_type { CreateStreamingJobType::Normal => Some((info, None)), - CreateStreamingJobType::SinkIntoTable(replace_table) => { - Some((info, Some(replace_table))) + CreateStreamingJobType::SinkIntoTable(replace_stream_job) => { + Some((info, Some(replace_stream_job))) } CreateStreamingJobType::SnapshotBackfill(_) => { // The progress of SnapshotBackfill won't be tracked here @@ -494,24 +497,24 @@ impl CreateMviewProgressTracker { pub fn add( &mut self, info: &CreateStreamingJobCommandInfo, - replace_table: Option<&ReplaceTablePlan>, + replace_stream_job: Option<&ReplaceStreamJobPlan>, version_stats: &HummockVersionStats, ) -> Option { tracing::trace!(?info, "add job to track"); let (info, actors, replace_table_info) = { let CreateStreamingJobCommandInfo { - stream_job_fragments: table_fragments, + stream_job_fragments, .. } = info; - let actors = table_fragments.tracking_progress_actor_ids(); + let actors = stream_job_fragments.tracking_progress_actor_ids(); if actors.is_empty() { // The command can be finished immediately. return Some(TrackingJob::New(TrackingCommand { info: info.clone(), - replace_table_info: replace_table.cloned(), + replace_stream_job: replace_stream_job.cloned(), })); } - (info.clone(), actors, replace_table.cloned()) + (info.clone(), actors, replace_stream_job.cloned()) }; let CreateStreamingJobCommandInfo { @@ -567,7 +570,7 @@ impl CreateMviewProgressTracker { // that the sink job has been created. Some(TrackingJob::New(TrackingCommand { info, - replace_table_info, + replace_stream_job: replace_table_info, })) } else { let old = self.progress_map.insert( @@ -576,7 +579,7 @@ impl CreateMviewProgressTracker { progress, TrackingJob::New(TrackingCommand { info, - replace_table_info, + replace_stream_job: replace_table_info, }), ), ); diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index f993e70beadb6..3c8c793256db9 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -34,10 +34,10 @@ use risingwave_meta_model::{ actor, actor_dispatcher, fragment, index, object, object_dependency, sink, source, streaming_job, table, ActorId, ActorUpstreamActors, ColumnCatalogArray, CreateType, DatabaseId, ExprNodeArray, FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SinkId, SourceId, - StreamNode, StreamingParallelism, TableId, TableVersion, UserId, + StreamNode, StreamingParallelism, UserId, }; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; -use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; +use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId; use risingwave_pb::catalog::{PbCreateType, PbTable}; use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; use risingwave_pb::meta::relation::{PbRelationInfo, RelationInfo}; @@ -62,7 +62,7 @@ use sea_orm::{ RelationTrait, TransactionTrait, }; -use crate::barrier::{ReplaceTablePlan, Reschedule}; +use crate::barrier::{ReplaceStreamJobPlan, Reschedule}; use crate::controller::catalog::CatalogController; use crate::controller::rename::ReplaceTableExprRewriter; use crate::controller::utils::{ @@ -71,7 +71,7 @@ use crate::controller::utils::{ get_internal_tables_by_id, rebuild_fragment_mapping_from_actors, PartialObject, }; use crate::controller::ObjectModel; -use crate::manager::{NotificationVersion, StreamingJob}; +use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType}; use crate::model::{StreamContext, StreamJobFragments, TableParallelism}; use crate::stream::SplitAssignment; use crate::{MetaError, MetaResult}; @@ -678,7 +678,6 @@ impl CatalogController { &self, streaming_job: &StreamingJob, ctx: &StreamContext, - version: &PbTableVersion, specified_parallelism: &Option, max_parallelism: usize, ) -> MetaResult { @@ -687,18 +686,7 @@ impl CatalogController { let txn = inner.db.begin().await?; // 1. check version. - let original_version: Option = Table::find_by_id(id as TableId) - .select_only() - .column(table::Column::Version) - .into_tuple() - .one(&txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), id))?; - let original_version = original_version.expect("version for table should exist"); - if version.version != original_version.to_protobuf().version + 1 { - return Err(MetaError::permission_denied("table version is stale")); - } - + streaming_job.verify_version_for_replace(&txn).await?; // 2. check concurrent replace. let referring_cnt = ObjectDependency::find() .join( @@ -716,7 +704,7 @@ impl CatalogController { .await?; if referring_cnt != 0 { return Err(MetaError::permission_denied( - "table is being altered or referenced by some creating jobs", + "job is being altered or referenced by some creating jobs", )); } @@ -727,14 +715,14 @@ impl CatalogController { .into_tuple() .one(&txn) .await? - .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), id))?; + .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?; if original_max_parallelism != max_parallelism as i32 { // We already override the max parallelism in `StreamFragmentGraph` before entering this function. // This should not happen in normal cases. bail!( "cannot use a different max parallelism \ - when altering or creating/dropping a sink into an existing table, \ + when replacing streaming job, \ original: {}, new: {}", original_max_parallelism, max_parallelism @@ -747,13 +735,13 @@ impl CatalogController { }; // 4. create streaming object for new replace table. - let obj_id = Self::create_streaming_job_obj( + let new_obj_id = Self::create_streaming_job_obj( &txn, - ObjectType::Table, + streaming_job.object_type(), streaming_job.owner() as _, Some(streaming_job.database_id() as _), Some(streaming_job.schema_id() as _), - PbCreateType::Foreground, + streaming_job.create_type(), ctx, parallelism, max_parallelism, @@ -763,7 +751,7 @@ impl CatalogController { // 5. record dependency for new replace table. ObjectDependency::insert(object_dependency::ActiveModel { oid: Set(id as _), - used_by: Set(obj_id as _), + used_by: Set(new_obj_id as _), ..Default::default() }) .exec(&txn) @@ -771,14 +759,14 @@ impl CatalogController { txn.commit().await?; - Ok(obj_id) + Ok(new_obj_id) } /// `finish_streaming_job` marks job related objects as `Created` and notify frontend. pub async fn finish_streaming_job( &self, job_id: ObjectId, - replace_table_job_info: Option, + replace_stream_job_info: Option, ) -> MetaResult<()> { let mut inner = self.inner.write().await; let txn = inner.db.begin().await?; @@ -913,8 +901,8 @@ impl CatalogController { let fragment_mapping = get_fragment_mappings(&txn, job_id).await?; - let replace_table_mapping_update = match replace_table_job_info { - Some(ReplaceTablePlan { + let replace_table_mapping_update = match replace_stream_job_info { + Some(ReplaceStreamJobPlan { streaming_job, merge_updates, tmp_id, @@ -926,9 +914,11 @@ impl CatalogController { tmp_id as ObjectId, merge_updates, None, - Some(incoming_sink_id as _), - None, - vec![], + SinkIntoTableContext { + creating_sink_id: Some(incoming_sink_id as _), + dropping_sink_id: None, + updated_sink_catalogs: vec![], + }, &txn, streaming_job, ) @@ -976,9 +966,7 @@ impl CatalogController { streaming_job: StreamingJob, merge_updates: Vec, table_col_index_mapping: Option, - creating_sink_id: Option, - dropping_sink_id: Option, - updated_sink_catalogs: Vec, + sink_into_table_context: SinkIntoTableContext, ) -> MetaResult { let inner = self.inner.write().await; let txn = inner.db.begin().await?; @@ -987,9 +975,7 @@ impl CatalogController { tmp_id, merge_updates, table_col_index_mapping, - creating_sink_id, - dropping_sink_id, - updated_sink_catalogs, + sink_into_table_context, &txn, streaming_job, ) @@ -1014,57 +1000,69 @@ impl CatalogController { Ok(version) } + /// TODO: make it general for other streaming jobs. + /// Currently only for replacing table. pub async fn finish_replace_streaming_job_inner( tmp_id: ObjectId, merge_updates: Vec, table_col_index_mapping: Option, - creating_sink_id: Option, - dropping_sink_id: Option, - updated_sink_catalogs: Vec, + SinkIntoTableContext { + creating_sink_id, + dropping_sink_id, + updated_sink_catalogs, + }: SinkIntoTableContext, txn: &DatabaseTransaction, streaming_job: StreamingJob, ) -> MetaResult<(Vec, Vec)> { - // Question: The source catalog should be remain unchanged? - let StreamingJob::Table(_, table, ..) = streaming_job else { - unreachable!("unexpected job: {streaming_job:?}") - }; + let original_job_id = streaming_job.id() as ObjectId; + let job_type = streaming_job.job_type(); - let job_id = table.id as ObjectId; + match streaming_job { + StreamingJob::Table(_source, table, _table_job_type) => { + // The source catalog should remain unchanged - let original_table_catalogs = Table::find_by_id(job_id) - .select_only() - .columns([table::Column::Columns]) - .into_tuple::() - .one(txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?; + let original_table_catalogs = Table::find_by_id(original_job_id) + .select_only() + .columns([table::Column::Columns]) + .into_tuple::() + .one(txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?; - // For sinks created in earlier versions, we need to set the original_target_columns. - for sink_id in updated_sink_catalogs { - sink::ActiveModel { - sink_id: Set(sink_id as _), - original_target_columns: Set(Some(original_table_catalogs.clone())), - ..Default::default() - } - .update(txn) - .await?; - } + // For sinks created in earlier versions, we need to set the original_target_columns. + for sink_id in updated_sink_catalogs { + sink::ActiveModel { + sink_id: Set(sink_id as _), + original_target_columns: Set(Some(original_table_catalogs.clone())), + ..Default::default() + } + .update(txn) + .await?; + } + // Update the table catalog with the new one. (column catalog is also updated here) + let mut table = table::ActiveModel::from(table); + let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone(); + if let Some(sink_id) = creating_sink_id { + debug_assert!(!incoming_sinks.contains(&{ sink_id })); + incoming_sinks.push(sink_id as _); + } - let mut table = table::ActiveModel::from(table); - let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone(); - if let Some(sink_id) = creating_sink_id { - debug_assert!(!incoming_sinks.contains(&{ sink_id })); - incoming_sinks.push(sink_id as _); - } + if let Some(sink_id) = dropping_sink_id { + let drained = incoming_sinks.extract_if(|id| *id == sink_id).collect_vec(); + debug_assert_eq!(drained, vec![sink_id]); + } - if let Some(sink_id) = dropping_sink_id { - let drained = incoming_sinks.extract_if(|id| *id == sink_id).collect_vec(); - debug_assert_eq!(drained, vec![sink_id]); + table.incoming_sinks = Set(incoming_sinks.into()); + table.update(txn).await?; + } + // TODO: support other streaming jobs + _ => unreachable!( + "invalid streaming job type: {:?}", + streaming_job.job_type_str() + ), } - table.incoming_sinks = Set(incoming_sinks.into()); - let table = table.update(txn).await?; - + // 0. update internal tables // Fields including `fragment_id` were placeholder values before. // After table fragments are created, update them for all internal tables. let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find() @@ -1090,14 +1088,13 @@ impl CatalogController { } } - // let old_fragment_mappings = get_fragment_mappings(&txn, job_id).await?; // 1. replace old fragments/actors with new ones. Fragment::delete_many() - .filter(fragment::Column::JobId.eq(job_id)) + .filter(fragment::Column::JobId.eq(original_job_id)) .exec(txn) .await?; Fragment::update_many() - .col_expr(fragment::Column::JobId, SimpleExpr::from(job_id)) + .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id)) .filter(fragment::Column::JobId.eq(tmp_id)) .exec(txn) .await?; @@ -1118,6 +1115,7 @@ impl CatalogController { // TODO: remove cache upstream fragment/actor ids and derive them from `actor_dispatcher` table. let mut to_update_fragment_ids = HashSet::new(); + // 2.1 update downstream actor's upstream_actor_ids for merge_update in merge_updates { assert!(merge_update.removed_upstream_actor_id.is_empty()); assert!(merge_update.new_upstream_fragment_id.is_some()); @@ -1158,6 +1156,7 @@ impl CatalogController { to_update_fragment_ids.insert(fragment_id); } + // 2.2 update downstream fragment's Merge node, and upstream_fragment_id for fragment_id in to_update_fragment_ids { let (fragment_id, mut stream_node, mut upstream_fragment_id) = Fragment::find_by_id(fragment_id) @@ -1201,14 +1200,21 @@ impl CatalogController { // 4. update catalogs and notify. let mut relations = vec![]; - let table_obj = table - .find_related(Object) - .one(txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("object", table.table_id))?; - relations.push(PbRelation { - relation_info: Some(PbRelationInfo::Table(ObjectModel(table, table_obj).into())), - }); + match job_type { + StreamingJobType::Table => { + let (table, table_obj) = Table::find_by_id(original_job_id) + .find_also_related(Object) + .one(txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?; + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Table( + ObjectModel(table, table_obj.unwrap()).into(), + )), + }) + } + _ => unreachable!("invalid streaming job type: {:?}", job_type), + } if let Some(table_col_index_mapping) = table_col_index_mapping { let expr_rewriter = ReplaceTableExprRewriter { table_col_index_mapping, @@ -1217,7 +1223,7 @@ impl CatalogController { let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find() .select_only() .columns([index::Column::IndexId, index::Column::IndexItems]) - .filter(index::Column::PrimaryTableId.eq(job_id)) + .filter(index::Column::PrimaryTableId.eq(original_job_id)) .into_tuple() .all(txn) .await?; @@ -1246,15 +1252,15 @@ impl CatalogController { } } - let fragment_mapping: Vec<_> = get_fragment_mappings(txn, job_id as _).await?; + let fragment_mapping: Vec<_> = get_fragment_mappings(txn, original_job_id as _).await?; Ok((relations, fragment_mapping)) } - /// `try_abort_replacing_streaming_job` is used to abort the replacing streaming job, the input `job_id` is the dummy job id. - pub async fn try_abort_replacing_streaming_job(&self, job_id: ObjectId) -> MetaResult<()> { + /// Abort the replacing streaming job by deleting the temporary job object. + pub async fn try_abort_replacing_streaming_job(&self, tmp_job_id: ObjectId) -> MetaResult<()> { let inner = self.inner.write().await; - Object::delete_by_id(job_id).exec(&inner.db).await?; + Object::delete_by_id(tmp_job_id).exec(&inner.db).await?; Ok(()) } @@ -1891,3 +1897,13 @@ fn bitflag_intersects(column: SimpleExpr, value: i32) -> SimpleExpr { fn fragment_type_mask_intersects(value: i32) -> SimpleExpr { bitflag_intersects(fragment::Column::FragmentTypeMask.into_simple_expr(), value) } + +pub struct SinkIntoTableContext { + /// For creating sink into table, this is `Some`, otherwise `None`. + pub creating_sink_id: Option, + /// For dropping sink into table, this is `Some`, otherwise `None`. + pub dropping_sink_id: Option, + /// For alter table (e.g., add column), this is the list of existing sink ids + /// otherwise empty. + pub updated_sink_catalogs: Vec, +} diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index f1c3bb0ffdd8a..cda027c2d634c 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::error::BoxedError; +use risingwave_common::error::{BoxedError, NotImplemented}; use risingwave_common::session_config::SessionConfigError; use risingwave_connector::error::ConnectorError; use risingwave_connector::sink::SinkError; @@ -132,6 +132,9 @@ pub enum MetaErrorInner { #[error("{0} has been deprecated, please use {1} instead.")] Deprecated(String, String), + + #[error(transparent)] + NotImplemented(#[from] NotImplemented), } impl MetaError { diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 76d7052589cd9..924cdb0124a9a 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -15,19 +15,25 @@ use std::collections::HashSet; use risingwave_common::catalog::TableVersionId; -use risingwave_common::current_cluster_version; use risingwave_common::util::epoch::Epoch; +use risingwave_common::{bail_not_implemented, current_cluster_version}; +use risingwave_meta_model::object::ObjectType; +use risingwave_meta_model::prelude::Table as TableModel; +use risingwave_meta_model::{table, TableId, TableVersion}; use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table}; use risingwave_pb::ddl_service::TableJobType; +use sea_orm::entity::prelude::*; +use sea_orm::{DatabaseTransaction, QuerySelect}; use strum::{EnumDiscriminants, EnumIs}; use super::{get_referred_secret_ids_from_sink, get_referred_secret_ids_from_source}; use crate::model::FragmentId; -use crate::MetaResult; +use crate::{MetaError, MetaResult}; // This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and // Sink. #[derive(Debug, Clone, EnumDiscriminants, EnumIs)] +#[strum_discriminants(name(StreamingJobType))] pub enum StreamingJob { MaterializedView(Table), Sink(Sink, Option<(Table, Option)>), @@ -255,6 +261,10 @@ impl StreamingJob { } } + pub fn job_type(&self) -> StreamingJobType { + self.into() + } + pub fn job_type_str(&self) -> &'static str { match self { StreamingJob::MaterializedView(_) => "materialized view", @@ -275,6 +285,16 @@ impl StreamingJob { } } + pub fn object_type(&self) -> ObjectType { + match self { + Self::MaterializedView(_) => ObjectType::Table, // Note MV is special. + Self::Sink(_, _) => ObjectType::Sink, + Self::Table(_, _, _) => ObjectType::Table, + Self::Index(_, _) => ObjectType::Index, + Self::Source(_) => ObjectType::Source, + } + } + /// Returns the [`TableVersionId`] if this job is `Table`. pub fn table_version_id(&self) -> Option { if let Self::Table(_, table, ..) = self { @@ -329,4 +349,35 @@ impl StreamingJob { StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()), } } + + /// Verify the new version is the next version of the original version. + pub async fn verify_version_for_replace(&self, txn: &DatabaseTransaction) -> MetaResult<()> { + let id = self.id(); + + match self { + StreamingJob::Table(_source, table, _table_job_type) => { + let new_version = table.get_version()?.get_version(); + let original_version: Option = TableModel::find_by_id(id as TableId) + .select_only() + .column(table::Column::Version) + .into_tuple() + .one(txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?; + let original_version = original_version + .expect("version for table should exist") + .to_protobuf(); + if new_version != original_version.version + 1 { + return Err(MetaError::permission_denied("table version is stale")); + } + } + StreamingJob::MaterializedView(_) + | StreamingJob::Sink(_, _) + | StreamingJob::Index(_, _) + | StreamingJob::Source(_) => { + bail_not_implemented!("schema change for {}", self.job_type_str()) + } + } + Ok(()) + } } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index e5490a86365b9..b87d06fdd0898 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -597,6 +597,13 @@ impl StreamJobFragments { }); self } + + /// Panics if the fragment is not found. + pub fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut Fragment { + self.fragments + .get_mut(&fragment_id) + .unwrap_or_else(|| panic!("fragment {} not found", fragment_id)) + } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index dacf908071533..bb14ecefc1298 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -66,16 +66,17 @@ use tracing::Instrument; use crate::barrier::BarrierManagerRef; use crate::controller::catalog::ReleaseContext; use crate::controller::cluster::StreamingClusterInfo; +use crate::controller::streaming_job::SinkIntoTableContext; use crate::error::{bail_invalid_parameter, bail_unavailable}; use crate::manager::{ DdlType, LocalNotification, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, IGNORED_NOTIFICATION_VERSION, }; -use crate::model::{FragmentId, StreamContext, StreamJobFragments, TableParallelism}; +use crate::model::{StreamContext, StreamJobFragments, TableParallelism}; use crate::stream::{ create_source_worker_handle, validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption, - GlobalStreamManagerRef, ReplaceTableContext, SourceManagerRef, StreamFragmentGraph, + GlobalStreamManagerRef, ReplaceStreamJobContext, SourceManagerRef, StreamFragmentGraph, }; use crate::{MetaError, MetaResult}; @@ -114,8 +115,9 @@ impl StreamingJobId { } } -// It’s used to describe the information of the table that needs to be replaced and it will be used during replacing table and creating sink into table operations. -pub struct ReplaceTableInfo { +/// It’s used to describe the information of the job that needs to be replaced +/// and it will be used during replacing table and creating sink into table operations. +pub struct ReplaceStreamJobInfo { pub streaming_job: StreamingJob, pub fragment_graph: StreamFragmentGraphProto, pub col_index_mapping: Option, @@ -136,13 +138,13 @@ pub enum DdlCommand { StreamingJob, StreamFragmentGraphProto, CreateType, - Option, + Option, HashSet, ), - DropStreamingJob(StreamingJobId, DropMode, Option), + DropStreamingJob(StreamingJobId, DropMode, Option), AlterName(alter_name_request::Object, String), AlterSwapRename(alter_swap_rename_request::Object), - ReplaceTable(ReplaceTableInfo), + ReplaceTable(ReplaceStreamJobInfo), AlterSourceColumn(Source), AlterObjectOwner(Object, UserId), AlterSetSchema(alter_set_schema_request::Object, SchemaId), @@ -325,7 +327,7 @@ impl DdlController { ctrl.drop_streaming_job(job_id, drop_mode, target_replace_info) .await } - DdlCommand::ReplaceTable(ReplaceTableInfo { + DdlCommand::ReplaceTable(ReplaceStreamJobInfo { streaming_job, fragment_graph, col_index_mapping, @@ -670,7 +672,7 @@ impl DdlController { dropping_sink_id: Option, streaming_job: &StreamingJob, fragment_graph: StreamFragmentGraph, - ) -> MetaResult<(ReplaceTableContext, StreamJobFragments)> { + ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragments)> { let (mut replace_table_ctx, mut stream_job_fragments) = self .build_replace_table(stream_ctx, streaming_job, fragment_graph, None, tmp_id as _) .await?; @@ -696,7 +698,7 @@ impl DdlController { let target_table = streaming_job.table().unwrap(); - let target_fragment_id = + let union_fragment_id = union_fragment_id.expect("fragment of placeholder merger not found"); if let Some(creating_sink_table_fragments) = creating_sink_table_fragments { @@ -707,8 +709,7 @@ impl DdlController { &sink_fragment, target_table, &mut replace_table_ctx, - &mut stream_job_fragments, - target_fragment_id, + stream_job_fragments.fragment_mut(union_fragment_id), None, ); } @@ -746,8 +747,7 @@ impl DdlController { &sink_fragment, target_table, &mut replace_table_ctx, - &mut stream_job_fragments, - target_fragment_id, + stream_job_fragments.fragment_mut(union_fragment_id), Some(&sink.unique_identity()), ); } @@ -773,9 +773,8 @@ impl DdlController { sink_id: Option, sink_fragment: &PbFragment, table: &Table, - replace_table_ctx: &mut ReplaceTableContext, - stream_job_fragments: &mut StreamJobFragments, - target_fragment_id: FragmentId, + replace_table_ctx: &mut ReplaceStreamJobContext, + union_fragment: &mut PbFragment, unique_identity: Option<&str>, ) { let sink_actor_ids = sink_fragment @@ -784,11 +783,6 @@ impl DdlController { .map(|a| a.actor_id) .collect_vec(); - let union_fragment = stream_job_fragments - .fragments - .get_mut(&target_fragment_id) - .unwrap(); - let downstream_actor_ids = union_fragment .actors .iter() @@ -916,7 +910,7 @@ impl DdlController { &self, mut streaming_job: StreamingJob, fragment_graph: StreamFragmentGraphProto, - affected_table_replace_info: Option, + affected_table_replace_info: Option, dependencies: HashSet, ) -> MetaResult { let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); @@ -998,7 +992,7 @@ impl DdlController { ctx: StreamContext, mut streaming_job: StreamingJob, fragment_graph: StreamFragmentGraphProto, - affected_table_replace_info: Option, + affected_table_replace_info: Option, ) -> MetaResult { let mut fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; @@ -1019,7 +1013,7 @@ impl DdlController { let affected_table_replace_info = match affected_table_replace_info { Some(replace_table_info) => { - let ReplaceTableInfo { + let ReplaceStreamJobInfo { mut streaming_job, fragment_graph, .. @@ -1116,7 +1110,7 @@ impl DdlController { object_type: ObjectType, object_id: ObjectId, drop_mode: DropMode, - target_replace_info: Option, + target_replace_info: Option, ) -> MetaResult { let (release_ctx, mut version) = match object_type { ObjectType::Database => { @@ -1159,7 +1153,7 @@ impl DdlController { let stream_ctx = StreamContext::from_protobuf(replace_table_info.fragment_graph.get_ctx().unwrap()); - let ReplaceTableInfo { + let ReplaceStreamJobInfo { mut streaming_job, fragment_graph, .. @@ -1187,7 +1181,7 @@ impl DdlController { streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); let streaming_job = streaming_job; - let table = streaming_job.table().unwrap(); + streaming_job.table().expect("should be table job"); tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink"); let tmp_id = self @@ -1196,7 +1190,6 @@ impl DdlController { .create_job_catalog_for_replace( &streaming_job, &stream_ctx, - table.get_version()?, &fragment_graph.specified_parallelism(), fragment_graph.max_parallelism(), ) @@ -1224,7 +1217,7 @@ impl DdlController { .await?; self.stream_manager - .replace_table(stream_job_fragments, ctx) + .replace_stream_job(stream_job_fragments, ctx) .await?; merge_updates @@ -1240,9 +1233,11 @@ impl DdlController { streaming_job, merge_updates, None, - None, - Some(sink_id), - vec![], + SinkIntoTableContext { + creating_sink_id: None, + dropping_sink_id: Some(sink_id), + updated_sink_catalogs: vec![], + }, ) .await?; Ok(version) @@ -1344,13 +1339,12 @@ impl DdlController { .create_job_catalog_for_replace( &streaming_job, &ctx, - table.get_version()?, &fragment_graph.specified_parallelism(), fragment_graph.max_parallelism(), ) .await?; - tracing::debug!(id = streaming_job.id(), "building replace streaming job"); + tracing::debug!(id = job_id, "building replace streaming job"); let mut updated_sink_catalogs = vec![]; let result: MetaResult> = try { @@ -1383,8 +1377,9 @@ impl DdlController { } } - let target_fragment_id = + let union_fragment_id = union_fragment_id.expect("fragment of placeholder merger not found"); + let union_fragment = stream_job_fragments.fragment_mut(union_fragment_id); let catalogs = self .metadata_manager @@ -1406,8 +1401,7 @@ impl DdlController { &sink_fragment, table, &mut ctx, - &mut stream_job_fragments, - target_fragment_id, + union_fragment, Some(&sink.unique_identity()), ); @@ -1424,7 +1418,7 @@ impl DdlController { .await?; self.stream_manager - .replace_table(stream_job_fragments, ctx) + .replace_stream_job(stream_job_fragments, ctx) .await?; merge_updates }; @@ -1439,9 +1433,11 @@ impl DdlController { streaming_job, merge_updates, table_col_index_mapping, - None, - None, - updated_sink_catalogs, + SinkIntoTableContext { + creating_sink_id: None, + dropping_sink_id: None, + updated_sink_catalogs, + }, ) .await?; Ok(version) @@ -1463,7 +1459,7 @@ impl DdlController { &self, job_id: StreamingJobId, drop_mode: DropMode, - target_replace_info: Option, + target_replace_info: Option, ) -> MetaResult { let (object_id, object_type) = match job_id { StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table), @@ -1637,25 +1633,24 @@ impl DdlController { } let replace_table_job_info = match affected_table_replace_info { - Some((streaming_job, fragment_graph)) => { + Some((table_stream_job, fragment_graph)) => { if snapshot_backfill_info.is_some() { return Err(anyhow!( - "snapshot backfill should not have replace table info: {streaming_job:?}" + "snapshot backfill should not have replace table info: {table_stream_job:?}" ) .into()); } - let StreamingJob::Sink(s, target_table) = &mut stream_job else { + let StreamingJob::Sink(sink, target_table) = &mut stream_job else { bail!("additional replace table event only occurs when sinking into table"); }; - let table = streaming_job.table().unwrap(); + table_stream_job.table().expect("should be table job"); let tmp_id = self .metadata_manager .catalog_controller .create_job_catalog_for_replace( - &streaming_job, + &table_stream_job, &stream_ctx, - table.get_version()?, &fragment_graph.specified_parallelism(), fragment_graph.max_parallelism(), ) @@ -1666,22 +1661,22 @@ impl DdlController { tmp_id, &self.metadata_manager, stream_ctx, - Some(s), + Some(sink), Some(&stream_job_fragments), None, - &streaming_job, + &table_stream_job, fragment_graph, ) .await?; // When sinking into table occurs, some variables of the target table may be modified, // such as `fragment_id` being altered by `prepare_replace_table`. // At this point, it’s necessary to update the table info carried with the sink. - must_match!(&streaming_job, StreamingJob::Table(source, table, _) => { + must_match!(&table_stream_job, StreamingJob::Table(source, table, _) => { // The StreamingJob in ReplaceTableInfo must be StreamingJob::Table *target_table = Some((table.clone(), source.clone())); }); - Some((streaming_job, context, table_fragments)) + Some((table_stream_job, context, table_fragments)) } None => None, }; @@ -1717,7 +1712,7 @@ impl DdlController { mut fragment_graph: StreamFragmentGraph, table_col_index_mapping: Option, tmp_table_id: TableId, - ) -> MetaResult<(ReplaceTableContext, StreamJobFragments)> { + ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragments)> { let id = stream_job.id(); let expr_context = stream_ctx.to_expr_context(); @@ -1837,7 +1832,7 @@ impl DdlController { // Note: no need to set `vnode_count` as it's already set by the frontend. // See `get_replace_table_plan`. - let ctx = ReplaceTableContext { + let ctx = ReplaceStreamJobContext { old_fragments, merge_updates, dispatchers, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index cd76e124a2d1f..34db34c3ddb2c 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -31,7 +31,7 @@ use tracing::Instrument; use super::{Locations, RescheduleOptions, ScaleControllerRef, TableResizePolicy}; use crate::barrier::{ BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, - ReplaceTablePlan, SnapshotBackfillInfo, + ReplaceStreamJobPlan, SnapshotBackfillInfo, }; use crate::error::bail_invalid_parameter; use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob}; @@ -77,7 +77,7 @@ pub struct CreateStreamingJobContext { pub ddl_type: DdlType, /// Context provided for potential replace table, typically used when sinking into a table. - pub replace_table_job_info: Option<(StreamingJob, ReplaceTableContext, StreamJobFragments)>, + pub replace_table_job_info: Option<(StreamingJob, ReplaceStreamJobContext, StreamJobFragments)>, pub snapshot_backfill_info: Option, @@ -164,10 +164,10 @@ impl CreatingStreamingJobInfo { type CreatingStreamingJobInfoRef = Arc; -/// [`ReplaceTableContext`] carries one-time infos for replacing the plan of an existing table. +/// [`ReplaceStreamJobContext`] carries one-time infos for replacing the plan of an existing stream job. /// /// Note: for better readability, keep this struct complete and immutable once created. -pub struct ReplaceTableContext { +pub struct ReplaceStreamJobContext { /// The old job fragments to be replaced. pub old_fragments: StreamJobFragments, @@ -177,7 +177,7 @@ pub struct ReplaceTableContext { /// New dispatchers to add from upstream actors to downstream actors. pub dispatchers: HashMap>, - /// The locations of the actors to build in the new table to replace. + /// The locations of the actors to build in the new job to replace. pub building_locations: Locations, /// The locations of the existing actors, essentially the downstream chain actors to update. @@ -359,7 +359,7 @@ impl GlobalStreamManager { replace_table_id = Some(tmp_table_id); - replace_table_command = Some(ReplaceTablePlan { + replace_table_command = Some(ReplaceStreamJobPlan { old_fragments: context.old_fragments, new_fragments: stream_job_fragments, merge_updates: context.merge_updates, @@ -457,32 +457,33 @@ impl GlobalStreamManager { } } - pub async fn replace_table( + /// Send replace job command to barrier scheduler. + pub async fn replace_stream_job( &self, - stream_job_fragments: StreamJobFragments, - ReplaceTableContext { + new_fragments: StreamJobFragments, + ReplaceStreamJobContext { old_fragments, merge_updates, dispatchers, tmp_id, streaming_job, .. - }: ReplaceTableContext, + }: ReplaceStreamJobContext, ) -> MetaResult<()> { - let tmp_table_id = stream_job_fragments.stream_job_id(); + let tmp_table_id = new_fragments.stream_job_id(); let init_split_assignment = self.source_manager.allocate_splits(&tmp_table_id).await?; self.barrier_scheduler .run_config_change_command_with_pause( streaming_job.database_id().into(), - Command::ReplaceTable(ReplaceTablePlan { + Command::ReplaceStreamJob(ReplaceStreamJobPlan { old_fragments, - new_fragments: stream_job_fragments, + new_fragments, merge_updates, dispatchers, init_split_assignment, - tmp_id, streaming_job, + tmp_id, }), ) .await?;