diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index ce7fa7985007f..47dcaf54f2643 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -196,6 +196,7 @@ pub async fn rpc_serve( Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn))) } }; + election_client.init().await?; rpc_serve_with_store( None, diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 5b81c8333bf7c..7d57adaf3522d 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -17,9 +17,10 @@ use std::collections::{HashMap, HashSet}; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_meta::manager::MetadataManager; +use risingwave_meta::model; use risingwave_meta::model::ActorId; use risingwave_meta::stream::ThrottleConfig; -use risingwave_meta_model_v2::SourceId; +use risingwave_meta_model_v2::{SourceId, StreamingParallelism}; use risingwave_pb::meta::cancel_creating_jobs_request::Jobs; use risingwave_pb::meta::list_table_fragments_response::{ ActorInfo, FragmentInfo, TableFragmentInfo, @@ -280,11 +281,18 @@ impl StreamManagerService for StreamServiceImpl { let job_states = mgr.catalog_controller.list_streaming_job_states().await?; job_states .into_iter() - .map(|(table_id, state)| { + .map(|(table_id, state, parallelism)| { + let parallelism = match parallelism { + StreamingParallelism::Adaptive => model::TableParallelism::Adaptive, + StreamingParallelism::Fixed(n) => { + model::TableParallelism::Fixed(n as _) + } + }; + list_table_fragment_states_response::TableFragmentState { table_id: table_id as _, state: PbState::from(state) as _, - parallelism: None, // TODO: support parallelism. + parallelism: Some(parallelism.into()), } }) .collect_vec() diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 1fbfecf76bf47..a89fe23b43e7d 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -56,9 +56,9 @@ use crate::controller::utils::{ check_connection_name_duplicate, check_database_name_duplicate, check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id, - get_fragment_mappings, get_referring_objects, get_referring_objects_cascade, - get_user_privilege, list_user_info_by_ids, resolve_source_register_info_for_jobs, - PartialObject, + get_fragment_mappings, get_fragment_mappings_by_jobs, get_referring_objects, + get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids, + resolve_source_register_info_for_jobs, PartialObject, }; use crate::controller::ObjectModel; use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION}; @@ -267,6 +267,7 @@ impl CatalogController { .into_tuple() .all(&txn) .await?; + let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs).await?; // The schema and objects in the database will be delete cascade. let res = Object::delete_by_id(database_id).exec(&txn).await?; @@ -287,6 +288,8 @@ impl CatalogController { }), ) .await; + self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings) + .await; Ok(( ReleaseContext { state_table_ids, @@ -1014,18 +1017,6 @@ impl CatalogController { .await?; } - if !index_ids.is_empty() { - let index_objs = Index::find() - .find_also_related(Object) - .filter(index::Column::IndexId.is_in(index_ids)) - .all(&txn) - .await?; - for (index, index_obj) in index_objs { - relations.push(PbRelationInfo::Index( - ObjectModel(index, index_obj.unwrap()).into(), - )); - } - } if !table_ids.is_empty() { let table_objs = Table::find() .find_also_related(Object) @@ -1038,6 +1029,19 @@ impl CatalogController { )); } } + // FIXME: frontend will update index/primary table from cache, requires apply updates of indexes after tables. + if !index_ids.is_empty() { + let index_objs = Index::find() + .find_also_related(Object) + .filter(index::Column::IndexId.is_in(index_ids)) + .all(&txn) + .await?; + for (index, index_obj) in index_objs { + relations.push(PbRelationInfo::Index( + ObjectModel(index, index_obj.unwrap()).into(), + )); + } + } } ObjectType::Source => { let source = Source::find_by_id(object_id) @@ -1426,7 +1430,16 @@ impl CatalogController { DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?, DropMode::Restrict => { ensure_object_not_refer(object_type, object_id, &txn).await?; - vec![] + if obj.obj_type == ObjectType::Table { + let indexes = get_referring_objects(object_id, &txn).await?; + assert!( + indexes.iter().all(|obj| obj.obj_type == ObjectType::Index), + "only index could be dropped in restrict mode" + ); + indexes + } else { + vec![] + } } }; assert!( @@ -1440,11 +1453,15 @@ impl CatalogController { let to_drop_table_ids = to_drop_objects .iter() - .filter(|obj| obj.obj_type == ObjectType::Table) + .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index) .map(|obj| obj.oid); let mut to_drop_streaming_jobs = to_drop_objects .iter() - .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink) + .filter(|obj| { + obj.obj_type == ObjectType::Table + || obj.obj_type == ObjectType::Sink + || obj.obj_type == ObjectType::Index + }) .map(|obj| obj.oid) .collect_vec(); @@ -1479,11 +1496,6 @@ impl CatalogController { } let mut to_drop_state_table_ids = to_drop_table_ids.clone().collect_vec(); - let to_drop_index_ids = to_drop_objects - .iter() - .filter(|obj| obj.obj_type == ObjectType::Index) - .map(|obj| obj.oid) - .collect_vec(); // Add associated sources. let mut to_drop_source_ids: Vec = Table::find() @@ -1507,16 +1519,6 @@ impl CatalogController { to_drop_source_ids.push(object_id); } - // add internal tables. - let index_table_ids: Vec = Index::find() - .select_only() - .column(index::Column::IndexTableId) - .filter(index::Column::IndexId.is_in(to_drop_index_ids)) - .into_tuple() - .all(&txn) - .await?; - to_drop_streaming_jobs.extend(index_table_ids); - if !to_drop_streaming_jobs.is_empty() { let to_drop_internal_table_objs: Vec = Object::find() .select_only() @@ -1537,7 +1539,8 @@ impl CatalogController { } let (source_fragments, removed_actors) = - resolve_source_register_info_for_jobs(&txn, to_drop_streaming_jobs).await?; + resolve_source_register_info_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?; + let fragment_mappings = get_fragment_mappings_by_jobs(&txn, to_drop_streaming_jobs).await?; // Find affect users with privileges on all this objects. let to_update_user_ids: Vec = UserPrivilege::find() @@ -1566,58 +1569,70 @@ impl CatalogController { // notify about them. self.notify_users_update(user_infos).await; - let relations = to_drop_objects - .into_iter() - .map(|obj| match obj.obj_type { - ObjectType::Table => PbRelation { + let mut relations = vec![]; + for obj in to_drop_objects { + match obj.obj_type { + ObjectType::Table => relations.push(PbRelation { relation_info: Some(PbRelationInfo::Table(PbTable { id: obj.oid as _, schema_id: obj.schema_id.unwrap() as _, database_id: obj.database_id.unwrap() as _, ..Default::default() })), - }, - ObjectType::Source => PbRelation { + }), + ObjectType::Source => relations.push(PbRelation { relation_info: Some(PbRelationInfo::Source(PbSource { id: obj.oid as _, schema_id: obj.schema_id.unwrap() as _, database_id: obj.database_id.unwrap() as _, ..Default::default() })), - }, - ObjectType::Sink => PbRelation { + }), + ObjectType::Sink => relations.push(PbRelation { relation_info: Some(PbRelationInfo::Sink(PbSink { id: obj.oid as _, schema_id: obj.schema_id.unwrap() as _, database_id: obj.database_id.unwrap() as _, ..Default::default() })), - }, - ObjectType::View => PbRelation { + }), + ObjectType::View => relations.push(PbRelation { relation_info: Some(PbRelationInfo::View(PbView { id: obj.oid as _, schema_id: obj.schema_id.unwrap() as _, database_id: obj.database_id.unwrap() as _, ..Default::default() })), - }, - ObjectType::Index => PbRelation { - relation_info: Some(PbRelationInfo::Index(PbIndex { - id: obj.oid as _, - schema_id: obj.schema_id.unwrap() as _, - database_id: obj.database_id.unwrap() as _, - ..Default::default() - })), - }, + }), + ObjectType::Index => { + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Index(PbIndex { + id: obj.oid as _, + schema_id: obj.schema_id.unwrap() as _, + database_id: obj.database_id.unwrap() as _, + ..Default::default() + })), + }); + relations.push(PbRelation { + relation_info: Some(PbRelationInfo::Table(PbTable { + id: obj.oid as _, + schema_id: obj.schema_id.unwrap() as _, + database_id: obj.database_id.unwrap() as _, + ..Default::default() + })), + }); + } _ => unreachable!("only relations will be dropped."), - }) - .collect_vec(); + } + } let version = self .notify_frontend( NotificationOperation::Delete, NotificationInfo::RelationGroup(PbRelationGroup { relations }), ) .await; + self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings) + .await; Ok(( ReleaseContext { @@ -1730,9 +1745,12 @@ impl CatalogController { .one(&txn) .await? .unwrap(); + let obj = obj.unwrap(); let old_name = relation.name.clone(); relation.name = object_name.into(); - relation.definition = alter_relation_rename(&relation.definition, object_name); + if obj.obj_type != ObjectType::Index { + relation.definition = alter_relation_rename(&relation.definition, object_name); + } let active_model = $table::ActiveModel { $identity: Set(relation.$identity), name: Set(object_name.into()), @@ -1741,9 +1759,7 @@ impl CatalogController { }; active_model.update(&txn).await?; to_update_relations.push(PbRelation { - relation_info: Some(PbRelationInfo::$entity( - ObjectModel(relation, obj.unwrap()).into(), - )), + relation_info: Some(PbRelationInfo::$entity(ObjectModel(relation, obj).into())), }); old_name }}; diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 4a9d3f3a992e5..b352c0046ac7e 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -24,7 +24,7 @@ use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ConnectorSplits, ExprContext, FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, SinkId, SourceId, - StreamNode, TableId, VnodeBitmap, WorkerId, + StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId, }; use risingwave_pb::common::PbParallelUnit; use risingwave_pb::meta::subscribe_response::{ @@ -634,13 +634,16 @@ impl CatalogController { ) } - pub async fn list_streaming_job_states(&self) -> MetaResult> { + pub async fn list_streaming_job_states( + &self, + ) -> MetaResult> { let inner = self.inner.read().await; - let job_states: Vec<(ObjectId, JobStatus)> = StreamingJob::find() + let job_states: Vec<(ObjectId, JobStatus, StreamingParallelism)> = StreamingJob::find() .select_only() .columns([ streaming_job::Column::JobId, streaming_job::Column::JobStatus, + streaming_job::Column::Parallelism, ]) .into_tuple() .all(&inner.db) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 62b532c6d2f46..a6273cf12067e 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -40,7 +40,6 @@ use risingwave_pb::meta::relation::PbRelationInfo; use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Info, Operation as NotificationOperation, Operation, }; -use risingwave_pb::meta::table_fragments::actor_status::PbActorState; use risingwave_pb::meta::table_fragments::PbActorStatus; use risingwave_pb::meta::{ FragmentParallelUnitMapping, PbRelation, PbRelationGroup, PbTableFragments, @@ -544,6 +543,7 @@ impl CatalogController { let table = table::ActiveModel::from(table).update(&txn).await?; + let old_fragment_mappings = get_fragment_mappings(&txn, job_id).await?; // 1. replace old fragments/actors with new ones. Fragment::delete_many() .filter(fragment::Column::JobId.eq(job_id)) @@ -701,6 +701,8 @@ impl CatalogController { txn.commit().await?; + self.notify_fragment_mapping(NotificationOperation::Delete, old_fragment_mappings) + .await; self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping) .await; let version = self @@ -953,7 +955,7 @@ impl CatalogController { // actor_status PbActorStatus { parallel_unit, - state, + state: _, }, ) in newly_created_actors { @@ -990,8 +992,6 @@ impl CatalogController { ); let actor_upstreams = ActorUpstreamActors(actor_upstreams); - - let status = ActorStatus::from(PbActorState::try_from(state).unwrap()); let parallel_unit_id = parallel_unit.unwrap().id; let splits = actor_splits @@ -1001,7 +1001,7 @@ impl CatalogController { new_actors.push(actor::ActiveModel { actor_id: Set(actor_id as _), fragment_id: Set(fragment_id as _), - status: Set(status), + status: Set(ActorStatus::Running), splits: Set(splits.map(|splits| PbConnectorSplits { splits }.into())), parallel_unit_id: Set(parallel_unit_id as _), upstream_actor_ids: Set(actor_upstreams), @@ -1121,6 +1121,9 @@ impl CatalogController { for dispatcher in all_dispatchers { debug_assert!(assert_dispatcher_update_checker.insert(dispatcher.id)); + if new_created_actors.contains(&dispatcher.actor_id) { + continue; + } let mut dispatcher = dispatcher.into_active_model(); @@ -1188,7 +1191,7 @@ impl CatalogController { TableParallelism::Adaptive => StreamingParallelism::Adaptive, TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n as _), TableParallelism::Custom => { - unreachable!("sql backend does't support custom parallelism") + unreachable!("sql backend doesn't support custom parallelism") } }); diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 893f8812a86d3..e5c6044271e31 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -669,7 +669,7 @@ where Ok(actor_dispatchers_map) } -/// `get_fragment_parallel_unit_mappings` returns the fragment vnode mappings of the given job. +/// `get_fragment_mappings` returns the fragment vnode mappings of the given job. pub async fn get_fragment_mappings( db: &C, job_id: ObjectId, @@ -694,6 +694,35 @@ where .collect()) } +/// `get_fragment_mappings_by_jobs` returns the fragment vnode mappings of the given job list. +pub async fn get_fragment_mappings_by_jobs( + db: &C, + job_ids: Vec, +) -> MetaResult> +where + C: ConnectionTrait, +{ + if job_ids.is_empty() { + return Ok(vec![]); + } + + let fragment_mappings: Vec<(FragmentId, FragmentVnodeMapping)> = Fragment::find() + .select_only() + .columns([fragment::Column::FragmentId, fragment::Column::VnodeMapping]) + .filter(fragment::Column::JobId.is_in(job_ids)) + .into_tuple() + .all(db) + .await?; + + Ok(fragment_mappings + .into_iter() + .map(|(fragment_id, mapping)| PbFragmentParallelUnitMapping { + fragment_id: fragment_id as _, + mapping: Some(mapping.into_inner()), + }) + .collect()) +} + /// `get_fragment_actor_ids` returns the fragment actor ids of the given fragments. pub async fn get_fragment_actor_ids( db: &C,