From 957f8b0bc6c64ebbb33c28a15e64a825f49c78b7 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Mon, 4 Mar 2024 18:32:59 +0800 Subject: [PATCH] init vnode bitmap in new_local --- .../hummock_test/src/compactor_tests.rs | 4 ++- .../hummock_test/src/failpoint_tests.rs | 4 ++- .../src/hummock_read_version_tests.rs | 9 ++++--- .../src/local_state_store_test_utils.rs | 8 +----- .../hummock_test/src/state_store_tests.rs | 14 +++++++--- src/storage/hummock_trace/src/opts.rs | 4 ++- .../event_handler/hummock_event_handler.rs | 5 ++++ src/storage/src/hummock/event_handler/mod.rs | 3 +++ .../src/hummock/store/hummock_storage.rs | 1 + .../hummock/store/local_hummock_storage.rs | 12 +-------- src/storage/src/hummock/store/version.rs | 21 +++++++++------ src/storage/src/mem_table.rs | 11 +++----- src/storage/src/store.rs | 26 ++++++++++++------- .../common/log_store_impl/kv_log_store/mod.rs | 1 + .../log_store_impl/kv_log_store/writer.rs | 4 +-- .../log_store_impl/subscription_log_store.rs | 4 +-- src/stream/src/common/table/state_table.rs | 21 ++++++++++----- src/stream/src/from_proto/subscription.rs | 22 +++++++++------- 18 files changed, 98 insertions(+), 76 deletions(-) diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 9d4fdac2439fb..732df9d199081 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -147,7 +147,9 @@ pub(crate) mod tests { value_size: usize, epochs: Vec, ) { - let mut local = storage.new_local(Default::default()).await; + let mut local = storage + .new_local(NewLocalOptions::for_test(TableId::default())) + .await; // 1. add sstables let val = b"0"[..].repeat(value_size); local.init_for_test(epochs[0]).await.unwrap(); diff --git a/src/storage/hummock_test/src/failpoint_tests.rs b/src/storage/hummock_test/src/failpoint_tests.rs index 2b2f3da3f15a2..8112e99358d8c 100644 --- a/src/storage/hummock_test/src/failpoint_tests.rs +++ b/src/storage/hummock_test/src/failpoint_tests.rs @@ -61,7 +61,9 @@ async fn test_failpoints_state_store_read_upload() { .await .unwrap(); - let mut local = hummock_storage.new_local(NewLocalOptions::default()).await; + let mut local = hummock_storage + .new_local(NewLocalOptions::for_test(TableId::default())) + .await; let anchor = gen_key_from_str(VirtualNode::ZERO, "aa"); let mut batch1 = vec![ diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index 52cb35388cf0b..ed4f68a289e6e 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -46,7 +46,8 @@ async fn test_read_version_basic() { let mut epoch = 1; let table_id = 0; - let mut read_version = HummockReadVersion::new(TableId::from(table_id), pinned_version); + let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT)); + let mut read_version = HummockReadVersion::new(TableId::from(table_id), pinned_version, vnodes); { // single imm @@ -263,13 +264,13 @@ async fn test_read_filter_basic() { let epoch = 1; let table_id = 0; + let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT)); let read_version = Arc::new(RwLock::new(HummockReadVersion::new( TableId::from(table_id), pinned_version, + vnodes.clone(), ))); - read_version - .write() - .update_vnode_bitmap(Arc::new(Bitmap::ones(VirtualNode::COUNT))); + read_version.write().update_vnode_bitmap(vnodes); { // single imm diff --git a/src/storage/hummock_test/src/local_state_store_test_utils.rs b/src/storage/hummock_test/src/local_state_store_test_utils.rs index d9f2a503c39e3..fed253a0488df 100644 --- a/src/storage/hummock_test/src/local_state_store_test_utils.rs +++ b/src/storage/hummock_test/src/local_state_store_test_utils.rs @@ -13,20 +13,14 @@ // limitations under the License. use std::future::Future; -use std::sync::Arc; -use risingwave_common::buffer::Bitmap; -use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::EpochPair; use risingwave_storage::error::StorageResult; use risingwave_storage::store::{InitOptions, LocalStateStore}; pub trait LocalStateStoreTestExt: LocalStateStore { fn init_for_test(&mut self, epoch: u64) -> impl Future> + Send + '_ { - self.init(InitOptions::new( - EpochPair::new_test_epoch(epoch), - Arc::new(Bitmap::ones(VirtualNode::COUNT)), - )) + self.init(InitOptions::new(EpochPair::new_test_epoch(epoch))) } } impl LocalStateStoreTestExt for T {} diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index a71ea8029639c..f8372c484396a 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use bytes::Bytes; use expect_test::expect; use futures::{pin_mut, StreamExt, TryStreamExt}; +use risingwave_common::buffer::Bitmap; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; @@ -127,7 +128,9 @@ async fn test_basic_inner( // Make sure the batch is sorted. batch3.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); - let mut local = hummock_storage.new_local(Default::default()).await; + let mut local = hummock_storage + .new_local(NewLocalOptions::for_test(TableId::default())) + .await; // epoch 0 is reserved by storage service let epoch1: u64 = 1; @@ -1043,7 +1046,9 @@ async fn test_delete_get_inner( StorageValue::new_put("222"), ), ]; - let mut local = hummock_storage.new_local(NewLocalOptions::default()).await; + let mut local = hummock_storage + .new_local(NewLocalOptions::for_test(Default::default())) + .await; local.init_for_test(epoch1).await.unwrap(); local .ingest_batch( @@ -1126,7 +1131,9 @@ async fn test_multiple_epoch_sync_inner( ), ]; - let mut local = hummock_storage.new_local(NewLocalOptions::default()).await; + let mut local = hummock_storage + .new_local(NewLocalOptions::for_test(TableId::default())) + .await; local.init_for_test(epoch1).await.unwrap(); local .ingest_batch( @@ -1382,6 +1389,7 @@ async fn test_replicated_local_hummock_storage() { TableOption { retention_seconds: None, }, + Arc::new(Bitmap::ones(VirtualNode::COUNT)), )) .await; diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index f5b749c83fb51..72454327c33d0 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -16,6 +16,7 @@ use bincode::{Decode, Encode}; use risingwave_common::buffer::Bitmap; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{TableId, TableOption}; +use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::EpochPair; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::common::PbBuffer; @@ -149,6 +150,7 @@ pub struct TracedNewLocalOptions { pub op_consistency_level: TracedOpConsistencyLevel, pub table_option: TracedTableOption, pub is_replicated: bool, + pub vnodes: TracedBitmap, } #[cfg(test)] @@ -161,6 +163,7 @@ impl TracedNewLocalOptions { retention_seconds: None, }, is_replicated: false, + vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::COUNT)), } } } @@ -224,7 +227,6 @@ impl From for EpochPair { #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)] pub struct TracedInitOptions { pub epoch: TracedEpochPair, - pub vnodes: TracedBitmap, } #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)] diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index f1209775e154a..3cd6cdf9f18ca 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -741,6 +741,7 @@ impl HummockEventHandler { table_id, new_read_version_sender, is_replicated, + vnodes, } => { let pinned_version = self.pinned_version.load(); let basic_read_version = Arc::new(RwLock::new( @@ -748,6 +749,7 @@ impl HummockEventHandler { table_id, (**pinned_version).clone(), is_replicated, + vnodes, ), )); @@ -875,7 +877,9 @@ mod tests { use bytes::Bytes; use futures::FutureExt; use itertools::Itertools; + use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; + use risingwave_common::hash::VirtualNode; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_hummock_sdk::key::TableKey; use risingwave_hummock_sdk::version::HummockVersion; @@ -955,6 +959,7 @@ mod tests { table_id, new_read_version_sender: read_version_tx, is_replicated: false, + vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT)), }) .unwrap(); let (read_version, guard) = read_version_rx.await.unwrap(); diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index 5c2e081c19f72..9bcc54851420d 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use parking_lot::{RwLock, RwLockReadGuard}; +use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::HummockEpoch; use thiserror_ext::AsReport; @@ -88,6 +89,7 @@ pub enum HummockEvent { table_id: TableId, new_read_version_sender: oneshot::Sender<(HummockReadVersionRef, LocalInstanceGuard)>, is_replicated: bool, + vnodes: Arc, }, DestroyReadVersion { @@ -138,6 +140,7 @@ impl HummockEvent { table_id, new_read_version_sender: _, is_replicated, + vnodes: _, } => format!( "RegisterReadVersion table_id {:?}, is_replicated: {:?}", table_id, is_replicated diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 7c6a28582b30c..76b7b56cd04dd 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -368,6 +368,7 @@ impl HummockStorage { table_id: option.table_id, new_read_version_sender: tx, is_replicated: option.is_replicated, + vnodes: option.vnodes.clone(), }) .unwrap(); diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 9dfcc19ee2850..604a242c66754 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -394,15 +394,6 @@ impl LocalStateStore for LocalHummockStorage { "local state store of table id {:?} is init for more than once", self.table_id ); - let prev_vnodes = self - .read_version - .write() - .update_vnode_bitmap(options.vnodes); - assert!( - prev_vnodes.is_none(), - "Vnode bitmap should be empty during init" - ); - Ok(()) } @@ -449,8 +440,7 @@ impl LocalStateStore for LocalHummockStorage { assert!(read_version.staging().is_empty(), "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update", self.table_id(), self.instance_id() ); - let prev_vnodes = read_version.update_vnode_bitmap(vnodes); - prev_vnodes.expect("Previous vnode bitmap should not be none") + read_version.update_vnode_bitmap(vnodes) } } diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 57bdf4a991d53..1be6893144a27 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -219,7 +219,7 @@ pub struct HummockReadVersion { // Vnode bitmap corresponding to the read version // It will be initialized after local state store init - vnodes: Option>, + vnodes: Arc, } impl HummockReadVersion { @@ -227,6 +227,7 @@ impl HummockReadVersion { table_id: TableId, committed_version: CommittedVersion, is_replicated: bool, + vnodes: Arc, ) -> Self { // before build `HummockReadVersion`, we need to get the a initial version which obtained // from meta. want this initialization after version is initialized (now with @@ -246,12 +247,16 @@ impl HummockReadVersion { committed: committed_version, is_replicated, - vnodes: None, + vnodes, } } - pub fn new(table_id: TableId, committed_version: CommittedVersion) -> Self { - Self::new_with_replication_option(table_id, committed_version, false) + pub fn new( + table_id: TableId, + committed_version: CommittedVersion, + vnodes: Arc, + ) -> Self { + Self::new_with_replication_option(table_id, committed_version, false, vnodes) } pub fn table_id(&self) -> TableId { @@ -509,16 +514,16 @@ impl HummockReadVersion { self.is_replicated } - pub fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Option> { - self.vnodes.replace(vnodes) + pub fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { + std::mem::replace(&mut self.vnodes, vnodes) } pub fn contains(&self, vnode: VirtualNode) -> bool { - self.vnodes.as_ref().unwrap().is_set(vnode.to_index()) + self.vnodes.is_set(vnode.to_index()) } pub fn vnodes(&self) -> Arc { - self.vnodes.as_ref().unwrap().clone() + self.vnodes.clone() } } diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 0e94071d492a0..60d794a408191 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -436,7 +436,7 @@ pub struct MemtableLocalStateStore { table_id: TableId, op_consistency_level: OpConsistencyLevel, table_option: TableOption, - vnodes: Option>, + vnodes: Arc, } impl MemtableLocalStateStore { @@ -448,7 +448,7 @@ impl MemtableLocalStateStore { table_id: option.table_id, op_consistency_level: option.op_consistency_level, table_option: option.table_option, - vnodes: None, + vnodes: option.vnodes, } } @@ -601,11 +601,6 @@ impl LocalStateStore for MemtableLocalState "epoch in local state store of table id {:?} is init for more than once", self.table_id ); - assert!( - self.vnodes.replace(options.vnodes).is_none(), - "vnodes in local state store of table id {:?} is init for more than once", - self.table_id - ); Ok(()) } @@ -665,7 +660,7 @@ impl LocalStateStore for MemtableLocalState } fn update_vnode_bitmap(&mut self, vnodes: Arc) -> Arc { - self.vnodes.replace(vnodes).unwrap() + std::mem::replace(&mut self.vnodes, vnodes) } } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index e5b096de47196..96838e1ef25d1 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -25,6 +25,7 @@ use futures_async_stream::try_stream; use prost::Message; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; +use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{Epoch, EpochPair}; use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange}; use risingwave_hummock_sdk::table_watermark::{ @@ -32,8 +33,8 @@ use risingwave_hummock_sdk::table_watermark::{ }; use risingwave_hummock_sdk::{HummockReadEpoch, LocalSstableInfo}; use risingwave_hummock_trace::{ - TracedBitmap, TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, - TracedPrefetchOptions, TracedReadOptions, TracedSealCurrentEpochOptions, TracedWriteOptions, + TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions, + TracedReadOptions, TracedSealCurrentEpochOptions, TracedWriteOptions, }; use crate::error::{StorageError, StorageResult}; @@ -447,7 +448,7 @@ impl OpConsistencyLevel { } } -#[derive(Clone, Default)] +#[derive(Clone)] pub struct NewLocalOptions { pub table_id: TableId, /// Whether the operation is consistent. The term `consistent` requires the following: @@ -463,6 +464,9 @@ pub struct NewLocalOptions { /// Indicate if this is replicated. If it is, we should not /// upload its ReadVersions. pub is_replicated: bool, + + /// The vnode bitmap for the local state store instance + pub vnodes: Arc, } impl From for NewLocalOptions { @@ -477,6 +481,7 @@ impl From for NewLocalOptions { }, table_option: value.table_option.into(), is_replicated: value.is_replicated, + vnodes: Arc::new(value.vnodes.into()), } } } @@ -493,6 +498,7 @@ impl From for TracedNewLocalOptions { }, table_option: value.table_option.into(), is_replicated: value.is_replicated, + vnodes: value.vnodes.as_ref().clone().into(), } } } @@ -502,12 +508,14 @@ impl NewLocalOptions { table_id: TableId, op_consistency_level: OpConsistencyLevel, table_option: TableOption, + vnodes: Arc, ) -> Self { NewLocalOptions { table_id, op_consistency_level, table_option, is_replicated: false, + vnodes, } } @@ -515,12 +523,14 @@ impl NewLocalOptions { table_id: TableId, op_consistency_level: OpConsistencyLevel, table_option: TableOption, + vnodes: Arc, ) -> Self { NewLocalOptions { table_id, op_consistency_level, table_option, is_replicated: true, + vnodes, } } @@ -532,6 +542,7 @@ impl NewLocalOptions { retention_seconds: None, }, is_replicated: false, + vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT)), } } } @@ -539,14 +550,11 @@ impl NewLocalOptions { #[derive(Clone)] pub struct InitOptions { pub epoch: EpochPair, - - /// The vnode bitmap for the local state store instance - pub vnodes: Arc, } impl InitOptions { - pub fn new(epoch: EpochPair, vnodes: Arc) -> Self { - Self { epoch, vnodes } + pub fn new(epoch: EpochPair) -> Self { + Self { epoch } } } @@ -554,7 +562,6 @@ impl From for TracedInitOptions { fn from(value: InitOptions) -> Self { TracedInitOptions { epoch: value.epoch.into(), - vnodes: TracedBitmap::from(value.vnodes.as_ref().clone()), } } } @@ -563,7 +570,6 @@ impl From for InitOptions { fn from(value: TracedInitOptions) -> Self { InitOptions { epoch: value.epoch.into(), - vnodes: Arc::new(Bitmap::from(value.vnodes)), } } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 4accff0f6fe4e..a6a69ae003503 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -365,6 +365,7 @@ impl LogStoreFactory for KvLogStoreFactory { retention_seconds: None, }, is_replicated: false, + vnodes: serde.vnodes().clone(), }) .await; diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index b4f54d17ed1d1..731d8e42126df 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -83,9 +83,7 @@ impl LogWriter for KvLogStoreWriter { epoch: EpochPair, pause_read_on_bootstrap: bool, ) -> LogStoreResult<()> { - self.state_store - .init(InitOptions::new(epoch, self.serde.vnodes().clone())) - .await?; + self.state_store.init(InitOptions::new(epoch)).await?; if pause_read_on_bootstrap { self.pause()?; info!("KvLogStore of {} paused on bootstrap", self.identity); diff --git a/src/stream/src/common/log_store_impl/subscription_log_store.rs b/src/stream/src/common/log_store_impl/subscription_log_store.rs index 999400771a30c..39ada926826d7 100644 --- a/src/stream/src/common/log_store_impl/subscription_log_store.rs +++ b/src/stream/src/common/log_store_impl/subscription_log_store.rs @@ -60,9 +60,7 @@ impl SubscriptionLogStoreWriter { epoch: risingwave_common::util::epoch::EpochPair, _pause_read_on_bootstrap: bool, ) -> LogStoreResult<()> { - self.state_store - .init(InitOptions::new(epoch, self.serde.vnodes().clone())) - .await?; + self.state_store.init(InitOptions::new(epoch)).await?; self.seq_id = FIRST_SEQ_ID; Ok(()) } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 87f61d4874074..dba45d52f1e3d 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -177,9 +177,7 @@ where /// async interface only used for replicated state table, /// as it needs to wait for prev epoch to be committed. pub async fn init_epoch(&mut self, epoch: EpochPair) -> StorageResult<()> { - self.local_store - .init(InitOptions::new(epoch, self.vnodes().clone())) - .await + self.local_store.init(InitOptions::new(epoch)).await } } @@ -195,7 +193,7 @@ where /// No need to `wait_for_epoch`, so it should complete immediately. pub fn init_epoch(&mut self, epoch: EpochPair) { self.local_store - .init(InitOptions::new(epoch, self.vnodes().clone())) + .init(InitOptions::new(epoch)) .now_or_never() .expect("non-replicated state store should start immediately.") .expect("non-replicated state store should not wait_for_epoch, and fail because of it.") @@ -359,9 +357,19 @@ where let table_option = TableOption::new(table_catalog.retention_seconds); let new_local_options = if IS_REPLICATED { - NewLocalOptions::new_replicated(table_id, op_consistency_level, table_option) + NewLocalOptions::new_replicated( + table_id, + op_consistency_level, + table_option, + distribution.vnodes().clone(), + ) } else { - NewLocalOptions::new(table_id, op_consistency_level, table_option) + NewLocalOptions::new( + table_id, + op_consistency_level, + table_option, + distribution.vnodes().clone(), + ) }; let local_state_store = store.new_local(new_local_options).await; @@ -573,6 +581,7 @@ where table_id, op_consistency_level, TableOption::default(), + distribution.vnodes().clone(), )) .await; let row_serde = make_row_serde(); diff --git a/src/stream/src/from_proto/subscription.rs b/src/stream/src/from_proto/subscription.rs index abc30a1967435..a7e8b79b16c10 100644 --- a/src/stream/src/from_proto/subscription.rs +++ b/src/stream/src/from_proto/subscription.rs @@ -35,6 +35,17 @@ impl ExecutorBuilder for SubscriptionExecutorBuilder { ) -> StreamResult { let [input]: [_; 1] = params.input.try_into().unwrap(); let table_id = TableId::new(node.log_store_table.as_ref().unwrap().id); + let vnodes = std::sync::Arc::new( + params + .vnode_bitmap + .expect("vnodes not set for subscription"), + ); + let serde = LogStoreRowSerde::new( + node.log_store_table.as_ref().unwrap(), + Some(vnodes), + &KV_LOG_STORE_V2_INFO, + ); + let local_state_store = state_store .new_local(NewLocalOptions { table_id: TableId { @@ -45,19 +56,10 @@ impl ExecutorBuilder for SubscriptionExecutorBuilder { retention_seconds: None, }, is_replicated: false, + vnodes: serde.vnodes().clone(), }) .await; - let vnodes = std::sync::Arc::new( - params - .vnode_bitmap - .expect("vnodes not set for subscription"), - ); - let serde = LogStoreRowSerde::new( - node.log_store_table.as_ref().unwrap(), - Some(vnodes.clone()), - &KV_LOG_STORE_V2_INFO, - ); let log_store_identity = format!( "subscription[{}]-executor[{}]", node.subscription_catalog.as_ref().unwrap().id,