From 1f151cd4a8ae51c3c405662f491e06d0251996a0 Mon Sep 17 00:00:00 2001 From: August Date: Tue, 17 Oct 2023 15:16:13 +0800 Subject: [PATCH] feat: add parallelism column in rw_fragments (#12901) --- proto/meta.proto | 1 + .../src/catalog/system_catalog/rw_catalog/rw_fragments.rs | 2 ++ src/meta/src/rpc/service/stream_service.rs | 1 + 3 files changed, 4 insertions(+) diff --git a/proto/meta.proto b/proto/meta.proto index cad0b97f6d2be..f2375eed7653a 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -221,6 +221,7 @@ message ListFragmentDistributionResponse { repeated uint32 state_table_ids = 4; repeated uint32 upstream_fragment_ids = 5; uint32 fragment_type_mask = 6; + uint32 parallelism = 7; } repeated FragmentDistribution distributions = 1; } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs index 1f31948139135..a4eda730d8941 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs @@ -35,6 +35,7 @@ pub static RW_FRAGMENTS_COLUMNS: LazyLock>> = La "upstream_fragment_ids", ), (DataType::List(Box::new(DataType::Varchar)), "flags"), + (DataType::Int32, "parallelism"), ] }); @@ -93,6 +94,7 @@ impl SysCatalogReaderImpl { .map(|t| Some(ScalarImpl::Utf8(t.into()))) .collect_vec(), ))), + Some(ScalarImpl::Int32(distribution.parallelism as i32)), ]) }) .collect_vec()) diff --git a/src/meta/src/rpc/service/stream_service.rs b/src/meta/src/rpc/service/stream_service.rs index b2ed1ec916b08..ef232d9b04ffd 100644 --- a/src/meta/src/rpc/service/stream_service.rs +++ b/src/meta/src/rpc/service/stream_service.rs @@ -207,6 +207,7 @@ impl StreamManagerService for StreamServiceImpl { state_table_ids: fragment.state_table_ids.clone(), upstream_fragment_ids: fragment.upstream_fragment_ids.clone(), fragment_type_mask: fragment.fragment_type_mask, + parallelism: fragment.actors.len() as _, } }) })