Skip to content

Commit

Permalink
chore: add some comments for creating streaming job
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jan 11, 2024
1 parent b03a641 commit b5c248d
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 60 deletions.
11 changes: 10 additions & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,19 @@ message DropViewResponse {
uint64 version = 2;
}

// An enum to distinguish different types of the Table streaming job.
// An enum to distinguish different types of the `Table` streaming job.
// - GENERAL: Table streaming jobs w/ or w/o a connector
// - SHARED_CDC_SOURCE: The table streaming job is created based on a shared CDC source job (risingwavelabs/rfcs#73).
//
// And one may add other types to support Table jobs that based on other backfill-able sources (risingwavelabs/rfcs#72).
//
// Currently, it's usages include:
// - When creating the streaming actor graph, different table jobs may need different treatment.
// + CDC table and table w/ backfill-able source connector have an upstream `Source` fragment.
// + Other tables don't have an upstream fragment.
// - Some adhoc validation when creating the streaming job. e.g., `validate_cdc_table`.
//
// It's not included in `catalog.Table`, and thus not persisted. It's only used in the `CreateTableRequest`.
enum TableJobType {
TABLE_JOB_TYPE_UNSPECIFIED = 0;
// table streaming jobs excepts the `SHARED_CDC_SOURCE` type
Expand Down
13 changes: 3 additions & 10 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use risingwave_meta_model_v2::{
TableId, VnodeBitmap, WorkerId,
};
use risingwave_pb::common::PbParallelUnit;
use risingwave_pb::ddl_service::PbTableJobType;
use risingwave_pb::meta::subscribe_response::{
Info as NotificationInfo, Operation as NotificationOperation,
};
Expand Down Expand Up @@ -1038,22 +1037,16 @@ impl CatalogController {
pub async fn get_upstream_root_fragments(
&self,
upstream_job_ids: Vec<ObjectId>,
job_type: Option<PbTableJobType>,
) -> MetaResult<HashMap<ObjectId, PbFragment>> {
let inner = self.inner.read().await;

let mut fragments = Fragment::find()
.filter(fragment::Column::JobId.is_in(upstream_job_ids))
.all(&inner.db)
.await?;
fragments.retain(|f| match job_type {
Some(PbTableJobType::SharedCdcSource) => {
f.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0
}
// MV on MV, and other kinds of table job
None | Some(PbTableJobType::General) | Some(PbTableJobType::Unspecified) => {
f.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0
}
fragments.retain(|f| {
f.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0
|| f.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0
});

let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?;
Expand Down
20 changes: 6 additions & 14 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping}
use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont};
use risingwave_connector::source::SplitImpl;
use risingwave_meta_model_v2::SourceId;
use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State};
Expand Down Expand Up @@ -1402,7 +1401,6 @@ impl FragmentManager {
pub async fn get_upstream_root_fragments(
&self,
upstream_table_ids: &HashSet<TableId>,
table_job_type: Option<TableJobType>,
) -> MetaResult<HashMap<TableId, Fragment>> {
let map = &self.core.read().await.table_fragments;
let mut fragments = HashMap::new();
Expand All @@ -1411,18 +1409,12 @@ impl FragmentManager {
let table_fragments = map
.get(&table_id)
.with_context(|| format!("table_fragment not exist: id={}", table_id))?;
match table_job_type.as_ref() {
Some(TableJobType::SharedCdcSource) => {
if let Some(fragment) = table_fragments.source_fragment() {
fragments.insert(table_id, fragment);
}
}
// MV on MV, and other kinds of table job
None | Some(TableJobType::General) | Some(TableJobType::Unspecified) => {
if let Some(fragment) = table_fragments.mview_fragment() {
fragments.insert(table_id, fragment);
}
}

if let Some(fragment) = table_fragments.mview_fragment() {
fragments.insert(table_id, fragment);
}
if let Some(fragment) = table_fragments.source_fragment() {
fragments.insert(table_id, fragment);
}
}

Expand Down
10 changes: 6 additions & 4 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use risingwave_common::catalog::{TableId, TableOption};
use risingwave_pb::catalog::PbSource;
use risingwave_pb::common::worker_node::{PbResource, State};
use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerType};
use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty;
use risingwave_pb::meta::table_fragments::Fragment;
use risingwave_pb::stream_plan::PbStreamActor;
Expand Down Expand Up @@ -174,15 +173,19 @@ impl MetadataManager {
}
}

/// Get and filter the upstream fragments of the specified relations.
///
/// - MV/Sink/Index should have MV upstream fragments for upstream MV/Tables, and Source upstream fragments for upstream backfill-able sources.
/// - CDC Table has a Source upstream fragment.
/// - Sources and other Tables shouldn't have an upstream fragment.
pub async fn get_upstream_root_fragments(
&self,
upstream_table_ids: &HashSet<TableId>,
table_job_type: Option<TableJobType>,
) -> MetaResult<HashMap<TableId, Fragment>> {
match self {
MetadataManager::V1(mgr) => {
mgr.fragment_manager
.get_upstream_root_fragments(upstream_table_ids, table_job_type)
.get_upstream_root_fragments(upstream_table_ids)
.await
}
MetadataManager::V2(mgr) => {
Expand All @@ -193,7 +196,6 @@ impl MetadataManager {
.iter()
.map(|id| id.table_id as _)
.collect(),
table_job_type,
)
.await?;
Ok(upstream_root_fragments
Expand Down
33 changes: 22 additions & 11 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ use crate::model::FragmentId;
// This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and
// Sink.
#[derive(Debug, Clone, EnumDiscriminants)]
#[strum_discriminants(name(DdlType))]
#[strum_discriminants(vis(pub))]
pub enum StreamingJob {
MaterializedView(Table),
Sink(Sink, Option<(Table, Option<PbSource>)>),
Expand All @@ -36,13 +34,34 @@ pub enum StreamingJob {
Source(PbSource),
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DdlType {
MaterializedView,
Sink,
Table(TableJobType),
Index,
Source,
}

impl From<&StreamingJob> for DdlType {
fn from(job: &StreamingJob) -> Self {
match job {
StreamingJob::MaterializedView(_) => DdlType::MaterializedView,
StreamingJob::Sink(_, _) => DdlType::Sink,
StreamingJob::Table(_, _, ty) => DdlType::Table(*ty),
StreamingJob::Index(_, _) => DdlType::Index,
StreamingJob::Source(_) => DdlType::Source,
}
}
}

#[cfg(test)]
#[allow(clippy::derivable_impls)]
impl Default for DdlType {
fn default() -> Self {
// This should not be used by mock services,
// so we can just pick an arbitrary default variant.
DdlType::Table
DdlType::MaterializedView
}
}

Expand Down Expand Up @@ -259,14 +278,6 @@ impl StreamingJob {
}
}

pub fn table_job_type(&self) -> Option<TableJobType> {
if let Self::Table(.., sub_type) = self {
Some(*sub_type)
} else {
None
}
}

// TODO: record all objects instead.
pub fn dependent_relations(&self) -> Vec<u32> {
match self {
Expand Down
12 changes: 6 additions & 6 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,7 @@ impl DdlController {
}

/// Builds the actor graph:
/// - Add the upstream fragments to the fragment graph
/// - Schedule the fragments based on their distribution
/// - Expand each fragment into one or several actors
pub(crate) async fn build_stream_job(
Expand All @@ -1274,10 +1275,7 @@ impl DdlController {

let upstream_root_fragments = self
.metadata_manager
.get_upstream_root_fragments(
fragment_graph.dependent_table_ids(),
stream_job.table_job_type(),
)
.get_upstream_root_fragments(fragment_graph.dependent_table_ids())
.await?;

let upstream_actors: HashMap<_, _> = upstream_root_fragments
Expand All @@ -1293,7 +1291,7 @@ impl DdlController {
let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
fragment_graph,
upstream_root_fragments,
stream_job.table_job_type(),
stream_job.into(),
)?;

// 2. Build the actor graph.
Expand Down Expand Up @@ -1713,6 +1711,7 @@ impl DdlController {
fragment_graph,
original_table_fragment.fragment_id,
downstream_fragments,
stream_job.into(),
)?;

// 2. Build the actor graph.
Expand Down Expand Up @@ -1975,7 +1974,8 @@ impl DdlController {
}
}

/// Fill in necessary information for table stream graph.
/// Fill in necessary information for `Table` stream graph.
/// e.g., fill source id for table with connector, fill external table id for CDC table.
pub fn fill_table_stream_graph_info(
source: &mut PbSource,
table: &mut PbTable,
Expand Down
38 changes: 24 additions & 14 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use risingwave_pb::stream_plan::{
StreamFragmentGraph as StreamFragmentGraphProto,
};

use crate::manager::{MetaSrvEnv, StreamingJob};
use crate::manager::{DdlType, MetaSrvEnv, StreamingJob};
use crate::model::FragmentId;
use crate::stream::stream_graph::id::{GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen};
use crate::stream::stream_graph::schedule::Distribution;
Expand Down Expand Up @@ -261,6 +261,10 @@ impl StreamFragmentEdge {

/// In-memory representation of a **Fragment** Graph, built from the [`StreamFragmentGraphProto`]
/// from the frontend.
///
/// This only includes nodes and edges of the current job itself. It will be converted to [`CompleteStreamFragmentGraph`] later,
/// that contains the additional information of pre-existing
/// fragments, which are connected to the graph's top-most or bottom-most fragments.
#[derive(Default)]
pub struct StreamFragmentGraph {
/// stores all the fragments in the graph.
Expand Down Expand Up @@ -484,8 +488,8 @@ pub(super) enum EitherFragment {
Existing(Fragment),
}

/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of existing
/// fragments, which is connected to the graph's top-most or bottom-most fragments.
/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of pre-existing
/// fragments, which are connected to the graph's top-most or bottom-most fragments.
///
/// For example,
/// - if we're going to build a mview on an existing mview, the upstream fragment containing the
Expand Down Expand Up @@ -530,20 +534,20 @@ impl CompleteStreamFragmentGraph {
}
}

/// Create a new [`CompleteStreamFragmentGraph`] for MV on MV or Table on CDC Source, with the upstream existing
/// Create a new [`CompleteStreamFragmentGraph`] for MV on MV and CDC/Source Table with the upstream existing
/// `Materialize` or `Source` fragments.
pub fn with_upstreams(
graph: StreamFragmentGraph,
upstream_root_fragments: HashMap<TableId, Fragment>,
table_job_type: Option<TableJobType>,
ddl_type: DdlType,
) -> MetaResult<Self> {
Self::build_helper(
graph,
Some(FragmentGraphUpstreamContext {
upstream_root_fragments,
}),
None,
table_job_type,
ddl_type,
)
}

Expand All @@ -553,6 +557,7 @@ impl CompleteStreamFragmentGraph {
graph: StreamFragmentGraph,
original_table_fragment_id: FragmentId,
downstream_fragments: Vec<(DispatchStrategy, Fragment)>,
ddl_type: DdlType,
) -> MetaResult<Self> {
Self::build_helper(
graph,
Expand All @@ -561,15 +566,16 @@ impl CompleteStreamFragmentGraph {
original_table_fragment_id,
downstream_fragments,
}),
None,
ddl_type,
)
}

/// The core logic of building a [`CompleteStreamFragmentGraph`], i.e., adding extra upstream/downstream fragments.
fn build_helper(
mut graph: StreamFragmentGraph,
upstream_ctx: Option<FragmentGraphUpstreamContext>,
downstream_ctx: Option<FragmentGraphDownstreamContext>,
table_job_type: Option<TableJobType>,
ddl_type: DdlType,
) -> MetaResult<Self> {
let mut extra_downstreams = HashMap::new();
let mut extra_upstreams = HashMap::new();
Expand All @@ -579,12 +585,10 @@ impl CompleteStreamFragmentGraph {
upstream_root_fragments,
}) = upstream_ctx
{
// Build the extra edges between the upstream `Materialize` and the downstream `StreamScan`
// of the new materialized view.
for (&id, fragment) in &mut graph.fragments {
for (&upstream_table_id, output_columns) in &fragment.upstream_table_columns {
let (up_fragment_id, edge) = match table_job_type.as_ref() {
Some(TableJobType::SharedCdcSource) => {
let (up_fragment_id, edge) = match ddl_type {
DdlType::Table(TableJobType::SharedCdcSource) => {
let source_fragment = upstream_root_fragments
.get(&upstream_table_id)
.context("upstream source fragment not found")?;
Expand Down Expand Up @@ -620,8 +624,11 @@ impl CompleteStreamFragmentGraph {

(source_job_id, edge)
}
_ => {
// handle other kinds of streaming graph, normally MV on MV
DdlType::MaterializedView | DdlType::Sink | DdlType::Index => {
// handle MV on MV

// Build the extra edges between the upstream `Materialize` and the downstream `StreamScan`
// of the new materialized view.
let mview_fragment = upstream_root_fragments
.get(&upstream_table_id)
.context("upstream materialized view fragment not found")?;
Expand Down Expand Up @@ -668,6 +675,9 @@ impl CompleteStreamFragmentGraph {

(mview_id, edge)
}
DdlType::Source | DdlType::Table(_) => {
bail!("the streaming job shouldn't have an upstream fragment, ddl_type: {:?}", ddl_type)
}
};

// put the edge into the extra edges
Expand Down

0 comments on commit b5c248d

Please sign in to comment.