diff --git a/Cargo.lock b/Cargo.lock index 5c4d757a6d114..b682b91d2211f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9448,6 +9448,7 @@ dependencies = [ name = "risingwave_meta_model_v2" version = "1.7.0-alpha" dependencies = [ + "risingwave_common", "risingwave_hummock_sdk", "risingwave_pb", "sea-orm", diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 14d44c9c0bd2d..9bf025061f9b9 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -65,6 +65,8 @@ pub const DEFAULT_SUPER_USER_FOR_PG_ID: u32 = 2; pub const NON_RESERVED_USER_ID: i32 = 11; pub const NON_RESERVED_SYS_CATALOG_ID: i32 = 1001; +pub const OBJECT_ID_PLACEHOLDER: u32 = u32::MAX - 1; + pub const SYSTEM_SCHEMAS: [&str; 3] = [ PG_CATALOG_SCHEMA_NAME, INFORMATION_SCHEMA_SCHEMA_NAME, @@ -164,7 +166,7 @@ impl DatabaseId { pub fn placeholder() -> Self { DatabaseId { - database_id: u32::MAX - 1, + database_id: OBJECT_ID_PLACEHOLDER, } } } @@ -200,7 +202,7 @@ impl SchemaId { pub fn placeholder() -> Self { SchemaId { - schema_id: u32::MAX - 1, + schema_id: OBJECT_ID_PLACEHOLDER, } } } @@ -237,7 +239,7 @@ impl TableId { /// Sometimes the id field is filled later, we use this value for better debugging. pub const fn placeholder() -> Self { TableId { - table_id: u32::MAX - 1, + table_id: OBJECT_ID_PLACEHOLDER, } } @@ -328,7 +330,7 @@ impl IndexId { /// Sometimes the id field is filled later, we use this value for better debugging. pub const fn placeholder() -> Self { IndexId { - index_id: u32::MAX - 1, + index_id: OBJECT_ID_PLACEHOLDER, } } @@ -357,7 +359,7 @@ impl FunctionId { } pub const fn placeholder() -> Self { - FunctionId(u32::MAX - 1) + FunctionId(OBJECT_ID_PLACEHOLDER) } pub fn function_id(&self) -> u32 { @@ -396,7 +398,7 @@ impl UserId { pub const fn placeholder() -> Self { UserId { - user_id: u32::MAX - 1, + user_id: OBJECT_ID_PLACEHOLDER, } } } @@ -428,7 +430,7 @@ impl ConnectionId { } pub const fn placeholder() -> Self { - ConnectionId(u32::MAX - 1) + ConnectionId(OBJECT_ID_PLACEHOLDER) } pub fn connection_id(&self) -> u32 { diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index be4f94fa0706b..d4e38cac4d1c9 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -20,6 +20,7 @@ use anyhow::anyhow; use itertools::Itertools; use risingwave_common::catalog::{ ColumnCatalog, ConnectionId, DatabaseId, Field, Schema, SchemaId, TableId, UserId, + OBJECT_ID_PLACEHOLDER, }; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; @@ -43,7 +44,7 @@ impl SinkId { /// Sometimes the id field is filled later, we use this value for better debugging. pub const fn placeholder() -> Self { SinkId { - sink_id: u32::MAX - 1, + sink_id: OBJECT_ID_PLACEHOLDER, } } diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index b25cb7d40aa79..c0b58701f4140 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -17,7 +17,7 @@ use std::assert_matches::assert_matches; use fixedbitset::FixedBitSet; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::catalog::{ColumnCatalog, ConflictBehavior, TableId}; +use risingwave_common::catalog::{ColumnCatalog, ConflictBehavior, TableId, OBJECT_ID_PLACEHOLDER}; use risingwave_common::error::Result; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; @@ -29,7 +29,6 @@ use super::stream::StreamPlanRef; use super::utils::{childless_record, Distill}; use super::{reorganize_elements_id, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::catalog::table_catalog::{CreateType, TableCatalog, TableType, TableVersion}; -use crate::catalog::FragmentId; use crate::optimizer::plan_node::derive::derive_pk; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::generic::GenericPlanRef; @@ -235,8 +234,7 @@ impl StreamMaterialize { append_only, owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID, properties, - // TODO(zehua): replace it with FragmentId::placeholder() - fragment_id: FragmentId::MAX - 1, + fragment_id: OBJECT_ID_PLACEHOLDER, dml_fragment_id: None, vnode_col_index: None, row_id_index, diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index ec5d66dba868e..75d2f8b60604b 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -20,12 +20,12 @@ use fixedbitset::FixedBitSet; use itertools::Itertools; use pretty_xmlish::{Pretty, Str, StrAssocArr, XmlNode}; use risingwave_common::catalog::{ - ColumnCatalog, ColumnDesc, ConflictBehavior, Field, FieldDisplay, Schema, + ColumnCatalog, ColumnDesc, ConflictBehavior, Field, FieldDisplay, Schema, OBJECT_ID_PLACEHOLDER, }; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use crate::catalog::table_catalog::{CreateType, TableType}; -use crate::catalog::{ColumnId, FragmentId, TableCatalog, TableId}; +use crate::catalog::{ColumnId, TableCatalog, TableId}; use crate::optimizer::property::Cardinality; use crate::utils::WithOptions; @@ -160,8 +160,7 @@ impl TableCatalogBuilder { append_only: false, owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID, properties: self.properties, - // TODO(zehua): replace it with FragmentId::placeholder() - fragment_id: FragmentId::MAX - 1, + fragment_id: OBJECT_ID_PLACEHOLDER, dml_fragment_id: None, vnode_col_index: self.vnode_col_idx, row_id_index: None, diff --git a/src/meta/model_v2/Cargo.toml b/src/meta/model_v2/Cargo.toml index f080645fc1c6a..8cc1407983f36 100644 --- a/src/meta/model_v2/Cargo.toml +++ b/src/meta/model_v2/Cargo.toml @@ -14,6 +14,7 @@ ignored = ["workspace-hack"] normal = ["workspace-hack"] [dependencies] +risingwave_common = { workspace = true } risingwave_hummock_sdk = { workspace = true } risingwave_pb = { workspace = true } sea-orm = { version = "0.12.0", features = [ diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 77707d3be5932..572e59c820356 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -75,7 +75,6 @@ pub type CompactionTaskId = i64; pub type HummockSstableObjectId = i64; pub type FragmentId = i32; - pub type ActorId = i32; #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model_v2/src/table.rs index 1865c973dd026..446c928718cf6 100644 --- a/src/meta/model_v2/src/table.rs +++ b/src/meta/model_v2/src/table.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::catalog::table::PbTableType; +use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER; +use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType}; use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable}; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; @@ -187,10 +188,28 @@ impl From for ActiveModel { let table_type = pb_table.table_type(); let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior(); + let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER { + NotSet + } else { + Set(Some(pb_table.fragment_id as FragmentId)) + }; + let dml_fragment_id = pb_table + .dml_fragment_id + .map(|x| Set(Some(x as FragmentId))) + .unwrap_or_default(); + let optional_associated_source_id = + if let Some(OptionalAssociatedSourceId::AssociatedSourceId(src_id)) = + pb_table.optional_associated_source_id + { + Set(Some(src_id as SourceId)) + } else { + NotSet + }; + Self { table_id: Set(pb_table.id as _), name: Set(pb_table.name), - optional_associated_source_id: NotSet, + optional_associated_source_id, table_type: Set(table_type.into()), belongs_to_job_id: Set(None), columns: Set(pb_table.columns.into()), @@ -199,7 +218,7 @@ impl From for ActiveModel { stream_key: Set(pb_table.stream_key.into()), append_only: Set(pb_table.append_only), properties: Set(pb_table.properties.into()), - fragment_id: NotSet, + fragment_id, vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)), row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)), value_indices: Set(pb_table.value_indices.into()), @@ -208,7 +227,7 @@ impl From for ActiveModel { read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _), watermark_indices: Set(pb_table.watermark_indices.into()), dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()), - dml_fragment_id: NotSet, + dml_fragment_id, cardinality: Set(pb_table.cardinality.map(|x| x.into())), cleaned_by_watermark: Set(pb_table.cleaned_by_watermark), description: Set(pb_table.description), diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 61fc59c49c3ae..0072f81ef1a8e 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -777,11 +777,8 @@ impl CommandContext { Command::Resume(_) => {} Command::SourceSplitAssignment(split_assignment) => { - let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager - else { - unimplemented!("implement config change funcs in v2"); - }; - mgr.fragment_manager + self.barrier_manager_context + .metadata_manager .update_actor_splits_by_split_assignment(split_assignment) .await?; self.barrier_manager_context @@ -981,26 +978,40 @@ impl CommandContext { dispatchers, init_split_assignment, }) => { - let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager - else { - unimplemented!("implement replace funcs in v2"); - }; let table_ids = HashSet::from_iter(std::iter::once(old_table_fragments.table_id())); - // Tell compute nodes to drop actors. - let node_actors = mgr.fragment_manager.table_node_actors(&table_ids).await?; + let node_actors = self + .barrier_manager_context + .metadata_manager + .get_worker_actor_ids(table_ids) + .await?; self.clean_up(node_actors).await?; - // Drop fragment info in meta store. - mgr.fragment_manager - .post_replace_table( - old_table_fragments, - new_table_fragments, - merge_updates, - dispatchers, - init_split_assignment.clone(), - ) - .await?; + match &self.barrier_manager_context.metadata_manager { + MetadataManager::V1(mgr) => { + // Drop fragment info in meta store. + mgr.fragment_manager + .post_replace_table( + old_table_fragments, + new_table_fragments, + merge_updates, + dispatchers, + init_split_assignment.clone(), + ) + .await?; + } + MetadataManager::V2(mgr) => { + // Update actors and actor_dispatchers for new table fragments. + mgr.catalog_controller + .post_collect_table_fragments( + new_table_fragments.table_id().table_id as _, + new_table_fragments.actor_ids(), + dispatchers.clone(), + init_split_assignment, + ) + .await?; + } + } // Apply the split changes in source manager. self.barrier_manager_context diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index bec61ff0f2166..67a81f194b10a 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -56,7 +56,7 @@ use crate::controller::utils::{ }; use crate::manager::{ActorInfos, LocalNotification}; use crate::stream::SplitAssignment; -use crate::MetaResult; +use crate::{MetaError, MetaResult}; impl CatalogControllerInner { /// List all fragment vnode mapping info for all CREATED streaming jobs. @@ -985,15 +985,58 @@ impl CatalogController { Ok(node_actors) } + pub async fn get_worker_actor_ids( + &self, + job_ids: Vec, + ) -> MetaResult>> { + let inner = self.inner.read().await; + let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?; + let actor_pu: Vec<(ActorId, i32)> = Actor::find() + .select_only() + .columns([actor::Column::ActorId, actor::Column::ParallelUnitId]) + .join(JoinType::InnerJoin, actor::Relation::Fragment.def()) + .filter(fragment::Column::JobId.is_in(job_ids)) + .into_tuple() + .all(&inner.db) + .await?; + + let mut worker_actors = BTreeMap::new(); + for (actor_id, pu_id) in actor_pu { + let worker_id = parallel_units_map + .get(&(pu_id as _)) + .unwrap() + .worker_node_id as WorkerId; + worker_actors + .entry(worker_id) + .or_insert_with(Vec::new) + .push(actor_id); + } + + Ok(worker_actors) + } + pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> { let inner = self.inner.read().await; let txn = inner.db.begin().await?; for assignments in split_assignment.values() { for (actor_id, splits) in assignments { + let actor_splits: Option = Actor::find_by_id(*actor_id as ActorId) + .select_only() + .column(actor::Column::Splits) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("actor_id", actor_id))?; + + let mut actor_splits = actor_splits + .map(|splits| splits.0.splits) + .unwrap_or_default(); + actor_splits.extend(splits.iter().map(Into::into)); + Actor::update(actor::ActiveModel { actor_id: Set(*actor_id as _), splits: Set(Some(ConnectorSplits(PbConnectorSplits { - splits: splits.iter().map(Into::into).collect(), + splits: actor_splits, }))), ..Default::default() }) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index bcd86e28c5287..576c5c3c40699 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -12,39 +12,49 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use itertools::Itertools; +use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::{ - Actor, ActorDispatcher, Fragment, Object, ObjectDependency, Source, Table, + Actor, ActorDispatcher, Fragment, Index, Object, ObjectDependency, Source, Table, }; use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, index, object_dependency, sink, source, streaming_job, - table, ActorId, CreateType, DatabaseId, FragmentId, JobStatus, ObjectId, SchemaId, SourceId, - StreamNode, UserId, + table, ActorId, ActorUpstreamActors, CreateType, DatabaseId, ExprNodeArray, FragmentId, + I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamNode, TableId, TableVersion, + UserId, }; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; -use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId; +use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; use risingwave_pb::catalog::{PbCreateType, PbTable}; -use risingwave_pb::meta::PbTableFragments; +use risingwave_pb::meta::relation::PbRelationInfo; +use risingwave_pb::meta::subscribe_response::{ + Info as NotificationInfo, Operation as NotificationOperation, +}; +use risingwave_pb::meta::{PbRelation, PbRelationGroup, PbTableFragments}; use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_node::PbNodeBody; -use risingwave_pb::stream_plan::{Dispatcher, PbFragmentTypeFlag}; +use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate; +use risingwave_pb::stream_plan::{PbDispatcher, PbFragmentTypeFlag}; use sea_orm::sea_query::SimpleExpr; use sea_orm::ActiveValue::Set; use sea_orm::{ ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, - NotSet, QueryFilter, QuerySelect, TransactionTrait, + ModelTrait, NotSet, QueryFilter, QuerySelect, TransactionTrait, }; use crate::controller::catalog::CatalogController; +use crate::controller::rename::ReplaceTableExprRewriter; use crate::controller::utils::{ check_relation_name_duplicate, ensure_object_id, ensure_user_id, get_fragment_actor_ids, + get_fragment_mappings, }; -use crate::manager::StreamingJob; +use crate::controller::ObjectModel; +use crate::manager::{NotificationVersion, SinkId, StreamingJob}; use crate::model::StreamContext; use crate::stream::SplitAssignment; use crate::{MetaError, MetaResult}; @@ -243,6 +253,7 @@ impl CatalogController { let mut table: table::ActiveModel = table.into(); table.table_id = Set(table_id as _); table.belongs_to_job_id = Set(Some(job_id as _)); + table.fragment_id = NotSet; table.insert(&txn).await?; } txn.commit().await?; @@ -254,6 +265,7 @@ impl CatalogController { &self, table_fragment: PbTableFragments, streaming_job: &StreamingJob, + for_replace: bool, ) -> MetaResult<()> { let fragment_actors = Self::extract_fragment_and_actors_from_table_fragments(table_fragment)?; @@ -263,7 +275,7 @@ impl CatalogController { // Add fragments, actors and actor dispatchers. for (fragment, actors, actor_dispatchers) in fragment_actors { let fragment = fragment.into_active_model(); - fragment.insert(&txn).await?; + let fragment = fragment.insert(&txn).await?; for actor in actors { let actor = actor.into_active_model(); actor.insert(&txn).await?; @@ -275,31 +287,31 @@ impl CatalogController { actor_dispatcher.insert(&txn).await?; } } + // Update fragment id for all state tables. + if !for_replace { + for state_table_id in fragment.state_table_ids.into_inner() { + table::ActiveModel { + table_id: Set(state_table_id as _), + fragment_id: Set(Some(fragment.fragment_id as _)), + ..Default::default() + } + .update(&txn) + .await?; + } + } } - // Update fragment id and dml fragment id. - match streaming_job { - StreamingJob::MaterializedView(table) - | StreamingJob::Index(_, table) - | StreamingJob::Table(_, table, ..) => { + if !for_replace { + // // Update dml fragment id. + if let StreamingJob::Table(_, table, ..) = streaming_job { Table::update(table::ActiveModel { table_id: Set(table.id as _), - fragment_id: Set(Some(table.fragment_id as _)), + dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)), ..Default::default() }) .exec(&txn) .await?; } - _ => {} - } - if let StreamingJob::Table(_, table, ..) = streaming_job { - Table::update(table::ActiveModel { - table_id: Set(table.id as _), - dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)), - ..Default::default() - }) - .exec(&txn) - .await?; } txn.commit().await?; @@ -344,7 +356,7 @@ impl CatalogController { &self, job_id: ObjectId, actor_ids: Vec, - new_actor_dispatchers: HashMap>, + new_actor_dispatchers: HashMap>, split_assignment: &SplitAssignment, ) -> MetaResult<()> { let inner = self.inner.write().await; @@ -406,6 +418,251 @@ impl CatalogController { Ok(()) } + pub async fn create_job_catalog_for_replace( + &self, + streaming_job: &StreamingJob, + ctx: &StreamContext, + version: &PbTableVersion, + ) -> MetaResult { + let id = streaming_job.id(); + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + + // 1. check version. + let original_version: Option = Table::find_by_id(id as TableId) + .select_only() + .column(table::Column::Version) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), id))?; + let original_version = original_version.expect("version for table should exist"); + if version.version != original_version.inner_ref().version + 1 { + return Err(MetaError::permission_denied("table version is stale")); + } + + // 2. create streaming object for new replace table. + let obj_id = Self::create_streaming_job_obj( + &txn, + ObjectType::Table, + streaming_job.owner() as _, + Some(streaming_job.database_id() as _), + Some(streaming_job.schema_id() as _), + PbCreateType::Foreground, + ctx, + ) + .await?; + + // 3. record dependency for new replace table. + ObjectDependency::insert(object_dependency::ActiveModel { + oid: Set(id as _), + used_by: Set(obj_id as _), + ..Default::default() + }) + .exec(&txn) + .await?; + + txn.commit().await?; + + Ok(obj_id) + } + + pub async fn finish_replace_streaming_job( + &self, + dummy_id: ObjectId, + streaming_job: StreamingJob, + merge_updates: Vec, + table_col_index_mapping: Option, + _creating_sink_id: Option, + _dropping_sink_id: Option, + ) -> MetaResult { + // Question: The source catalog should be remain unchanged? + let StreamingJob::Table(_, table, ..) = streaming_job else { + unreachable!("unexpected job: {streaming_job:?}") + }; + + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + let job_id = table.id as ObjectId; + + let table = table::ActiveModel::from(table).update(&txn).await?; + + // 1. replace old fragments/actors with new ones. + Fragment::delete_many() + .filter(fragment::Column::JobId.eq(job_id)) + .exec(&txn) + .await?; + Fragment::update_many() + .col_expr(fragment::Column::JobId, SimpleExpr::from(job_id)) + .filter(fragment::Column::JobId.eq(dummy_id)) + .exec(&txn) + .await?; + + // 2. update merges. + let fragment_replace_map: HashMap<_, _> = merge_updates + .iter() + .map(|update| { + ( + update.upstream_fragment_id, + ( + update.new_upstream_fragment_id.unwrap(), + update.added_upstream_actor_id.clone(), + ), + ) + }) + .collect(); + + // TODO: remove cache upstream fragment/actor ids and derive them from `actor_dispatcher` table. + let mut to_update_fragment_ids = HashSet::new(); + for merge_update in merge_updates { + assert!(merge_update.removed_upstream_actor_id.is_empty()); + assert!(merge_update.new_upstream_fragment_id.is_some()); + let (actor_id, fragment_id, mut upstream_actors) = + Actor::find_by_id(merge_update.actor_id as ActorId) + .select_only() + .columns([ + actor::Column::ActorId, + actor::Column::FragmentId, + actor::Column::UpstreamActorIds, + ]) + .into_tuple::<(ActorId, FragmentId, ActorUpstreamActors)>() + .one(&txn) + .await? + .ok_or_else(|| { + MetaError::catalog_id_not_found("actor", merge_update.actor_id) + })?; + + assert!(upstream_actors + .0 + .remove(&(merge_update.upstream_fragment_id as FragmentId)) + .is_some()); + upstream_actors.0.insert( + merge_update.new_upstream_fragment_id.unwrap() as _, + merge_update + .added_upstream_actor_id + .iter() + .map(|id| *id as _) + .collect(), + ); + actor::ActiveModel { + actor_id: Set(actor_id), + upstream_actor_ids: Set(upstream_actors), + ..Default::default() + } + .update(&txn) + .await?; + + to_update_fragment_ids.insert(fragment_id); + } + for fragment_id in to_update_fragment_ids { + let (fragment_id, mut stream_node, mut upstream_fragment_id) = + Fragment::find_by_id(fragment_id) + .select_only() + .columns([ + fragment::Column::FragmentId, + fragment::Column::StreamNode, + fragment::Column::UpstreamFragmentId, + ]) + .into_tuple::<(FragmentId, StreamNode, I32Array)>() + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?; + visit_stream_node(&mut stream_node.0, |body| { + if let PbNodeBody::Merge(m) = body + && let Some((new_fragment_id, new_actor_ids)) = + fragment_replace_map.get(&m.upstream_fragment_id) + { + m.upstream_fragment_id = *new_fragment_id; + m.upstream_actor_id = new_actor_ids.clone(); + } + }); + for fragment_id in &mut upstream_fragment_id.0 { + if let Some((new_fragment_id, _)) = fragment_replace_map.get(&(*fragment_id as _)) { + *fragment_id = *new_fragment_id as _; + } + } + fragment::ActiveModel { + fragment_id: Set(fragment_id), + stream_node: Set(stream_node), + upstream_fragment_id: Set(upstream_fragment_id), + ..Default::default() + } + .update(&txn) + .await?; + } + + // 3. remove dummy object. + Object::delete_by_id(dummy_id).exec(&txn).await?; + + // 4. update catalogs and notify. + let mut relations = vec![]; + let table_obj = table + .find_related(Object) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("object", table.table_id))?; + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Table(ObjectModel(table, table_obj).into())), + }); + if let Some(table_col_index_mapping) = table_col_index_mapping { + let expr_rewriter = ReplaceTableExprRewriter { + table_col_index_mapping, + }; + + let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find() + .select_only() + .columns([index::Column::IndexId, index::Column::IndexItems]) + .filter(index::Column::PrimaryTableId.eq(job_id)) + .into_tuple() + .all(&txn) + .await?; + for (index_id, mut nodes) in index_items { + nodes + .0 + .iter_mut() + .for_each(|x| expr_rewriter.rewrite_expr(x)); + let index = index::ActiveModel { + index_id: Set(index_id), + index_items: Set(nodes), + ..Default::default() + } + .update(&txn) + .await?; + let index_obj = index + .find_related(Object) + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?; + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Index( + ObjectModel(index, index_obj).into(), + )), + }); + } + } + let fragment_mapping = get_fragment_mappings(&txn, job_id).await?; + + txn.commit().await?; + + self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping) + .await; + let version = self + .notify_frontend( + NotificationOperation::Update, + NotificationInfo::RelationGroup(PbRelationGroup { relations }), + ) + .await; + + Ok(version) + } + + /// `try_abort_replacing_streaming_job` is used to abort the replacing streaming job, the input `job_id` is the dummy job id. + pub async fn try_abort_replacing_streaming_job(&self, job_id: ObjectId) -> MetaResult<()> { + let inner = self.inner.write().await; + Object::delete_by_id(job_id).exec(&inner.db).await?; + Ok(()) + } + // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments // return the actor_ids to be applied pub async fn update_source_rate_limit_by_source_id( diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index cace146d7fb62..89294fdcd43df 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -3019,7 +3019,7 @@ impl CatalogManager { let mut updated_indexes = vec![]; - if let Some(table_col_index_mapping) = table_col_index_mapping.clone() { + if let Some(table_col_index_mapping) = table_col_index_mapping { let expr_rewriter = ReplaceTableExprRewriter { table_col_index_mapping, }; diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 3cc909fb28f26..e40fbe4f17010 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -322,7 +322,6 @@ impl MetaSrvEnv { let hummock_seq = meta_store_sql .clone() .map(|m| Arc::new(SequenceGenerator::new(m.conn))); - let sql_id_gen_manager = if let Some(store) = &meta_store_sql { Some(Arc::new(SqlIdGeneratorManager::new(&store.conn).await?)) } else { diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 450a920c379d0..d4ed2e0e15577 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -12,16 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_meta_model_v2::SourceId; -use risingwave_pb::catalog::PbSource; +use risingwave_pb::catalog::{PbSource, PbTable}; use risingwave_pb::common::worker_node::{PbResource, State}; use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerType}; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; -use risingwave_pb::meta::table_fragments::Fragment; -use risingwave_pb::stream_plan::PbStreamActor; +use risingwave_pb::meta::table_fragments::{Fragment, PbFragment}; +use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamActor}; use crate::controller::catalog::CatalogControllerRef; use crate::controller::cluster::{ClusterControllerRef, WorkerExtraInfo}; @@ -29,6 +29,7 @@ use crate::manager::{ CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, StreamingClusterInfo, WorkerId, }; use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments}; +use crate::stream::SplitAssignment; use crate::MetaResult; #[derive(Clone)] @@ -259,6 +260,59 @@ impl MetadataManager { } } + pub async fn get_table_catalog_by_ids(&self, ids: Vec) -> MetaResult> { + match &self { + MetadataManager::V1(mgr) => Ok(mgr.catalog_manager.get_tables(&ids).await), + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .get_table_by_ids(ids.into_iter().map(|id| id as _).collect()) + .await + } + } + } + + pub async fn get_downstream_chain_fragments( + &self, + job_id: u32, + ) -> MetaResult> { + match &self { + MetadataManager::V1(mgr) => { + mgr.fragment_manager + .get_downstream_fragments(job_id.into()) + .await + } + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .get_downstream_chain_fragments(job_id as _) + .await + } + } + } + + pub async fn get_worker_actor_ids( + &self, + job_ids: HashSet, + ) -> MetaResult>> { + match &self { + MetadataManager::V1(mgr) => mgr.fragment_manager.table_node_actors(&job_ids).await, + MetadataManager::V2(mgr) => { + let worker_actors = mgr + .catalog_controller + .get_worker_actor_ids(job_ids.into_iter().map(|id| id.table_id as _).collect()) + .await?; + Ok(worker_actors + .into_iter() + .map(|(id, actors)| { + ( + id as WorkerId, + actors.into_iter().map(|id| id as ActorId).collect(), + ) + }) + .collect()) + } + } + } + pub async fn get_job_id_to_internal_table_ids_mapping(&self) -> Option)>> { match &self { MetadataManager::V1(mgr) => mgr @@ -426,4 +480,22 @@ impl MetadataManager { } } } + + pub async fn update_actor_splits_by_split_assignment( + &self, + split_assignment: &SplitAssignment, + ) -> MetaResult<()> { + match self { + MetadataManager::V1(mgr) => { + mgr.fragment_manager + .update_actor_splits_by_split_assignment(split_assignment) + .await + } + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .update_actor_splits(split_assignment) + .await + } + } + } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 1ab3aad2ddf3d..fa160e0658b47 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -860,9 +860,14 @@ impl DdlController { fragment_graph, ) .await?; + let dummy_id = self + .env + .id_gen_manager() + .generate::<{ IdCategory::Table }>() + .await? as u32; let (mut replace_table_ctx, mut table_fragments) = self - .build_replace_table(mgr, stream_ctx, &streaming_job, fragment_graph, None) + .build_replace_table(stream_ctx, &streaming_job, fragment_graph, None, dummy_id) .await?; let mut union_fragment_id = None; @@ -1586,7 +1591,9 @@ impl DdlController { table_col_index_mapping: Option, ) -> MetaResult { let MetadataManager::V1(mgr) = &self.metadata_manager else { - unimplemented!("support replace table in v2"); + return self + .replace_table_v2(stream_job, fragment_graph, table_col_index_mapping) + .await; }; let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; let stream_ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); @@ -1594,15 +1601,20 @@ impl DdlController { let fragment_graph = self .prepare_replace_table(mgr.catalog_manager.clone(), &mut stream_job, fragment_graph) .await?; + let dummy_id = self + .env + .id_gen_manager() + .generate::<{ IdCategory::Table }>() + .await? as u32; let result = try { let (ctx, table_fragments) = self .build_replace_table( - mgr, stream_ctx, &stream_job, fragment_graph, table_col_index_mapping.clone(), + dummy_id, ) .await?; @@ -1663,39 +1675,41 @@ impl DdlController { /// `build_replace_table` builds a table replacement and returns the context and new table /// fragments. - async fn build_replace_table( + /// + /// Note that we use a dummy ID for the new table fragments and replace it with the real one after + /// replacement is finished. + pub(crate) async fn build_replace_table( &self, - mgr: &MetadataManagerV1, stream_ctx: StreamContext, stream_job: &StreamingJob, mut fragment_graph: StreamFragmentGraph, table_col_index_mapping: Option, + dummy_table_id: TableId, ) -> MetaResult<(ReplaceTableContext, TableFragments)> { let id = stream_job.id(); let default_parallelism = fragment_graph.default_parallelism(); let expr_context = stream_ctx.to_expr_context(); - let old_table_fragments = mgr - .fragment_manager - .select_table_fragments_by_table_id(&id.into()) + let old_table_fragments = self + .metadata_manager + .get_job_fragments_by_id(&id.into()) .await?; let old_internal_table_ids = old_table_fragments.internal_table_ids(); - let old_internal_tables = mgr - .catalog_manager - .get_tables(&old_internal_table_ids) - .await; + let old_internal_tables = self + .metadata_manager + .get_table_catalog_by_ids(old_internal_table_ids) + .await?; fragment_graph.fit_internal_table_ids(old_internal_tables)?; // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete // graph that contains all information needed for building the actor graph. - let original_table_fragment = mgr.fragment_manager.get_mview_fragment(id.into()).await?; + let original_table_fragment = old_table_fragments + .mview_fragment() + .expect("mview fragment not found"); // Map the column indices in the dispatchers with the given mapping. - let downstream_fragments = mgr - .fragment_manager - .get_downstream_fragments(id.into()) - .await? + let downstream_fragments = self.metadata_manager.get_downstream_chain_fragments(id).await? .into_iter() .map(|(d, f)| if let Some(mapping) = &table_col_index_mapping { @@ -1719,7 +1733,7 @@ impl DdlController { )?; // 2. Build the actor graph. - let cluster_info = mgr.cluster_manager.get_streaming_cluster_info().await; + let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?; let parallelism = self.resolve_stream_parallelism(default_parallelism, &cluster_info)?; let actor_graph_builder = @@ -1736,27 +1750,16 @@ impl DdlController { .await?; assert!(dispatchers.is_empty()); - // 3. Assign a new dummy ID for the new table fragments. - // - // FIXME: we use a dummy table ID for new table fragments, so we can drop the old fragments - // with the real table ID, then replace the dummy table ID with the real table ID. This is a - // workaround for not having the version info in the fragment manager. - let dummy_id = self - .env - .id_gen_manager() - .generate::<{ IdCategory::Table }>() - .await? as u32; - let table_parallelism = match default_parallelism { None => TableParallelism::Auto, Some(parallelism) => TableParallelism::Fixed(parallelism.get()), }; - // 4. Build the table fragments structure that will be persisted in the stream manager, and + // 3. Build the table fragments structure that will be persisted in the stream manager, and // the context that contains all information needed for building the actors on the compute // nodes. let table_fragments = TableFragments::new( - dummy_id.into(), + dummy_table_id.into(), graph, &building_locations.actor_locations, stream_ctx, diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 4a83914f4f08a..fc620f6386a54 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -13,10 +13,12 @@ // limitations under the License. use itertools::Itertools; +use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_fragment; use risingwave_pb::catalog::CreateType; use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::stream_plan::stream_node::NodeBody; +use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate; use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto; use thiserror_ext::AsReport; @@ -151,7 +153,7 @@ impl DdlController { } mgr.catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), streaming_job) + .prepare_streaming_job(table_fragments.to_protobuf(), streaming_job, false) .await?; // create streaming jobs. @@ -191,4 +193,85 @@ impl DdlController { } } } + + /// This is used for `ALTER TABLE ADD/DROP COLUMN`. + pub async fn replace_table_v2( + &self, + mut streaming_job: StreamingJob, + fragment_graph: StreamFragmentGraphProto, + table_col_index_mapping: Option, + ) -> MetaResult { + let MetadataManager::V2(mgr) = &self.metadata_manager else { + unreachable!("MetadataManager should be V2") + }; + let job_id = streaming_job.id(); + + let _reschedule_job_lock = self.stream_manager.reschedule_lock.read().await; + let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); + + // 1. build fragment graph. + let fragment_graph = + StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job).await?; + streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); + streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); + let streaming_job = streaming_job; + + let StreamingJob::Table(_, table, ..) = &streaming_job else { + unreachable!("unexpected job: {streaming_job:?}") + }; + let dummy_id = mgr + .catalog_controller + .create_job_catalog_for_replace(&streaming_job, &ctx, table.get_version()?) + .await?; + + tracing::debug!(id = streaming_job.id(), "building replace streaming job"); + let result: MetaResult> = try { + let (ctx, table_fragments) = self + .build_replace_table( + ctx, + &streaming_job, + fragment_graph, + table_col_index_mapping.clone(), + dummy_id as _, + ) + .await?; + let merge_updates = ctx.merge_updates.clone(); + + mgr.catalog_controller + .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .await?; + + self.stream_manager + .replace_table(table_fragments, ctx) + .await?; + merge_updates + }; + + match result { + Ok(merge_updates) => { + let version = mgr + .catalog_controller + .finish_replace_streaming_job( + dummy_id, + streaming_job, + merge_updates, + table_col_index_mapping, + None, + None, + ) + .await?; + Ok(version) + } + Err(err) => { + tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace table"); + let _ = mgr + .catalog_controller + .try_abort_replacing_streaming_job(dummy_id) + .await.inspect_err(|err| { + tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing table"); + }); + Err(err) + } + } + } }