Skip to content

Commit

Permalink
feat: impl pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
fengys1996 committed Aug 1, 2023
1 parent 44f3ed2 commit 44cc5c9
Show file tree
Hide file tree
Showing 14 changed files with 550 additions and 1 deletion.
9 changes: 9 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use tokio::sync::mpsc::error::SendError;
use tonic::codegen::http;
use tonic::Code;

use crate::pubsub::Message;

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
Expand Down Expand Up @@ -461,6 +463,12 @@ pub enum Error {

#[snafu(display("Invalid heartbeat request: {}", err_msg))]
InvalidHeartbeatRequest { err_msg: String, location: Location },

#[snafu(display("Failed to publish message: {:?}", source))]
PublishMessage {
source: SendError<Message>,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -506,6 +514,7 @@ impl ErrorExt for Error {
| Error::UpdateTableMetadata { .. }
| Error::NoEnoughAvailableDatanode { .. }
| Error::ConvertGrpcExpr { .. }
| Error::PublishMessage { .. }
| Error::Join { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub mod node_stat;
mod on_leader_start_handler;
mod persist_stats_handler;
pub(crate) mod region_lease_handler;
pub mod report_handler;
mod response_header_handler;

#[async_trait::async_trait]
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/handler/persist_stats_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ mod tests {
table_metadata_manager: Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
))),
publish: None,
};

let handler = PersistStatsHandler::default();
Expand Down
45 changes: 45 additions & 0 deletions src/meta-srv/src/handler/report_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::meta::{HeartbeatRequest, Role};
use async_trait::async_trait;

use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::pubsub::Message;

pub struct ReportHandler;

#[async_trait]
impl HeartbeatHandler for ReportHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}

async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &mut Context,
_: &mut HeartbeatAccumulator,
) -> Result<()> {
let req = Box::new(req.clone());

if let Some(publish) = ctx.publish.as_ref() {
publish.send_msg(Message::Heartbeat(req)).await;
}

Ok(())
}
}
1 change: 1 addition & 0 deletions src/meta-srv/src/handler/response_header_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ mod tests {
table_metadata_manager: Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
))),
publish: None,
};

