From 950e10e79d7e186f2134cc1cd1bf050d62114ada Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 11 Jan 2024 16:00:40 +0800 Subject: [PATCH] chore: add some comments for creating streaming job --- proto/ddl_service.proto | 11 +++++++- src/meta/src/manager/catalog/fragment.rs | 1 + src/meta/src/manager/streaming_job.rs | 1 + src/meta/src/rpc/ddl_controller.rs | 4 ++- src/meta/src/stream/stream_graph/fragment.rs | 27 +++++++++++++++----- 5 files changed, 35 insertions(+), 9 deletions(-) diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index db910930b5bee..4799150956286 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -137,10 +137,19 @@ message DropViewResponse { uint64 version = 2; } -// An enum to distinguish different types of the Table streaming job. +// An enum to distinguish different types of the `Table` streaming job. // - GENERAL: Table streaming jobs w/ or w/o a connector // - SHARED_CDC_SOURCE: The table streaming job is created based on a shared CDC source job (risingwavelabs/rfcs#73). +// // And one may add other types to support Table jobs that based on other backfill-able sources (risingwavelabs/rfcs#72). +// +// Currently, it's usages include: +// - When creating the streaming actor graph, different table jobs may need different treatment. +// + CDC table and table w/ backfill-able source connector have an upstream `Source` fragment. +// + Other tables don't have an upstream fragment. +// - Some adhoc validation when creating the streaming job. e.g., `validate_cdc_table`. +// +// It's not included in `catalog.Table`, and thus not persisted. It's only used in the `CreateTableRequest`. enum TableJobType { TABLE_JOB_TYPE_UNSPECIFIED = 0; // table streaming jobs excepts the `SHARED_CDC_SOURCE` type diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index d359c3fa453c9..a73623e78dd90 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -1412,6 +1412,7 @@ impl FragmentManager { .get(&table_id) .with_context(|| format!("table_fragment not exist: id={}", table_id))?; match table_job_type.as_ref() { + // TODO: Some(TableJobType::SharedCdcSource) => { if let Some(fragment) = table_fragments.source_fragment() { fragments.insert(table_id, fragment); diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index b5d63256ccb6e..7bfca33a59579 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -259,6 +259,7 @@ impl StreamingJob { } } + /// If the `StreamingJob` is a `Table`, returns it's sub-type. Otherwise, returns `None`. pub fn table_job_type(&self) -> Option { if let Self::Table(.., sub_type) = self { Some(*sub_type) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index c613385735125..53164b11271fa 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1255,6 +1255,7 @@ impl DdlController { } /// Builds the actor graph: + /// - Add the upstream fragments to the fragment graph /// - Schedule the fragments based on their distribution /// - Expand each fragment into one or several actors pub(crate) async fn build_stream_job( @@ -1975,7 +1976,8 @@ impl DdlController { } } -/// Fill in necessary information for table stream graph. +/// Fill in necessary information for `Table` stream graph. +/// e.g., fill source id for table with connector, fill external table id for CDC table. pub fn fill_table_stream_graph_info( source: &mut PbSource, table: &mut PbTable, diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index e4b2b03004fe8..09bff7909d1e8 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -261,6 +261,10 @@ impl StreamFragmentEdge { /// In-memory representation of a **Fragment** Graph, built from the [`StreamFragmentGraphProto`] /// from the frontend. +/// +/// This only includes nodes and edges of the current job itself. It will be converted to [`CompleteStreamFragmentGraph`] later, +/// that contains the additional information of pre-existing +/// fragments, which are connected to the graph's top-most or bottom-most fragments. #[derive(Default)] pub struct StreamFragmentGraph { /// stores all the fragments in the graph. @@ -484,8 +488,8 @@ pub(super) enum EitherFragment { Existing(Fragment), } -/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of existing -/// fragments, which is connected to the graph's top-most or bottom-most fragments. +/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of pre-existing +/// fragments, which are connected to the graph's top-most or bottom-most fragments. /// /// For example, /// - if we're going to build a mview on an existing mview, the upstream fragment containing the @@ -530,7 +534,7 @@ impl CompleteStreamFragmentGraph { } } - /// Create a new [`CompleteStreamFragmentGraph`] for MV on MV or Table on CDC Source, with the upstream existing + /// Create a new [`CompleteStreamFragmentGraph`] for MV on MV and CDC/Source Table with the upstream existing /// `Materialize` or `Source` fragments. pub fn with_upstreams( graph: StreamFragmentGraph, @@ -565,6 +569,7 @@ impl CompleteStreamFragmentGraph { ) } + /// The core logic of building a [`CompleteStreamFragmentGraph`], i.e., adding extra upstream/downstream fragments. fn build_helper( mut graph: StreamFragmentGraph, upstream_ctx: Option, @@ -579,8 +584,6 @@ impl CompleteStreamFragmentGraph { upstream_root_fragments, }) = upstream_ctx { - // Build the extra edges between the upstream `Materialize` and the downstream `StreamScan` - // of the new materialized view. for (&id, fragment) in &mut graph.fragments { for (&upstream_table_id, output_columns) in &fragment.upstream_table_columns { let (up_fragment_id, edge) = match table_job_type.as_ref() { @@ -620,8 +623,18 @@ impl CompleteStreamFragmentGraph { (source_job_id, edge) } - _ => { - // handle other kinds of streaming graph, normally MV on MV + None | Some(TableJobType::General | TableJobType::Unspecified) => { + if let Some(table_job_type) = table_job_type { + // This isn't normal. But don't panic... + tracing::warn!( + ?table_job_type, + "table job shouldn't have upstream fragments", + ); + } + // handle MV on MV + + // Build the extra edges between the upstream `Materialize` and the downstream `StreamScan` + // of the new materialized view. let mview_fragment = upstream_root_fragments .get(&upstream_table_id) .context("upstream materialized view fragment not found")?;