diff --git a/src/meta/model_v2/src/fragment.rs b/src/meta/model_v2/src/fragment.rs index bd6641f36165a..9d6f618ae7217 100644 --- a/src/meta/model_v2/src/fragment.rs +++ b/src/meta/model_v2/src/fragment.rs @@ -15,7 +15,7 @@ use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; use sea_orm::entity::prelude::*; -use crate::{FragmentId, FragmentVnodeMapping, StreamNode, TableId, U32Array, VnodeBitmap}; +use crate::{FragmentId, FragmentVnodeMapping, StreamNode, TableId, U32Array}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] #[sea_orm(table_name = "fragment")] diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 8c1153e41718b..0abfd5f4b354f 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -14,11 +14,10 @@ use std::iter; -use anyhow::{anyhow, Context}; +use anyhow::anyhow; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::catalog::{DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS}; -use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::object::ObjectType; use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::{ @@ -55,8 +54,8 @@ use crate::{MetaError, MetaResult}; /// `CatalogController` is the controller for catalog related operations, including database, schema, table, view, etc. pub struct CatalogController { - env: MetaSrvEnv, - inner: RwLock, + pub(crate) env: MetaSrvEnv, + pub(crate) inner: RwLock, } #[derive(Clone, Default)] @@ -80,8 +79,8 @@ impl CatalogController { } } -struct CatalogControllerInner { - db: DatabaseConnection, +pub(crate) struct CatalogControllerInner { + pub(crate) db: DatabaseConnection, } impl CatalogController { diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index f590eb5a3fab5..a3f8d3f7dc6c2 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::mem::swap; use anyhow::Context; @@ -20,43 +20,19 @@ use risingwave_common::bail; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::{ actor, fragment, ActorStatus, ActorUpstreamActors, ConnectorSplits, Dispatchers, - FragmentVnodeMapping, I32Array, StreamNode, TableId, U32Array, VnodeBitmap, + FragmentVnodeMapping, StreamNode, TableId, U32Array, VnodeBitmap, }; use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; -use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment}; +use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment, PbState}; use risingwave_pb::meta::PbTableFragments; use risingwave_pb::source::PbConnectorSplits; use risingwave_pb::stream_plan::stream_node::NodeBody; -use risingwave_pb::stream_plan::StreamActor; -use sea_orm::{ActiveModelTrait, EntityTrait}; +use risingwave_pb::stream_plan::{PbStreamEnvironment, StreamActor}; use crate::controller::catalog::CatalogController; use crate::MetaResult; impl CatalogController { - pub async fn load_fragment( - &self, - _fragment_id: crate::model::FragmentId, - ) -> MetaResult { - // let inner = self.inner.read().await; - // - // let all = Fragment::find_by_id(fragment_id) - // .find_also_related(Actor) - // .all(&inner.db) - // .await?; - // - // fn uname(fragment: Fragment, actors: Vec) -> PbFragment { - // let Fragment {} = fragment; - // }; - // - // let actors = vec![]; - // - - todo!() - - // Ok(ObjectModel(conn, obj.unwrap()).into()) - } - pub fn extract_fragment_and_actors_from_table_fragments( PbTableFragments { table_id, @@ -68,6 +44,8 @@ impl CatalogController { ) -> MetaResult)>> { let mut result = vec![]; + let fragments: BTreeMap<_, _> = fragments.into_iter().collect(); + for (_, fragment) in fragments { let (fragment, actors) = Self::extract_fragment_and_actors( table_id, @@ -134,8 +112,6 @@ impl CatalogController { } }); - assert!(!upstream_actors.is_empty()); - let StreamActor { actor_id, fragment_id, @@ -166,12 +142,15 @@ impl CatalogController { as _; assert_eq!( - pb_upstream_actor_id, + pb_upstream_actor_id + .iter() + .cloned() + .collect::>(), upstream_actors .values() .flatten() .cloned() - .collect::>() + .collect::>() ); actors.push(actor::Model { @@ -211,6 +190,9 @@ impl CatalogController { } pub fn compose_table_fragments( + table_id: u32, + state: PbState, + env: Option, fragments: Vec<(fragment::Model, Vec)>, ) -> MetaResult { let mut pb_fragments = HashMap::new(); @@ -228,18 +210,19 @@ impl CatalogController { } let table_fragments = PbTableFragments { - table_id: 0, - state: 0, + table_id, + state: state as _, fragments: pb_fragments, actor_status: pb_actor_status, actor_splits: pb_actor_splits, - env: None, + env, }; Ok(table_fragments) } - pub fn compose_fragment( + #[allow(clippy::type_complexity)] + pub(crate) fn compose_fragment( fragment: fragment::Model, actors: Vec, ) -> MetaResult<( @@ -347,31 +330,30 @@ impl CatalogController { } } +#[cfg(test)] mod tests { use std::collections::{BTreeMap, HashMap}; + use std::default::Default; use itertools::Itertools; use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping}; - use risingwave_common::range::RangeBoundsExt; + use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::fragment::DistributionType; use risingwave_meta_model_v2::{ actor, fragment, ActorStatus, ActorUpstreamActors, ConnectorSplits, Dispatchers, FragmentVnodeMapping, StreamNode, U32Array, VnodeBitmap, }; - use risingwave_pb::common::{ParallelUnit, PbParallelUnit}; - use risingwave_pb::meta::table_fragments::actor_status::PbActorState; + use risingwave_pb::common::ParallelUnit; use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment}; use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; - use risingwave_pb::stream_plan::stream_node::PbNodeBody; + use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody}; use risingwave_pb::stream_plan::{ - Dispatcher, MergeNode, PbDispatcher, PbDispatcherType, PbFragmentTypeFlag, PbHashJoinNode, - PbNoOpNode, PbStreamActor, PbStreamNode, PbUnionNode, StreamActor, + Dispatcher, MergeNode, PbDispatcher, PbDispatcherType, PbFragmentTypeFlag, PbStreamActor, + PbStreamNode, PbUnionNode, StreamActor, }; - use risingwave_common::util::iter_util::ZipEqDebug; - use crate::controller::catalog::CatalogController; use crate::manager::TableId; use crate::model::{ActorId, FragmentId}; @@ -388,30 +370,28 @@ mod tests { fn generate_parallel_units(count: u32) -> Vec { (0..count) .map(|parallel_unit_id| ParallelUnit { - id: parallel_unit_id as u32, + id: parallel_unit_id, ..Default::default() }) .collect_vec() } fn generate_dispatchers_for_actor(actor_id: u32) -> Vec { - let mut dispatchers = vec![]; - - dispatchers.push(PbDispatcher { + vec![PbDispatcher { r#type: PbDispatcherType::Hash as _, - dist_key_indices: vec![1], - output_indices: vec![1], - hash_mapping: None, - dispatcher_id: (actor_id + 200) as u64, - downstream_actor_id: vec![actor_id + 200], - }); - - dispatchers + dispatcher_id: actor_id as u64, + downstream_actor_id: vec![actor_id], + ..Default::default() + }] } fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> BTreeMap> { let mut upstream_actor_ids = BTreeMap::new(); upstream_actor_ids.insert(TEST_UPSTREAM_FRAGMENT_ID, vec![(actor_id + 100) as ActorId]); + upstream_actor_ids.insert( + TEST_UPSTREAM_FRAGMENT_ID + 1, + vec![(actor_id + 200) as ActorId], + ); upstream_actor_ids } @@ -423,7 +403,7 @@ mod tests { input.push(PbStreamNode { node_body: Some(PbNodeBody::Merge(MergeNode { upstream_actor_id: upstream_actor_ids.clone(), - upstream_fragment_id: upstream_fragment_id.clone(), + upstream_fragment_id: *upstream_fragment_id, ..Default::default() })), ..Default::default() @@ -442,7 +422,7 @@ mod tests { let actor_count = 3u32; let parallel_units = generate_parallel_units(actor_count); let parallel_unit_mapping = ParallelUnitMapping::build(¶llel_units); - let mut actor_vnode_bitmaps = parallel_unit_mapping.to_bitmaps(); + let actor_vnode_bitmaps = parallel_unit_mapping.to_bitmaps(); let upstream_actor_ids: HashMap>> = (0 ..actor_count) @@ -473,7 +453,7 @@ mod tests { }) .collect_vec(); - let mut pb_fragment = PbFragment { + let pb_fragment = PbFragment { fragment_id: TEST_FRAGMENT_ID, fragment_type_mask: PbFragmentTypeFlag::Source as _, distribution_type: PbFragmentDistributionType::Hash as _, @@ -508,12 +488,40 @@ mod tests { &pb_actor_splits, )?; + check_fragment_template(fragment.clone(), pb_actors.clone(), &upstream_actor_ids); check_fragment(fragment, pb_fragment); check_actors(actors, pb_actors, pb_actor_status, pb_actor_splits); Ok(()) } + fn check_fragment_template( + fragment: fragment::Model, + actors: Vec, + upstream_actor_ids: &HashMap>>, + ) { + let stream_node_template = fragment.stream_node.clone(); + + for PbStreamActor { + nodes, actor_id, .. + } in actors + { + let mut template_node = stream_node_template.clone().into_inner(); + let nodes = nodes.unwrap(); + let actor_upstream_actor_ids = upstream_actor_ids.get(&actor_id).cloned().unwrap(); + visit_stream_node(&mut template_node, |body| { + if let NodeBody::Merge(m) = body { + m.upstream_actor_id = actor_upstream_actor_ids + .get(&m.upstream_fragment_id) + .cloned() + .unwrap(); + } + }); + + assert_eq!(nodes, template_node); + } + } + #[tokio::test] async fn test_compose_fragment() -> MetaResult<()> { let actor_count = 3u32; @@ -521,6 +529,11 @@ mod tests { let parallel_unit_mapping = ParallelUnitMapping::build(¶llel_units); let mut actor_vnode_bitmaps = parallel_unit_mapping.to_bitmaps(); + let upstream_actor_ids: HashMap>> = (0 + ..actor_count) + .map(|actor_id| (actor_id, generate_upstream_actor_ids_for_actor(actor_id))) + .collect(); + let actors = (0..actor_count) .map(|actor_id| { let parallel_unit_id = actor_id as ParallelUnitId; @@ -531,17 +544,17 @@ mod tests { let actor_status = ActorStatus(PbActorStatus { parallel_unit: Some(parallel_units[actor_id as usize].clone()), - state: PbActorState::Running as _, + ..Default::default() }); let actor_splits = Some(ConnectorSplits(PbConnectorSplits { splits: vec![PbConnectorSplit { split_type: "dummy".to_string(), - encoded_split: vec![], + ..Default::default() }], })); - let upstream_actor_ids = generate_upstream_actor_ids_for_actor(actor_id); + let actor_upstream_actor_ids = upstream_actor_ids.get(&actor_id).cloned().unwrap(); let dispatchers = generate_dispatchers_for_actor(actor_id); actor::Model { @@ -550,21 +563,24 @@ mod tests { status: actor_status, splits: actor_splits, parallel_unit_id: parallel_unit_id as i32, - upstream_actor_ids: ActorUpstreamActors(upstream_actor_ids), + upstream_actor_ids: ActorUpstreamActors(actor_upstream_actor_ids), dispatchers: Dispatchers(dispatchers), vnode_bitmap, } }) .collect_vec(); - - let stream_node = { - let mut mapping = BTreeMap::new(); - mapping.insert(1 as FragmentId, vec![]); - mapping.insert(2 as FragmentId, vec![]); + let template_actor = actors.first().cloned().unwrap(); - generate_merger_stream_node(&mapping) + let template_upstream_actor_ids = template_actor + .upstream_actor_ids + .into_inner() + .into_keys() + .map(|k| (k, vec![])) + .collect(); + + generate_merger_stream_node(&template_upstream_actor_ids) }; let fragment = fragment::Model { @@ -594,6 +610,7 @@ mod tests { let pb_actors = pb_fragment.actors.clone(); + check_fragment_template(fragment.clone(), pb_actors.clone(), &upstream_actor_ids); check_fragment(fragment, pb_fragment); check_actors(actors, pb_actors, pb_actor_status, pb_actor_splits); @@ -620,7 +637,7 @@ mod tests { StreamActor { actor_id: pb_actor_id, fragment_id: pb_fragment_id, - nodes: mut pb_nodes, + nodes: pb_nodes, dispatcher: pb_dispatcher, upstream_actor_id: pb_upstream_actor_id, vnode_bitmap: pb_vnode_bitmap, @@ -696,7 +713,7 @@ mod tests { PbFragmentDistributionType::from(fragment.distribution_type) as i32 ); assert_eq!( - pb_vnode_mapping.map(|mapping| FragmentVnodeMapping(mapping)), + pb_vnode_mapping.map(FragmentVnodeMapping), fragment.vnode_mapping ); diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 561167cba4255..7fe9de46e5742 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -31,7 +31,7 @@ use crate::MetaError; #[allow(dead_code)] pub mod catalog; pub mod cluster; -mod fragment; +pub mod fragment; pub mod rename; pub mod system_param; pub mod utils; diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 8f6e7c0be6915..bea3961790b53 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -473,6 +473,19 @@ impl DdlController { } }; + use crate::model::MetadataModel; + let x = crate::controller::catalog::CatalogController::extract_fragment_and_actors_from_table_fragments(table_fragments.clone().to_protobuf()).unwrap(); + let xx = crate::controller::catalog::CatalogController::compose_table_fragments( + table_fragments.table_id().table_id, + table_fragments.state(), + Some(table_fragments.env.to_protobuf()), + x.clone(), + ) + .unwrap(); + let y = crate::controller::catalog::CatalogController::extract_fragment_and_actors_from_table_fragments(xx).unwrap(); + + assert_eq!(x, y); + match create_type { CreateType::Foreground | CreateType::Unspecified => { self.create_streaming_job_inner(stream_job, table_fragments, ctx, internal_tables)