Skip to content

Commit

Permalink
refactor(meta): unify DdlType and StreamingJobType
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Dec 2, 2024
1 parent 22b11ef commit 1d67808
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 46 deletions.
4 changes: 2 additions & 2 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::barrier::utils::collect_resp_info;
use crate::barrier::InflightSubscriptionInfo;
use crate::controller::fragment::InflightFragmentInfo;
use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use crate::manager::{DdlType, StreamingJob};
use crate::manager::{StreamingJob, StreamingJobType};
use crate::model::{ActorId, DispatcherId, FragmentId, StreamJobFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};

Expand Down Expand Up @@ -157,7 +157,7 @@ pub struct CreateStreamingJobCommandInfo {
pub dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
pub init_split_assignment: SplitAssignment,
pub definition: String,
pub ddl_type: DdlType,
pub job_type: StreamingJobType,
pub create_type: CreateType,
pub streaming_job: StreamingJob,
pub internal_tables: Vec<Table>,
Expand Down
10 changes: 5 additions & 5 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::barrier::info::BarrierInfo;
use crate::barrier::{
Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceStreamJobPlan,
};
use crate::manager::{DdlType, MetadataManager};
use crate::manager::{MetadataManager, StreamingJobType};
use crate::model::{ActorId, BackfillUpstreamType, StreamJobFragments};
use crate::MetaResult;

Expand Down Expand Up @@ -522,14 +522,14 @@ impl CreateMviewProgressTracker {
upstream_root_actors,
dispatchers,
definition,
ddl_type,
job_type,
create_type,
..
} = &info;

let creating_mv_id = table_fragments.stream_job_id();

let (upstream_mv_count, upstream_total_key_count, ddl_type, create_type) = {
let (upstream_mv_count, upstream_total_key_count, job_type, create_type) = {
// Keep track of how many times each upstream MV appears.
let mut upstream_mv_count = HashMap::new();
for (table_id, actors) in upstream_root_actors {
Expand All @@ -547,7 +547,7 @@ impl CreateMviewProgressTracker {
(
upstream_mv_count,
upstream_total_key_count,
ddl_type,
job_type,
create_type,
)
};
Expand All @@ -562,7 +562,7 @@ impl CreateMviewProgressTracker {
upstream_total_key_count,
definition.clone(),
);
if *ddl_type == DdlType::Sink && *create_type == CreateType::Background {
if *job_type == StreamingJobType::Sink && *create_type == CreateType::Background {
// We return the original tracking job immediately.
// This is because sink can be decoupled with backfill progress.
// We don't need to wait for sink to finish backfill.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,7 @@ impl CatalogController {
// 4. update catalogs and notify.
let mut relations = vec![];
match job_type {
StreamingJobType::Table => {
StreamingJobType::Table(_) => {
let (table, table_obj) = Table::find_by_id(original_job_id)
.find_also_related(Object)
.one(txn)
Expand Down
23 changes: 11 additions & 12 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table};
use risingwave_pb::ddl_service::TableJobType;
use sea_orm::entity::prelude::*;
use sea_orm::{DatabaseTransaction, QuerySelect};
use strum::{EnumDiscriminants, EnumIs};
use strum::EnumIs;

use super::{
get_referred_connection_ids_from_sink, get_referred_connection_ids_from_source,
Expand All @@ -35,8 +35,7 @@ use crate::{MetaError, MetaResult};

// This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and
// Sink.
#[derive(Debug, Clone, EnumDiscriminants, EnumIs)]
#[strum_discriminants(name(StreamingJobType))]
#[derive(Debug, Clone, EnumIs)]
pub enum StreamingJob {
MaterializedView(Table),
Sink(Sink, Option<(Table, Option<PbSource>)>),
Expand All @@ -46,33 +45,33 @@ pub enum StreamingJob {
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DdlType {
pub enum StreamingJobType {
MaterializedView,
Sink,
Table(TableJobType),
Index,
Source,
}

impl From<&StreamingJob> for DdlType {
impl From<&StreamingJob> for StreamingJobType {
fn from(job: &StreamingJob) -> Self {
match job {
StreamingJob::MaterializedView(_) => DdlType::MaterializedView,
StreamingJob::Sink(_, _) => DdlType::Sink,
StreamingJob::Table(_, _, ty) => DdlType::Table(*ty),
StreamingJob::Index(_, _) => DdlType::Index,
StreamingJob::Source(_) => DdlType::Source,
StreamingJob::MaterializedView(_) => StreamingJobType::MaterializedView,
StreamingJob::Sink(_, _) => StreamingJobType::Sink,
StreamingJob::Table(_, _, ty) => StreamingJobType::Table(*ty),
StreamingJob::Index(_, _) => StreamingJobType::Index,
StreamingJob::Source(_) => StreamingJobType::Source,
}
}
}

#[cfg(test)]
#[allow(clippy::derivable_impls)]
impl Default for DdlType {
impl Default for StreamingJobType {
fn default() -> Self {
// This should not be used by mock services,
// so we can just pick an arbitrary default variant.
DdlType::MaterializedView
StreamingJobType::MaterializedView
}
}

Expand Down
18 changes: 9 additions & 9 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ use crate::controller::cluster::StreamingClusterInfo;
use crate::controller::streaming_job::SinkIntoTableContext;
use crate::error::{bail_invalid_parameter, bail_unavailable};
use crate::manager::{
DdlType, LocalNotification, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob,
IGNORED_NOTIFICATION_VERSION,
LocalNotification, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob,
StreamingJobType, IGNORED_NOTIFICATION_VERSION,
};
use crate::model::{StreamContext, StreamJobFragments, TableParallelism};
use crate::stream::{
Expand Down Expand Up @@ -1709,7 +1709,7 @@ impl DdlController {
definition: stream_job.definition(),
mv_table_id: stream_job.mv_table(),
create_type: stream_job.create_type(),
ddl_type: (&stream_job).into(),
job_type: (&stream_job).into(),
streaming_job: stream_job,
replace_table_job_info,
option: CreateStreamingJobOption {},
Expand Down Expand Up @@ -1753,11 +1753,11 @@ impl DdlController {
.mview_fragment()
.expect("mview fragment not found");

let ddl_type = DdlType::from(stream_job);
let DdlType::Table(table_job_type) = &ddl_type else {
let job_type = StreamingJobType::from(stream_job);
let StreamingJobType::Table(table_job_type) = &job_type else {
bail!(
"only support replacing table streaming job, ddl_type: {:?}",
ddl_type
"only support replacing table streaming job, job_type: {:?}",
job_type
)
};

Expand Down Expand Up @@ -1789,7 +1789,7 @@ impl DdlController {
original_table_fragment.fragment_id,
downstream_fragments,
downstream_actor_location,
ddl_type,
job_type,
)?,

TableJobType::SharedCdcSource => {
Expand All @@ -1806,7 +1806,7 @@ impl DdlController {
original_table_fragment.fragment_id,
downstream_fragments,
downstream_actor_location,
ddl_type,
job_type,
)?
}
TableJobType::Unspecified => {
Expand Down
28 changes: 15 additions & 13 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use risingwave_pb::stream_plan::{
};

use crate::barrier::SnapshotBackfillInfo;
use crate::manager::{DdlType, MetaSrvEnv, StreamingJob};
use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType};
use crate::model::{ActorId, FragmentId};
use crate::stream::stream_graph::id::{GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen};
use crate::stream::stream_graph::schedule::Distribution;
Expand Down Expand Up @@ -714,7 +714,7 @@ impl CompleteStreamFragmentGraph {
graph: StreamFragmentGraph,
upstream_root_fragments: HashMap<TableId, Fragment>,
existing_actor_location: HashMap<ActorId, WorkerId>,
ddl_type: DdlType,
job_type: StreamingJobType,
) -> MetaResult<Self> {
Self::build_helper(
graph,
Expand All @@ -723,7 +723,7 @@ impl CompleteStreamFragmentGraph {
upstream_actor_location: existing_actor_location,
}),
None,
ddl_type,
job_type,
)
}

Expand All @@ -734,7 +734,7 @@ impl CompleteStreamFragmentGraph {
original_table_fragment_id: FragmentId,
downstream_fragments: Vec<(DispatchStrategy, Fragment)>,
existing_actor_location: HashMap<ActorId, WorkerId>,
ddl_type: DdlType,
job_type: StreamingJobType,
) -> MetaResult<Self> {
Self::build_helper(
graph,
Expand All @@ -744,7 +744,7 @@ impl CompleteStreamFragmentGraph {
downstream_fragments,
downstream_actor_location: existing_actor_location,
}),
ddl_type,
job_type,
)
}

Expand All @@ -756,7 +756,7 @@ impl CompleteStreamFragmentGraph {
original_table_fragment_id: FragmentId,
downstream_fragments: Vec<(DispatchStrategy, Fragment)>,
downstream_actor_location: HashMap<ActorId, WorkerId>,
ddl_type: DdlType,
job_type: StreamingJobType,
) -> MetaResult<Self> {
Self::build_helper(
graph,
Expand All @@ -769,7 +769,7 @@ impl CompleteStreamFragmentGraph {
downstream_fragments,
downstream_actor_location,
}),
ddl_type,
job_type,
)
}

Expand All @@ -778,7 +778,7 @@ impl CompleteStreamFragmentGraph {
mut graph: StreamFragmentGraph,
upstream_ctx: Option<FragmentGraphUpstreamContext>,
downstream_ctx: Option<FragmentGraphDownstreamContext>,
ddl_type: DdlType,
job_type: StreamingJobType,
) -> MetaResult<Self> {
let mut extra_downstreams = HashMap::new();
let mut extra_upstreams = HashMap::new();
Expand All @@ -794,8 +794,8 @@ impl CompleteStreamFragmentGraph {
for (&id, fragment) in &mut graph.fragments {
let uses_shuffled_backfill = fragment.has_shuffled_backfill();
for (&upstream_table_id, output_columns) in &fragment.upstream_table_columns {
let (up_fragment_id, edge) = match ddl_type {
DdlType::Table(TableJobType::SharedCdcSource) => {
let (up_fragment_id, edge) = match job_type {
StreamingJobType::Table(TableJobType::SharedCdcSource) => {
let source_fragment = upstream_root_fragments
.get(&upstream_table_id)
.context("upstream source fragment not found")?;
Expand Down Expand Up @@ -831,7 +831,9 @@ impl CompleteStreamFragmentGraph {

(source_job_id, edge)
}
DdlType::MaterializedView | DdlType::Sink | DdlType::Index => {
StreamingJobType::MaterializedView
| StreamingJobType::Sink
| StreamingJobType::Index => {
// handle MV on MV/Source

// Build the extra edges between the upstream `Materialize` and the downstream `StreamScan`
Expand Down Expand Up @@ -927,8 +929,8 @@ impl CompleteStreamFragmentGraph {
bail!("the upstream fragment should be a MView or Source, got fragment type: {:b}", upstream_fragment.fragment_type_mask)
}
}
DdlType::Source | DdlType::Table(_) => {
bail!("the streaming job shouldn't have an upstream fragment, ddl_type: {:?}", ddl_type)
StreamingJobType::Source | StreamingJobType::Table(_) => {
bail!("the streaming job shouldn't have an upstream fragment, job_type: {:?}", job_type)
}
};

Expand Down
10 changes: 6 additions & 4 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use crate::barrier::{
ReplaceStreamJobPlan, SnapshotBackfillInfo,
};
use crate::error::bail_invalid_parameter;
use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob};
use crate::manager::{
MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
};
use crate::model::{ActorId, FragmentId, StreamJobFragments, TableParallelism};
use crate::stream::SourceManagerRef;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -74,7 +76,7 @@ pub struct CreateStreamingJobContext {

pub create_type: CreateType,

pub ddl_type: DdlType,
pub job_type: StreamingJobType,

/// Context provided for potential replace table, typically used when sinking into a table.
pub replace_table_job_info: Option<(StreamingJob, ReplaceStreamJobContext, StreamJobFragments)>,
Expand Down Expand Up @@ -334,7 +336,7 @@ impl GlobalStreamManager {
upstream_root_actors,
definition,
create_type,
ddl_type,
job_type,
replace_table_job_info,
internal_tables,
snapshot_backfill_info,
Expand Down Expand Up @@ -394,7 +396,7 @@ impl GlobalStreamManager {
definition: definition.to_string(),
streaming_job: streaming_job.clone(),
internal_tables: internal_tables.into_values().collect_vec(),
ddl_type,
job_type,
create_type,
};

Expand Down

0 comments on commit 1d67808

Please sign in to comment.