diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model_v2/src/table.rs index 1865c973dd026..57b6a176986f5 100644 --- a/src/meta/model_v2/src/table.rs +++ b/src/meta/model_v2/src/table.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::catalog::table::PbTableType; +use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType}; use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable}; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; @@ -187,10 +187,28 @@ impl From for ActiveModel { let table_type = pb_table.table_type(); let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior(); + let fragment_id = if pb_table.fragment_id == u32::MAX - 1 { + NotSet + } else { + Set(Some(pb_table.fragment_id as FragmentId)) + }; + let dml_fragment_id = pb_table + .dml_fragment_id + .map(|x| Set(Some(x as FragmentId))) + .unwrap_or_default(); + let optional_associated_source_id = + if let Some(OptionalAssociatedSourceId::AssociatedSourceId(src_id)) = + pb_table.optional_associated_source_id + { + Set(Some(src_id as SourceId)) + } else { + NotSet + }; + Self { table_id: Set(pb_table.id as _), name: Set(pb_table.name), - optional_associated_source_id: NotSet, + optional_associated_source_id, table_type: Set(table_type.into()), belongs_to_job_id: Set(None), columns: Set(pb_table.columns.into()), @@ -199,7 +217,7 @@ impl From for ActiveModel { stream_key: Set(pb_table.stream_key.into()), append_only: Set(pb_table.append_only), properties: Set(pb_table.properties.into()), - fragment_id: NotSet, + fragment_id, vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)), row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)), value_indices: Set(pb_table.value_indices.into()), @@ -208,7 +226,7 @@ impl From for ActiveModel { read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _), watermark_indices: Set(pb_table.watermark_indices.into()), dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()), - dml_fragment_id: NotSet, + dml_fragment_id, cardinality: Set(pb_table.cardinality.map(|x| x.into())), cleaned_by_watermark: Set(pb_table.cleaned_by_watermark), description: Set(pb_table.description), diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index f6aa1be0d08f5..7672ef7f8ed96 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -214,9 +214,12 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { }, MetaBackend::Mem => MetaStoreBackend::Mem, }; - let sql_backend = opts - .sql_endpoint - .map(|endpoint| MetaStoreSqlBackend { endpoint }); + // let sql_backend = opts + // .sql_endpoint + // .map(|endpoint| MetaStoreSqlBackend { endpoint }); + let sql_backend = Some(MetaStoreSqlBackend { + endpoint: "postgres://postgres:@localhost:5432/postgres".to_string(), + }); validate_config(&config); diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index c4703d3fd8e03..954d9c93d7c86 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -777,11 +777,8 @@ impl CommandContext { Command::Resume(_) => {} Command::SourceSplitAssignment(split_assignment) => { - let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager - else { - unimplemented!("implement config change funcs in v2"); - }; - mgr.fragment_manager + self.barrier_manager_context + .metadata_manager .update_actor_splits_by_split_assignment(split_assignment) .await?; self.barrier_manager_context @@ -981,26 +978,40 @@ impl CommandContext { dispatchers, init_split_assignment, }) => { - let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager - else { - unimplemented!("implement replace funcs in v2"); - }; let table_ids = HashSet::from_iter(std::iter::once(old_table_fragments.table_id())); - // Tell compute nodes to drop actors. - let node_actors = mgr.fragment_manager.table_node_actors(&table_ids).await?; + let node_actors = self + .barrier_manager_context + .metadata_manager + .get_worker_actor_ids(table_ids) + .await?; self.clean_up(node_actors).await?; - // Drop fragment info in meta store. - mgr.fragment_manager - .post_replace_table( - old_table_fragments, - new_table_fragments, - merge_updates, - dispatchers, - init_split_assignment.clone(), - ) - .await?; + match &self.barrier_manager_context.metadata_manager { + MetadataManager::V1(mgr) => { + // Drop fragment info in meta store. + mgr.fragment_manager + .post_replace_table( + old_table_fragments, + new_table_fragments, + merge_updates, + dispatchers, + init_split_assignment.clone(), + ) + .await?; + } + MetadataManager::V2(mgr) => { + // Update actors and actor_dispatchers for new table fragments. + mgr.catalog_controller + .post_collect_table_fragments( + new_table_fragments.table_id().table_id as _, + new_table_fragments.actor_ids(), + dispatchers.clone(), + init_split_assignment, + ) + .await?; + } + } } } diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index d0c39694692b1..cc6383499cc81 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -57,7 +57,7 @@ use crate::controller::utils::{ }; use crate::manager::{ActorInfos, LocalNotification}; use crate::stream::SplitAssignment; -use crate::MetaResult; +use crate::{MetaError, MetaResult}; impl CatalogControllerInner { /// List all fragment vnode mapping info for all CREATED streaming jobs. @@ -986,15 +986,58 @@ impl CatalogController { Ok(node_actors) } + pub async fn get_worker_actor_ids( + &self, + job_ids: Vec, + ) -> MetaResult>> { + let inner = self.inner.read().await; + let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?; + let actor_pu: Vec<(ActorId, i32)> = Actor::find() + .select_only() + .columns([actor::Column::ActorId, actor::Column::ParallelUnitId]) + .join(JoinType::InnerJoin, actor::Relation::Fragment.def()) + .filter(fragment::Column::JobId.is_in(job_ids)) + .into_tuple() + .all(&inner.db) + .await?; + + let mut worker_actors = BTreeMap::new(); + for (actor_id, pu_id) in actor_pu { + let worker_id = parallel_units_map + .get(&(pu_id as _)) + .unwrap() + .worker_node_id as WorkerId; + worker_actors + .entry(worker_id) + .or_insert_with(Vec::new) + .push(actor_id); + } + + Ok(worker_actors) + } + pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> { let inner = self.inner.read().await; let txn = inner.db.begin().await?; for assignments in split_assignment.values() { for (actor_id, splits) in assignments { + let actor_splits: Option = Actor::find_by_id(*actor_id as ActorId) + .select_only() + .column(actor::Column::Splits) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("actor_id", actor_id))?; + + let mut actor_splits = actor_splits + .map(|splits| splits.0.splits) + .unwrap_or_default(); + actor_splits.extend(splits.iter().map(Into::into)); + Actor::update(actor::ActiveModel { actor_id: Set(*actor_id as _), splits: Set(Some(ConnectorSplits(PbConnectorSplits { - splits: splits.iter().map(Into::into).collect(), + splits: actor_splits, }))), ..Default::default() }) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index bcd86e28c5287..576c5c3c40699 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -12,39 +12,49 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use itertools::Itertools; +use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::{ - Actor, ActorDispatcher, Fragment, Object, ObjectDependency, Source, Table, + Actor, ActorDispatcher, Fragment, Index, Object, ObjectDependency, Source, Table, }; use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, index, object_dependency, sink, source, streaming_job, - table, ActorId, CreateType, DatabaseId, FragmentId, JobStatus, ObjectId, SchemaId, SourceId, - StreamNode, UserId, + table, ActorId, ActorUpstreamActors, CreateType, DatabaseId, ExprNodeArray, FragmentId, + I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamNode, TableId, TableVersion, + UserId, }; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; -use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId; +use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; use risingwave_pb::catalog::{PbCreateType, PbTable}; -use risingwave_pb::meta::PbTableFragments; +use risingwave_pb::meta::relation::PbRelationInfo; +use risingwave_pb::meta::subscribe_response::{ + Info as NotificationInfo, Operation as NotificationOperation, +}; +use risingwave_pb::meta::{PbRelation, PbRelationGroup, PbTableFragments}; use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_node::PbNodeBody; -use risingwave_pb::stream_plan::{Dispatcher, PbFragmentTypeFlag}; +use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate; +use risingwave_pb::stream_plan::{PbDispatcher, PbFragmentTypeFlag}; use sea_orm::sea_query::SimpleExpr; use sea_orm::ActiveValue::Set; use sea_orm::{ ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, - NotSet, QueryFilter, QuerySelect, TransactionTrait, + ModelTrait, NotSet, QueryFilter, QuerySelect, TransactionTrait, }; use crate::controller::catalog::CatalogController; +use crate::controller::rename::ReplaceTableExprRewriter; use crate::controller::utils::{ check_relation_name_duplicate, ensure_object_id, ensure_user_id, get_fragment_actor_ids, + get_fragment_mappings, }; -use crate::manager::StreamingJob; +use crate::controller::ObjectModel; +use crate::manager::{NotificationVersion, SinkId, StreamingJob}; use crate::model::StreamContext; use crate::stream::SplitAssignment; use crate::{MetaError, MetaResult}; @@ -243,6 +253,7 @@ impl CatalogController { let mut table: table::ActiveModel = table.into(); table.table_id = Set(table_id as _); table.belongs_to_job_id = Set(Some(job_id as _)); + table.fragment_id = NotSet; table.insert(&txn).await?; } txn.commit().await?; @@ -254,6 +265,7 @@ impl CatalogController { &self, table_fragment: PbTableFragments, streaming_job: &StreamingJob, + for_replace: bool, ) -> MetaResult<()> { let fragment_actors = Self::extract_fragment_and_actors_from_table_fragments(table_fragment)?; @@ -263,7 +275,7 @@ impl CatalogController { // Add fragments, actors and actor dispatchers. for (fragment, actors, actor_dispatchers) in fragment_actors { let fragment = fragment.into_active_model(); - fragment.insert(&txn).await?; + let fragment = fragment.insert(&txn).await?; for actor in actors { let actor = actor.into_active_model(); actor.insert(&txn).await?; @@ -275,31 +287,31 @@ impl CatalogController { actor_dispatcher.insert(&txn).await?; } } + // Update fragment id for all state tables. + if !for_replace { + for state_table_id in fragment.state_table_ids.into_inner() { + table::ActiveModel { + table_id: Set(state_table_id as _), + fragment_id: Set(Some(fragment.fragment_id as _)), + ..Default::default() + } + .update(&txn) + .await?; + } + } } - // Update fragment id and dml fragment id. - match streaming_job { - StreamingJob::MaterializedView(table) - | StreamingJob::Index(_, table) - | StreamingJob::Table(_, table, ..) => { + if !for_replace { + // // Update dml fragment id. + if let StreamingJob::Table(_, table, ..) = streaming_job { Table::update(table::ActiveModel { table_id: Set(table.id as _), - fragment_id: Set(Some(table.fragment_id as _)), + dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)), ..Default::default() }) .exec(&txn) .await?; } - _ => {} - } - if let StreamingJob::Table(_, table, ..) = streaming_job { - Table::update(table::ActiveModel { - table_id: Set(table.id as _), - dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)), - ..Default::default() - }) - .exec(&txn) - .await?; } txn.commit().await?; @@ -344,7 +356,7 @@ impl CatalogController { &self, job_id: ObjectId, actor_ids: Vec, - new_actor_dispatchers: HashMap>, + new_actor_dispatchers: HashMap>, split_assignment: &SplitAssignment, ) -> MetaResult<()> { let inner = self.inner.write().await; @@ -406,6 +418,251 @@ impl CatalogController { Ok(()) } + pub async fn create_job_catalog_for_replace( + &self, + streaming_job: &StreamingJob, + ctx: &StreamContext, + version: &PbTableVersion, + ) -> MetaResult { + let id = streaming_job.id(); + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + + // 1. check version. + let original_version: Option = Table::find_by_id(id as TableId) + .select_only() + .column(table::Column::Version) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), id))?; + let original_version = original_version.expect("version for table should exist"); + if version.version != original_version.inner_ref().version + 1 { + return Err(MetaError::permission_denied("table version is stale")); + } + + // 2. create streaming object for new replace table. + let obj_id = Self::create_streaming_job_obj( + &txn, + ObjectType::Table, + streaming_job.owner() as _, + Some(streaming_job.database_id() as _), + Some(streaming_job.schema_id() as _), + PbCreateType::Foreground, + ctx, + ) + .await?; + + // 3. record dependency for new replace table. + ObjectDependency::insert(object_dependency::ActiveModel { + oid: Set(id as _), + used_by: Set(obj_id as _), + ..Default::default() + }) + .exec(&txn) + .await?; + + txn.commit().await?; + + Ok(obj_id) + } + + pub async fn finish_replace_streaming_job( + &self, + dummy_id: ObjectId, + streaming_job: StreamingJob, + merge_updates: Vec, + table_col_index_mapping: Option, + _creating_sink_id: Option, + _dropping_sink_id: Option, + ) -> MetaResult { + // Question: The source catalog should be remain unchanged? + let StreamingJob::Table(_, table, ..) = streaming_job else { + unreachable!("unexpected job: {streaming_job:?}") + }; + + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + let job_id = table.id as ObjectId; + + let table = table::ActiveModel::from(table).update(&txn).await?; + + // 1. replace old fragments/actors with new ones. + Fragment::delete_many() + .filter(fragment::Column::JobId.eq(job_id)) + .exec(&txn) + .await?; + Fragment::update_many() + .col_expr(fragment::Column::JobId, SimpleExpr::from(job_id)) + .filter(fragment::Column::JobId.eq(dummy_id)) + .exec(&txn) + .await?; + + // 2. update merges. + let fragment_replace_map: HashMap<_, _> = merge_updates + .iter() + .map(|update| { + ( + update.upstream_fragment_id, + ( + update.new_upstream_fragment_id.unwrap(), + update.added_upstream_actor_id.clone(), + ), + ) + }) + .collect(); + + // TODO: remove cache upstream fragment/actor ids and derive them from `actor_dispatcher` table. + let mut to_update_fragment_ids = HashSet::new(); + for merge_update in merge_updates { + assert!(merge_update.removed_upstream_actor_id.is_empty()); + assert!(merge_update.new_upstream_fragment_id.is_some()); + let (actor_id, fragment_id, mut upstream_actors) = + Actor::find_by_id(merge_update.actor_id as ActorId) + .select_only() + .columns([ + actor::Column::ActorId, + actor::Column::FragmentId, + actor::Column::UpstreamActorIds, + ]) + .into_tuple::<(ActorId, FragmentId, ActorUpstreamActors)>() + .one(&txn) + .await? + .ok_or_else(|| { + MetaError::catalog_id_not_found("actor", merge_update.actor_id) + })?; + + assert!(upstream_actors + .0 + .remove(&(merge_update.upstream_fragment_id as FragmentId)) + .is_some()); + upstream_actors.0.insert( + merge_update.new_upstream_fragment_id.unwrap() as _, + merge_update + .added_upstream_actor_id + .iter() + .map(|id| *id as _) + .collect(), + ); + actor::ActiveModel { + actor_id: Set(actor_id), + upstream_actor_ids: Set(upstream_actors), + ..Default::default() + } + .update(&txn) + .await?; + + to_update_fragment_ids.insert(fragment_id); + } + for fragment_id in to_update_fragment_ids { + let (fragment_id, mut stream_node, mut upstream_fragment_id) = + Fragment::find_by_id(fragment_id) + .select_only() + .columns([ + fragment::Column::FragmentId, + fragment::Column::StreamNode, + fragment::Column::UpstreamFragmentId, + ]) + .into_tuple::<(FragmentId, StreamNode, I32Array)>() + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?; + visit_stream_node(&mut stream_node.0, |body| { + if let PbNodeBody::Merge(m) = body + && let Some((new_fragment_id, new_actor_ids)) = + fragment_replace_map.get(&m.upstream_fragment_id) + { + m.upstream_fragment_id = *new_fragment_id; + m.upstream_actor_id = new_actor_ids.clone(); + } + }); + for fragment_id in &mut upstream_fragment_id.0 { + if let Some((new_fragment_id, _)) = fragment_replace_map.get(&(*fragment_id as _)) { + *fragment_id = *new_fragment_id as _; + } + } + fragment::ActiveModel { + fragment_id: Set(fragment_id), + stream_node: Set(stream_node), + upstream_fragment_id: Set(upstream_fragment_id), + ..Default::default() + } + .update(&txn) + .await?; + } + + // 3. remove dummy object. + Object::delete_by_id(dummy_id).exec(&txn).await?; + + // 4. update catalogs and notify. + let mut relations = vec![]; + let table_obj = table + .find_related(Object) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("object", table.table_id))?; + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Table(ObjectModel(table, table_obj).into())), + }); + if let Some(table_col_index_mapping) = table_col_index_mapping { + let expr_rewriter = ReplaceTableExprRewriter { + table_col_index_mapping, + }; + + let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find() + .select_only() + .columns([index::Column::IndexId, index::Column::IndexItems]) + .filter(index::Column::PrimaryTableId.eq(job_id)) + .into_tuple() + .all(&txn) + .await?; + for (index_id, mut nodes) in index_items { + nodes + .0 + .iter_mut() + .for_each(|x| expr_rewriter.rewrite_expr(x)); + let index = index::ActiveModel { + index_id: Set(index_id), + index_items: Set(nodes), + ..Default::default() + } + .update(&txn) + .await?; + let index_obj = index + .find_related(Object) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?; + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Index( + ObjectModel(index, index_obj).into(), + )), + }); + } + } + let fragment_mapping = get_fragment_mappings(&txn, job_id).await?; + + txn.commit().await?; + + self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping) + .await; + let version = self + .notify_frontend( + NotificationOperation::Update, + NotificationInfo::RelationGroup(PbRelationGroup { relations }), + ) + .await; + + Ok(version) + } + + /// `try_abort_replacing_streaming_job` is used to abort the replacing streaming job, the input `job_id` is the dummy job id. + pub async fn try_abort_replacing_streaming_job(&self, job_id: ObjectId) -> MetaResult<()> { + let inner = self.inner.write().await; + Object::delete_by_id(job_id).exec(&inner.db).await?; + Ok(()) + } + // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments // return the actor_ids to be applied pub async fn update_source_rate_limit_by_source_id( diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index f272e6d1ff198..3ecc57658bd85 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -3018,7 +3018,7 @@ impl CatalogManager { let mut updated_indexes = vec![]; - if let Some(table_col_index_mapping) = table_col_index_mapping.clone() { + if let Some(table_col_index_mapping) = table_col_index_mapping { let expr_rewriter = ReplaceTableExprRewriter { table_col_index_mapping, }; diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 0d50f7e1dc8c4..c994f02474811 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -12,17 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_meta_model_v2::SourceId; -use risingwave_pb::catalog::PbSource; +use risingwave_pb::catalog::{PbSource, PbTable}; use risingwave_pb::common::worker_node::{PbResource, State}; use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerType}; use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; -use risingwave_pb::meta::table_fragments::Fragment; -use risingwave_pb::stream_plan::PbStreamActor; +use risingwave_pb::meta::table_fragments::{Fragment, PbFragment}; +use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamActor}; use crate::controller::catalog::CatalogControllerRef; use crate::controller::cluster::{ClusterControllerRef, WorkerExtraInfo}; @@ -30,6 +30,7 @@ use crate::manager::{ CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, StreamingClusterInfo, WorkerId, }; use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments}; +use crate::stream::SplitAssignment; use crate::MetaResult; #[derive(Clone)] @@ -248,6 +249,59 @@ impl MetadataManager { } } + pub async fn get_table_catalog_by_ids(&self, ids: Vec) -> MetaResult> { + match &self { + MetadataManager::V1(mgr) => Ok(mgr.catalog_manager.get_tables(&ids).await), + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .get_table_by_ids(ids.into_iter().map(|id| id as _).collect()) + .await + } + } + } + + pub async fn get_downstream_chain_fragments( + &self, + job_id: u32, + ) -> MetaResult> { + match &self { + MetadataManager::V1(mgr) => { + mgr.fragment_manager + .get_downstream_fragments(job_id.into()) + .await + } + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .get_downstream_chain_fragments(job_id as _) + .await + } + } + } + + pub async fn get_worker_actor_ids( + &self, + job_ids: HashSet, + ) -> MetaResult>> { + match &self { + MetadataManager::V1(mgr) => mgr.fragment_manager.table_node_actors(&job_ids).await, + MetadataManager::V2(mgr) => { + let worker_actors = mgr + .catalog_controller + .get_worker_actor_ids(job_ids.into_iter().map(|id| id.table_id as _).collect()) + .await?; + Ok(worker_actors + .into_iter() + .map(|(id, actors)| { + ( + id as WorkerId, + actors.into_iter().map(|id| id as ActorId).collect(), + ) + }) + .collect()) + } + } + } + pub async fn get_job_id_to_internal_table_ids_mapping(&self) -> Option)>> { match &self { MetadataManager::V1(mgr) => mgr @@ -415,4 +469,22 @@ impl MetadataManager { } } } + + pub async fn update_actor_splits_by_split_assignment( + &self, + split_assignment: &SplitAssignment, + ) -> MetaResult<()> { + match self { + MetadataManager::V1(mgr) => { + mgr.fragment_manager + .update_actor_splits_by_split_assignment(split_assignment) + .await + } + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .update_actor_splits(split_assignment) + .await + } + } + } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 9f093c601b2dc..fada573369c6b 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -858,9 +858,14 @@ impl DdlController { fragment_graph, ) .await?; + let dummy_id = self + .env + .id_gen_manager() + .generate::<{ IdCategory::Table }>() + .await? as u32; let (mut replace_table_ctx, mut table_fragments) = self - .build_replace_table(mgr, stream_ctx, &streaming_job, fragment_graph, None) + .build_replace_table(stream_ctx, &streaming_job, fragment_graph, None, dummy_id) .await?; let mut union_fragment_id = None; @@ -1581,7 +1586,9 @@ impl DdlController { table_col_index_mapping: Option, ) -> MetaResult { let MetadataManager::V1(mgr) = &self.metadata_manager else { - unimplemented!("support replace table in v2"); + return self + .replace_table_v2(stream_job, fragment_graph, table_col_index_mapping) + .await; }; let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; let stream_ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); @@ -1589,15 +1596,20 @@ impl DdlController { let fragment_graph = self .prepare_replace_table(mgr.catalog_manager.clone(), &mut stream_job, fragment_graph) .await?; + let dummy_id = self + .env + .id_gen_manager() + .generate::<{ IdCategory::Table }>() + .await? as u32; let result = try { let (ctx, table_fragments) = self .build_replace_table( - mgr, stream_ctx, &stream_job, fragment_graph, table_col_index_mapping.clone(), + dummy_id, ) .await?; @@ -1658,39 +1670,38 @@ impl DdlController { /// `build_replace_table` builds a table replacement and returns the context and new table /// fragments. - async fn build_replace_table( + pub(crate) async fn build_replace_table( &self, - mgr: &MetadataManagerV1, stream_ctx: StreamContext, stream_job: &StreamingJob, mut fragment_graph: StreamFragmentGraph, table_col_index_mapping: Option, + dummy_table_id: TableId, ) -> MetaResult<(ReplaceTableContext, TableFragments)> { let id = stream_job.id(); let default_parallelism = fragment_graph.default_parallelism(); let expr_context = stream_ctx.to_expr_context(); - let old_table_fragments = mgr - .fragment_manager - .select_table_fragments_by_table_id(&id.into()) + let old_table_fragments = self + .metadata_manager + .get_job_fragments_by_id(&id.into()) .await?; let old_internal_table_ids = old_table_fragments.internal_table_ids(); - let old_internal_tables = mgr - .catalog_manager - .get_tables(&old_internal_table_ids) - .await; + let old_internal_tables = self + .metadata_manager + .get_table_catalog_by_ids(old_internal_table_ids) + .await?; fragment_graph.fit_internal_table_ids(old_internal_tables)?; // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete // graph that contains all information needed for building the actor graph. - let original_table_fragment = mgr.fragment_manager.get_mview_fragment(id.into()).await?; + let original_table_fragment = old_table_fragments + .mview_fragment() + .expect("mview fragment not found"); // Map the column indices in the dispatchers with the given mapping. - let downstream_fragments = mgr - .fragment_manager - .get_downstream_fragments(id.into()) - .await? + let downstream_fragments = self.metadata_manager.get_downstream_chain_fragments(id).await? .into_iter() .map(|(d, f)| if let Some(mapping) = &table_col_index_mapping { @@ -1713,7 +1724,7 @@ impl DdlController { )?; // 2. Build the actor graph. - let cluster_info = mgr.cluster_manager.get_streaming_cluster_info().await; + let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?; let parallelism = self.resolve_stream_parallelism(default_parallelism, &cluster_info)?; let actor_graph_builder = @@ -1735,11 +1746,6 @@ impl DdlController { // FIXME: we use a dummy table ID for new table fragments, so we can drop the old fragments // with the real table ID, then replace the dummy table ID with the real table ID. This is a // workaround for not having the version info in the fragment manager. - let dummy_id = self - .env - .id_gen_manager() - .generate::<{ IdCategory::Table }>() - .await? as u32; let table_parallelism = match default_parallelism { None => TableParallelism::Auto, @@ -1750,7 +1756,7 @@ impl DdlController { // the context that contains all information needed for building the actors on the compute // nodes. let table_fragments = TableFragments::new( - dummy_id.into(), + dummy_table_id.into(), graph, &building_locations.actor_locations, stream_ctx, diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 852d9776d61db..948c05a3f8ead 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -13,10 +13,12 @@ // limitations under the License. use itertools::Itertools; +use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_fragment; use risingwave_pb::catalog::CreateType; use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::stream_plan::stream_node::NodeBody; +use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate; use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto; use thiserror_ext::AsReport; @@ -151,7 +153,7 @@ impl DdlController { } mgr.catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), streaming_job) + .prepare_streaming_job(table_fragments.to_protobuf(), streaming_job, false) .await?; // create streaming jobs. @@ -191,4 +193,85 @@ impl DdlController { } } } + + /// This is used for `ALTER TABLE ADD/DROP COLUMN`. + pub async fn replace_table_v2( + &self, + mut streaming_job: StreamingJob, + fragment_graph: StreamFragmentGraphProto, + table_col_index_mapping: Option, + ) -> MetaResult { + let MetadataManager::V2(mgr) = &self.metadata_manager else { + unreachable!("MetadataManager should be V2") + }; + let job_id = streaming_job.id(); + + let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); + + // 1. build fragment graph. + let fragment_graph = + StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job).await?; + streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); + streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); + let streaming_job = streaming_job; + + let StreamingJob::Table(_, table, ..) = &streaming_job else { + unreachable!("unexpected job: {streaming_job:?}") + }; + let dummy_id = mgr + .catalog_controller + .create_job_catalog_for_replace(&streaming_job, &ctx, table.get_version()?) + .await?; + + tracing::debug!(id = streaming_job.id(), "building replace streaming job"); + let result: MetaResult> = try { + let (ctx, table_fragments) = self + .build_replace_table( + ctx, + &streaming_job, + fragment_graph, + table_col_index_mapping.clone(), + dummy_id as _, + ) + .await?; + let merge_updates = ctx.merge_updates.clone(); + + mgr.catalog_controller + .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .await?; + + self.stream_manager + .replace_table(table_fragments, ctx) + .await?; + merge_updates + }; + + match result { + Ok(merge_updates) => { + let version = mgr + .catalog_controller + .finish_replace_streaming_job( + dummy_id, + streaming_job, + merge_updates, + table_col_index_mapping, + None, + None, + ) + .await?; + Ok(version) + } + Err(err) => { + tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace table"); + let _ = mgr + .catalog_controller + .try_abort_replacing_streaming_job(dummy_id) + .await.inspect_err(|err| { + tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing table"); + }); + Err(err) + } + } + } }