Skip to content

Commit

Permalink
fix: avoid panic when auto scaling hit the maximum number of bind par…
Browse files Browse the repository at this point in the history
…ameters in sql backend (#16750)
  • Loading branch information
yezizp2012 authored May 14, 2024
1 parent 44f990b commit 0971e55
Showing 1 changed file with 10 additions and 16 deletions.
26 changes: 10 additions & 16 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1253,10 +1253,6 @@ impl CatalogController {
.exec(&txn)
.await?;

// newly created actor
let mut new_actors = vec![];
let mut new_actor_dispatchers = vec![];

for (
PbStreamActor {
actor_id,
Expand All @@ -1276,6 +1272,7 @@ impl CatalogController {
) in newly_created_actors
{
let mut actor_upstreams = BTreeMap::<FragmentId, BTreeSet<ActorId>>::new();
let mut new_actor_dispatchers = vec![];

if let Some(nodes) = &mut nodes {
visit_stream_node(nodes, |node| {
Expand Down Expand Up @@ -1314,7 +1311,7 @@ impl CatalogController {
.get(&actor_id)
.map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec());

new_actors.push(actor::ActiveModel {
Actor::insert(actor::ActiveModel {
actor_id: Set(actor_id as _),
fragment_id: Set(fragment_id as _),
status: Set(ActorStatus::Running),
Expand All @@ -1324,7 +1321,9 @@ impl CatalogController {
upstream_actor_ids: Set(actor_upstreams),
vnode_bitmap: Set(vnode_bitmap.as_ref().map(|bitmap| bitmap.into())),
expr_context: Set(expr_context.as_ref().unwrap().into()),
});
})
.exec(&txn)
.await?;

for PbDispatcher {
r#type: dispatcher_type,
Expand All @@ -1348,16 +1347,11 @@ impl CatalogController {
downstream_actor_ids: Set(downstream_actor_id.into()),
})
}
}

if !new_actors.is_empty() {
Actor::insert_many(new_actors).exec(&txn).await?;
}

if !new_actor_dispatchers.is_empty() {
ActorDispatcher::insert_many(new_actor_dispatchers)
.exec(&txn)
.await?;
if !new_actor_dispatchers.is_empty() {
ActorDispatcher::insert_many(new_actor_dispatchers)
.exec(&txn)
.await?;
}
}

// actor update
Expand Down

0 comments on commit 0971e55

Please sign in to comment.