Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jan 2, 2024
1 parent 87d06d2 commit db7c3d0
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 29 deletions.
7 changes: 3 additions & 4 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,9 @@ use crate::hummock::metrics_utils::{
trigger_split_stat, trigger_sst_stat, trigger_version_stat, trigger_write_stop_stats,
};
use crate::hummock::{CompactorManagerRef, TASK_NORMAL};
use crate::manager::{
ClusterManagerRef, FragmentManagerRef, IdCategory, MetaSrvEnv, MetadataManager, TableId,
META_NODE_ID,
};
#[cfg(any(test, feature = "test"))]
use crate::manager::{ClusterManagerRef, FragmentManagerRef};
use crate::manager::{IdCategory, MetaSrvEnv, MetadataManager, TableId, META_NODE_ID};
use crate::model::{
BTreeMapEntryTransaction, BTreeMapTransaction, ClusterId, MetadataModel, ValTransaction,
VarTransaction,
Expand Down
47 changes: 22 additions & 25 deletions src/stream/src/from_proto/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use risingwave_connector::sink::{
SinkError, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION,
};
use risingwave_pb::stream_plan::{SinkLogStoreType, SinkNode};
use risingwave_storage::dispatch_state_store;

use super::*;
use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
Expand All @@ -37,7 +36,7 @@ impl ExecutorBuilder for SinkExecutorBuilder {
async fn new_boxed_executor(
params: ExecutorParams,
node: &Self::Node,
_store: impl StateStore,
state_store: impl StateStore,
stream: &mut LocalStreamManagerCore,
) -> StreamResult<BoxedExecutor> {
let [input_executor]: [_; 1] = params.input.try_into().unwrap();
Expand Down Expand Up @@ -150,29 +149,27 @@ impl ExecutorBuilder for SinkExecutorBuilder {
connector,
);
// TODO: support setting max row count in config
dispatch_state_store!(params.env.state_store(), state_store, {
let factory = KvLogStoreFactory::new(
state_store,
node.table.as_ref().unwrap().clone(),
params.vnode_bitmap.clone().map(Arc::new),
65536,
metrics,
log_store_identity,
);

Ok(Box::new(
SinkExecutor::new(
params.actor_context,
params.info,
input_executor,
sink_write_param,
sink_param,
columns,
factory,
)
.await?,
))
})
let factory = KvLogStoreFactory::new(
state_store,
node.table.as_ref().unwrap().clone(),
params.vnode_bitmap.clone().map(Arc::new),
65536,
metrics,
log_store_identity,
);

Ok(Box::new(
SinkExecutor::new(
params.actor_context,
params.info,
input_executor,
sink_write_param,
sink_param,
columns,
factory,
)
.await?,
))
}
}
}
Expand Down

0 comments on commit db7c3d0

Please sign in to comment.