Skip to content

Commit

Permalink
big change
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky committed Dec 7, 2023
1 parent a9501cf commit b160d0b
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 155 deletions.
10 changes: 9 additions & 1 deletion src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,15 @@ impl DdlService for DdlServiceImpl {
self.validate_connection(connection_id).await?;
}

let mut stream_job = StreamingJob::Sink(sink);
let mut stream_job = match &affected_table_change {
None => StreamingJob::Sink(sink, None),
Some(change) => {
let table = change.table.clone().unwrap();
let source = change.source.clone();
StreamingJob::Sink(sink, Some((table, source)))
}
};

let id = self.gen_unique_id::<{ IdCategory::Table }>().await?;

stream_job.set_id(id);
Expand Down
27 changes: 22 additions & 5 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use risingwave_common::catalog::{
use risingwave_common::{bail, ensure};
use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, TableType};
use risingwave_pb::catalog::{
Comment, Connection, CreateType, Database, Function, Index, PbStreamJobStatus, Schema, Sink,
Source, StreamJobStatus, Table, View,
Comment, Connection, CreateType, Database, Function, Index, PbSource, PbStreamJobStatus,
Schema, Sink, Source, StreamJobStatus, Table, View,
};
use risingwave_pb::ddl_service::{alter_owner_request, alter_set_schema_request};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
Expand Down Expand Up @@ -646,7 +646,7 @@ impl CatalogManager {
self.start_create_table_procedure(table, internal_tables)
.await
}
StreamingJob::Sink(sink) => self.start_create_sink_procedure(sink).await,
StreamingJob::Sink(sink, _) => self.start_create_sink_procedure(sink).await,
StreamingJob::Index(index, index_table) => {
self.start_create_index_procedure(index, index_table).await
}
Expand Down Expand Up @@ -2811,7 +2811,11 @@ impl CatalogManager {
Ok(version)
}

pub async fn cancel_create_sink_procedure(&self, sink: &Sink) {
pub async fn cancel_create_sink_procedure(
&self,
sink: &Sink,
target_table: &Option<(Table, Option<PbSource>)>,
) {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
let user_core = &mut core.user;
Expand All @@ -2828,6 +2832,10 @@ impl CatalogManager {
}
user_core.decrease_ref(sink.owner);
refcnt_dec_connection(database_core, sink.connection_id);

if let Some((table, source)) = target_table {
Self::cancel_replace_table_procedure_inner(source, table, core);
}
}

/// This is used for `ALTER TABLE ADD/DROP COLUMN`.
Expand Down Expand Up @@ -2974,6 +2982,16 @@ impl CatalogManager {
unreachable!("unexpected job: {stream_job:?}")
};
let core = &mut *self.core.lock().await;

Self::cancel_replace_table_procedure_inner(source, table, core);
Ok(())
}

