Skip to content

Commit

Permalink
refactor(meta): refactor how upstream fragment is handled when creati…
Browse files Browse the repository at this point in the history
…ng stream job (#14510)
  • Loading branch information
xxchan authored Jan 12, 2024
1 parent 9cd7f64 commit 681c226
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 65 deletions.
9 changes: 8 additions & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,17 @@ message DropViewResponse {
uint64 version = 2;
}

// An enum to distinguish different types of the Table streaming job.
// An enum to distinguish different types of the `Table` streaming job.
// - GENERAL: Table streaming jobs w/ or w/o a connector
// - SHARED_CDC_SOURCE: The table streaming job is created based on a shared CDC source job (risingwavelabs/rfcs#73).
//
// And one may add other types to support Table jobs that based on other backfill-able sources (risingwavelabs/rfcs#72).
//
// Currently, it's usages include:
// - When creating the streaming actor graph, different table jobs may need different treatment.
// - Some adhoc validation when creating the streaming job. e.g., `validate_cdc_table`.
//
// It's not included in `catalog.Table`, and thus not persisted. It's only used in the `CreateTableRequest`.
enum TableJobType {
TABLE_JOB_TYPE_UNSPECIFIED = 0;
// table streaming jobs excepts the `SHARED_CDC_SOURCE` type
Expand Down
3 changes: 2 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,7 @@ message StreamActor {
plan_common.ExprContext expr_context = 10;
}

// Indicates whether the fragment contains some special kind of nodes.
enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_FRAGMENT_UNSPECIFIED = 0;
FRAGMENT_TYPE_FLAG_SOURCE = 1;
Expand All @@ -864,7 +865,7 @@ message StreamFragmentGraph {
uint32 fragment_id = 1;
// root stream node in this fragment.
StreamNode node = 2;
// Bitwise-OR of FragmentTypeFlags
// Bitwise-OR of `FragmentTypeFlag`s
uint32 fragment_type_mask = 3;
// Mark whether this fragment requires exactly one actor.
// Note: if this is `false`, the fragment may still be a singleton according to the scheduler.
Expand Down
24 changes: 11 additions & 13 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use risingwave_meta_model_v2::{
StreamNode, TableId, VnodeBitmap, WorkerId,
};
use risingwave_pb::common::PbParallelUnit;
use risingwave_pb::ddl_service::PbTableJobType;
use risingwave_pb::meta::subscribe_response::{
Info as NotificationInfo, Operation as NotificationOperation,
};
Expand Down Expand Up @@ -1037,31 +1036,30 @@ impl CatalogController {
Ok(actors)
}

/// Get and filter the upstream `Materialize` or `Source` fragments of the specified relations.
pub async fn get_upstream_root_fragments(
&self,
upstream_job_ids: Vec<ObjectId>,
job_type: Option<PbTableJobType>,
) -> MetaResult<HashMap<ObjectId, PbFragment>> {
let inner = self.inner.read().await;

let mut fragments = Fragment::find()
let all_upstream_fragments = Fragment::find()
.filter(fragment::Column::JobId.is_in(upstream_job_ids))
.all(&inner.db)
.await?;
fragments.retain(|f| match job_type {
Some(PbTableJobType::SharedCdcSource) => {
f.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0
}
// MV on MV, and other kinds of table job
None | Some(PbTableJobType::General) | Some(PbTableJobType::Unspecified) => {
f.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0
// job_id -> fragment
let mut fragments = HashMap::<ObjectId, fragment::Model>::new();
for fragment in all_upstream_fragments {
if fragment.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0 {
_ = fragments.insert(fragment.job_id, fragment);
} else if fragment.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
// look for Source fragment if there's no MView fragment
_ = fragments.try_insert(fragment.job_id, fragment);
}
});
}

let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?;
let mut root_fragments = HashMap::new();
for fragment in fragments {
for (_, fragment) in fragments {
let actors = fragment.find_related(Actor).all(&inner.db).await?;
let actor_dispatchers = get_actor_dispatchers(
&inner.db,
Expand Down
21 changes: 6 additions & 15 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping}
use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont};
use risingwave_connector::source::SplitImpl;
use risingwave_meta_model_v2::SourceId;
use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State};
Expand Down Expand Up @@ -1398,11 +1397,9 @@ impl FragmentManager {
.mview_actor_ids())
}

/// Get and filter the upstream `Materialize` or `Source` fragments of the specified relations.
pub async fn get_upstream_root_fragments(
&self,
upstream_table_ids: &HashSet<TableId>,
table_job_type: Option<TableJobType>,
) -> MetaResult<HashMap<TableId, Fragment>> {
let map = &self.core.read().await.table_fragments;
let mut fragments = HashMap::new();
Expand All @@ -1411,18 +1408,12 @@ impl FragmentManager {
let table_fragments = map
.get(&table_id)
.with_context(|| format!("table_fragment not exist: id={}", table_id))?;
match table_job_type.as_ref() {
Some(TableJobType::SharedCdcSource) => {
if let Some(fragment) = table_fragments.source_fragment() {
fragments.insert(table_id, fragment);
}
}
// MV on MV, and other kinds of table job
None | Some(TableJobType::General) | Some(TableJobType::Unspecified) => {
if let Some(fragment) = table_fragments.mview_fragment() {
fragments.insert(table_id, fragment);
}
}

if let Some(fragment) = table_fragments.mview_fragment() {
fragments.insert(table_id, fragment);
} else if let Some(fragment) = table_fragments.source_fragment() {
// look for Source fragment if there's no MView fragment
fragments.insert(table_id, fragment);
}
}

Expand Down
19 changes: 15 additions & 4 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use risingwave_meta_model_v2::SourceId;
use risingwave_pb::catalog::PbSource;
use risingwave_pb::common::worker_node::{PbResource, State};
use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerType};
use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty;
use risingwave_pb::meta::table_fragments::Fragment;
use risingwave_pb::stream_plan::PbStreamActor;
Expand Down Expand Up @@ -175,15 +174,28 @@ impl MetadataManager {
}
}

/// Get and filter the "**root**" fragments of the specified relations.
/// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
///
/// ## What can be the root fragment
/// - For MV, it should have one `MView` fragment.
/// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
/// - For source, it should have one `Source` fragment.
///
/// In other words, it's the `MView` fragment if it exists, otherwise it's the `Source` fragment.
///
/// ## What do we expect to get for different creating streaming job
/// - MV/Sink/Index should have MV upstream fragments for upstream MV/Tables, and Source upstream fragments for upstream backfill-able sources.
/// - CDC Table has a Source upstream fragment.
/// - Sources and other Tables shouldn't have an upstream fragment.
pub async fn get_upstream_root_fragments(
&self,
upstream_table_ids: &HashSet<TableId>,
table_job_type: Option<TableJobType>,
) -> MetaResult<HashMap<TableId, Fragment>> {
match self {
MetadataManager::V1(mgr) => {
mgr.fragment_manager
.get_upstream_root_fragments(upstream_table_ids, table_job_type)
.get_upstream_root_fragments(upstream_table_ids)
.await
}
MetadataManager::V2(mgr) => {
Expand All @@ -194,7 +206,6 @@ impl MetadataManager {
.iter()
.map(|id| id.table_id as _)
.collect(),
table_job_type,
)
.await?;
Ok(upstream_root_fragments
Expand Down
33 changes: 22 additions & 11 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ use crate::model::FragmentId;
// This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and
// Sink.
#[derive(Debug, Clone, EnumDiscriminants)]
#[strum_discriminants(name(DdlType))]
#[strum_discriminants(vis(pub))]
pub enum StreamingJob {
MaterializedView(Table),
Sink(Sink, Option<(Table, Option<PbSource>)>),
Expand All @@ -36,13 +34,34 @@ pub enum StreamingJob {
Source(PbSource),
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DdlType {
MaterializedView,
Sink,
Table(TableJobType),
Index,
Source,
}

impl From<&StreamingJob> for DdlType {
fn from(job: &StreamingJob) -> Self {
match job {
StreamingJob::MaterializedView(_) => DdlType::MaterializedView,
StreamingJob::Sink(_, _) => DdlType::Sink,
StreamingJob::Table(_, _, ty) => DdlType::Table(*ty),
StreamingJob::Index(_, _) => DdlType::Index,
StreamingJob::Source(_) => DdlType::Source,
}
}
}

#[cfg(test)]
#[allow(clippy::derivable_impls)]
impl Default for DdlType {
fn default() -> Self {
// This should not be used by mock services,
// so we can just pick an arbitrary default variant.
DdlType::Table
DdlType::MaterializedView
}
}

Expand Down Expand Up @@ -259,14 +278,6 @@ impl StreamingJob {
}
}

pub fn table_job_type(&self) -> Option<TableJobType> {
if let Self::Table(.., sub_type) = self {
Some(*sub_type)
} else {
None
}
}

// TODO: record all objects instead.
pub fn dependent_relations(&self) -> Vec<u32> {
match self {
Expand Down
12 changes: 6 additions & 6 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,7 @@ impl DdlController {
}

/// Builds the actor graph:
/// - Add the upstream fragments to the fragment graph
/// - Schedule the fragments based on their distribution
/// - Expand each fragment into one or several actors
pub(crate) async fn build_stream_job(
Expand All @@ -1278,10 +1279,7 @@ impl DdlController {

let upstream_root_fragments = self
.metadata_manager
.get_upstream_root_fragments(
fragment_graph.dependent_table_ids(),
stream_job.table_job_type(),
)
.get_upstream_root_fragments(fragment_graph.dependent_table_ids())
.await?;

let upstream_actors: HashMap<_, _> = upstream_root_fragments
Expand All @@ -1297,7 +1295,7 @@ impl DdlController {
let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
fragment_graph,
upstream_root_fragments,
stream_job.table_job_type(),
stream_job.into(),
)?;

// 2. Build the actor graph.
Expand Down Expand Up @@ -1717,6 +1715,7 @@ impl DdlController {
fragment_graph,
original_table_fragment.fragment_id,
downstream_fragments,
stream_job.into(),
)?;

// 2. Build the actor graph.
Expand Down Expand Up @@ -1979,7 +1978,8 @@ impl DdlController {
}
}

/// Fill in necessary information for table stream graph.
/// Fill in necessary information for `Table` stream graph.
/// e.g., fill source id for table with connector, fill external table id for CDC table.
pub fn fill_table_stream_graph_info(
source: &mut Option<PbSource>,
table: &mut PbTable,
Expand Down
Loading

0 comments on commit 681c226

Please sign in to comment.