Skip to content

Commit

Permalink
fix: reject concurrent altering table column
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Jun 4, 2024
1 parent db27ab9 commit 925192c
Showing 1 changed file with 22 additions and 2 deletions.
24 changes: 22 additions & 2 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,12 +541,32 @@ impl CatalogController {
return Err(MetaError::permission_denied("table version is stale"));
}

// 2. check concurrent replace.
let replacing_cnt = ObjectDependency::find()
.join(
JoinType::InnerJoin,
object_dependency::Relation::Object1.def(),
)
.join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
.filter(
object_dependency::Column::Oid
.eq(id as ObjectId)
.and(object::Column::ObjType.eq(ObjectType::Table))
.and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
)
.count(&txn)
.await?;
if replacing_cnt != 0 {
assert_eq!(replacing_cnt, 1);
return Err(MetaError::permission_denied("table is being altered"));
}

let parallelism = match specified_parallelism {
None => StreamingParallelism::Adaptive,
Some(n) => StreamingParallelism::Fixed(n.get() as _),
};

// 2. create streaming object for new replace table.
// 3. create streaming object for new replace table.
let obj_id = Self::create_streaming_job_obj(
&txn,
ObjectType::Table,
Expand All @@ -559,7 +579,7 @@ impl CatalogController {
)
.await?;

// 3. record dependency for new replace table.
// 4. record dependency for new replace table.
ObjectDependency::insert(object_dependency::ActiveModel {
oid: Set(id as _),
used_by: Set(obj_id as _),
Expand Down

0 comments on commit 925192c

Please sign in to comment.