fn cancel_replace_table_procedure_inner(
source: &Option<PbSource>,
table: &Table,
core: &mut CatalogManagerCore,
) {
let database_core = &mut core.database;
let key = (table.database_id, table.schema_id, table.name.clone());

Expand All @@ -2999,7 +3017,6 @@ impl CatalogManager {
// TODO: Here we reuse the `creation` tracker for `alter` procedure, as an `alter` must
// occur after it's created. We may need to add a new tracker for `alter` procedure.s
database_core.unmark_creating(&key);
Ok(())
}

pub async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
Expand Down
32 changes: 16 additions & 16 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::model::FragmentId;
#[derive(Debug, Clone)]
pub enum StreamingJob {
MaterializedView(Table),
Sink(Sink),
Sink(Sink, Option<(Table, Option<PbSource>)>),
Table(Option<PbSource>, Table, TableJobType),
Index(Index, Table),
Source(PbSource),
Expand Down Expand Up @@ -55,7 +55,7 @@ impl From<&StreamingJob> for DdlType {
fn from(job: &StreamingJob) -> Self {
match job {
StreamingJob::MaterializedView(_) => DdlType::MaterializedView,
StreamingJob::Sink(_) => DdlType::Sink,
StreamingJob::Sink(_, _) => DdlType::Sink,
StreamingJob::Table(_, _, _) => DdlType::Table,
StreamingJob::Index(_, _) => DdlType::Index,
StreamingJob::Source(_) => DdlType::Source,
Expand All @@ -68,7 +68,7 @@ impl StreamingJob {
let created_at_epoch = Some(Epoch::now().0);
match self {
StreamingJob::MaterializedView(table) => table.created_at_epoch = created_at_epoch,
StreamingJob::Sink(table) => table.created_at_epoch = created_at_epoch,
StreamingJob::Sink(table, _) => table.created_at_epoch = created_at_epoch,
StreamingJob::Table(source, table, ..) => {
table.created_at_epoch = created_at_epoch;
if let Some(source) = source {
Expand All @@ -90,7 +90,7 @@ impl StreamingJob {
StreamingJob::MaterializedView(table) => {
table.initialized_at_epoch = initialized_at_epoch
}
StreamingJob::Sink(table) => table.initialized_at_epoch = initialized_at_epoch,
StreamingJob::Sink(table, _) => table.initialized_at_epoch = initialized_at_epoch,
StreamingJob::Table(source, table, ..) => {
table.initialized_at_epoch = initialized_at_epoch;
if let Some(source) = source {
Expand All @@ -111,7 +111,7 @@ impl StreamingJob {
pub fn set_id(&mut self, id: u32) {
match self {
Self::MaterializedView(table) => table.id = id,
Self::Sink(sink) => sink.id = id,
Self::Sink(sink, _) => sink.id = id,
Self::Table(_, table, ..) => table.id = id,
Self::Index(index, index_table) => {
index.id = id;
Expand All @@ -132,7 +132,7 @@ impl StreamingJob {
Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
table.fragment_id = id;
}
Self::Sink(_) | Self::Source(_) => {}
Self::Sink(_, _) | Self::Source(_) => {}
}
}

Expand All @@ -142,15 +142,15 @@ impl StreamingJob {
Self::Table(_, table, ..) => {
table.dml_fragment_id = id;
}
Self::MaterializedView(_) | Self::Index(_, _) | Self::Sink(_) => {}
Self::MaterializedView(_) | Self::Index(_, _) | Self::Sink(_, _) => {}
Self::Source(_) => {}
}
}

pub fn id(&self) -> u32 {
match self {
Self::MaterializedView(table) => table.id,
Self::Sink(sink) => sink.id,
Self::Sink(sink, _) => sink.id,
Self::Table(_, table, ..) => table.id,
Self::Index(index, _) => index.id,
Self::Source(source) => source.id,
Expand All @@ -160,7 +160,7 @@ impl StreamingJob {
pub fn mv_table(&self) -> Option<u32> {
match self {
Self::MaterializedView(table) => Some(table.id),
Self::Sink(_sink) => None,
Self::Sink(_, _) => None,
Self::Table(_, table, ..) => Some(table.id),
Self::Index(_, table) => Some(table.id),
Self::Source(_) => None,
Expand All @@ -173,14 +173,14 @@ impl StreamingJob {
Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
Some(table)
}
Self::Sink(_) | Self::Source(_) => None,
Self::Sink(_, _) | Self::Source(_) => None,
}
}

pub fn schema_id(&self) -> u32 {
match self {
Self::MaterializedView(table) => table.schema_id,
Self::Sink(sink) => sink.schema_id,
Self::Sink(sink, _) => sink.schema_id,
Self::Table(_, table, ..) => table.schema_id,
Self::Index(index, _) => index.schema_id,
Self::Source(source) => source.schema_id,
Expand All @@ -190,7 +190,7 @@ impl StreamingJob {
pub fn database_id(&self) -> u32 {
match self {
Self::MaterializedView(table) => table.database_id,
Self::Sink(sink) => sink.database_id,
Self::Sink(sink, _) => sink.database_id,
Self::Table(_, table, ..) => table.database_id,
Self::Index(index, _) => index.database_id,
Self::Source(source) => source.database_id,
Expand All @@ -200,7 +200,7 @@ impl StreamingJob {
pub fn name(&self) -> String {
match self {
Self::MaterializedView(table) => table.name.clone(),
Self::Sink(sink) => sink.name.clone(),
Self::Sink(sink, _) => sink.name.clone(),
Self::Table(_, table, ..) => table.name.clone(),
Self::Index(index, _) => index.name.clone(),
Self::Source(source) => source.name.clone(),
Expand All @@ -210,7 +210,7 @@ impl StreamingJob {
pub fn owner(&self) -> u32 {
match self {
StreamingJob::MaterializedView(mv) => mv.owner,
StreamingJob::Sink(sink) => sink.owner,
StreamingJob::Sink(sink, _) => sink.owner,
StreamingJob::Table(_, table, ..) => table.owner,
StreamingJob::Index(index, _) => index.owner,
StreamingJob::Source(source) => source.owner,
Expand All @@ -222,15 +222,15 @@ impl StreamingJob {
Self::MaterializedView(table) => table.definition.clone(),
Self::Table(_, table, ..) => table.definition.clone(),
Self::Index(_, table) => table.definition.clone(),
Self::Sink(sink) => sink.definition.clone(),
Self::Sink(sink, _) => sink.definition.clone(),
Self::Source(source) => source.definition.clone(),
}
}

pub fn properties(&self) -> HashMap<String, String> {
match self {
Self::MaterializedView(table) => table.properties.clone(),
Self::Sink(sink) => sink.properties.clone(),
Self::Sink(sink, _) => sink.properties.clone(),
Self::Table(_, table, ..) => table.properties.clone(),
Self::Index(_, index_table) => index_table.properties.clone(),
Self::Source(source) => source.properties.clone(),
Expand Down
Loading

0 comments on commit b160d0b

Please sign in to comment.