From 7acaf48b3d14af52247a349f8d3676cdfac31f75 Mon Sep 17 00:00:00 2001 From: fys <1113014250@qq.com> Date: Tue, 1 Aug 2023 10:53:14 +0800 Subject: [PATCH] add test_subscriber_disconnect unit test --- src/meta-srv/src/pubsub/tests.rs | 38 ++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/meta-srv/src/pubsub/tests.rs b/src/meta-srv/src/pubsub/tests.rs index a8ceac814185..0fe088707198 100644 --- a/src/meta-srv/src/pubsub/tests.rs +++ b/src/meta-srv/src/pubsub/tests.rs @@ -69,6 +69,44 @@ async fn test_pubsub() { assert!(may_msg.is_none()); } +#[tokio::test] +async fn test_subscriber_disconnect() { + let manager = Arc::new(DefaultSubscribeManager::default()); + + let (subscriber1, rx1) = mock_subscriber(1, "tidigong"); + let req = AddSubRequest { + topic_list: vec![Topic::Heartbeat], + subscriber: subscriber1, + }; + manager.subscribe(req).unwrap(); + + let (subscriber2, rx2) = mock_subscriber(2, "gcrm"); + let req = AddSubRequest { + topic_list: vec![Topic::Heartbeat], + subscriber: subscriber2, + }; + manager.subscribe(req).unwrap(); + + let manager_clone = manager.clone(); + let message_number: usize = 5; + let join = tokio::spawn(async move { + let publisher: DefaultPublish>, Sender> = + DefaultPublish::new(manager_clone); + for _ in 0..message_number { + publisher.send_msg(mock_message()).await; + } + }); + + // Simulate subscriber disconnection. + std::mem::drop(rx1); + std::mem::drop(rx2); + + join.await.unwrap(); + + let subscriber_list = manager.subscriber_list_by_topic(&Topic::Heartbeat); + assert!(subscriber_list.is_empty()); +} + #[test] fn test_message() { let msg = Message::Heartbeat(Box::default());