Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Apr 7, 2024
1 parent c525e25 commit 5ac5d67
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 15 deletions.
16 changes: 2 additions & 14 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,7 @@ impl CatalogController {
pub async fn get_running_actors_and_upstream_of_fragment(
&self,
fragment_id: FragmentId,
) -> MetaResult<Vec<(ActorId, Vec<ActorId>)>> {
) -> MetaResult<Vec<(ActorId, ActorUpstreamActors)>> {
let inner = self.inner.read().await;
let actors: Vec<(ActorId, ActorUpstreamActors)> = Actor::find()
.select_only()
Expand All @@ -1105,19 +1105,7 @@ impl CatalogController {
.into_tuple()
.all(&inner.db)
.await?;
Ok(actors
.into_iter()
.map(|(id, upstream_ids)| {
(
id,
upstream_ids
.into_inner()
.into_iter()
.flat_map(|(_, ids)| ids.into_iter())
.collect(),
)
})
.collect())
Ok(actors)
}

pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
Expand Down
6 changes: 5 additions & 1 deletion src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,11 @@ impl MetadataManager {
.map(|(id, actors)| {
(
id as ActorId,
actors.into_iter().map(|id| id as ActorId).collect(),
actors
.into_inner()
.into_iter()
.flat_map(|(_, ids)| ids.into_iter().map(|id| id as ActorId))
.collect(),
)
})
.collect())
Expand Down

0 comments on commit 5ac5d67

Please sign in to comment.