Skip to content

Commit

Permalink
fix: filter out creating associated source catalog in notification sn…
Browse files Browse the repository at this point in the history
…apshot (#18874)
  • Loading branch information
yezizp2012 authored Oct 11, 2024
1 parent 9ef40ba commit 7bdc37e
Showing 1 changed file with 22 additions and 2 deletions.
24 changes: 22 additions & 2 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3251,7 +3251,7 @@ impl CatalogControllerInner {

/// `list_sources` return all sources and `CREATED` ones if contains any streaming jobs.
async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
let source_objs = Source::find()
let mut source_objs = Source::find()
.find_also_related(Object)
.join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
.filter(
Expand All @@ -3261,7 +3261,27 @@ impl CatalogControllerInner {
)
.all(&self.db)
.await?;
// TODO: filter out inner connector source that are still under creating.

// filter out inner connector sources that are still under creating.
let created_table_ids: HashSet<TableId> = Table::find()
.select_only()
.column(table::Column::TableId)
.join(JoinType::InnerJoin, table::Relation::Object1.def())
.join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
.filter(
table::Column::OptionalAssociatedSourceId
.is_not_null()
.and(streaming_job::Column::JobStatus.eq(JobStatus::Created)),
)
.into_tuple()
.all(&self.db)
.await?
.into_iter()
.collect();
source_objs.retain_mut(|(source, _)| {
source.optional_associated_table_id.is_none()
|| created_table_ids.contains(&source.optional_associated_table_id.unwrap())
});

Ok(source_objs
.into_iter()
Expand Down

0 comments on commit 7bdc37e

Please sign in to comment.