Skip to content

Commit

Permalink
Revert "provide create job obj in finish_streaming_job"
Browse files Browse the repository at this point in the history
This reverts commit 99427d2.
  • Loading branch information
kwannoel committed Jun 28, 2024
1 parent 339f089 commit c71f181
Showing 1 changed file with 5 additions and 12 deletions.
17 changes: 5 additions & 12 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,15 +635,10 @@ impl CatalogController {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;

let creating_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
let creating_jobs: Vec<(ObjectId, ObjectType)> = streaming_job::Entity::find()
.select_only()
.column(streaming_job::Column::JobId)
.columns([
object::Column::Oid,
object::Column::ObjType,
object::Column::SchemaId,
object::Column::DatabaseId,
])
.column(object::Column::ObjType)
.join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
.filter(
streaming_job::Column::JobStatus.eq(JobStatus::Initial).or(
Expand All @@ -652,13 +647,13 @@ impl CatalogController {
.and(streaming_job::Column::CreateType.eq(CreateType::Foreground)),
),
)
.into_partial_model()
.into_tuple()
.all(&txn)
.await?;

let changed = Self::clean_dirty_sink_downstreams(&txn).await?;

if creating_job_objs.is_empty() {
if creating_jobs.is_empty() {
if changed {
txn.commit().await?;
}
Expand All @@ -671,9 +666,7 @@ impl CatalogController {
let mut creating_source_ids = vec![];
let mut creating_sink_ids = vec![];
let mut creating_job_ids = vec![];
for creating_job_obj in creating_job_objs {
let job_id = creating_job_obj.oid;
let job_type = creating_job_obj.obj_type;
for (job_id, job_type) in creating_jobs {
creating_job_ids.push(job_id);
match job_type {
ObjectType::Table | ObjectType::Index => creating_table_ids.push(job_id),
Expand Down

0 comments on commit c71f181

Please sign in to comment.