Skip to content

Commit

Permalink
Revert "Revert "provide create job obj in finish_streaming_job""
Browse files Browse the repository at this point in the history
This reverts commit c71f181.
  • Loading branch information
kwannoel committed Jun 28, 2024
1 parent fff3849 commit 290544e
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,10 +635,15 @@ impl CatalogController {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;

let creating_jobs: Vec<(ObjectId, ObjectType)> = streaming_job::Entity::find()
let creating_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
.select_only()
.column(streaming_job::Column::JobId)
.column(object::Column::ObjType)
.columns([
object::Column::Oid,
object::Column::ObjType,
object::Column::SchemaId,
object::Column::DatabaseId,
])
.join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
.filter(
streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
Expand All @@ -647,13 +652,13 @@ impl CatalogController {
.and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
),
)
.into_tuple()
.into_partial_model()
.all(&txn)
.await?;

let changed = Self::clean_dirty_sink_downstreams(&txn).await?;

if creating_jobs.is_empty() {
if creating_job_objs.is_empty() {
if changed {
txn.commit().await?;
}
Expand All @@ -666,7 +671,9 @@ impl CatalogController {
let mut creating_source_ids = vec![];
let mut creating_sink_ids = vec![];
let mut creating_job_ids = vec![];
for (job_id, job_type) in creating_jobs {
for creating_job_obj in creating_job_objs {
let job_id = creating_job_obj.oid;
let job_type = creating_job_obj.obj_type;
creating_job_ids.push(job_id);
match job_type {
ObjectType::Table | ObjectType::Index => creating_table_ids.push(job_id),
Expand Down

0 comments on commit 290544e

Please sign in to comment.