From b5c248d91a8bc1a4e30b8045d86e7e0b3c6d7062 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 11 Jan 2024 16:00:40 +0800 Subject: [PATCH] chore: add some comments for creating streaming job --- proto/ddl_service.proto | 11 +++++- src/meta/src/controller/fragment.rs | 13 ++----- src/meta/src/manager/catalog/fragment.rs | 20 ++++------- src/meta/src/manager/metadata.rs | 10 +++--- src/meta/src/manager/streaming_job.rs | 33 +++++++++++------ src/meta/src/rpc/ddl_controller.rs | 12 +++---- src/meta/src/stream/stream_graph/fragment.rs | 38 ++++++++++++-------- 7 files changed, 77 insertions(+), 60 deletions(-) diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index db910930b5be..479915095628 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -137,10 +137,19 @@ message DropViewResponse { uint64 version = 2; } -// An enum to distinguish different types of the Table streaming job. +// An enum to distinguish different types of the `Table` streaming job. // - GENERAL: Table streaming jobs w/ or w/o a connector // - SHARED_CDC_SOURCE: The table streaming job is created based on a shared CDC source job (risingwavelabs/rfcs#73). +// // And one may add other types to support Table jobs that based on other backfill-able sources (risingwavelabs/rfcs#72). +// +// Currently, it's usages include: +// - When creating the streaming actor graph, different table jobs may need different treatment. +// + CDC table and table w/ backfill-able source connector have an upstream `Source` fragment. +// + Other tables don't have an upstream fragment. +// - Some adhoc validation when creating the streaming job. e.g., `validate_cdc_table`. +// +// It's not included in `catalog.Table`, and thus not persisted. It's only used in the `CreateTableRequest`. enum TableJobType { TABLE_JOB_TYPE_UNSPECIFIED = 0; // table streaming jobs excepts the `SHARED_CDC_SOURCE` type diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index e1849f8407a4..2b72a676d3a3 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -27,7 +27,6 @@ use risingwave_meta_model_v2::{ TableId, VnodeBitmap, WorkerId, }; use risingwave_pb::common::PbParallelUnit; -use risingwave_pb::ddl_service::PbTableJobType; use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Operation as NotificationOperation, }; @@ -1038,7 +1037,6 @@ impl CatalogController { pub async fn get_upstream_root_fragments( &self, upstream_job_ids: Vec, - job_type: Option, ) -> MetaResult> { let inner = self.inner.read().await; @@ -1046,14 +1044,9 @@ impl CatalogController { .filter(fragment::Column::JobId.is_in(upstream_job_ids)) .all(&inner.db) .await?; - fragments.retain(|f| match job_type { - Some(PbTableJobType::SharedCdcSource) => { - f.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 - } - // MV on MV, and other kinds of table job - None | Some(PbTableJobType::General) | Some(PbTableJobType::Unspecified) => { - f.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0 - } + fragments.retain(|f| { + f.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 + || f.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0 }); let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?; diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index d359c3fa453c..ae6ba74844f3 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -24,7 +24,6 @@ use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping} use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont}; use risingwave_connector::source::SplitImpl; use risingwave_meta_model_v2::SourceId; -use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; @@ -1402,7 +1401,6 @@ impl FragmentManager { pub async fn get_upstream_root_fragments( &self, upstream_table_ids: &HashSet, - table_job_type: Option, ) -> MetaResult> { let map = &self.core.read().await.table_fragments; let mut fragments = HashMap::new(); @@ -1411,18 +1409,12 @@ impl FragmentManager { let table_fragments = map .get(&table_id) .with_context(|| format!("table_fragment not exist: id={}", table_id))?; - match table_job_type.as_ref() { - Some(TableJobType::SharedCdcSource) => { - if let Some(fragment) = table_fragments.source_fragment() { - fragments.insert(table_id, fragment); - } - } - // MV on MV, and other kinds of table job - None | Some(TableJobType::General) | Some(TableJobType::Unspecified) => { - if let Some(fragment) = table_fragments.mview_fragment() { - fragments.insert(table_id, fragment); - } - } + + if let Some(fragment) = table_fragments.mview_fragment() { + fragments.insert(table_id, fragment); + } + if let Some(fragment) = table_fragments.source_fragment() { + fragments.insert(table_id, fragment); } } diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 67f077f55316..f85e7ee73960 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -18,7 +18,6 @@ use risingwave_common::catalog::{TableId, TableOption}; use risingwave_pb::catalog::PbSource; use risingwave_pb::common::worker_node::{PbResource, State}; use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerType}; -use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::table_fragments::Fragment; use risingwave_pb::stream_plan::PbStreamActor; @@ -174,15 +173,19 @@ impl MetadataManager { } } + /// Get and filter the upstream fragments of the specified relations. + /// + /// - MV/Sink/Index should have MV upstream fragments for upstream MV/Tables, and Source upstream fragments for upstream backfill-able sources. + /// - CDC Table has a Source upstream fragment. + /// - Sources and other Tables shouldn't have an upstream fragment. pub async fn get_upstream_root_fragments( &self, upstream_table_ids: &HashSet, - table_job_type: Option, ) -> MetaResult> { match self { MetadataManager::V1(mgr) => { mgr.fragment_manager - .get_upstream_root_fragments(upstream_table_ids, table_job_type) + .get_upstream_root_fragments(upstream_table_ids) .await } MetadataManager::V2(mgr) => { @@ -193,7 +196,6 @@ impl MetadataManager { .iter() .map(|id| id.table_id as _) .collect(), - table_job_type, ) .await?; Ok(upstream_root_fragments diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index b5d63256ccb6..54dfdb9a22ab 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -26,8 +26,6 @@ use crate::model::FragmentId; // This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and // Sink. #[derive(Debug, Clone, EnumDiscriminants)] -#[strum_discriminants(name(DdlType))] -#[strum_discriminants(vis(pub))] pub enum StreamingJob { MaterializedView(Table), Sink(Sink, Option<(Table, Option)>), @@ -36,13 +34,34 @@ pub enum StreamingJob { Source(PbSource), } +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum DdlType { + MaterializedView, + Sink, + Table(TableJobType), + Index, + Source, +} + +impl From<&StreamingJob> for DdlType { + fn from(job: &StreamingJob) -> Self { + match job { + StreamingJob::MaterializedView(_) => DdlType::MaterializedView, + StreamingJob::Sink(_, _) => DdlType::Sink, + StreamingJob::Table(_, _, ty) => DdlType::Table(*ty), + StreamingJob::Index(_, _) => DdlType::Index, + StreamingJob::Source(_) => DdlType::Source, + } + } +} + #[cfg(test)] #[allow(clippy::derivable_impls)] impl Default for DdlType { fn default() -> Self { // This should not be used by mock services, // so we can just pick an arbitrary default variant. - DdlType::Table + DdlType::MaterializedView } } @@ -259,14 +278,6 @@ impl StreamingJob { } } - pub fn table_job_type(&self) -> Option { - if let Self::Table(.., sub_type) = self { - Some(*sub_type) - } else { - None - } - } - // TODO: record all objects instead. pub fn dependent_relations(&self) -> Vec { match self { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index c61338573512..dba3f261df47 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1255,6 +1255,7 @@ impl DdlController { } /// Builds the actor graph: + /// - Add the upstream fragments to the fragment graph /// - Schedule the fragments based on their distribution /// - Expand each fragment into one or several actors pub(crate) async fn build_stream_job( @@ -1274,10 +1275,7 @@ impl DdlController { let upstream_root_fragments = self .metadata_manager - .get_upstream_root_fragments( - fragment_graph.dependent_table_ids(), - stream_job.table_job_type(), - ) + .get_upstream_root_fragments(fragment_graph.dependent_table_ids()) .await?; let upstream_actors: HashMap<_, _> = upstream_root_fragments @@ -1293,7 +1291,7 @@ impl DdlController { let complete_graph = CompleteStreamFragmentGraph::with_upstreams( fragment_graph, upstream_root_fragments, - stream_job.table_job_type(), + stream_job.into(), )?; // 2. Build the actor graph. @@ -1713,6 +1711,7 @@ impl DdlController { fragment_graph, original_table_fragment.fragment_id, downstream_fragments, + stream_job.into(), )?; // 2. Build the actor graph. @@ -1975,7 +1974,8 @@ impl DdlController { } } -/// Fill in necessary information for table stream graph. +/// Fill in necessary information for `Table` stream graph. +/// e.g., fill source id for table with connector, fill external table id for CDC table. pub fn fill_table_stream_graph_info( source: &mut PbSource, table: &mut PbTable, diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index e4b2b03004fe..0e04df72fe54 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -38,7 +38,7 @@ use risingwave_pb::stream_plan::{ StreamFragmentGraph as StreamFragmentGraphProto, }; -use crate::manager::{MetaSrvEnv, StreamingJob}; +use crate::manager::{DdlType, MetaSrvEnv, StreamingJob}; use crate::model::FragmentId; use crate::stream::stream_graph::id::{GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen}; use crate::stream::stream_graph::schedule::Distribution; @@ -261,6 +261,10 @@ impl StreamFragmentEdge { /// In-memory representation of a **Fragment** Graph, built from the [`StreamFragmentGraphProto`] /// from the frontend. +/// +/// This only includes nodes and edges of the current job itself. It will be converted to [`CompleteStreamFragmentGraph`] later, +/// that contains the additional information of pre-existing +/// fragments, which are connected to the graph's top-most or bottom-most fragments. #[derive(Default)] pub struct StreamFragmentGraph { /// stores all the fragments in the graph. @@ -484,8 +488,8 @@ pub(super) enum EitherFragment { Existing(Fragment), } -/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of existing -/// fragments, which is connected to the graph's top-most or bottom-most fragments. +/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of pre-existing +/// fragments, which are connected to the graph's top-most or bottom-most fragments. /// /// For example, /// - if we're going to build a mview on an existing mview, the upstream fragment containing the @@ -530,12 +534,12 @@ impl CompleteStreamFragmentGraph { } } - /// Create a new [`CompleteStreamFragmentGraph`] for MV on MV or Table on CDC Source, with the upstream existing + /// Create a new [`CompleteStreamFragmentGraph`] for MV on MV and CDC/Source Table with the upstream existing /// `Materialize` or `Source` fragments. pub fn with_upstreams( graph: StreamFragmentGraph, upstream_root_fragments: HashMap, - table_job_type: Option, + ddl_type: DdlType, ) -> MetaResult { Self::build_helper( graph, @@ -543,7 +547,7 @@ impl CompleteStreamFragmentGraph { upstream_root_fragments, }), None, - table_job_type, + ddl_type, ) } @@ -553,6 +557,7 @@ impl CompleteStreamFragmentGraph { graph: StreamFragmentGraph, original_table_fragment_id: FragmentId, downstream_fragments: Vec<(DispatchStrategy, Fragment)>, + ddl_type: DdlType, ) -> MetaResult { Self::build_helper( graph, @@ -561,15 +566,16 @@ impl CompleteStreamFragmentGraph { original_table_fragment_id, downstream_fragments, }), - None, + ddl_type, ) } + /// The core logic of building a [`CompleteStreamFragmentGraph`], i.e., adding extra upstream/downstream fragments. fn build_helper( mut graph: StreamFragmentGraph, upstream_ctx: Option, downstream_ctx: Option, - table_job_type: Option, + ddl_type: DdlType, ) -> MetaResult { let mut extra_downstreams = HashMap::new(); let mut extra_upstreams = HashMap::new(); @@ -579,12 +585,10 @@ impl CompleteStreamFragmentGraph { upstream_root_fragments, }) = upstream_ctx { - // Build the extra edges between the upstream `Materialize` and the downstream `StreamScan` - // of the new materialized view. for (&id, fragment) in &mut graph.fragments { for (&upstream_table_id, output_columns) in &fragment.upstream_table_columns { - let (up_fragment_id, edge) = match table_job_type.as_ref() { - Some(TableJobType::SharedCdcSource) => { + let (up_fragment_id, edge) = match ddl_type { + DdlType::Table(TableJobType::SharedCdcSource) => { let source_fragment = upstream_root_fragments .get(&upstream_table_id) .context("upstream source fragment not found")?; @@ -620,8 +624,11 @@ impl CompleteStreamFragmentGraph { (source_job_id, edge) } - _ => { - // handle other kinds of streaming graph, normally MV on MV + DdlType::MaterializedView | DdlType::Sink | DdlType::Index => { + // handle MV on MV + + // Build the extra edges between the upstream `Materialize` and the downstream `StreamScan` + // of the new materialized view. let mview_fragment = upstream_root_fragments .get(&upstream_table_id) .context("upstream materialized view fragment not found")?; @@ -668,6 +675,9 @@ impl CompleteStreamFragmentGraph { (mview_id, edge) } + DdlType::Source | DdlType::Table(_) => { + bail!("the streaming job shouldn't have an upstream fragment, ddl_type: {:?}", ddl_type) + } }; // put the edge into the extra edges