diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index dc33c0c05b31f..13c7a7a4a1881 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -14,6 +14,7 @@ impl MigrationTrait for Migration { assert!(!manager.has_table(UserPrivilege::Table.to_string()).await?); assert!(!manager.has_table(Database::Table.to_string()).await?); assert!(!manager.has_table(Schema::Table.to_string()).await?); + assert!(!manager.has_table(StreamingJob::Table.to_string()).await?); assert!(!manager.has_table(Fragment::Table.to_string()).await?); assert!(!manager.has_table(Actor::Table.to_string()).await?); assert!(!manager.has_table(Table::Table.to_string()).await?); @@ -318,6 +319,25 @@ impl MigrationTrait for Migration { .to_owned(), ) .await?; + manager + .create_table( + MigrationTable::create() + .table(StreamingJob::Table) + .col(ColumnDef::new(StreamingJob::JobId).integer().primary_key()) + .col(ColumnDef::new(StreamingJob::JobStatus).string().not_null()) + .col(ColumnDef::new(StreamingJob::CreateType).string().not_null()) + .col(ColumnDef::new(StreamingJob::Timezone).string()) + .foreign_key( + &mut ForeignKey::create() + .name("FK_streaming_job_object_id") + .from(StreamingJob::Table, StreamingJob::JobId) + .to(Object::Table, Object::Oid) + .on_delete(ForeignKeyAction::Cascade) + .to_owned(), + ) + .to_owned(), + ) + .await?; manager .create_table( MigrationTable::create() @@ -328,7 +348,7 @@ impl MigrationTrait for Migration { .primary_key() .auto_increment(), ) - .col(ColumnDef::new(Fragment::TableId).integer().not_null()) + .col(ColumnDef::new(Fragment::JobId).integer().not_null()) .col( ColumnDef::new(Fragment::FragmentTypeMask) .integer() @@ -340,13 +360,13 @@ impl MigrationTrait for Migration { .not_null(), ) .col(ColumnDef::new(Fragment::StreamNode).json().not_null()) - .col(ColumnDef::new(Fragment::VnodeMapping).json()) + .col(ColumnDef::new(Fragment::VnodeMapping).json().not_null()) .col(ColumnDef::new(Fragment::StateTableIds).json()) .col(ColumnDef::new(Fragment::UpstreamFragmentId).json()) .foreign_key( &mut ForeignKey::create() .name("FK_fragment_table_id") - .from(Fragment::Table, Fragment::TableId) + .from(Fragment::Table, Fragment::JobId) .to(Object::Table, Object::Oid) .on_delete(ForeignKeyAction::Cascade) .to_owned(), @@ -365,7 +385,7 @@ impl MigrationTrait for Migration { .auto_increment(), ) .col(ColumnDef::new(Actor::FragmentId).integer().not_null()) - .col(ColumnDef::new(Actor::Status).json().not_null()) + .col(ColumnDef::new(Actor::Status).string().not_null()) .col(ColumnDef::new(Actor::Splits).json()) .col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null()) .col(ColumnDef::new(Actor::UpstreamActorIds).json()) @@ -767,6 +787,7 @@ impl MigrationTrait for Migration { UserPrivilege, Database, Schema, + StreamingJob, Fragment, Actor, Table, @@ -854,7 +875,7 @@ enum Schema { enum Fragment { Table, FragmentId, - TableId, + JobId, FragmentTypeMask, DistributionType, StreamNode, @@ -876,6 +897,15 @@ enum Actor { VnodeBitmap, } +#[derive(DeriveIden)] +enum StreamingJob { + Table, + JobId, + JobStatus, + Timezone, + CreateType, +} + #[derive(DeriveIden)] #[allow(clippy::enum_variant_names)] enum Table { diff --git a/src/meta/model_v2/src/actor.rs b/src/meta/model_v2/src/actor.rs index dd3ed244209ea..75dd3806aca74 100644 --- a/src/meta/model_v2/src/actor.rs +++ b/src/meta/model_v2/src/actor.rs @@ -12,12 +12,38 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_pb::meta::table_fragments::actor_status::PbActorState; use sea_orm::entity::prelude::*; -use crate::{ - ActorId, ActorStatus, ActorUpstreamActors, ConnectorSplits, Dispatchers, FragmentId, - VnodeBitmap, -}; +use crate::{ActorId, ActorUpstreamActors, ConnectorSplits, Dispatchers, FragmentId, VnodeBitmap}; + +#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] +#[sea_orm(rs_type = "String", db_type = "String(None)")] +pub enum ActorStatus { + #[sea_orm(string_value = "INACTIVE")] + Inactive, + #[sea_orm(string_value = "RUNNING")] + Running, +} + +impl From for ActorStatus { + fn from(val: PbActorState) -> Self { + match val { + PbActorState::Unspecified => unreachable!(), + PbActorState::Inactive => ActorStatus::Inactive, + PbActorState::Running => ActorStatus::Running, + } + } +} + +impl From for PbActorState { + fn from(val: ActorStatus) -> Self { + match val { + ActorStatus::Inactive => PbActorState::Inactive, + ActorStatus::Running => PbActorState::Running, + } + } +} #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "actor")] diff --git a/src/meta/model_v2/src/fragment.rs b/src/meta/model_v2/src/fragment.rs index a364e0709e3d4..155995624fcc4 100644 --- a/src/meta/model_v2/src/fragment.rs +++ b/src/meta/model_v2/src/fragment.rs @@ -15,18 +15,18 @@ use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; use sea_orm::entity::prelude::*; -use crate::{FragmentId, FragmentVnodeMapping, I32Array, StreamNode, TableId}; +use crate::{FragmentId, FragmentVnodeMapping, I32Array, ObjectId, StreamNode}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "fragment")] pub struct Model { #[sea_orm(primary_key)] pub fragment_id: FragmentId, - pub table_id: TableId, + pub job_id: ObjectId, pub fragment_type_mask: i32, pub distribution_type: DistributionType, pub stream_node: StreamNode, - pub vnode_mapping: Option, + pub vnode_mapping: FragmentVnodeMapping, pub state_table_ids: I32Array, pub upstream_fragment_id: I32Array, } @@ -65,7 +65,7 @@ pub enum Relation { Actor, #[sea_orm( belongs_to = "super::object::Entity", - from = "Column::TableId", + from = "Column::JobId", to = "super::object::Column::Oid", on_update = "NoAction", on_delete = "Cascade" diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 1f101a29f2f77..04f41eb26fd30 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -15,6 +15,7 @@ use std::collections::{BTreeMap, HashMap}; use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus}; +use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState; use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult}; use serde::{Deserialize, Serialize}; @@ -39,6 +40,7 @@ pub mod object_dependency; pub mod schema; pub mod sink; pub mod source; +pub mod streaming_job; pub mod system_parameter; pub mod table; pub mod user; @@ -92,6 +94,16 @@ impl From for PbStreamJobStatus { } } +// todo: deprecate job status in catalog and unify with this one. +impl From for PbStreamJobState { + fn from(status: JobStatus) -> Self { + match status { + JobStatus::Creating => PbStreamJobState::Creating, + JobStatus::Created => PbStreamJobState::Created, + } + } +} + #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] #[sea_orm(rs_type = "String", db_type = "String(None)")] pub enum CreateType { @@ -187,10 +199,6 @@ derive_from_json_struct!(StreamNode, risingwave_pb::stream_plan::PbStreamNode); derive_from_json_struct!(Dispatchers, Vec); derive_from_json_struct!(ConnectorSplits, risingwave_pb::source::ConnectorSplits); -derive_from_json_struct!( - ActorStatus, - risingwave_pb::meta::table_fragments::PbActorStatus -); derive_from_json_struct!(VnodeBitmap, risingwave_pb::common::Buffer); derive_from_json_struct!( diff --git a/src/meta/model_v2/src/prelude.rs b/src/meta/model_v2/src/prelude.rs index ab9670f712f04..f55f6ebc9a49d 100644 --- a/src/meta/model_v2/src/prelude.rs +++ b/src/meta/model_v2/src/prelude.rs @@ -31,6 +31,7 @@ pub use super::object_dependency::Entity as ObjectDependency; pub use super::schema::Entity as Schema; pub use super::sink::Entity as Sink; pub use super::source::Entity as Source; +pub use super::streaming_job::Entity as StreamingJob; pub use super::system_parameter::Entity as SystemParameter; pub use super::table::Entity as Table; pub use super::user::Entity as User; diff --git a/src/meta/model_v2/src/streaming_job.rs b/src/meta/model_v2/src/streaming_job.rs new file mode 100644 index 0000000000000..e2fe673ebd057 --- /dev/null +++ b/src/meta/model_v2/src/streaming_job.rs @@ -0,0 +1,47 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use sea_orm::entity::prelude::*; + +use crate::{CreateType, JobStatus}; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "streaming_job")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub job_id: i32, + pub job_status: JobStatus, + pub create_type: CreateType, + pub timezone: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::object::Entity", + from = "Column::JobId", + to = "super::object::Column::Oid", + on_update = "NoAction", + on_delete = "Cascade" + )] + Object, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Object.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index fddd2bd44f88e..cd69cdb630f40 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::iter; +use std::sync::Arc; use anyhow::anyhow; use itertools::Itertools; @@ -53,6 +54,8 @@ use crate::manager::{MetaSrvEnv, NotificationVersion, StreamingJob}; use crate::rpc::ddl_controller::DropMode; use crate::{MetaError, MetaResult}; +pub type CatalogControllerRef = Arc; + /// `CatalogController` is the controller for catalog related operations, including database, schema, table, view, etc. pub struct CatalogController { pub(crate) env: MetaSrvEnv, @@ -61,9 +64,9 @@ pub struct CatalogController { #[derive(Clone, Default)] pub struct ReleaseContext { - streaming_jobs: Vec, - source_ids: Vec, - connections: Vec, + pub(crate) streaming_jobs: Vec, + pub(crate) source_ids: Vec, + pub(crate) connections: Vec, } impl CatalogController { @@ -978,6 +981,40 @@ impl CatalogController { Ok(version) } + + pub async fn alter_source_column( + &self, + pb_source: PbSource, + ) -> MetaResult { + let source_id = pb_source.id as SourceId; + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + + let original_version: i64 = Source::find_by_id(source_id) + .select_only() + .column(source::Column::Version) + .into_tuple() + .one(&txn) + .await? + .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?; + if original_version + 1 != pb_source.version as i64 { + return Err(MetaError::permission_denied( + "source version is stale".to_string(), + )); + } + + let source: source::ActiveModel = pb_source.clone().into(); + source.update(&txn).await?; + txn.commit().await?; + + let version = self + .notify_frontend_relation_info( + NotificationOperation::Update, + PbRelationInfo::Source(pb_source), + ) + .await; + Ok(version) + } } #[cfg(test)] diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 6fa5d8c2aac0f..306ef73345bde 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -12,26 +12,63 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::mem::swap; use anyhow::Context; +use itertools::Itertools; use risingwave_common::bail; use risingwave_common::util::stream_graph_visitor::visit_stream_node; +use risingwave_meta_model_v2::actor::ActorStatus; +use risingwave_meta_model_v2::prelude::{Actor, Fragment, StreamingJob}; use risingwave_meta_model_v2::{ - actor, fragment, ActorStatus, ConnectorSplits, Dispatchers, FragmentVnodeMapping, StreamNode, - TableId, VnodeBitmap, + actor, fragment, ActorId, ConnectorSplits, Dispatchers, FragmentId, FragmentVnodeMapping, + I32Array, ObjectId, StreamNode, TableId, VnodeBitmap, WorkerId, }; +use risingwave_pb::common::PbParallelUnit; +use risingwave_pb::ddl_service::PbTableJobType; +use risingwave_pb::meta::table_fragments::actor_status::PbActorState; use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment, PbState}; -use risingwave_pb::meta::PbTableFragments; +use risingwave_pb::meta::{FragmentParallelUnitMapping, PbTableFragments}; use risingwave_pb::source::PbConnectorSplits; use risingwave_pb::stream_plan::stream_node::NodeBody; -use risingwave_pb::stream_plan::{PbStreamEnvironment, StreamActor}; +use risingwave_pb::stream_plan::{ + PbDispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamEnvironment, +}; +use sea_orm::sea_query::{Expr, Value}; +use sea_orm::ActiveValue::Set; +use sea_orm::{ + ColumnTrait, EntityTrait, JoinType, ModelTrait, PaginatorTrait, QueryFilter, QuerySelect, + RelationTrait, TransactionTrait, +}; -use crate::controller::catalog::CatalogController; +use crate::controller::catalog::{CatalogController, CatalogControllerInner}; +use crate::controller::utils::get_parallel_unit_mapping; +use crate::manager::ActorInfos; +use crate::stream::SplitAssignment; use crate::MetaResult; +impl CatalogControllerInner { + /// List all fragment vnode mapping info + pub async fn all_running_fragment_mappings( + &self, + ) -> MetaResult + '_> { + let fragment_mappings: Vec<(FragmentId, FragmentVnodeMapping)> = Fragment::find() + .select_only() + .columns([fragment::Column::FragmentId, fragment::Column::VnodeMapping]) + .into_tuple() + .all(&self.db) + .await?; + Ok(fragment_mappings.into_iter().map(|(fragment_id, mapping)| { + FragmentParallelUnitMapping { + fragment_id: fragment_id as _, + mapping: Some(mapping.into_inner()), + } + })) + } +} + impl CatalogController { pub fn extract_fragment_and_actors_from_table_fragments( PbTableFragments { @@ -61,7 +98,7 @@ impl CatalogController { } pub fn extract_fragment_and_actors( - table_id: TableId, + job_id: ObjectId, pb_fragment: PbFragment, pb_actor_status: &HashMap, pb_actor_splits: &HashMap, @@ -112,7 +149,7 @@ impl CatalogController { } }); - let StreamActor { + let PbStreamActor { actor_id, fragment_id, nodes: _, @@ -123,9 +160,7 @@ impl CatalogController { } = actor; let splits = pb_actor_splits.get(&actor_id).cloned().map(ConnectorSplits); - let status = pb_actor_status.get(&actor_id).cloned().map(ActorStatus); - - let status = status.ok_or_else(|| { + let status = pb_actor_status.get(&actor_id).cloned().ok_or_else(|| { anyhow::anyhow!( "actor {} in fragment {} has no actor_status", actor_id, @@ -134,7 +169,6 @@ impl CatalogController { })?; let parallel_unit_id = status - .inner_ref() .parallel_unit .as_ref() .map(|parallel_unit| parallel_unit.id) @@ -156,7 +190,7 @@ impl CatalogController { actors.push(actor::Model { actor_id: actor_id as _, fragment_id: fragment_id as _, - status, + status: status.get_state().unwrap().into(), splits, parallel_unit_id, upstream_actor_ids: upstream_actors.into(), @@ -167,7 +201,7 @@ impl CatalogController { let upstream_fragment_id = pb_upstream_fragment_ids.into(); - let vnode_mapping = pb_vnode_mapping.map(FragmentVnodeMapping); + let vnode_mapping = pb_vnode_mapping.map(FragmentVnodeMapping).unwrap(); let stream_node = StreamNode(stream_node); @@ -177,7 +211,7 @@ impl CatalogController { let fragment = fragment::Model { fragment_id: pb_fragment_id as _, - table_id, + job_id, fragment_type_mask: pb_fragment_type_mask as _, distribution_type, stream_node, @@ -194,6 +228,7 @@ impl CatalogController { state: PbState, env: Option, fragments: Vec<(fragment::Model, Vec)>, + parallel_units_map: HashMap, ) -> MetaResult { let mut pb_fragments = HashMap::new(); let mut pb_actor_splits = HashMap::new(); @@ -201,7 +236,7 @@ impl CatalogController { for (fragment, actors) in fragments { let (fragment, fragment_actor_status, fragment_actor_splits) = - Self::compose_fragment(fragment, actors)?; + Self::compose_fragment(fragment, actors, ¶llel_units_map)?; pb_fragments.insert(fragment.fragment_id, fragment); @@ -225,6 +260,7 @@ impl CatalogController { pub(crate) fn compose_fragment( fragment: fragment::Model, actors: Vec, + parallel_units_map: &HashMap, ) -> MetaResult<( PbFragment, HashMap, @@ -232,7 +268,7 @@ impl CatalogController { )> { let fragment::Model { fragment_id, - table_id: _, + job_id: _, fragment_type_mask, distribution_type, stream_node, @@ -262,6 +298,7 @@ impl CatalogController { actor_id, fragment_id, status, + parallel_unit_id, splits, upstream_actor_ids, dispatchers, @@ -295,13 +332,24 @@ impl CatalogController { let pb_dispatcher = dispatchers.into_inner(); - pb_actor_status.insert(actor_id as _, status.into_inner()); + pb_actor_status.insert( + actor_id as _, + PbActorStatus { + parallel_unit: Some( + parallel_units_map + .get(&(parallel_unit_id as _)) + .unwrap() + .clone(), + ), + state: PbActorState::from(status) as _, + }, + ); if let Some(splits) = splits { pb_actor_splits.insert(actor_id as _, splits.into_inner()); } - pb_actors.push(StreamActor { + pb_actors.push(PbStreamActor { actor_id: actor_id as _, fragment_id: fragment_id as _, nodes: pb_nodes, @@ -313,7 +361,7 @@ impl CatalogController { } let pb_upstream_fragment_ids = upstream_fragment_id.into_u32_array(); - let pb_vnode_mapping = vnode_mapping.map(|mapping| mapping.into_inner()); + let pb_vnode_mapping = vnode_mapping.into_inner(); let pb_state_table_ids = state_table_ids.into_u32_array(); let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _; let pb_fragment = PbFragment { @@ -321,13 +369,470 @@ impl CatalogController { fragment_type_mask: fragment_type_mask as _, distribution_type: pb_distribution_type, actors: pb_actors, - vnode_mapping: pb_vnode_mapping, + vnode_mapping: Some(pb_vnode_mapping), state_table_ids: pb_state_table_ids, upstream_fragment_ids: pb_upstream_fragment_ids, }; Ok((pb_fragment, pb_actor_status, pb_actor_splits)) } + + pub async fn running_fragment_parallelisms( + &self, + id_filter: Option>, + ) -> MetaResult> { + let inner = self.inner.read().await; + let mut select = Actor::find() + .select_only() + .column(actor::Column::FragmentId) + .column_as(actor::Column::ActorId.count(), "count") + .group_by(actor::Column::FragmentId); + if let Some(id_filter) = id_filter { + select = select.having(actor::Column::FragmentId.is_in(id_filter)); + } + let fragment_parallelisms: Vec<(FragmentId, i64)> = + select.into_tuple().all(&inner.db).await?; + Ok(fragment_parallelisms + .into_iter() + .map(|(fragment_id, count)| (fragment_id, count as usize)) + .collect()) + } + + pub async fn fragment_job_mapping(&self) -> MetaResult> { + let inner = self.inner.read().await; + let fragment_jobs: Vec<(FragmentId, ObjectId)> = Fragment::find() + .select_only() + .columns([fragment::Column::FragmentId, fragment::Column::JobId]) + .into_tuple() + .all(&inner.db) + .await?; + Ok(fragment_jobs.into_iter().collect()) + } + + /// Gets the counts for each upstream relation that each stream job + /// indicated by `table_ids` depends on. + /// For example in the following query: + /// ```sql + /// CREATE MATERIALIZED VIEW m1 AS + /// SELECT * FROM t1 JOIN t2 ON t1.a = t2.a JOIN t3 ON t2.b = t3.b + /// ``` + /// + /// We have t1 occurring once, and t2 occurring once. + pub async fn get_upstream_job_counts( + &self, + job_ids: Vec, + ) -> MetaResult>> { + let inner = self.inner.read().await; + let upstream_fragments: Vec<(ObjectId, i32, I32Array)> = Fragment::find() + .select_only() + .columns([ + fragment::Column::JobId, + fragment::Column::FragmentTypeMask, + fragment::Column::UpstreamFragmentId, + ]) + .filter(fragment::Column::JobId.is_in(job_ids)) + .into_tuple() + .all(&inner.db) + .await?; + + // filter out stream scan node. + let upstream_fragments = upstream_fragments + .into_iter() + .filter(|(_, mask, _)| (*mask & PbFragmentTypeFlag::StreamScan as i32) != 0) + .map(|(obj, _, upstream_fragments)| (obj, upstream_fragments.into_inner())) + .collect_vec(); + + // count by fragment id. + let upstream_fragment_counts = upstream_fragments + .iter() + .flat_map(|(_, upstream_fragments)| upstream_fragments.iter().cloned()) + .counts(); + + // get fragment id to job id mapping. + let fragment_job_ids: Vec<(FragmentId, ObjectId)> = Fragment::find() + .select_only() + .columns([fragment::Column::FragmentId, fragment::Column::JobId]) + .filter( + fragment::Column::FragmentId + .is_in(upstream_fragment_counts.keys().cloned().collect_vec()), + ) + .into_tuple() + .all(&inner.db) + .await?; + let fragment_job_mapping: HashMap = + fragment_job_ids.into_iter().collect(); + + // get upstream job counts. + let upstream_job_counts = upstream_fragments + .into_iter() + .map(|(job_id, upstream_fragments)| { + let upstream_job_counts = upstream_fragments + .into_iter() + .map(|upstream_fragment_id| { + let upstream_job_id = + fragment_job_mapping.get(&upstream_fragment_id).unwrap(); + ( + *upstream_job_id, + *upstream_fragment_counts.get(&upstream_fragment_id).unwrap(), + ) + }) + .collect(); + (job_id, upstream_job_counts) + }) + .collect(); + Ok(upstream_job_counts) + } + + pub async fn get_job_fragments_by_id(&self, job_id: ObjectId) -> MetaResult { + let inner = self.inner.read().await; + let fragment_actors = Fragment::find() + .find_with_related(Actor) + .filter(fragment::Column::JobId.eq(job_id)) + .all(&inner.db) + .await?; + let job_info = StreamingJob::find_by_id(job_id) + .one(&inner.db) + .await? + .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?; + + let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?; + Self::compose_table_fragments( + job_id as _, + job_info.job_status.into(), + job_info + .timezone + .map(|tz| PbStreamEnvironment { timezone: tz }), + fragment_actors, + parallel_units_map, + ) + } + + /// Get all actor ids in the target streaming jobs. + pub async fn get_job_actor_mapping( + &self, + job_ids: Vec, + ) -> MetaResult>> { + let inner = self.inner.read().await; + let job_actors: Vec<(ObjectId, ActorId)> = Actor::find() + .select_only() + .column(fragment::Column::JobId) + .column(actor::Column::ActorId) + .join(JoinType::InnerJoin, actor::Relation::Fragment.def()) + .filter(fragment::Column::JobId.is_in(job_ids)) + .into_tuple() + .all(&inner.db) + .await?; + Ok(job_actors.into_iter().into_group_map()) + } + + /// Try to get internal table ids of each streaming job, used by metrics collection. + pub async fn get_job_internal_table_ids(&self) -> Option)>> { + if let Ok(inner) = self.inner.try_read() { + if let Ok(job_state_tables) = Fragment::find() + .select_only() + .columns([fragment::Column::JobId, fragment::Column::StateTableIds]) + .into_tuple::<(ObjectId, I32Array)>() + .all(&inner.db) + .await + { + let mut job_internal_table_ids = HashMap::new(); + for (job_id, state_table_ids) in job_state_tables { + job_internal_table_ids + .entry(job_id) + .or_insert_with(Vec::new) + .extend(state_table_ids.into_inner()); + } + return Some(job_internal_table_ids.into_iter().collect()); + } + } + None + } + + pub async fn has_any_running_jobs(&self) -> MetaResult { + let inner = self.inner.read().await; + let count = Fragment::find().count(&inner.db).await?; + Ok(count > 0) + } + + pub fn table_fragments(&self) -> MetaResult> { + unimplemented!( + "This function is too heavy, we should avoid using it and implement others on demand." + ) + } + + /// Check if the fragment type mask is injectable. + fn is_injectable(fragment_type_mask: u32) -> bool { + (fragment_type_mask + & (PbFragmentTypeFlag::Source as u32 + | PbFragmentTypeFlag::Now as u32 + | PbFragmentTypeFlag::Values as u32 + | PbFragmentTypeFlag::BarrierRecv as u32)) + != 0 + } + + /// Used in [`crate::barrier::GlobalBarrierManager`], load all actor that need to be sent or + /// collected + pub async fn load_all_actor( + &self, + parallel_units_map: &HashMap, + check_state: impl Fn(PbActorState, ObjectId, ActorId) -> bool, + ) -> MetaResult { + let inner = self.inner.read().await; + let actor_info: Vec<(ActorId, ActorStatus, i32, ObjectId, i32)> = Actor::find() + .select_only() + .column(actor::Column::ActorId) + .column(actor::Column::Status) + .column(actor::Column::ParallelUnitId) + .column(fragment::Column::JobId) + .column(fragment::Column::FragmentTypeMask) + .join(JoinType::InnerJoin, actor::Relation::Fragment.def()) + .into_tuple() + .all(&inner.db) + .await?; + + let mut actor_maps = HashMap::new(); + let mut barrier_inject_actor_maps = HashMap::new(); + + for (actor_id, status, parallel_unit_id, job_id, type_mask) in actor_info { + let status = PbActorState::from(status); + let worker_id = parallel_units_map + .get(&(parallel_unit_id as _)) + .unwrap() + .worker_node_id; + if check_state(status, job_id, actor_id) { + actor_maps + .entry(worker_id) + .or_insert_with(Vec::new) + .push(actor_id as _); + if Self::is_injectable(type_mask as _) { + barrier_inject_actor_maps + .entry(worker_id) + .or_insert_with(Vec::new) + .push(actor_id as _); + } + } + } + + Ok(ActorInfos { + actor_maps, + barrier_inject_actor_maps, + }) + } + + pub async fn migrate_actors(&self, plan: HashMap) -> MetaResult<()> { + let inner = self.inner.read().await; + let txn = inner.db.begin().await?; + for (from_pu_id, to_pu_id) in plan { + Actor::update_many() + .col_expr( + actor::Column::ParallelUnitId, + Expr::value(Value::Int(Some(to_pu_id.id as _))), + ) + .filter(actor::Column::ParallelUnitId.eq(from_pu_id as i32)) + .exec(&txn) + .await?; + } + txn.commit().await?; + + Ok(()) + } + + pub async fn all_inuse_parallel_units(&self) -> MetaResult> { + let inner = self.inner.read().await; + let parallel_units: Vec = Actor::find() + .select_only() + .column(actor::Column::ParallelUnitId) + .distinct() + .into_tuple() + .all(&inner.db) + .await?; + Ok(parallel_units) + } + + pub async fn all_node_actors( + &self, + include_inactive: bool, + ) -> MetaResult>> { + let inner = self.inner.read().await; + let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?; + let fragment_actors = if include_inactive { + Fragment::find() + .find_with_related(Actor) + .all(&inner.db) + .await? + } else { + Fragment::find() + .find_with_related(Actor) + .filter(actor::Column::Status.eq(ActorStatus::Running)) + .all(&inner.db) + .await? + }; + + let mut node_actors = HashMap::new(); + for (fragment, actors) in fragment_actors { + let (table_fragments, actor_status, _) = + Self::compose_fragment(fragment, actors, ¶llel_units_map)?; + for actor in table_fragments.actors { + let node_id = actor_status[&actor.actor_id] + .get_parallel_unit() + .unwrap() + .worker_node_id as WorkerId; + node_actors + .entry(node_id) + .or_insert_with(Vec::new) + .push(actor); + } + } + + Ok(node_actors) + } + + pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> { + let inner = self.inner.read().await; + let txn = inner.db.begin().await?; + for assignments in split_assignment.values() { + for (actor_id, splits) in assignments { + Actor::update(actor::ActiveModel { + actor_id: Set(*actor_id as _), + splits: Set(Some(ConnectorSplits(PbConnectorSplits { + splits: splits.iter().map(Into::into).collect(), + }))), + ..Default::default() + }) + .exec(&txn) + .await?; + } + } + txn.commit().await?; + + Ok(()) + } + + /// Get the actor ids of the fragment with `fragment_id` with `Running` status. + pub async fn get_running_actors_by_fragment( + &self, + fragment_id: FragmentId, + ) -> MetaResult> { + let inner = self.inner.read().await; + let actors: Vec = Actor::find() + .select_only() + .column(actor::Column::ActorId) + .filter(actor::Column::FragmentId.eq(fragment_id)) + .filter(actor::Column::Status.eq(ActorStatus::Running)) + .into_tuple() + .all(&inner.db) + .await?; + Ok(actors) + } + + pub async fn get_actors_by_job_ids(&self, job_ids: Vec) -> MetaResult> { + let inner = self.inner.read().await; + let actors: Vec = Actor::find() + .select_only() + .column(actor::Column::ActorId) + .join(JoinType::InnerJoin, actor::Relation::Fragment.def()) + .filter(fragment::Column::JobId.is_in(job_ids)) + .into_tuple() + .all(&inner.db) + .await?; + Ok(actors) + } + + /// Get and filter the upstream `Materialize` or `Source` fragments of the specified relations. + pub async fn get_upstream_root_fragments( + &self, + upstream_job_ids: Vec, + job_type: Option, + ) -> MetaResult> { + let inner = self.inner.read().await; + + let mut fragments = Fragment::find() + .filter(fragment::Column::JobId.is_in(upstream_job_ids)) + .all(&inner.db) + .await?; + fragments.retain(|f| match job_type { + Some(PbTableJobType::SharedCdcSource) => { + f.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 + } + // MV on MV, and other kinds of table job + None | Some(PbTableJobType::General) | Some(PbTableJobType::Unspecified) => { + f.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0 + } + }); + + let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?; + let mut root_fragments = HashMap::new(); + for fragment in fragments { + let actors = fragment.find_related(Actor).all(&inner.db).await?; + root_fragments.insert( + fragment.job_id, + Self::compose_fragment(fragment, actors, ¶llel_units_map)?.0, + ); + } + + Ok(root_fragments) + } + + /// Get the downstream `Chain` fragments of the specified table. + pub async fn get_downstream_chain_fragments( + &self, + job_id: ObjectId, + ) -> MetaResult> { + let mview_fragment = self.get_mview_fragment(job_id).await?; + let downstream_dispatches: HashMap<_, _> = mview_fragment.actors[0] + .dispatcher + .iter() + .map(|d| { + let fragment_id = d.dispatcher_id as FragmentId; + let strategy = PbDispatchStrategy { + r#type: d.r#type, + dist_key_indices: d.dist_key_indices.clone(), + output_indices: d.output_indices.clone(), + downstream_table_name: d.downstream_table_name.clone(), + }; + (fragment_id, strategy) + }) + .collect(); + + let inner = self.inner.read().await; + let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?; + let mut chain_fragments = vec![]; + for (fragment_id, dispatch_strategy) in downstream_dispatches { + let mut fragment_actors = Fragment::find_by_id(fragment_id) + .find_with_related(Actor) + .all(&inner.db) + .await?; + if fragment_actors.is_empty() { + bail!("No fragment found for fragment id {}", fragment_id); + } + assert_eq!(fragment_actors.len(), 1); + let (fragment, actors) = fragment_actors.pop().unwrap(); + let fragment = Self::compose_fragment(fragment, actors, ¶llel_units_map)?.0; + chain_fragments.push((dispatch_strategy, fragment)); + } + + Ok(chain_fragments) + } + + /// Get the `Materialize` fragment of the specified table. + pub async fn get_mview_fragment(&self, job_id: ObjectId) -> MetaResult { + let inner = self.inner.read().await; + let mut fragments = Fragment::find() + .filter(fragment::Column::JobId.eq(job_id)) + .all(&inner.db) + .await?; + fragments.retain(|f| f.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0); + if fragments.is_empty() { + bail!("No mview fragment found for job {}", job_id); + } + assert_eq!(fragments.len(), 1); + + let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?; + let fragment = fragments.pop().unwrap(); + let actors = fragment.find_related(Actor).all(&inner.db).await?; + + Ok(Self::compose_fragment(fragment, actors, ¶llel_units_map)?.0) + } } #[cfg(test)] @@ -339,19 +844,21 @@ mod tests { use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::stream_graph_visitor::visit_stream_node; + use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::fragment::DistributionType; use risingwave_meta_model_v2::{ - actor, fragment, ActorId, ActorStatus, ActorUpstreamActors, ConnectorSplits, Dispatchers, - FragmentId, FragmentVnodeMapping, I32Array, StreamNode, TableId, VnodeBitmap, + actor, fragment, ActorId, ActorUpstreamActors, ConnectorSplits, Dispatchers, FragmentId, + FragmentVnodeMapping, I32Array, ObjectId, StreamNode, TableId, VnodeBitmap, }; use risingwave_pb::common::ParallelUnit; + use risingwave_pb::meta::table_fragments::actor_status::PbActorState; use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment}; use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody}; use risingwave_pb::stream_plan::{ Dispatcher, MergeNode, PbDispatcher, PbDispatcherType, PbFragmentTypeFlag, PbStreamActor, - PbStreamNode, PbUnionNode, StreamActor, + PbStreamNode, PbUnionNode, }; use crate::controller::catalog::CatalogController; @@ -361,7 +868,7 @@ mod tests { const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2; - const TEST_TABLE_ID: TableId = 1; + const TEST_JOB_ID: ObjectId = 1; const TEST_STATE_TABLE_ID: TableId = 1000; @@ -476,7 +983,7 @@ mod tests { actor_id, PbActorStatus { parallel_unit: Some(parallel_units[actor_id as usize].clone()), - ..Default::default() + state: PbActorState::Running as _, }, ) }) @@ -485,7 +992,7 @@ mod tests { let pb_actor_splits = Default::default(); let (fragment, actors) = CatalogController::extract_fragment_and_actors( - TEST_TABLE_ID, + TEST_JOB_ID, pb_fragment.clone(), &pb_actor_status, &pb_actor_splits, @@ -543,6 +1050,11 @@ mod tests { }) .collect(); + let parallel_units_map = parallel_units + .iter() + .map(|parallel_unit| (parallel_unit.id, parallel_unit.clone())) + .collect(); + let actors = (0..actor_count) .map(|actor_id| { let parallel_unit_id = actor_id as ParallelUnitId; @@ -551,11 +1063,6 @@ mod tests { .remove(¶llel_unit_id) .map(|m| VnodeBitmap(m.to_protobuf())); - let actor_status = ActorStatus(PbActorStatus { - parallel_unit: Some(parallel_units[actor_id as usize].clone()), - ..Default::default() - }); - let actor_splits = Some(ConnectorSplits(PbConnectorSplits { splits: vec![PbConnectorSplit { split_type: "dummy".to_string(), @@ -570,7 +1077,7 @@ mod tests { actor::Model { actor_id: actor_id as ActorId, fragment_id: TEST_FRAGMENT_ID, - status: actor_status, + status: ActorStatus::Running, splits: actor_splits, parallel_unit_id: parallel_unit_id as i32, upstream_actor_ids: ActorUpstreamActors(actor_upstream_actor_ids), @@ -595,17 +1102,21 @@ mod tests { let fragment = fragment::Model { fragment_id: TEST_FRAGMENT_ID, - table_id: TEST_TABLE_ID, + job_id: TEST_JOB_ID, fragment_type_mask: 0, distribution_type: DistributionType::Hash, stream_node: StreamNode(stream_node), - vnode_mapping: Some(FragmentVnodeMapping(parallel_unit_mapping.to_protobuf())), + vnode_mapping: FragmentVnodeMapping(parallel_unit_mapping.to_protobuf()), state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]), upstream_fragment_id: I32Array::default(), }; - let (pb_fragment, pb_actor_status, pb_actor_splits) = - CatalogController::compose_fragment(fragment.clone(), actors.clone()).unwrap(); + let (pb_fragment, pb_actor_status, pb_actor_splits) = CatalogController::compose_fragment( + fragment.clone(), + actors.clone(), + ¶llel_units_map, + ) + .unwrap(); assert_eq!(pb_actor_status.len(), actor_count as usize); assert_eq!(pb_actor_splits.len(), actor_count as usize); @@ -629,8 +1140,8 @@ mod tests { fn check_actors( actors: Vec, - pb_actors: Vec, - pb_actor_status: HashMap, + pb_actors: Vec, + _pb_actor_status: HashMap, pb_actor_splits: HashMap, ) { for ( @@ -644,7 +1155,7 @@ mod tests { dispatchers, vnode_bitmap, }, - StreamActor { + PbStreamActor { actor_id: pb_actor_id, fragment_id: pb_fragment_id, nodes: pb_nodes, @@ -690,14 +1201,7 @@ mod tests { } }); - assert_eq!( - status, - pb_actor_status - .get(&pb_actor_id) - .cloned() - .map(ActorStatus) - .unwrap() - ); + assert_eq!(status, ActorStatus::Running); assert_eq!( splits, @@ -727,7 +1231,7 @@ mod tests { PbFragmentDistributionType::from(fragment.distribution_type) as i32 ); assert_eq!( - pb_vnode_mapping.map(FragmentVnodeMapping), + pb_vnode_mapping.map(FragmentVnodeMapping).unwrap(), fragment.vnode_mapping ); diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 6291633ba81ca..eb61d9895b617 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -12,15 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use anyhow::anyhow; use risingwave_meta_model_migration::WithQuery; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::{ connection, function, index, object, object_dependency, schema, sink, source, table, user, - user_privilege, view, DataTypeArray, DatabaseId, ObjectId, PrivilegeId, SchemaId, UserId, + user_privilege, view, worker_property, DataTypeArray, DatabaseId, I32Array, ObjectId, + PrivilegeId, SchemaId, UserId, WorkerId, }; use risingwave_pb::catalog::{PbConnection, PbFunction}; +use risingwave_pb::common::PbParallelUnit; use risingwave_pb::user::grant_privilege::{PbAction, PbActionWithGrantOption, PbObject}; use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo}; use sea_orm::sea_query::{ @@ -557,3 +561,37 @@ pub fn extract_grant_obj_id(object: &PbObject) -> ObjectId { _ => unreachable!("invalid object type: {:?}", object), } } + +// todo: deprecate parallel units and avoid this query. +pub async fn get_parallel_unit_mapping(db: &C) -> MetaResult> +where + C: ConnectionTrait, +{ + let parallel_units: Vec<(WorkerId, I32Array)> = WorkerProperty::find() + .select_only() + .columns([ + worker_property::Column::WorkerId, + worker_property::Column::ParallelUnitIds, + ]) + .into_tuple() + .all(db) + .await?; + let parallel_units_map = parallel_units + .into_iter() + .flat_map(|(worker_id, parallel_unit_ids)| { + parallel_unit_ids + .into_inner() + .into_iter() + .map(move |parallel_unit_id| { + ( + parallel_unit_id as _, + PbParallelUnit { + id: parallel_unit_id as _, + worker_node_id: worker_id as _, + }, + ) + }) + }) + .collect(); + Ok(parallel_units_map) +}