Skip to content

Commit

Permalink
refactor(meta): unify DdlType and StreamingJobType
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Dec 2, 2024
1 parent 22b11ef commit dce34e9
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 31 deletions.
4 changes: 2 additions & 2 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::barrier::utils::collect_resp_info;
use crate::barrier::InflightSubscriptionInfo;
use crate::controller::fragment::InflightFragmentInfo;
use crate::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use crate::manager::{DdlType, StreamingJob};
use crate::manager::{StreamingJob, StreamingJobType};
use crate::model::{ActorId, DispatcherId, FragmentId, StreamJobFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};

Expand Down Expand Up @@ -157,7 +157,7 @@ pub struct CreateStreamingJobCommandInfo {
pub dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
pub init_split_assignment: SplitAssignment,
pub definition: String,
pub ddl_type: DdlType,
pub ddl_type: StreamingJobType,
pub create_type: CreateType,
pub streaming_job: StreamingJob,
pub internal_tables: Vec<Table>,
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::barrier::info::BarrierInfo;
use crate::barrier::{
Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceStreamJobPlan,
};
use crate::manager::{DdlType, MetadataManager};
use crate::manager::{MetadataManager, StreamingJobType};
use crate::model::{ActorId, BackfillUpstreamType, StreamJobFragments};
use crate::MetaResult;

Expand Down Expand Up @@ -562,7 +562,7 @@ impl CreateMviewProgressTracker {
upstream_total_key_count,
definition.clone(),
);
if *ddl_type == DdlType::Sink && *create_type == CreateType::Background {
if *ddl_type == StreamingJobType::Sink && *create_type == CreateType::Background {
// We return the original tracking job immediately.
// This is because sink can be decoupled with backfill progress.
// We don't need to wait for sink to finish backfill.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,7 @@ impl CatalogController {
// 4. update catalogs and notify.
let mut relations = vec![];
match job_type {
StreamingJobType::Table => {
StreamingJobType::Table(_) => {
let (table, table_obj) = Table::find_by_id(original_job_id)
.find_also_related(Object)
.one(txn)
Expand Down
23 changes: 11 additions & 12 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table};
use risingwave_pb::ddl_service::TableJobType;
use sea_orm::entity::prelude::*;
use sea_orm::{DatabaseTransaction, QuerySelect};
use strum::{EnumDiscriminants, EnumIs};
use strum::EnumIs;

use super::{
get_referred_connection_ids_from_sink, get_referred_connection_ids_from_source,
Expand All @@ -35,8 +35,7 @@ use crate::{MetaError, MetaResult};

// This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and
// Sink.
#[derive(Debug, Clone, EnumDiscriminants, EnumIs)]
#[strum_discriminants(name(StreamingJobType))]
#[derive(Debug, Clone, EnumIs)]
pub enum StreamingJob {
MaterializedView(Table),
Sink(Sink, Option<(Table, Option<PbSource>)>),
Expand All @@ -46,33 +45,33 @@ pub enum StreamingJob {
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DdlType {
pub enum StreamingJobType {
MaterializedView,
Sink,
Table(TableJobType),
Index,
Source,
}

impl From<&StreamingJob> for DdlType {
impl From<&StreamingJob> for StreamingJobType {
fn from(job: &StreamingJob) -> Self {
match job {
StreamingJob::MaterializedView(_) => DdlType::MaterializedView,
StreamingJob::Sink(_, _) => DdlType::Sink,
StreamingJob::Table(_, _, ty) => DdlType::Table(*ty),
StreamingJob::Index(_, _) => DdlType::Index,
StreamingJob::Source(_) => DdlType::Source,
StreamingJob::MaterializedView(_) => StreamingJobType::MaterializedView,
StreamingJob::Sink(_, _) => StreamingJobType::Sink,
StreamingJob::Table(_, _, ty) => StreamingJobType::Table(*ty),
StreamingJob::Index(_, _) => StreamingJobType::Index,
StreamingJob::Source(_) => StreamingJobType::Source,
}
}
}

#[cfg(test)]
#[allow(clippy::derivable_impls)]
impl Default for DdlType {
impl Default for StreamingJobType {
fn default() -> Self {
// This should not be used by mock services,
// so we can just pick an arbitrary default variant.
DdlType::MaterializedView
StreamingJobType::MaterializedView
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ use crate::controller::cluster::StreamingClusterInfo;
use crate::controller::streaming_job::SinkIntoTableContext;
use crate::error::{bail_invalid_parameter, bail_unavailable};
use crate::manager::{
DdlType, LocalNotification, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob,
IGNORED_NOTIFICATION_VERSION,
LocalNotification, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob,
StreamingJobType, IGNORED_NOTIFICATION_VERSION,
};
use crate::model::{StreamContext, StreamJobFragments, TableParallelism};
use crate::stream::{
Expand Down Expand Up @@ -1753,8 +1753,8 @@ impl DdlController {
.mview_fragment()
.expect("mview fragment not found");

let ddl_type = DdlType::from(stream_job);
let DdlType::Table(table_job_type) = &ddl_type else {
let ddl_type = StreamingJobType::from(stream_job);
let StreamingJobType::Table(table_job_type) = &ddl_type else {
bail!(
"only support replacing table streaming job, ddl_type: {:?}",
ddl_type
Expand Down
18 changes: 10 additions & 8 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use risingwave_pb::stream_plan::{
};

use crate::barrier::SnapshotBackfillInfo;
use crate::manager::{DdlType, MetaSrvEnv, StreamingJob};
use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType};
use crate::model::{ActorId, FragmentId};
use crate::stream::stream_graph::id::{GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen};
use crate::stream::stream_graph::schedule::Distribution;
Expand Down Expand Up @@ -714,7 +714,7 @@ impl CompleteStreamFragmentGraph {
graph: StreamFragmentGraph,
upstream_root_fragments: HashMap<TableId, Fragment>,
existing_actor_location: HashMap<ActorId, WorkerId>,
ddl_type: DdlType,
ddl_type: StreamingJobType,
) -> MetaResult<Self> {
Self::build_helper(
graph,
Expand All @@ -734,7 +734,7 @@ impl CompleteStreamFragmentGraph {
original_table_fragment_id: FragmentId,
downstream_fragments: Vec<(DispatchStrategy, Fragment)>,
existing_actor_location: HashMap<ActorId, WorkerId>,
ddl_type: DdlType,
ddl_type: StreamingJobType,
) -> MetaResult<Self> {
Self::build_helper(
graph,
Expand All @@ -756,7 +756,7 @@ impl CompleteStreamFragmentGraph {
original_table_fragment_id: FragmentId,
downstream_fragments: Vec<(DispatchStrategy, Fragment)>,
downstream_actor_location: HashMap<ActorId, WorkerId>,
ddl_type: DdlType,
ddl_type: StreamingJobType,
) -> MetaResult<Self> {
Self::build_helper(
graph,
Expand All @@ -778,7 +778,7 @@ impl CompleteStreamFragmentGraph {
mut graph: StreamFragmentGraph,
upstream_ctx: Option<FragmentGraphUpstreamContext>,
downstream_ctx: Option<FragmentGraphDownstreamContext>,
ddl_type: DdlType,
ddl_type: StreamingJobType,
) -> MetaResult<Self> {
let mut extra_downstreams = HashMap::new();
let mut extra_upstreams = HashMap::new();
Expand All @@ -795,7 +795,7 @@ impl CompleteStreamFragmentGraph {
let uses_shuffled_backfill = fragment.has_shuffled_backfill();
for (&upstream_table_id, output_columns) in &fragment.upstream_table_columns {
let (up_fragment_id, edge) = match ddl_type {
DdlType::Table(TableJobType::SharedCdcSource) => {
StreamingJobType::Table(TableJobType::SharedCdcSource) => {
let source_fragment = upstream_root_fragments
.get(&upstream_table_id)
.context("upstream source fragment not found")?;
Expand Down Expand Up @@ -831,7 +831,9 @@ impl CompleteStreamFragmentGraph {

(source_job_id, edge)
}
DdlType::MaterializedView | DdlType::Sink | DdlType::Index => {
StreamingJobType::MaterializedView
| StreamingJobType::Sink
| StreamingJobType::Index => {
// handle MV on MV/Source

// Build the extra edges between the upstream `Materialize` and the downstream `StreamScan`
Expand Down Expand Up @@ -927,7 +929,7 @@ impl CompleteStreamFragmentGraph {
bail!("the upstream fragment should be a MView or Source, got fragment type: {:b}", upstream_fragment.fragment_type_mask)
}
}
DdlType::Source | DdlType::Table(_) => {
StreamingJobType::Source | StreamingJobType::Table(_) => {
bail!("the streaming job shouldn't have an upstream fragment, ddl_type: {:?}", ddl_type)
}
};
Expand Down
6 changes: 4 additions & 2 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use crate::barrier::{
ReplaceStreamJobPlan, SnapshotBackfillInfo,
};
use crate::error::bail_invalid_parameter;
use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob};
use crate::manager::{
MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob, StreamingJobType,
};
use crate::model::{ActorId, FragmentId, StreamJobFragments, TableParallelism};
use crate::stream::SourceManagerRef;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -74,7 +76,7 @@ pub struct CreateStreamingJobContext {

pub create_type: CreateType,

pub ddl_type: DdlType,
pub ddl_type: StreamingJobType,

/// Context provided for potential replace table, typically used when sinking into a table.
pub replace_table_job_info: Option<(StreamingJob, ReplaceStreamJobContext, StreamJobFragments)>,
Expand Down

0 comments on commit dce34e9

Please sign in to comment.