Skip to content

Commit

Permalink
minor fix
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Nov 26, 2024
1 parent a97a3aa commit 64290ea
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 14 deletions.
39 changes: 25 additions & 14 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ use crate::controller::utils::{
get_internal_tables_by_id, rebuild_fragment_mapping_from_actors, PartialObject,
};
use crate::controller::ObjectModel;
use crate::manager::{NotificationVersion, StreamingJob};
use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
use crate::model::{StreamContext, StreamJobFragments, TableParallelism};
use crate::stream::{self, SplitAssignment};
use crate::stream::SplitAssignment;
use crate::{MetaError, MetaResult};

impl CatalogController {
Expand Down Expand Up @@ -1015,10 +1015,11 @@ impl CatalogController {
streaming_job: StreamingJob,
) -> MetaResult<(Vec<Relation>, Vec<PbFragmentWorkerSlotMapping>)> {
let original_job_id = streaming_job.id() as ObjectId;
let job_type = streaming_job.job_type();

match streaming_job {
StreamingJob::Table(_source, table, _table_job_type) => {
// The source catalog should be remain unchanged
// The source catalog should remain unchanged

let original_table_catalogs = Table::find_by_id(original_job_id)
.select_only()
Expand All @@ -1038,7 +1039,7 @@ impl CatalogController {
.update(txn)
.await?;
}

// Update the table catalog with the new one. (column catalog is also updated here)
let mut table = table::ActiveModel::from(table);
let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone();
if let Some(sink_id) = creating_sink_id {
Expand All @@ -1052,10 +1053,13 @@ impl CatalogController {
}

table.incoming_sinks = Set(incoming_sinks.into());
let table = table.update(txn).await?;
table.update(txn).await?;
}
// TODO: support other streaming jobs
_ => unreachable!("invalid streaming job type: {:?}", streaming_job.job_type()),
_ => unreachable!(
"invalid streaming job type: {:?}",
streaming_job.job_type_str()
),
}

// 0. update internal tables
Expand Down Expand Up @@ -1196,14 +1200,21 @@ impl CatalogController {

// 4. update catalogs and notify.
let mut relations = vec![];
let table_obj = table
.find_related(Object)
.one(txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("object", table.table_id))?;
relations.push(PbRelation {
relation_info: Some(PbRelationInfo::Table(ObjectModel(table, table_obj).into())),
});
match job_type {
StreamingJobType::Table => {
let (table, table_obj) = Table::find_by_id(original_job_id)
.find_also_related(Object)
.one(txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
relations.push(PbRelation {
relation_info: Some(PbRelationInfo::Table(
ObjectModel(table, table_obj.unwrap()).into(),
)),
})
}
_ => unreachable!("invalid streaming job type: {:?}", job_type),
}
if let Some(table_col_index_mapping) = table_col_index_mapping {
let expr_rewriter = ReplaceTableExprRewriter {
table_col_index_mapping,
Expand Down
5 changes: 5 additions & 0 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::{MetaError, MetaResult};
// This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and
// Sink.
#[derive(Debug, Clone, EnumDiscriminants, EnumIs)]
#[strum_discriminants(name(StreamingJobType))]
pub enum StreamingJob {
MaterializedView(Table),
Sink(Sink, Option<(Table, Option<PbSource>)>),
Expand Down Expand Up @@ -260,6 +261,10 @@ impl StreamingJob {
}
}

pub fn job_type(&self) -> StreamingJobType {
self.into()
}

pub fn job_type_str(&self) -> &'static str {
match self {
StreamingJob::MaterializedView(_) => "materialized view",
Expand Down

0 comments on commit 64290ea

Please sign in to comment.