diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 0ea5c51904c21..e37848a77ffd5 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -81,10 +81,9 @@ use crate::hummock::metrics_utils::{ trigger_split_stat, trigger_sst_stat, trigger_version_stat, trigger_write_stop_stats, }; use crate::hummock::{CompactorManagerRef, TASK_NORMAL}; -use crate::manager::{ - ClusterManagerRef, FragmentManagerRef, IdCategory, MetaSrvEnv, MetadataManager, TableId, - META_NODE_ID, -}; +#[cfg(any(test, feature = "test"))] +use crate::manager::{ClusterManagerRef, FragmentManagerRef}; +use crate::manager::{IdCategory, MetaSrvEnv, MetadataManager, TableId, META_NODE_ID}; use crate::model::{ BTreeMapEntryTransaction, BTreeMapTransaction, ClusterId, MetadataModel, ValTransaction, VarTransaction, diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index a1071096e218a..659450e83c077 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -22,7 +22,6 @@ use risingwave_connector::sink::{ SinkError, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, }; use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode}; -use risingwave_storage::dispatch_state_store; use super::*; use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory; @@ -37,7 +36,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { async fn new_boxed_executor( params: ExecutorParams, node: &Self::Node, - _store: impl StateStore, + state_store: impl StateStore, stream: &mut LocalStreamManagerCore, ) -> StreamResult { let [input_executor]: [_; 1] = params.input.try_into().unwrap(); @@ -150,29 +149,27 @@ impl ExecutorBuilder for SinkExecutorBuilder { connector, ); // TODO: support setting max row count in config - dispatch_state_store!(params.env.state_store(), state_store, { - let factory = KvLogStoreFactory::new( - state_store, - node.table.as_ref().unwrap().clone(), - params.vnode_bitmap.clone().map(Arc::new), - 65536, - metrics, - log_store_identity, - ); - - Ok(Box::new( - SinkExecutor::new( - params.actor_context, - params.info, - input_executor, - sink_write_param, - sink_param, - columns, - factory, - ) - .await?, - )) - }) + let factory = KvLogStoreFactory::new( + state_store, + node.table.as_ref().unwrap().clone(), + params.vnode_bitmap.clone().map(Arc::new), + 65536, + metrics, + log_store_identity, + ); + + Ok(Box::new( + SinkExecutor::new( + params.actor_context, + params.info, + input_executor, + sink_write_param, + sink_param, + columns, + factory, + ) + .await?, + )) } } }