Skip to content

Commit

Permalink
no need to manually set 1 for singleton
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Oct 25, 2024
1 parent a191add commit e804afd
Showing 1 changed file with 21 additions and 31 deletions.
52 changes: 21 additions & 31 deletions src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::stream_plan::FragmentTypeFlag;

use crate::catalog::system_catalog::SysCatalogReaderImpl;
Expand Down Expand Up @@ -53,36 +52,27 @@ async fn read_rw_fragment(reader: &SysCatalogReaderImpl) -> Result<Vec<RwFragmen

Ok(distributions
.into_iter()
.map(|distribution| {
let distribution_type = distribution.distribution_type();
let max_parallelism = match distribution_type {
FragmentDistributionType::Single => 1,
FragmentDistributionType::Hash => distribution.vnode_count as i32,
FragmentDistributionType::Unspecified => unreachable!(),
};

RwFragment {
fragment_id: distribution.fragment_id as i32,
table_id: distribution.table_id as i32,
distribution_type: distribution.distribution_type().as_str_name().into(),
state_table_ids: distribution
.state_table_ids
.into_iter()
.map(|id| id as i32)
.collect(),
upstream_fragment_ids: distribution
.upstream_fragment_ids
.into_iter()
.map(|id| id as i32)
.collect(),
flags: extract_fragment_type_flag(distribution.fragment_type_mask)
.into_iter()
.flat_map(|t| t.as_str_name().strip_prefix("FRAGMENT_TYPE_FLAG_"))
.map(|s| s.into())
.collect(),
parallelism: distribution.parallelism as i32,
max_parallelism,
}
.map(|distribution| RwFragment {
fragment_id: distribution.fragment_id as i32,
table_id: distribution.table_id as i32,
distribution_type: distribution.distribution_type().as_str_name().into(),
state_table_ids: distribution
.state_table_ids
.into_iter()
.map(|id| id as i32)
.collect(),
upstream_fragment_ids: distribution
.upstream_fragment_ids
.into_iter()
.map(|id| id as i32)
.collect(),
flags: extract_fragment_type_flag(distribution.fragment_type_mask)
.into_iter()
.flat_map(|t| t.as_str_name().strip_prefix("FRAGMENT_TYPE_FLAG_"))
.map(|s| s.into())
.collect(),
parallelism: distribution.parallelism as i32,
max_parallelism: distribution.vnode_count as i32,
})
.collect())
}
Expand Down

0 comments on commit e804afd

Please sign in to comment.