Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): unify DdlType and StreamingJobType #19630

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::barrier::utils::collect_resp_info;
use crate::barrier::InflightSubscriptionInfo;
use crate::controller::fragment::InflightFragmentInfo;
use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use crate::manager::{DdlType, StreamingJob};
use crate::manager::{StreamingJob, StreamingJobType};
use crate::model::{ActorId, DispatcherId, FragmentId, StreamJobFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};

Expand Down Expand Up @@ -157,7 +157,7 @@ pub struct CreateStreamingJobCommandInfo {
pub dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
pub init_split_assignment: SplitAssignment,
pub definition: String,
pub ddl_type: DdlType,
pub job_type: StreamingJobType,
pub create_type: CreateType,
pub streaming_job: StreamingJob,
pub internal_tables: Vec<Table>,
Expand Down
10 changes: 5 additions & 5 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::barrier::info::BarrierInfo;
use crate::barrier::{
Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceStreamJobPlan,
};
use crate::manager::{DdlType, MetadataManager};
use crate::manager::{MetadataManager, StreamingJobType};
use crate::model::{ActorId, BackfillUpstreamType, StreamJobFragments};
use crate::MetaResult;

Expand Down Expand Up @@ -522,14 +522,14 @@ impl CreateMviewProgressTracker {
upstream_root_actors,
dispatchers,
definition,
ddl_type,
job_type,
create_type,
..
} = &info;

let creating_mv_id = table_fragments.stream_job_id();

let (upstream_mv_count, upstream_total_key_count, ddl_type, create_type) = {
let (upstream_mv_count, upstream_total_key_count, job_type, create_type) = {
// Keep track of how many times each upstream MV appears.
let mut upstream_mv_count = HashMap::new();
for (table_id, actors) in upstream_root_actors {
Expand All @@ -547,7 +547,7 @@ impl CreateMviewProgressTracker {
(
upstream_mv_count,
upstream_total_key_count,
ddl_type,
job_type,
create_type,
)
};
Expand All @@ -562,7 +562,7 @@ impl CreateMviewProgressTracker {
upstream_total_key_count,
definition.clone(),
);
if *ddl_type == DdlType::Sink && *create_type == CreateType::Background {
if *job_type == StreamingJobType::Sink && *create_type == CreateType::Background {
// We return the original tracking job immediately.
// This is because sink can be decoupled with backfill progress.
// We don't need to wait for sink to finish backfill.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,7 @@ impl CatalogController {
// 4. update catalogs and notify.
let mut relations = vec![];
match job_type {
StreamingJobType::Table => {
StreamingJobType::Table(_) => {
let (table, table_obj) = Table::find_by_id(original_job_id)
.find_also_related(Object)
.one(txn)
Expand Down
23 changes: 11 additions & 12 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table};
use risingwave_pb::ddl_service::TableJobType;
use sea_orm::entity::prelude::*;
use sea_orm::{DatabaseTransaction, QuerySelect};
use strum::{EnumDiscriminants, EnumIs};
use strum::EnumIs;

use super::{
get_referred_connection_ids_from_sink, get_referred_connection_ids_from_source,
Expand All @@ -35,8 +35,7 @@ use crate::{MetaError, MetaResult};

// This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and
// Sink.
#[derive(Debug, Clone, EnumDiscriminants, EnumIs)]
#[strum_discriminants(name(StreamingJobType))]
#[derive(Debug, Clone, EnumIs)]
pub enum StreamingJob {
MaterializedView(Table),
Sink(Sink, Option<(Table, Option<PbSource>)>),
Expand All @@ -46,33 +45,33 @@ pub enum StreamingJob {
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DdlType {
pub enum StreamingJobType {
MaterializedView,
Sink,
Table(TableJobType),
Index,
Source,
}

impl From<&StreamingJob> for DdlType {
impl From<&StreamingJob> for StreamingJobType {
fn from(job: &StreamingJob) -> Self {
match job {
StreamingJob::MaterializedView(_) => DdlType::MaterializedView,
StreamingJob::Sink(_, _) => DdlType::Sink,
StreamingJob::Table(_, _, ty) => DdlType::Table(*ty),
StreamingJob::Index(_, _) => DdlType::Index,
StreamingJob::Source(_) => DdlType::Source,
StreamingJob::MaterializedView(_) => StreamingJobType::MaterializedView,
StreamingJob::Sink(_, _) => StreamingJobType::Sink,
StreamingJob::Table(_, _, ty) => StreamingJobType::Table(*ty),
StreamingJob::Index(_, _) => StreamingJobType::Index,
StreamingJob::Source(_) => StreamingJobType::Source,
}
}
}

#[cfg(test)]
#[allow(clippy::derivable_impls)]
impl Default for DdlType {
impl Default for StreamingJobType {
fn default() -> Self {
// This should not be used by mock services,
// so we can just pick an arbitrary default variant.
DdlType::MaterializedView
StreamingJobType::MaterializedView
}
}

Expand Down
18 changes: 9 additions & 9 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ use crate::controller::cluster::StreamingClusterInfo;
use crate::controller::streaming_job::SinkIntoTableContext;
use crate::error::{bail_invalid_parameter, bail_unavailable};
use crate::manager::{
DdlType, LocalNotification, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob,
IGNORED_NOTIFICATION_VERSION,
LocalNotification, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob,
StreamingJobType, IGNORED_NOTIFICATION_VERSION,
};
use crate::model::{StreamContext, StreamJobFragments, TableParallelism};
use crate::stream::{
Expand Down Expand Up @@ -1709,7 +1709,7 @@ impl DdlController {
definition: stream_job.definition(),
mv_table_id: stream_job.mv_table(),
create_type: stream_job.create_type(),
ddl_type: (&stream_job).into(),
job_type: (&stream_job).into(),
streaming_job: stream_job,
replace_table_job_info,
option: CreateStreamingJobOption {},
Expand Down Expand Up @@ -1753,11 +1753,11 @@ impl DdlController {
.mview_fragment()
.expect("mview fragment not found");

let ddl_type = DdlType::from(stream_job);
let DdlType::Table(table_job_type) = &ddl_type else {
let job_type = StreamingJobType::from(stream_job);
let StreamingJobType::Table(table_job_type) = &job_type else {
bail!(
"only support replacing table streaming job, ddl_type: {:?}",
ddl_type
"only support replacing table streaming job, job_type: {:?}",
job_type
)
};

Expand Down Expand Up @@ -1789,7 +1789,7 @@ impl DdlController {
original_table_fragment.fragment_id,
downstream_fragments,
downstream_actor_location,
ddl_type,
job_type,
)?,

TableJobType::SharedCdcSource => {
Expand All @@ -1806,7 +1806,7 @@ impl DdlController {
original_table_fragment.fragment_id,
downstream_fragments,
downstream_actor_location,
ddl_type,
job_type,
)?
}
TableJobType::Unspecified => {
Expand Down
28 changes: 15 additions & 13 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use risingwave_pb::stream_plan::{
};

use crate::barrier::SnapshotBackfillInfo;
use crate::manager::{DdlType, MetaSrvEnv, StreamingJob};
use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType};
use crate::model::{ActorId, FragmentId};
use crate::stream::stream_graph::id::{GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen};
use crate::stream::stream_graph::schedule::Distribution;
Expand Down Expand Up @@ -714,7 +714,7 @@ impl CompleteStreamFragmentGraph {
graph: StreamFragmentGraph,
upstream_root_fragments: HashMap<TableId, Fragment>,
existing_actor_location: HashMap<ActorId, WorkerId>,
ddl_type: DdlType,
job_type: StreamingJobType,
) -> MetaResult<Self> {
Self::build_helper(
graph,
Expand All @@ -723,7 +723,7 @@ impl CompleteStreamFragmentGraph {
upstream_actor_location: existing_actor_location,
}),
None,
ddl_type,
job_type,
)
}

Expand All @@ -734,7 +734,7 @@ impl CompleteStreamFragmentGraph {
original_table_fragment_id: FragmentId,
downstream_fragments: Vec<(DispatchStrategy, Fragment)>,
existing_actor_location: HashMap<ActorId, WorkerId>,
ddl_type: DdlType,
job_type: StreamingJobType,
) -> MetaResult<Self> {
Self::build_helper(
graph,
Expand All @@ -744,7 +744,7 @@ impl CompleteStreamFragmentGraph {
downstream_fragments,
downstream_actor_location: existing_actor_location,
}),
ddl_type,
job_type,
)
}

Expand All @@ -756,7 +756,7 @@ impl CompleteStreamFragmentGraph {
original_table_fragment_id: FragmentId,
downstream_fragments: Vec<(DispatchStrategy, Fragment)>,
downstream_actor_location: HashMap<ActorId, WorkerId>,
ddl_type: DdlType,
job_type: StreamingJobType,
) -> MetaResult<Self> {
Self::build_helper(
graph,
Expand All @@ -769,7 +769,7 @@ impl CompleteStreamFragmentGraph {
downstream_fragments,
downstream_actor_location,
}),
ddl_type,
job_type,
)
}

Expand All @@ -778,7 +778,7 @@ impl CompleteStreamFragmentGraph {
mut graph: StreamFragmentGraph,
upstream_ctx: Option<FragmentGraphUpstreamContext>,
downstream_ctx: Option<FragmentGraphDownstreamContext>,
ddl_type: DdlType,
job_type: StreamingJobType,
) -> MetaResult<Self> {
let mut extra_downstreams = HashMap::new();
let mut extra_upstreams = HashMap::new();
Expand All @@ -794,8 +794,8 @@ impl CompleteStreamFragmentGraph {
for (&id, fragment) in &mut graph.fragments {
let uses_shuffled_backfill = fragment.has_shuffled_backfill();
for (&upstream_table_id, output_columns) in &fragment.upstream_table_columns {
let (up_fragment_id, edge) = match ddl_type {
DdlType::Table(TableJobType::SharedCdcSource) => {
let (up_fragment_id, edge) = match job_type {
StreamingJobType::Table(TableJobType::SharedCdcSource) => {
let source_fragment = upstream_root_fragments
.get(&upstream_table_id)
.context("upstream source fragment not found")?;
Expand Down Expand Up @@ -831,7 +831,9 @@ impl CompleteStreamFragmentGraph {

(source_job_id, edge)
}
DdlType::MaterializedView | DdlType::Sink | DdlType::Index => {
StreamingJobType::MaterializedView
| StreamingJobType::Sink
| StreamingJobType::Index => {
// handle MV on MV/Source

// Build the extra edges between the upstream `Materialize` and the downstream `StreamScan`
Expand Down Expand Up @@ -927,8 +929,8 @@ impl CompleteStreamFragmentGraph {
bail!("the upstream fragment should be a MView or Source, got fragment type: {:b}", upstream_fragment.fragment_type_mask)
}
}
DdlType::Source | DdlType::Table(_) => {
bail!("the streaming job shouldn't have an upstream fragment, ddl_type: {:?}", ddl_type)
StreamingJobType::Source | StreamingJobType::Table(_) => {
bail!("the streaming job shouldn't have an upstream fragment, job_type: {:?}", job_type)
}
};

Expand Down
10 changes: 6 additions & 4 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use crate::barrier::{
ReplaceStreamJobPlan, SnapshotBackfillInfo,
};
use crate::error::bail_invalid_parameter;
use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob};
use crate::manager::{
MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
};
use crate::model::{ActorId, FragmentId, StreamJobFragments, TableParallelism};
use crate::stream::SourceManagerRef;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -74,7 +76,7 @@ pub struct CreateStreamingJobContext {

pub create_type: CreateType,

pub ddl_type: DdlType,
pub job_type: StreamingJobType,

/// Context provided for potential replace table, typically used when sinking into a table.
pub replace_table_job_info: Option<(StreamingJob, ReplaceStreamJobContext, StreamJobFragments)>,
Expand Down Expand Up @@ -334,7 +336,7 @@ impl GlobalStreamManager {
upstream_root_actors,
definition,
create_type,
ddl_type,
job_type,
replace_table_job_info,
internal_tables,
snapshot_backfill_info,
Expand Down Expand Up @@ -394,7 +396,7 @@ impl GlobalStreamManager {
definition: definition.to_string(),
streaming_job: streaming_job.clone(),
internal_tables: internal_tables.into_values().collect_vec(),
ddl_type,
job_type,
create_type,
};

Expand Down
Loading