From bf901d1665530926cd78797e01e96c78123ed2ed Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 12 Jan 2024 15:44:10 +0800 Subject: [PATCH] refactor(meta): refactor how upstream fragment is handled when creating stream job (#14510) --- proto/ddl_service.proto | 9 ++++- proto/stream_plan.proto | 3 +- src/meta/src/controller/fragment.rs | 24 ++++++------- src/meta/src/manager/catalog/fragment.rs | 21 ++++------- src/meta/src/manager/metadata.rs | 19 +++++++--- src/meta/src/manager/streaming_job.rs | 33 +++++++++++------ src/meta/src/rpc/ddl_controller.rs | 12 +++---- src/meta/src/stream/stream_graph/fragment.rs | 38 ++++++++++++-------- 8 files changed, 94 insertions(+), 65 deletions(-) diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index db910930b5bee..1b584a7df78e1 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -137,10 +137,17 @@ message DropViewResponse { uint64 version = 2; } -// An enum to distinguish different types of the Table streaming job. +// An enum to distinguish different types of the `Table` streaming job. // - GENERAL: Table streaming jobs w/ or w/o a connector // - SHARED_CDC_SOURCE: The table streaming job is created based on a shared CDC source job (risingwavelabs/rfcs#73). +// // And one may add other types to support Table jobs that based on other backfill-able sources (risingwavelabs/rfcs#72). +// +// Currently, it's usages include: +// - When creating the streaming actor graph, different table jobs may need different treatment. +// - Some adhoc validation when creating the streaming job. e.g., `validate_cdc_table`. +// +// It's not included in `catalog.Table`, and thus not persisted. It's only used in the `CreateTableRequest`. enum TableJobType { TABLE_JOB_TYPE_UNSPECIFIED = 0; // table streaming jobs excepts the `SHARED_CDC_SOURCE` type diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index a168ea163f5b5..e69a712c9e3d8 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -839,6 +839,7 @@ message StreamActor { plan_common.ExprContext expr_context = 10; } +// Indicates whether the fragment contains some special kind of nodes. enum FragmentTypeFlag { FRAGMENT_TYPE_FLAG_FRAGMENT_UNSPECIFIED = 0; FRAGMENT_TYPE_FLAG_SOURCE = 1; @@ -864,7 +865,7 @@ message StreamFragmentGraph { uint32 fragment_id = 1; // root stream node in this fragment. StreamNode node = 2; - // Bitwise-OR of FragmentTypeFlags + // Bitwise-OR of `FragmentTypeFlag`s uint32 fragment_type_mask = 3; // Mark whether this fragment requires exactly one actor. // Note: if this is `false`, the fragment may still be a singleton according to the scheduler. diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index d0c39694692b1..bec61ff0f2166 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -27,7 +27,6 @@ use risingwave_meta_model_v2::{ StreamNode, TableId, VnodeBitmap, WorkerId, }; use risingwave_pb::common::PbParallelUnit; -use risingwave_pb::ddl_service::PbTableJobType; use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Operation as NotificationOperation, }; @@ -1037,31 +1036,30 @@ impl CatalogController { Ok(actors) } - /// Get and filter the upstream `Materialize` or `Source` fragments of the specified relations. pub async fn get_upstream_root_fragments( &self, upstream_job_ids: Vec, - job_type: Option, ) -> MetaResult> { let inner = self.inner.read().await; - let mut fragments = Fragment::find() + let all_upstream_fragments = Fragment::find() .filter(fragment::Column::JobId.is_in(upstream_job_ids)) .all(&inner.db) .await?; - fragments.retain(|f| match job_type { - Some(PbTableJobType::SharedCdcSource) => { - f.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 - } - // MV on MV, and other kinds of table job - None | Some(PbTableJobType::General) | Some(PbTableJobType::Unspecified) => { - f.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0 + // job_id -> fragment + let mut fragments = HashMap::::new(); + for fragment in all_upstream_fragments { + if fragment.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0 { + _ = fragments.insert(fragment.job_id, fragment); + } else if fragment.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 { + // look for Source fragment if there's no MView fragment + _ = fragments.try_insert(fragment.job_id, fragment); } - }); + } let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?; let mut root_fragments = HashMap::new(); - for fragment in fragments { + for (_, fragment) in fragments { let actors = fragment.find_related(Actor).all(&inner.db).await?; let actor_dispatchers = get_actor_dispatchers( &inner.db, diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index d359c3fa453c9..89ea2de7148a7 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -24,7 +24,6 @@ use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping} use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont}; use risingwave_connector::source::SplitImpl; use risingwave_meta_model_v2::SourceId; -use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; @@ -1398,11 +1397,9 @@ impl FragmentManager { .mview_actor_ids()) } - /// Get and filter the upstream `Materialize` or `Source` fragments of the specified relations. pub async fn get_upstream_root_fragments( &self, upstream_table_ids: &HashSet, - table_job_type: Option, ) -> MetaResult> { let map = &self.core.read().await.table_fragments; let mut fragments = HashMap::new(); @@ -1411,18 +1408,12 @@ impl FragmentManager { let table_fragments = map .get(&table_id) .with_context(|| format!("table_fragment not exist: id={}", table_id))?; - match table_job_type.as_ref() { - Some(TableJobType::SharedCdcSource) => { - if let Some(fragment) = table_fragments.source_fragment() { - fragments.insert(table_id, fragment); - } - } - // MV on MV, and other kinds of table job - None | Some(TableJobType::General) | Some(TableJobType::Unspecified) => { - if let Some(fragment) = table_fragments.mview_fragment() { - fragments.insert(table_id, fragment); - } - } + + if let Some(fragment) = table_fragments.mview_fragment() { + fragments.insert(table_id, fragment); + } else if let Some(fragment) = table_fragments.source_fragment() { + // look for Source fragment if there's no MView fragment + fragments.insert(table_id, fragment); } } diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 0d50f7e1dc8c4..450a920c379d0 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -19,7 +19,6 @@ use risingwave_meta_model_v2::SourceId; use risingwave_pb::catalog::PbSource; use risingwave_pb::common::worker_node::{PbResource, State}; use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerType}; -use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::table_fragments::Fragment; use risingwave_pb::stream_plan::PbStreamActor; @@ -175,15 +174,28 @@ impl MetadataManager { } } + /// Get and filter the "**root**" fragments of the specified relations. + /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`. + /// + /// ## What can be the root fragment + /// - For MV, it should have one `MView` fragment. + /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root. + /// - For source, it should have one `Source` fragment. + /// + /// In other words, it's the `MView` fragment if it exists, otherwise it's the `Source` fragment. + /// + /// ## What do we expect to get for different creating streaming job + /// - MV/Sink/Index should have MV upstream fragments for upstream MV/Tables, and Source upstream fragments for upstream backfill-able sources. + /// - CDC Table has a Source upstream fragment. + /// - Sources and other Tables shouldn't have an upstream fragment. pub async fn get_upstream_root_fragments( &self, upstream_table_ids: &HashSet, - table_job_type: Option, ) -> MetaResult> { match self { MetadataManager::V1(mgr) => { mgr.fragment_manager - .get_upstream_root_fragments(upstream_table_ids, table_job_type) + .get_upstream_root_fragments(upstream_table_ids) .await } MetadataManager::V2(mgr) => { @@ -194,7 +206,6 @@ impl MetadataManager { .iter() .map(|id| id.table_id as _) .collect(), - table_job_type, ) .await?; Ok(upstream_root_fragments diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index b5d63256ccb6e..54dfdb9a22abb 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -26,8 +26,6 @@ use crate::model::FragmentId; // This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and // Sink. #[derive(Debug, Clone, EnumDiscriminants)] -#[strum_discriminants(name(DdlType))] -#[strum_discriminants(vis(pub))] pub enum StreamingJob { MaterializedView(Table), Sink(Sink, Option<(Table, Option)>), @@ -36,13 +34,34 @@ pub enum StreamingJob { Source(PbSource), } +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum DdlType { + MaterializedView, + Sink, + Table(TableJobType), + Index, + Source, +} + +impl From<&StreamingJob> for DdlType { + fn from(job: &StreamingJob) -> Self { + match job { + StreamingJob::MaterializedView(_) => DdlType::MaterializedView, + StreamingJob::Sink(_, _) => DdlType::Sink, + StreamingJob::Table(_, _, ty) => DdlType::Table(*ty), + StreamingJob::Index(_, _) => DdlType::Index, + StreamingJob::Source(_) => DdlType::Source, + } + } +} + #[cfg(test)] #[allow(clippy::derivable_impls)] impl Default for DdlType { fn default() -> Self { // This should not be used by mock services, // so we can just pick an arbitrary default variant. - DdlType::Table + DdlType::MaterializedView } } @@ -259,14 +278,6 @@ impl StreamingJob { } } - pub fn table_job_type(&self) -> Option { - if let Self::Table(.., sub_type) = self { - Some(*sub_type) - } else { - None - } - } - // TODO: record all objects instead. pub fn dependent_relations(&self) -> Vec { match self { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 3937944e66d9b..1ab3aad2ddf3d 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1259,6 +1259,7 @@ impl DdlController { } /// Builds the actor graph: + /// - Add the upstream fragments to the fragment graph /// - Schedule the fragments based on their distribution /// - Expand each fragment into one or several actors pub(crate) async fn build_stream_job( @@ -1278,10 +1279,7 @@ impl DdlController { let upstream_root_fragments = self .metadata_manager - .get_upstream_root_fragments( - fragment_graph.dependent_table_ids(), - stream_job.table_job_type(), - ) + .get_upstream_root_fragments(fragment_graph.dependent_table_ids()) .await?; let upstream_actors: HashMap<_, _> = upstream_root_fragments @@ -1297,7 +1295,7 @@ impl DdlController { let complete_graph = CompleteStreamFragmentGraph::with_upstreams( fragment_graph, upstream_root_fragments, - stream_job.table_job_type(), + stream_job.into(), )?; // 2. Build the actor graph. @@ -1717,6 +1715,7 @@ impl DdlController { fragment_graph, original_table_fragment.fragment_id, downstream_fragments, + stream_job.into(), )?; // 2. Build the actor graph. @@ -1979,7 +1978,8 @@ impl DdlController { } } -/// Fill in necessary information for table stream graph. +/// Fill in necessary information for `Table` stream graph. +/// e.g., fill source id for table with connector, fill external table id for CDC table. pub fn fill_table_stream_graph_info( source: &mut Option, table: &mut PbTable, diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 4edd3743a1cee..925b01c8cbdcf 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -38,7 +38,7 @@ use risingwave_pb::stream_plan::{ StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanType, }; -use crate::manager::{MetaSrvEnv, StreamingJob}; +use crate::manager::{DdlType, MetaSrvEnv, StreamingJob}; use crate::model::FragmentId; use crate::stream::stream_graph::id::{GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen}; use crate::stream::stream_graph::schedule::Distribution; @@ -283,6 +283,10 @@ impl StreamFragmentEdge { /// In-memory representation of a **Fragment** Graph, built from the [`StreamFragmentGraphProto`] /// from the frontend. +/// +/// This only includes nodes and edges of the current job itself. It will be converted to [`CompleteStreamFragmentGraph`] later, +/// that contains the additional information of pre-existing +/// fragments, which are connected to the graph's top-most or bottom-most fragments. #[derive(Default)] pub struct StreamFragmentGraph { /// stores all the fragments in the graph. @@ -514,8 +518,8 @@ pub(super) enum EitherFragment { Existing(Fragment), } -/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of existing -/// fragments, which is connected to the graph's top-most or bottom-most fragments. +/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of pre-existing +/// fragments, which are connected to the graph's top-most or bottom-most fragments. /// /// For example, /// - if we're going to build a mview on an existing mview, the upstream fragment containing the @@ -560,12 +564,12 @@ impl CompleteStreamFragmentGraph { } } - /// Create a new [`CompleteStreamFragmentGraph`] for MV on MV or Table on CDC Source, with the upstream existing + /// Create a new [`CompleteStreamFragmentGraph`] for MV on MV and CDC/Source Table with the upstream existing /// `Materialize` or `Source` fragments. pub fn with_upstreams( graph: StreamFragmentGraph, upstream_root_fragments: HashMap, - table_job_type: Option, + ddl_type: DdlType, ) -> MetaResult { Self::build_helper( graph, @@ -573,7 +577,7 @@ impl CompleteStreamFragmentGraph { upstream_root_fragments, }), None, - table_job_type, + ddl_type, ) } @@ -583,6 +587,7 @@ impl CompleteStreamFragmentGraph { graph: StreamFragmentGraph, original_table_fragment_id: FragmentId, downstream_fragments: Vec<(DispatchStrategy, Fragment)>, + ddl_type: DdlType, ) -> MetaResult { Self::build_helper( graph, @@ -591,15 +596,16 @@ impl CompleteStreamFragmentGraph { original_table_fragment_id, downstream_fragments, }), - None, + ddl_type, ) } + /// The core logic of building a [`CompleteStreamFragmentGraph`], i.e., adding extra upstream/downstream fragments. fn build_helper( mut graph: StreamFragmentGraph, upstream_ctx: Option, downstream_ctx: Option, - table_job_type: Option, + ddl_type: DdlType, ) -> MetaResult { let mut extra_downstreams = HashMap::new(); let mut extra_upstreams = HashMap::new(); @@ -609,13 +615,11 @@ impl CompleteStreamFragmentGraph { upstream_root_fragments, }) = upstream_ctx { - // Build the extra edges between the upstream `Materialize` and the downstream `StreamScan` - // of the new materialized view. for (&id, fragment) in &mut graph.fragments { let uses_arrangement_backfill = fragment.has_arrangement_backfill(); for (&upstream_table_id, output_columns) in &fragment.upstream_table_columns { - let (up_fragment_id, edge) = match table_job_type.as_ref() { - Some(TableJobType::SharedCdcSource) => { + let (up_fragment_id, edge) = match ddl_type { + DdlType::Table(TableJobType::SharedCdcSource) => { let source_fragment = upstream_root_fragments .get(&upstream_table_id) .context("upstream source fragment not found")?; @@ -651,8 +655,11 @@ impl CompleteStreamFragmentGraph { (source_job_id, edge) } - _ => { - // handle other kinds of streaming graph, normally MV on MV + DdlType::MaterializedView | DdlType::Sink | DdlType::Index => { + // handle MV on MV + + // Build the extra edges between the upstream `Materialize` and the downstream `StreamScan` + // of the new materialized view. let mview_fragment = upstream_root_fragments .get(&upstream_table_id) .context("upstream materialized view fragment not found")?; @@ -724,6 +731,9 @@ impl CompleteStreamFragmentGraph { (mview_id, edge) } + DdlType::Source | DdlType::Table(_) => { + bail!("the streaming job shouldn't have an upstream fragment, ddl_type: {:?}", ddl_type) + } }; // put the edge into the extra edges