Skip to content

Commit

Permalink
feat: add parallelism column in rw_fragments (#12901)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Oct 17, 2023
1 parent e713fb9 commit 1f151cd
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 0 deletions.
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ message ListFragmentDistributionResponse {
repeated uint32 state_table_ids = 4;
repeated uint32 upstream_fragment_ids = 5;
uint32 fragment_type_mask = 6;
uint32 parallelism = 7;
}
repeated FragmentDistribution distributions = 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub static RW_FRAGMENTS_COLUMNS: LazyLock<Vec<SystemCatalogColumnsDef<'_>>> = La
"upstream_fragment_ids",
),
(DataType::List(Box::new(DataType::Varchar)), "flags"),
(DataType::Int32, "parallelism"),
]
});

Expand Down Expand Up @@ -93,6 +94,7 @@ impl SysCatalogReaderImpl {
.map(|t| Some(ScalarImpl::Utf8(t.into())))
.collect_vec(),
))),
Some(ScalarImpl::Int32(distribution.parallelism as i32)),
])
})
.collect_vec())
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl StreamManagerService for StreamServiceImpl {
state_table_ids: fragment.state_table_ids.clone(),
upstream_fragment_ids: fragment.upstream_fragment_ids.clone(),
fragment_type_mask: fragment.fragment_type_mask,
parallelism: fragment.actors.len() as _,
}
})
})
Expand Down

0 comments on commit 1f151cd

Please sign in to comment.