Skip to content

Commit

Permalink
chore: add some comments for creating streaming job
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jan 11, 2024
1 parent b03a641 commit 950e10e
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 9 deletions.
11 changes: 10 additions & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,19 @@ message DropViewResponse {
uint64 version = 2;
}

// An enum to distinguish different types of the Table streaming job.
// An enum to distinguish different types of the `Table` streaming job.
// - GENERAL: Table streaming jobs w/ or w/o a connector
// - SHARED_CDC_SOURCE: The table streaming job is created based on a shared CDC source job (risingwavelabs/rfcs#73).
//
// And one may add other types to support Table jobs that based on other backfill-able sources (risingwavelabs/rfcs#72).
//
// Currently, it's usages include:
// - When creating the streaming actor graph, different table jobs may need different treatment.
// + CDC table and table w/ backfill-able source connector have an upstream `Source` fragment.
// + Other tables don't have an upstream fragment.
// - Some adhoc validation when creating the streaming job. e.g., `validate_cdc_table`.
//
// It's not included in `catalog.Table`, and thus not persisted. It's only used in the `CreateTableRequest`.
enum TableJobType {
TABLE_JOB_TYPE_UNSPECIFIED = 0;
// table streaming jobs excepts the `SHARED_CDC_SOURCE` type
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1412,6 +1412,7 @@ impl FragmentManager {
.get(&table_id)
.with_context(|| format!("table_fragment not exist: id={}", table_id))?;
match table_job_type.as_ref() {
// TODO:
Some(TableJobType::SharedCdcSource) => {
if let Some(fragment) = table_fragments.source_fragment() {
fragments.insert(table_id, fragment);
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ impl StreamingJob {
}
}

/// If the `StreamingJob` is a `Table`, returns it's sub-type. Otherwise, returns `None`.
pub fn table_job_type(&self) -> Option<TableJobType> {
if let Self::Table(.., sub_type) = self {
Some(*sub_type)
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,7 @@ impl DdlController {
}

/// Builds the actor graph:
/// - Add the upstream fragments to the fragment graph
/// - Schedule the fragments based on their distribution
/// - Expand each fragment into one or several actors
pub(crate) async fn build_stream_job(
Expand Down Expand Up @@ -1975,7 +1976,8 @@ impl DdlController {
}
}

/// Fill in necessary information for table stream graph.
/// Fill in necessary information for `Table` stream graph.
/// e.g., fill source id for table with connector, fill external table id for CDC table.
pub fn fill_table_stream_graph_info(
source: &mut PbSource,
table: &mut PbTable,
Expand Down
27 changes: 20 additions & 7 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ impl StreamFragmentEdge {

/// In-memory representation of a **Fragment** Graph, built from the [`StreamFragmentGraphProto`]
/// from the frontend.
///
/// This only includes nodes and edges of the current job itself. It will be converted to [`CompleteStreamFragmentGraph`] later,
/// that contains the additional information of pre-existing
/// fragments, which are connected to the graph's top-most or bottom-most fragments.
#[derive(Default)]
pub struct StreamFragmentGraph {
/// stores all the fragments in the graph.
Expand Down Expand Up @@ -484,8 +488,8 @@ pub(super) enum EitherFragment {
Existing(Fragment),
}

/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of existing
/// fragments, which is connected to the graph's top-most or bottom-most fragments.
/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of pre-existing
/// fragments, which are connected to the graph's top-most or bottom-most fragments.
///
/// For example,
/// - if we're going to build a mview on an existing mview, the upstream fragment containing the
Expand Down Expand Up @@ -530,7 +534,7 @@ impl CompleteStreamFragmentGraph {
}
}

/// Create a new [`CompleteStreamFragmentGraph`] for MV on MV or Table on CDC Source, with the upstream existing
/// Create a new [`CompleteStreamFragmentGraph`] for MV on MV and CDC/Source Table with the upstream existing
/// `Materialize` or `Source` fragments.
pub fn with_upstreams(
graph: StreamFragmentGraph,
Expand Down Expand Up @@ -565,6 +569,7 @@ impl CompleteStreamFragmentGraph {
)
}

/// The core logic of building a [`CompleteStreamFragmentGraph`], i.e., adding extra upstream/downstream fragments.
fn build_helper(
mut graph: StreamFragmentGraph,
upstream_ctx: Option<FragmentGraphUpstreamContext>,
Expand All @@ -579,8 +584,6 @@ impl CompleteStreamFragmentGraph {
upstream_root_fragments,
}) = upstream_ctx
{
// Build the extra edges between the upstream `Materialize` and the downstream `StreamScan`
// of the new materialized view.
for (&id, fragment) in &mut graph.fragments {
for (&upstream_table_id, output_columns) in &fragment.upstream_table_columns {
let (up_fragment_id, edge) = match table_job_type.as_ref() {
Expand Down Expand Up @@ -620,8 +623,18 @@ impl CompleteStreamFragmentGraph {

(source_job_id, edge)
}
_ => {
// handle other kinds of streaming graph, normally MV on MV
None | Some(TableJobType::General | TableJobType::Unspecified) => {
if let Some(table_job_type) = table_job_type {
// This isn't normal. But don't panic...
tracing::warn!(
?table_job_type,
"table job shouldn't have upstream fragments",
);
}
// handle MV on MV

// Build the extra edges between the upstream `Materialize` and the downstream `StreamScan`
// of the new materialized view.
let mview_fragment = upstream_root_fragments
.get(&upstream_table_id)
.context("upstream materialized view fragment not found")?;
Expand Down

0 comments on commit 950e10e

Please sign in to comment.