let req = HeartbeatRequest {
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod metrics;
#[cfg(feature = "mock")]
pub mod mocks;
pub mod procedure;
pub mod pubsub;
pub mod selector;
mod sequence;
pub mod service;
Expand Down
22 changes: 22 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::error::{RecoverProcedureSnafu, Result};
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::metadata_service::MetadataServiceRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::selector::{Selector, SelectorType};
use crate::sequence::SequenceRef;
use crate::service::mailbox::MailboxRef;
Expand Down Expand Up @@ -117,6 +118,7 @@ pub struct Context {
pub meta_peer_client: MetaPeerClientRef,
pub mailbox: MailboxRef,
pub election: Option<ElectionRef>,
pub publish: Option<PublishRef>,
pub skip_all: Arc<AtomicBool>,
pub is_infancy: bool,
pub table_metadata_manager: TableMetadataManagerRef,
Expand Down Expand Up @@ -177,6 +179,8 @@ pub struct MetaSrv {
ddl_manager: DdlManagerRef,
table_metadata_manager: TableMetadataManagerRef,
greptimedb_telemerty_task: Arc<GreptimeDBTelemetryTask>,
publish: Option<PublishRef>,
subscribe_manager: Option<SubscribeManagerRef>,
}

impl MetaSrv {
Expand All @@ -196,6 +200,7 @@ impl MetaSrv {
let procedure_manager = self.procedure_manager.clone();
let in_memory = self.in_memory.clone();
let leader_cached_kv_store = self.leader_cached_kv_store.clone();
let subscribe_manager = self.subscribe_manager.clone();
let mut rx = election.subscribe_leader_change();
let task_handler = self.greptimedb_telemerty_task.clone();
let _handle = common_runtime::spawn_bg(async move {
Expand All @@ -219,6 +224,12 @@ impl MetaSrv {
});
}
LeaderChangeMessage::StepDown(leader) => {
if let Some(sub_manager) = subscribe_manager.clone() {
info!("Leader changed, un_subscribe all");
if let Err(e) = sub_manager.un_subscribe_all() {
error!("Failed to un_subscribe all, error: {}", e);
}
}
error!("Leader :{:?} step down", leader);
let _ = task_handler.stop().await.map_err(|e| {
debug!(
Expand Down Expand Up @@ -329,6 +340,14 @@ impl MetaSrv {
&self.table_metadata_manager
}

pub fn publish(&self) -> Option<&PublishRef> {
self.publish.as_ref()
}

pub fn subscribe_manager(&self) -> Option<&SubscribeManagerRef> {
self.subscribe_manager.as_ref()
}

#[inline]
pub fn new_ctx(&self) -> Context {
let server_addr = self.options().server_addr.clone();
Expand All @@ -339,6 +358,8 @@ impl MetaSrv {
let mailbox = self.mailbox.clone();
let election = self.election.clone();
let skip_all = Arc::new(AtomicBool::new(false));
let publish = self.publish.clone();

Context {
server_addr,
in_memory,
Expand All @@ -350,6 +371,7 @@ impl MetaSrv {
skip_all,
is_infancy: false,
table_metadata_manager: self.table_metadata_manager.clone(),
publish,
}
}
}
15 changes: 15 additions & 0 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ use common_grpc::channel_manager::ChannelConfig;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use tokio::sync::mpsc::Sender;

use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::ddl::{DdlManager, DdlManagerRef};
use crate::error::Result;
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::region_lease_handler::RegionLeaseHandler;
use crate::handler::report_handler::ReportHandler;
use crate::handler::{
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox,
KeepLeaseHandler, OnLeaderStartHandler, PersistStatsHandler, Pushers, RegionFailureHandler,
Expand All @@ -40,6 +42,9 @@ use crate::metasrv::{
};
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::state_store::MetaStateStore;
use crate::pubsub::{
DefaultPublish, DefaultSubscribeManager, Message, PublishRef, SubscribeManagerRef,
};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::sequence::Sequence;
use crate::service::mailbox::MailboxRef;
Expand Down Expand Up @@ -169,6 +174,7 @@ impl MetaSrvBuilder {
&table_metadata_manager,
);
let _ = ddl_manager.try_start();
let (publish, sub_manager) = build_publish();

let handler_group = match handler_group {
Some(handler_group) => handler_group,
Expand Down Expand Up @@ -215,6 +221,7 @@ impl MetaSrvBuilder {
}
group.add_handler(RegionLeaseHandler::default()).await;
group.add_handler(PersistStatsHandler::default()).await;
group.add_handler(ReportHandler).await;
group
}
};
Expand All @@ -237,6 +244,8 @@ impl MetaSrvBuilder {
ddl_manager,
table_metadata_manager,
greptimedb_telemerty_task: get_greptimedb_telemetry_task(meta_peer_client).await,
publish: Some(publish),
subscribe_manager: Some(sub_manager),
})
}
}
Expand Down Expand Up @@ -315,6 +324,12 @@ fn build_ddl_manager(
))
}

fn build_publish() -> (PublishRef, SubscribeManagerRef) {
let sub_manager = Arc::new(DefaultSubscribeManager::<Sender<Message>>::default());
let publish = Arc::new(DefaultPublish::new(sub_manager.clone()));
(publish, sub_manager)
}

impl Default for MetaSrvBuilder {
fn default() -> Self {
Self::new()
Expand Down
44 changes: 44 additions & 0 deletions src/meta-srv/src/pubsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::meta::HeartbeatRequest;

mod publish;
mod subscribe_manager;
mod subscriber;
#[cfg(test)]
mod tests;

pub use publish::*;
pub use subscribe_manager::*;
pub use subscriber::*;

/// Subscribed topic type, determined by the ability of meta.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Topic {
Heartbeat,
}

#[derive(Clone, Debug)]
pub enum Message {
Heartbeat(Box<HeartbeatRequest>),
}

impl Message {
pub fn topic(&self) -> Topic {
match self {
Message::Heartbeat(_) => Topic::Heartbeat,
}
}
}
72 changes: 72 additions & 0 deletions src/meta-srv/src/pubsub/publish.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;

use common_telemetry::error;

use crate::pubsub::{Message, SubscribeManager, Transport, UnSubRequest};

/// This trait provides a `send_msg` method that can be used by other modules
/// of meta to publish [Message].
#[async_trait::async_trait]
pub trait Publish: Send + Sync {
async fn send_msg(&self, message: Message);
}

pub type PublishRef = Arc<dyn Publish>;

/// The default implementation of [Publish]
pub struct DefaultPublish<M, T> {
subscribe_manager: Arc<M>,
_transport: PhantomData<T>,
}

impl<M, T> DefaultPublish<M, T> {
pub fn new(subscribe_manager: Arc<M>) -> Self {
Self {
subscribe_manager,
_transport: PhantomData,
}
}
}

#[async_trait::async_trait]
impl<M, T> Publish for DefaultPublish<M, T>
where
M: SubscribeManager<T>,
T: Transport + Debug,
{
async fn send_msg(&self, message: Message) {
let sub_list = self
.subscribe_manager
.subscriber_list_by_topic(&message.topic());

for sub in sub_list {
if sub.transport_msg(message.clone()).await.is_err() {
// If an error occurs, we consider the subscriber offline,
// so un_subscribe here.
let req = UnSubRequest {
subscriber_id: sub.id(),
};

if let Err(e) = self.subscribe_manager.un_subscribe(req.clone()) {
error!("failed to un_subscribe, req: {:?}, error: {:?}", req, e);
}
}
}
}
}
Loading

0 comments on commit 44cc5c9

Please sign in to comment.