Skip to content

Commit

Permalink
Update stream scaling logic & logging
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Jan 29, 2024
1 parent 4abe76c commit 8575cc9
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 14 deletions.
3 changes: 3 additions & 0 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2435,12 +2435,15 @@ impl GlobalStreamManager {
.await;
}));

tracing::debug!("pausing tick lock in source manager");
let _source_pause_guard = self.source_manager.paused.lock().await;

self.barrier_scheduler
.run_config_change_command_with_pause(command)
.await?;

tracing::info!("reschedule done");

Ok(())
}

Expand Down
52 changes: 38 additions & 14 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,27 +755,51 @@ impl GlobalStreamManager {
.map(|node| node.id)
.collect::<BTreeSet<_>>();

let reschedules = if deferred {
HashMap::new()
} else {
let table_parallelism_assignment = HashMap::from([(TableId::new(table_id), parallelism)]);

if deferred {
tracing::debug!(
"deferred mode enabled for job {}, set the parallelism directly to {:?}",
table_id,
parallelism
);
self.scale_controller
.as_ref()
.unwrap()
.post_apply_reschedule(&HashMap::new(), &table_parallelism_assignment)
.await?;
} else {
let reschedules = self
.scale_controller
.as_ref()
.unwrap()
.generate_table_resize_plan(TableResizePolicy {
worker_ids,
table_parallelisms: vec![(table_id, parallelism)].into_iter().collect(),
table_parallelisms: table_parallelism_assignment
.iter()
.map(|(id, parallelism)| (id.table_id, *parallelism))
.collect(),
})
.await?
};
.await?;

self.reschedule_actors(
reschedules,
RescheduleOptions {
resolve_no_shuffle_upstream: false,
},
Some(HashMap::from([(TableId::new(table_id), parallelism)])),
)
.await?;
if reschedules.is_empty() {
tracing::debug!("empty reschedule plan generated for job {}, set the parallelism directly to {:?}", table_id, parallelism);
self.scale_controller
.as_ref()
.unwrap()
.post_apply_reschedule(&HashMap::new(), &table_parallelism_assignment)
.await?;
} else {
self.reschedule_actors(
reschedules,
RescheduleOptions {
resolve_no_shuffle_upstream: false,
},
Some(table_parallelism_assignment),
)
.await?;
}
};

Ok(())
}
Expand Down

0 comments on commit 8575cc9

Please sign in to comment.