diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 3ec1a699acdd..d2d0be501720 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -21,6 +21,8 @@ use tokio::sync::mpsc::error::SendError; use tonic::codegen::http; use tonic::Code; +use crate::pubsub::Message; + #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { @@ -461,6 +463,12 @@ pub enum Error { #[snafu(display("Invalid heartbeat request: {}", err_msg))] InvalidHeartbeatRequest { err_msg: String, location: Location }, + + #[snafu(display("Failed to publish message: {:?}", source))] + PublishMessage { + source: SendError, + location: Location, + }, } pub type Result = std::result::Result; @@ -506,6 +514,7 @@ impl ErrorExt for Error { | Error::UpdateTableMetadata { .. } | Error::NoEnoughAvailableDatanode { .. } | Error::ConvertGrpcExpr { .. } + | Error::PublishMessage { .. } | Error::Join { .. } => StatusCode::Internal, Error::EmptyKey { .. } | Error::MissingRequiredParameter { .. } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index f675bc5fedf0..10a7b0124d04 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -55,6 +55,7 @@ pub mod node_stat; mod on_leader_start_handler; mod persist_stats_handler; pub(crate) mod region_lease_handler; +pub mod report_handler; mod response_header_handler; #[async_trait::async_trait] diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index db8d73362abb..412b2a57a145 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -181,6 +181,7 @@ mod tests { table_metadata_manager: Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( kv_store.clone(), ))), + publish: None, }; let handler = PersistStatsHandler::default(); diff --git a/src/meta-srv/src/handler/report_handler.rs b/src/meta-srv/src/handler/report_handler.rs new file mode 100644 index 000000000000..1df11b0a3e45 --- /dev/null +++ b/src/meta-srv/src/handler/report_handler.rs @@ -0,0 +1,45 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{HeartbeatRequest, Role}; +use async_trait::async_trait; + +use crate::error::Result; +use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::metasrv::Context; +use crate::pubsub::Message; + +pub struct ReportHandler; + +#[async_trait] +impl HeartbeatHandler for ReportHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Datanode + } + + async fn handle( + &self, + req: &HeartbeatRequest, + ctx: &mut Context, + _: &mut HeartbeatAccumulator, + ) -> Result<()> { + let req = Box::new(req.clone()); + + if let Some(publish) = ctx.publish.as_ref() { + publish.send_msg(Message::Heartbeat(req)).await; + } + + Ok(()) + } +} diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 25e3ff5bd621..65aa8acedb81 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -89,6 +89,7 @@ mod tests { table_metadata_manager: Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( kv_store.clone(), ))), + publish: None, }; let req = HeartbeatRequest { diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 2058de1fa50d..3b23152a67d9 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -32,6 +32,7 @@ mod metrics; #[cfg(feature = "mock")] pub mod mocks; pub mod procedure; +pub mod pubsub; pub mod selector; mod sequence; pub mod service; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 94fb59325882..d0ea76e27476 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -38,6 +38,7 @@ use crate::error::{RecoverProcedureSnafu, Result}; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; use crate::metadata_service::MetadataServiceRef; +use crate::pubsub::{PublishRef, SubscribeManagerRef}; use crate::selector::{Selector, SelectorType}; use crate::sequence::SequenceRef; use crate::service::mailbox::MailboxRef; @@ -117,6 +118,7 @@ pub struct Context { pub meta_peer_client: MetaPeerClientRef, pub mailbox: MailboxRef, pub election: Option, + pub publish: Option, pub skip_all: Arc, pub is_infancy: bool, pub table_metadata_manager: TableMetadataManagerRef, @@ -177,6 +179,8 @@ pub struct MetaSrv { ddl_manager: DdlManagerRef, table_metadata_manager: TableMetadataManagerRef, greptimedb_telemerty_task: Arc, + publish: Option, + subscribe_manager: Option, } impl MetaSrv { @@ -196,6 +200,7 @@ impl MetaSrv { let procedure_manager = self.procedure_manager.clone(); let in_memory = self.in_memory.clone(); let leader_cached_kv_store = self.leader_cached_kv_store.clone(); + let subscribe_manager = self.subscribe_manager.clone(); let mut rx = election.subscribe_leader_change(); let task_handler = self.greptimedb_telemerty_task.clone(); let _handle = common_runtime::spawn_bg(async move { @@ -219,6 +224,12 @@ impl MetaSrv { }); } LeaderChangeMessage::StepDown(leader) => { + if let Some(sub_manager) = subscribe_manager.clone() { + info!("Leader changed, un_subscribe all"); + if let Err(e) = sub_manager.un_subscribe_all() { + error!("Failed to un_subscribe all, error: {}", e); + } + } error!("Leader :{:?} step down", leader); let _ = task_handler.stop().await.map_err(|e| { debug!( @@ -329,6 +340,14 @@ impl MetaSrv { &self.table_metadata_manager } + pub fn publish(&self) -> Option<&PublishRef> { + self.publish.as_ref() + } + + pub fn subscribe_manager(&self) -> Option<&SubscribeManagerRef> { + self.subscribe_manager.as_ref() + } + #[inline] pub fn new_ctx(&self) -> Context { let server_addr = self.options().server_addr.clone(); @@ -339,6 +358,8 @@ impl MetaSrv { let mailbox = self.mailbox.clone(); let election = self.election.clone(); let skip_all = Arc::new(AtomicBool::new(false)); + let publish = self.publish.clone(); + Context { server_addr, in_memory, @@ -350,6 +371,7 @@ impl MetaSrv { skip_all, is_infancy: false, table_metadata_manager: self.table_metadata_manager.clone(), + publish, } } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index fe562cc0ae17..cbab704c7faa 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -21,12 +21,14 @@ use common_grpc::channel_manager::ChannelConfig; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; +use tokio::sync::mpsc::Sender; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::ddl::{DdlManager, DdlManagerRef}; use crate::error::Result; use crate::handler::mailbox_handler::MailboxHandler; use crate::handler::region_lease_handler::RegionLeaseHandler; +use crate::handler::report_handler::ReportHandler; use crate::handler::{ CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox, KeepLeaseHandler, OnLeaderStartHandler, PersistStatsHandler, Pushers, RegionFailureHandler, @@ -40,6 +42,9 @@ use crate::metasrv::{ }; use crate::procedure::region_failover::RegionFailoverManager; use crate::procedure::state_store::MetaStateStore; +use crate::pubsub::{ + DefaultPublish, DefaultSubscribeManager, Message, PublishRef, SubscribeManagerRef, +}; use crate::selector::lease_based::LeaseBasedSelector; use crate::sequence::Sequence; use crate::service::mailbox::MailboxRef; @@ -169,6 +174,7 @@ impl MetaSrvBuilder { &table_metadata_manager, ); let _ = ddl_manager.try_start(); + let (publish, sub_manager) = build_publish(); let handler_group = match handler_group { Some(handler_group) => handler_group, @@ -215,6 +221,7 @@ impl MetaSrvBuilder { } group.add_handler(RegionLeaseHandler::default()).await; group.add_handler(PersistStatsHandler::default()).await; + group.add_handler(ReportHandler).await; group } }; @@ -237,6 +244,8 @@ impl MetaSrvBuilder { ddl_manager, table_metadata_manager, greptimedb_telemerty_task: get_greptimedb_telemetry_task(meta_peer_client).await, + publish: Some(publish), + subscribe_manager: Some(sub_manager), }) } } @@ -315,6 +324,12 @@ fn build_ddl_manager( )) } +fn build_publish() -> (PublishRef, SubscribeManagerRef) { + let sub_manager = Arc::new(DefaultSubscribeManager::>::default()); + let publish = Arc::new(DefaultPublish::new(sub_manager.clone())); + (publish, sub_manager) +} + impl Default for MetaSrvBuilder { fn default() -> Self { Self::new() diff --git a/src/meta-srv/src/pubsub.rs b/src/meta-srv/src/pubsub.rs new file mode 100644 index 000000000000..8badcb50258b --- /dev/null +++ b/src/meta-srv/src/pubsub.rs @@ -0,0 +1,44 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::HeartbeatRequest; + +mod publish; +mod subscribe_manager; +mod subscriber; +#[cfg(test)] +mod tests; + +pub use publish::*; +pub use subscribe_manager::*; +pub use subscriber::*; + +/// Subscribed topic type, determined by the ability of meta. +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub enum Topic { + Heartbeat, +} + +#[derive(Clone, Debug)] +pub enum Message { + Heartbeat(Box), +} + +impl Message { + pub fn topic(&self) -> Topic { + match self { + Message::Heartbeat(_) => Topic::Heartbeat, + } + } +} diff --git a/src/meta-srv/src/pubsub/publish.rs b/src/meta-srv/src/pubsub/publish.rs new file mode 100644 index 000000000000..d5172919ad8c --- /dev/null +++ b/src/meta-srv/src/pubsub/publish.rs @@ -0,0 +1,72 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::marker::PhantomData; +use std::sync::Arc; + +use common_telemetry::error; + +use crate::pubsub::{Message, SubscribeManager, Transport, UnSubRequest}; + +/// This trait provides a `send_msg` method that can be used by other modules +/// of meta to publish [Message]. +#[async_trait::async_trait] +pub trait Publish: Send + Sync { + async fn send_msg(&self, message: Message); +} + +pub type PublishRef = Arc; + +/// The default implementation of [Publish] +pub struct DefaultPublish { + subscribe_manager: Arc, + _transport: PhantomData, +} + +impl DefaultPublish { + pub fn new(subscribe_manager: Arc) -> Self { + Self { + subscribe_manager, + _transport: PhantomData, + } + } +} + +#[async_trait::async_trait] +impl Publish for DefaultPublish +where + M: SubscribeManager, + T: Transport + Debug, +{ + async fn send_msg(&self, message: Message) { + let sub_list = self + .subscribe_manager + .subscriber_list_by_topic(&message.topic()); + + for sub in sub_list { + if sub.transport_msg(message.clone()).await.is_err() { + // If an error occurs, we consider the subscriber offline, + // so un_subscribe here. + let req = UnSubRequest { + subscriber_id: sub.id(), + }; + + if let Err(e) = self.subscribe_manager.un_subscribe(req.clone()) { + error!("failed to un_subscribe, req: {:?}, error: {:?}", req, e); + } + } + } + } +} diff --git a/src/meta-srv/src/pubsub/subscribe_manager.rs b/src/meta-srv/src/pubsub/subscribe_manager.rs new file mode 100644 index 000000000000..2e6263590b88 --- /dev/null +++ b/src/meta-srv/src/pubsub/subscribe_manager.rs @@ -0,0 +1,116 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_telemetry::info; +use dashmap::DashMap; +use tokio::sync::mpsc::Sender; + +use crate::error::Result; +use crate::pubsub::{Message, Subscriber, SubscriberRef, Topic, Transport}; + +pub trait SubscribeQuery: Send + Sync { + fn subscriber_list_by_topic(&self, topic: &Topic) -> Vec>; +} + +pub trait SubscribeManager: SubscribeQuery { + fn subscribe(&self, req: AddSubRequest) -> Result<()>; + + fn un_subscribe(&self, req: UnSubRequest) -> Result<()>; + + fn un_subscribe_all(&self) -> Result<()>; +} + +pub type SubscribeManagerRef = Arc>>; + +pub struct AddSubRequest { + pub topic_list: Vec, + pub subscriber: Subscriber, +} + +#[derive(Debug, Clone)] +pub struct UnSubRequest { + pub subscriber_id: u32, +} +pub struct DefaultSubscribeManager { + topic2sub: DashMap>>>, +} + +impl Default for DefaultSubscribeManager { + fn default() -> Self { + Self { + topic2sub: DashMap::new(), + } + } +} + +impl SubscribeQuery for DefaultSubscribeManager +where + T: Transport, +{ + fn subscriber_list_by_topic(&self, topic: &Topic) -> Vec> { + self.topic2sub + .get(topic) + .map(|list_ref| list_ref.clone()) + .unwrap_or_else(Vec::new) + } +} + +impl SubscribeManager for DefaultSubscribeManager +where + T: Transport, +{ + fn subscribe(&self, req: AddSubRequest) -> Result<()> { + let AddSubRequest { + topic_list, + subscriber, + } = req; + + info!( + "Add a subscription, subscriber_id: {}, subscriber_name: {}, topic list: {:?}", + subscriber.id(), + subscriber.name(), + topic_list + ); + + let subscriber = Arc::new(subscriber); + + for topic in topic_list { + let mut entry = self.topic2sub.entry(topic).or_insert_with(Vec::new); + entry.push(subscriber.clone()); + } + + Ok(()) + } + + fn un_subscribe(&self, req: UnSubRequest) -> Result<()> { + let UnSubRequest { subscriber_id } = req; + + info!("Add a un_subscription, subscriber_id: {}", subscriber_id); + + for entry in self.topic2sub.iter_mut() { + let mut sub_list = entry; + sub_list.retain(|subscriber| subscriber.id() != subscriber_id) + } + + Ok(()) + } + + fn un_subscribe_all(&self) -> Result<()> { + self.topic2sub.clear(); + + Ok(()) + } +} diff --git a/src/meta-srv/src/pubsub/subscriber.rs b/src/meta-srv/src/pubsub/subscriber.rs new file mode 100644 index 000000000000..6b4eaa4357fa --- /dev/null +++ b/src/meta-srv/src/pubsub/subscriber.rs @@ -0,0 +1,75 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use snafu::ResultExt; +use tokio::sync::mpsc::Sender; + +use crate::error::{self, Result}; +use crate::pubsub::Message; + +#[derive(Debug)] +pub struct Subscriber { + /// Subscriber's id, globally unique, assigned by leader meta. + id: u32, + /// Subscriber's name, passed in by subscriber. + name: String, + /// Transport channel from meta to subscriber. + transporter: T, +} + +pub type SubscriberRef = Arc>; + +impl Subscriber { + pub fn new(id: u32, name: impl Into, transporter: T) -> Self { + let name = name.into(); + + Self { + id, + name, + transporter, + } + } + + pub fn id(&self) -> u32 { + self.id + } + + pub fn name(&self) -> &str { + &self.name + } +} + +impl Subscriber +where + T: Transport, +{ + pub async fn transport_msg(&self, message: Message) -> Result<()> { + self.transporter.transport_msg(message).await + } +} + +/// This trait defines how messages are delivered from meta to the subscriber. +#[async_trait::async_trait] +pub trait Transport: Send + Sync { + async fn transport_msg(&self, msg: Message) -> Result<()>; +} + +#[async_trait::async_trait] +impl Transport for Sender { + async fn transport_msg(&self, msg: Message) -> Result<()> { + self.send(msg).await.context(error::PublishMessageSnafu) + } +} diff --git a/src/meta-srv/src/pubsub/tests.rs b/src/meta-srv/src/pubsub/tests.rs new file mode 100644 index 000000000000..a8ceac814185 --- /dev/null +++ b/src/meta-srv/src/pubsub/tests.rs @@ -0,0 +1,147 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::meta::HeartbeatRequest; +use tokio::sync::mpsc::{Receiver, Sender}; + +use super::DefaultSubscribeManager; +use crate::pubsub::{ + AddSubRequest, DefaultPublish, Message, Publish, SubscribeManager, SubscribeQuery, Subscriber, + Topic, UnSubRequest, +}; + +#[tokio::test] +async fn test_pubsub() { + let manager = Arc::new(DefaultSubscribeManager::default()); + + let (subscriber1, mut rx1) = mock_subscriber(1, "tidigong"); + let req = AddSubRequest { + topic_list: vec![Topic::Heartbeat], + subscriber: subscriber1, + }; + manager.subscribe(req).unwrap(); + + let (subscriber2, mut rx2) = mock_subscriber(2, "gcrm"); + let req = AddSubRequest { + topic_list: vec![Topic::Heartbeat], + subscriber: subscriber2, + }; + manager.subscribe(req).unwrap(); + + let manager_clone = manager.clone(); + let message_number: usize = 5; + tokio::spawn(async move { + let publisher: DefaultPublish>, Sender> = + DefaultPublish::new(manager_clone); + for _ in 0..message_number { + publisher.send_msg(mock_message()).await; + } + }); + + for _ in 0..message_number { + let msg = rx1.recv().await.unwrap(); + check_message(msg); + let msg = rx2.recv().await.unwrap(); + check_message(msg); + } + + manager + .un_subscribe(UnSubRequest { subscriber_id: 1 }) + .unwrap(); + let may_msg = rx1.recv().await; + assert!(may_msg.is_none()); + + manager.un_subscribe_all().unwrap(); + let may_msg = rx2.recv().await; + assert!(may_msg.is_none()); +} + +#[test] +fn test_message() { + let msg = Message::Heartbeat(Box::default()); + assert_eq!(Topic::Heartbeat, msg.topic()); +} + +#[test] +fn test_sub_manager() { + let manager = DefaultSubscribeManager::default(); + + let subscriber = mock_subscriber(1, "tidigong").0; + let req = AddSubRequest { + topic_list: vec![Topic::Heartbeat], + subscriber, + }; + manager.subscribe(req).unwrap(); + let ret = manager.subscriber_list_by_topic(&Topic::Heartbeat); + assert_eq!(1, ret.len()); + + let subscriber = mock_subscriber(2, "gcrm").0; + let req = AddSubRequest { + topic_list: vec![Topic::Heartbeat], + subscriber, + }; + manager.subscribe(req).unwrap(); + let ret = manager.subscriber_list_by_topic(&Topic::Heartbeat); + assert_eq!(2, ret.len()); + + let req = UnSubRequest { subscriber_id: 1 }; + manager.un_subscribe(req).unwrap(); + let ret = manager.subscriber_list_by_topic(&Topic::Heartbeat); + assert_eq!(1, ret.len()); + + let req = UnSubRequest { subscriber_id: 2 }; + manager.un_subscribe(req).unwrap(); + let ret = manager.subscriber_list_by_topic(&Topic::Heartbeat); + assert_eq!(0, ret.len()); +} + +#[tokio::test] +async fn test_subscriber() { + let (subscriber, mut rx) = mock_subscriber(1, "tudigong"); + assert_eq!(1, subscriber.id()); + assert_eq!("tudigong", subscriber.name()); + + subscriber.transport_msg(mock_message()).await.unwrap(); + + let may_msg = rx.recv().await; + assert!(may_msg.is_some()); + match may_msg.unwrap() { + Message::Heartbeat(hb) => { + assert_eq!(123, hb.duration_since_epoch); + } + } +} + +fn mock_subscriber(id: u32, name: &str) -> (Subscriber>, Receiver) { + let (tx, rx) = tokio::sync::mpsc::channel(1024); + let sub = Subscriber::new(id, name, tx); + (sub, rx) +} + +fn mock_message() -> Message { + Message::Heartbeat(Box::new(HeartbeatRequest { + duration_since_epoch: 123, + ..Default::default() + })) +} + +fn check_message(message: Message) { + match message { + Message::Heartbeat(hb) => { + assert_eq!(123, hb.duration_since_epoch); + } + } +} diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 857423660765..22dc50ccd8f3 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -73,7 +73,7 @@ schemars = "0.8" secrecy = { version = "0.8", features = ["serde", "alloc"] } serde.workspace = true serde_json = "1.0" -session = { path = "../session" } +session = { path = "../session", features = ["testing"] } sha1 = "0.10" snafu = { version = "0.7", features = ["backtraces"] } snap = "1"