Skip to content

Commit

Permalink
cr
Browse files Browse the repository at this point in the history
  • Loading branch information
fengys1996 committed Aug 3, 2023
1 parent 233c4e1 commit 659c375
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use common_grpc::channel_manager::ChannelConfig;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use tokio::sync::mpsc::Sender;

use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::ddl::{DdlManager, DdlManagerRef};
Expand All @@ -42,9 +41,7 @@ use crate::metasrv::{
};
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::state_store::MetaStateStore;
use crate::pubsub::{
DefaultPublish, DefaultSubscribeManager, Message, PublishRef, SubscribeManagerRef,
};
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::sequence::Sequence;
use crate::service::mailbox::MailboxRef;
Expand All @@ -65,6 +62,7 @@ pub struct MetaSrvBuilder {
lock: Option<DistLockRef>,
metadata_service: Option<MetadataServiceRef>,
datanode_clients: Option<Arc<DatanodeClients>>,
pubsub: Option<(PublishRef, SubscribeManagerRef)>,
}

impl MetaSrvBuilder {
Expand All @@ -80,6 +78,7 @@ impl MetaSrvBuilder {
lock: None,
metadata_service: None,
datanode_clients: None,
pubsub: None,
}
}

Expand Down Expand Up @@ -133,6 +132,11 @@ impl MetaSrvBuilder {
self
}

pub fn pubsub(mut self, publish: PublishRef, subscribe_manager: SubscribeManagerRef) -> Self {
self.pubsub = Some((publish, subscribe_manager));
self
}

pub async fn build(self) -> Result<MetaSrv> {
let started = Arc::new(AtomicBool::new(false));

Expand All @@ -147,6 +151,7 @@ impl MetaSrvBuilder {
lock,
metadata_service,
datanode_clients,
pubsub,
} = self;

let options = options.unwrap_or_default();
Expand Down Expand Up @@ -174,7 +179,6 @@ impl MetaSrvBuilder {
&table_metadata_manager,
);
let _ = ddl_manager.try_start();
let (publish, sub_manager) = build_publish();

let handler_group = match handler_group {
Some(handler_group) => handler_group,
Expand Down Expand Up @@ -221,9 +225,11 @@ impl MetaSrvBuilder {
}
group.add_handler(RegionLeaseHandler::default()).await;
group.add_handler(PersistStatsHandler::default()).await;
group
.add_handler(PublishHeartbeatHandler::new(publish.clone()))
.await;
if let Some((publish, _)) = pubsub.as_ref() {
group
.add_handler(PublishHeartbeatHandler::new(publish.clone()))
.await;
}
group
}
};
Expand All @@ -246,7 +252,7 @@ impl MetaSrvBuilder {
ddl_manager,
table_metadata_manager,
greptimedb_telemerty_task: get_greptimedb_telemetry_task(meta_peer_client).await,
pubsub: Some((publish, sub_manager)),
pubsub,
})
}
}
Expand Down Expand Up @@ -325,12 +331,6 @@ fn build_ddl_manager(
))
}

fn build_publish() -> (PublishRef, SubscribeManagerRef) {
let sub_manager = Arc::new(DefaultSubscribeManager::<Sender<Message>>::default());
let publish = Arc::new(DefaultPublish::new(sub_manager.clone()));
(publish, sub_manager)
}

impl Default for MetaSrvBuilder {
fn default() -> Self {
Self::new()
Expand Down

0 comments on commit 659c375

Please sign in to comment.