Skip to content

Commit

Permalink
add ddl type
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Nov 27, 2023
1 parent c247597 commit eb9b821
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 2 deletions.
9 changes: 9 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ enum StreamJobStatus {
STREAM_JOB_STATUS_CREATED = 2;
}

enum DdlType {
DDL_TYPE_UNSPECIFIED = 0;
DDL_TYPE_TABLE = 1;
DDL_TYPE_SINK = 2;
DDL_TYPE_INDEX = 3;
DDL_TYPE_SOURCE = 4;
DDL_TYPE_MATERIALIZED_VIEW = 5;
}

// How the stream job was created will determine
// whether they are persisted.
enum CreateType {
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorMapping;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::catalog::DdlType;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
use risingwave_pb::stream_plan::barrier::{BarrierKind, Mutation};
Expand Down Expand Up @@ -117,6 +118,7 @@ pub enum Command {
dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
init_split_assignment: SplitAssignment,
definition: String,
ddl_type: DdlType,
},
/// `CancelStreamingJob` command generates a `Stop` barrier including the actors of the given
/// table fragment.
Expand Down
14 changes: 13 additions & 1 deletion src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;

use risingwave_common::catalog::TableVersionId;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table};
use risingwave_pb::catalog::{CreateType, DdlType, Index, PbSource, Sink, Table};
use risingwave_pb::ddl_service::TableJobType;

use crate::model::FragmentId;
Expand All @@ -32,6 +32,18 @@ pub enum StreamingJob {
Source(PbSource),
}

impl From<&StreamingJob> for DdlType {
fn from(job: &StreamingJob) -> Self {
match job {
StreamingJob::MaterializedView(_) => DdlType::MaterializedView,
StreamingJob::Sink(_) => DdlType::Sink,
StreamingJob::Table(_, _, _) => DdlType::Table,
StreamingJob::Index(_, _) => DdlType::Index,
StreamingJob::Source(_) => DdlType::Source,
}
}
}

impl StreamingJob {
pub fn mark_created(&mut self) {
let created_at_epoch = Some(Epoch::now().0);
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ impl DdlController {
definition: stream_job.definition(),
mv_table_id: stream_job.mv_table(),
create_type: stream_job.create_type(),
ddl_type: stream_job.into(),
};

// 4. Mark creating tables, including internal tables and the table of the stream job.
Expand Down
6 changes: 5 additions & 1 deletion src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use futures::future::{join_all, try_join_all, BoxFuture};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_pb::catalog::{CreateType, Table};
use risingwave_pb::catalog::{CreateType, DdlType, Table};
use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
use risingwave_pb::stream_plan::Dispatcher;
use risingwave_pb::stream_service::{
Expand Down Expand Up @@ -69,6 +69,8 @@ pub struct CreateStreamingJobContext {
pub mv_table_id: Option<u32>,

pub create_type: CreateType,

pub ddl_type: DdlType,
}

impl CreateStreamingJobContext {
Expand Down Expand Up @@ -440,6 +442,7 @@ impl GlobalStreamManager {
mv_table_id,
internal_tables,
create_type,
ddl_type,
}: CreateStreamingJobContext,
) -> MetaResult<()> {
// Register to compaction group beforehand.
Expand Down Expand Up @@ -483,6 +486,7 @@ impl GlobalStreamManager {
dispatchers,
init_split_assignment,
definition: definition.to_string(),
ddl_type,
})
.await
{
Expand Down

0 comments on commit eb9b821

Please sign in to comment.