Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): refactor how upstream fragment is handled when creating stream job #14510

Merged
merged 2 commits into from
Jan 12, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
chore: add some comments for creating streaming job
xxchan committed Jan 12, 2024
commit 2aaa7f52bd8db3aa11ac45145a4cabf9169333a8
9 changes: 8 additions & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
@@ -137,10 +137,17 @@ message DropViewResponse {
uint64 version = 2;
}

// An enum to distinguish different types of the Table streaming job.
// An enum to distinguish different types of the `Table` streaming job.
// - GENERAL: Table streaming jobs w/ or w/o a connector
// - SHARED_CDC_SOURCE: The table streaming job is created based on a shared CDC source job (risingwavelabs/rfcs#73).
//
// And one may add other types to support Table jobs that based on other backfill-able sources (risingwavelabs/rfcs#72).
//
// Currently, it's usages include:
// - When creating the streaming actor graph, different table jobs may need different treatment.
// - Some adhoc validation when creating the streaming job. e.g., `validate_cdc_table`.
//
// It's not included in `catalog.Table`, and thus not persisted. It's only used in the `CreateTableRequest`.
enum TableJobType {
TABLE_JOB_TYPE_UNSPECIFIED = 0;
// table streaming jobs excepts the `SHARED_CDC_SOURCE` type
3 changes: 2 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
@@ -839,6 +839,7 @@ message StreamActor {
plan_common.ExprContext expr_context = 10;
}

// Indicates whether the fragment contains some special kind of nodes.
enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_FRAGMENT_UNSPECIFIED = 0;
FRAGMENT_TYPE_FLAG_SOURCE = 1;
@@ -864,7 +865,7 @@ message StreamFragmentGraph {
uint32 fragment_id = 1;
// root stream node in this fragment.
StreamNode node = 2;
// Bitwise-OR of FragmentTypeFlags
// Bitwise-OR of `FragmentTypeFlag`s
uint32 fragment_type_mask = 3;
// Mark whether this fragment requires exactly one actor.
// Note: if this is `false`, the fragment may still be a singleton according to the scheduler.
24 changes: 11 additions & 13 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
@@ -27,7 +27,6 @@ use risingwave_meta_model_v2::{
TableId, VnodeBitmap, WorkerId,
};
use risingwave_pb::common::PbParallelUnit;
use risingwave_pb::ddl_service::PbTableJobType;
use risingwave_pb::meta::subscribe_response::{
Info as NotificationInfo, Operation as NotificationOperation,
};
@@ -1034,31 +1033,30 @@ impl CatalogController {
Ok(actors)
}

/// Get and filter the upstream `Materialize` or `Source` fragments of the specified relations.
pub async fn get_upstream_root_fragments(
&self,
upstream_job_ids: Vec<ObjectId>,
job_type: Option<PbTableJobType>,
) -> MetaResult<HashMap<ObjectId, PbFragment>> {
let inner = self.inner.read().await;

let mut fragments = Fragment::find()
let all_upstream_fragments = Fragment::find()
.filter(fragment::Column::JobId.is_in(upstream_job_ids))
.all(&inner.db)
.await?;
fragments.retain(|f| match job_type {
Some(PbTableJobType::SharedCdcSource) => {
f.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0
}
// MV on MV, and other kinds of table job
yezizp2012 marked this conversation as resolved.
Show resolved Hide resolved
None | Some(PbTableJobType::General) | Some(PbTableJobType::Unspecified) => {
f.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0
// job_id -> fragment
let mut fragments = HashMap::<ObjectId, fragment::Model>::new();
for fragment in all_upstream_fragments {
if fragment.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0 {
_ = fragments.insert(fragment.job_id, fragment);
} else if fragment.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
// look for Source fragment if there's no MView fragment
_ = fragments.try_insert(fragment.job_id, fragment);
Comment on lines +1052 to +1053
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I made a mistake, then I finally noticed it is try_insert 😅

}
});
}

let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?;
let mut root_fragments = HashMap::new();
for fragment in fragments {
for (_, fragment) in fragments {
let actors = fragment.find_related(Actor).all(&inner.db).await?;
let actor_dispatchers = get_actor_dispatchers(
&inner.db,
21 changes: 6 additions & 15 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
@@ -24,7 +24,6 @@ use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping}
use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont};
use risingwave_connector::source::SplitImpl;
use risingwave_meta_model_v2::SourceId;
use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State};
@@ -1398,11 +1397,9 @@ impl FragmentManager {
.mview_actor_ids())
}

/// Get and filter the upstream `Materialize` or `Source` fragments of the specified relations.
pub async fn get_upstream_root_fragments(
&self,
upstream_table_ids: &HashSet<TableId>,
table_job_type: Option<TableJobType>,
) -> MetaResult<HashMap<TableId, Fragment>> {
let map = &self.core.read().await.table_fragments;
let mut fragments = HashMap::new();
@@ -1411,18 +1408,12 @@ impl FragmentManager {
let table_fragments = map
.get(&table_id)
.with_context(|| format!("table_fragment not exist: id={}", table_id))?;
match table_job_type.as_ref() {
Some(TableJobType::SharedCdcSource) => {
if let Some(fragment) = table_fragments.source_fragment() {
fragments.insert(table_id, fragment);
}
}
// MV on MV, and other kinds of table job
None | Some(TableJobType::General) | Some(TableJobType::Unspecified) => {
if let Some(fragment) = table_fragments.mview_fragment() {
fragments.insert(table_id, fragment);
}
}

if let Some(fragment) = table_fragments.mview_fragment() {
fragments.insert(table_id, fragment);
} else if let Some(fragment) = table_fragments.source_fragment() {
// look for Source fragment if there's no MView fragment
fragments.insert(table_id, fragment);
Comment on lines +1415 to +1416
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite counter-intu😄tive. May add some comments to show that this is for Source job?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's already on pub async fn get_upstream_root_fragments.

}
}

19 changes: 15 additions & 4 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@ use risingwave_common::catalog::{TableId, TableOption};
use risingwave_pb::catalog::PbSource;
use risingwave_pb::common::worker_node::{PbResource, State};
use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerType};
use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty;
use risingwave_pb::meta::table_fragments::Fragment;
use risingwave_pb::stream_plan::PbStreamActor;
@@ -174,15 +173,28 @@ impl MetadataManager {
}
}

/// Get and filter the "**root**" fragments of the specified relations.
/// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
///
/// ## What can be the root fragment
/// - For MV, it should have one `MView` fragment.
/// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
/// - For source, it should have one `Source` fragment.
///
/// In other words, it's the `MView` fragment if it exists, otherwise it's the `Source` fragment.
///
/// ## What do we expect to get for different creating streaming job
/// - MV/Sink/Index should have MV upstream fragments for upstream MV/Tables, and Source upstream fragments for upstream backfill-able sources.
/// - CDC Table has a Source upstream fragment.
/// - Sources and other Tables shouldn't have an upstream fragment.
pub async fn get_upstream_root_fragments(
&self,
upstream_table_ids: &HashSet<TableId>,
table_job_type: Option<TableJobType>,
) -> MetaResult<HashMap<TableId, Fragment>> {
match self {
MetadataManager::V1(mgr) => {
mgr.fragment_manager
.get_upstream_root_fragments(upstream_table_ids, table_job_type)
.get_upstream_root_fragments(upstream_table_ids)
.await
}
MetadataManager::V2(mgr) => {
@@ -193,7 +205,6 @@ impl MetadataManager {
.iter()
.map(|id| id.table_id as _)
.collect(),
table_job_type,
)
.await?;
Ok(upstream_root_fragments
33 changes: 22 additions & 11 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
@@ -26,8 +26,6 @@ use crate::model::FragmentId;
// This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and
// Sink.
#[derive(Debug, Clone, EnumDiscriminants)]
#[strum_discriminants(name(DdlType))]
#[strum_discriminants(vis(pub))]
Comment on lines -29 to -30
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A step back... 🤣

pub enum StreamingJob {
MaterializedView(Table),
Sink(Sink, Option<(Table, Option<PbSource>)>),
@@ -36,13 +34,34 @@ pub enum StreamingJob {
Source(PbSource),
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DdlType {
MaterializedView,
Sink,
Table(TableJobType),
Index,
Source,
}

impl From<&StreamingJob> for DdlType {
fn from(job: &StreamingJob) -> Self {
match job {
StreamingJob::MaterializedView(_) => DdlType::MaterializedView,
StreamingJob::Sink(_, _) => DdlType::Sink,
StreamingJob::Table(_, _, ty) => DdlType::Table(*ty),
StreamingJob::Index(_, _) => DdlType::Index,
StreamingJob::Source(_) => DdlType::Source,
}
}
}

#[cfg(test)]
#[allow(clippy::derivable_impls)]
impl Default for DdlType {
fn default() -> Self {
// This should not be used by mock services,
// so we can just pick an arbitrary default variant.
DdlType::Table
DdlType::MaterializedView
}
}

@@ -259,14 +278,6 @@ impl StreamingJob {
}
}

pub fn table_job_type(&self) -> Option<TableJobType> {
if let Self::Table(.., sub_type) = self {
Some(*sub_type)
} else {
None
}
}

// TODO: record all objects instead.
pub fn dependent_relations(&self) -> Vec<u32> {
match self {
12 changes: 6 additions & 6 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
@@ -1255,6 +1255,7 @@ impl DdlController {
}

/// Builds the actor graph:
/// - Add the upstream fragments to the fragment graph
/// - Schedule the fragments based on their distribution
/// - Expand each fragment into one or several actors
pub(crate) async fn build_stream_job(
@@ -1274,10 +1275,7 @@ impl DdlController {

let upstream_root_fragments = self
.metadata_manager
.get_upstream_root_fragments(
fragment_graph.dependent_table_ids(),
stream_job.table_job_type(),
)
.get_upstream_root_fragments(fragment_graph.dependent_table_ids())
.await?;

let upstream_actors: HashMap<_, _> = upstream_root_fragments
@@ -1293,7 +1291,7 @@ impl DdlController {
let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
fragment_graph,
upstream_root_fragments,
stream_job.table_job_type(),
stream_job.into(),
)?;

// 2. Build the actor graph.
@@ -1713,6 +1711,7 @@ impl DdlController {
fragment_graph,
original_table_fragment.fragment_id,
downstream_fragments,
stream_job.into(),
)?;

// 2. Build the actor graph.
@@ -1975,7 +1974,8 @@ impl DdlController {
}
}

/// Fill in necessary information for table stream graph.
/// Fill in necessary information for `Table` stream graph.
/// e.g., fill source id for table with connector, fill external table id for CDC table.
pub fn fill_table_stream_graph_info(
source: &mut PbSource,
table: &mut PbTable,
Loading