Skip to content

Commit

Permalink
temp revert scale service change
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Oct 24, 2024
1 parent 4452f84 commit 4dc0998
Showing 1 changed file with 41 additions and 47 deletions.
88 changes: 41 additions & 47 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,54 +118,48 @@ impl ScaleService for ScaleServiceImpl {
} = request.into_inner();

let _reschedule_job_lock = self.stream_manager.reschedule_lock_write_guard().await;
for (database_id, worker_reschedules) in self

let streaming_job_ids = self
.metadata_manager
.split_fragment_map_by_database(worker_reschedules)
.await?
{
let database_id = DatabaseId::new(database_id as _);
let streaming_job_ids = self
.metadata_manager
.catalog_controller
.get_fragment_job_id(
worker_reschedules
.keys()
.map(|id| *id as FragmentId)
.collect(),
)
.await?;

let table_parallelisms = streaming_job_ids
.into_iter()
.map(|id| (TableId::new(id as _), TableParallelism::Custom))
.collect();

self.stream_manager
.reschedule_actors(
database_id,
worker_reschedules
.into_iter()
.map(|(fragment_id, reschedule)| {
let PbWorkerReschedule { worker_actor_diff } = reschedule;
(
fragment_id,
WorkerReschedule {
worker_actor_diff: worker_actor_diff
.into_iter()
.map(|(worker_id, diff)| (worker_id as _, diff as _))
.collect(),
},
)
})
.collect(),
RescheduleOptions {
resolve_no_shuffle_upstream,
skip_create_new_actors: false,
},
Some(table_parallelisms),
)
.await?;
}
.catalog_controller
.get_fragment_job_id(
worker_reschedules
.keys()
.map(|id| *id as FragmentId)
.collect(),
)
.await?;

let table_parallelisms = streaming_job_ids
.into_iter()
.map(|id| (TableId::new(id as _), TableParallelism::Custom))
.collect();

self.stream_manager
.reschedule_actors(
DatabaseId::new(0),
worker_reschedules
.into_iter()
.map(|(fragment_id, reschedule)| {
let PbWorkerReschedule { worker_actor_diff } = reschedule;
(
fragment_id,
WorkerReschedule {
worker_actor_diff: worker_actor_diff
.into_iter()
.map(|(worker_id, diff)| (worker_id as _, diff as _))
.collect(),
},
)
})
.collect(),
RescheduleOptions {
resolve_no_shuffle_upstream,
skip_create_new_actors: false,
},
Some(table_parallelisms),
)
.await?;

Ok(Response::new(RescheduleResponse {
success: true,
Expand Down

0 comments on commit 4dc0998

Please sign in to comment.