diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 6c24a6a44e5b4..16228a06d0a9a 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -47,8 +47,8 @@ use risingwave_pb::stream_plan::{ use sea_orm::sea_query::Expr; use sea_orm::ActiveValue::Set; use sea_orm::{ - ColumnTrait, EntityTrait, JoinType, ModelTrait, PaginatorTrait, QueryFilter, QuerySelect, - RelationTrait, TransactionTrait, Value, + ColumnTrait, DbErr, EntityTrait, JoinType, ModelTrait, PaginatorTrait, QueryFilter, + QuerySelect, RelationTrait, TransactionTrait, Value, }; use crate::controller::catalog::{CatalogController, CatalogControllerInner}; @@ -1099,19 +1099,7 @@ impl CatalogController { let txn = inner.db.begin().await?; for assignments in split_assignment.values() { for (actor_id, splits) in assignments { - let actor_splits: Option = Actor::find_by_id(*actor_id as ActorId) - .select_only() - .column(actor::Column::Splits) - .into_tuple() - .one(&txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("actor_id", actor_id))?; - - let mut actor_splits = actor_splits - .map(|splits| splits.to_protobuf().splits) - .unwrap_or_default(); - actor_splits.extend(splits.iter().map(Into::into)); - + let actor_splits = splits.iter().map(Into::into).collect_vec(); Actor::update(actor::ActiveModel { actor_id: Set(*actor_id as _), splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits { @@ -1120,7 +1108,14 @@ impl CatalogController { ..Default::default() }) .exec(&txn) - .await?; + .await + .map_err(|err| { + if err == DbErr::RecordNotUpdated { + MetaError::catalog_id_not_found("actor_id", actor_id) + } else { + err.into() + } + })?; } } txn.commit().await?;