diff --git a/application/xiu/src/service.rs b/application/xiu/src/service.rs index 047e1e40..16b78920 100644 --- a/application/xiu/src/service.rs +++ b/application/xiu/src/service.rs @@ -1,6 +1,6 @@ use commonlib::auth::AuthType; use rtmp::remuxer::RtmpRemuxer; - +use std::sync::Arc; use crate::config::{AuthConfig, AuthSecretConfig}; use { @@ -16,7 +16,7 @@ use { relay::{pull_client::PullClient, push_client::PushClient}, rtmp::RtmpServer, }, - streamhub::{notify::Notifier, StreamsHub}, + streamhub::{notify::Notifier, notify::http::HttpNotifier, StreamsHub}, tokio, xrtsp::rtsp::RtspServer, xwebrtc::webrtc::WebRTCServer, @@ -61,16 +61,16 @@ impl Service { } pub async fn run(&mut self) -> Result<()> { - let notifier = if let Some(httpnotifier) = &self.cfg.httpnotify { + let notifier: Option> = if let Some(httpnotifier) = &self.cfg.httpnotify { if !httpnotifier.enabled { None } else { - Some(Notifier::new( + Some(Arc::new(HttpNotifier::new( httpnotifier.on_publish.clone(), httpnotifier.on_unpublish.clone(), httpnotifier.on_play.clone(), httpnotifier.on_stop.clone(), - )) + ))) } } else { None diff --git a/library/streamhub/src/define.rs b/library/streamhub/src/define.rs index 1b987dd7..ef00f8f0 100644 --- a/library/streamhub/src/define.rs +++ b/library/streamhub/src/define.rs @@ -225,6 +225,27 @@ pub enum PubDataType { Both, } +#[derive(Clone, Serialize)] +pub enum StreamHubEventMessage { + Subscribe { + identifier: StreamIdentifier, + info: SubscriberInfo, + }, + UnSubscribe { + identifier: StreamIdentifier, + info: SubscriberInfo, + }, + Publish { + identifier: StreamIdentifier, + info: PublisherInfo, + }, + UnPublish { + identifier: StreamIdentifier, + info: PublisherInfo, + }, + NotSupport {}, +} + #[derive(Serialize)] pub enum StreamHubEvent { Subscribe { @@ -266,6 +287,28 @@ pub enum StreamHubEvent { }, } +impl StreamHubEvent { + pub fn to_message(&self) -> StreamHubEventMessage { + match self { + StreamHubEvent::Subscribe { identifier, info, result_sender: _result_sender } => { + StreamHubEventMessage::Subscribe { identifier: identifier.clone(), info: info.clone() } + } + StreamHubEvent::UnSubscribe { identifier, info } => { + StreamHubEventMessage::UnSubscribe { identifier: identifier.clone(), info: info.clone() } + } + StreamHubEvent::Publish { identifier, info, result_sender: _result_sender, stream_handler: _stream_handler } => { + StreamHubEventMessage::Publish { identifier: identifier.clone(), info: info.clone() } + } + StreamHubEvent::UnPublish { identifier, info } => { + StreamHubEventMessage::UnPublish { identifier: identifier.clone(), info: info.clone() } + } + _ => { + StreamHubEventMessage::NotSupport {} + } + } + } +} + #[derive(Debug)] pub enum TransceiverEvent { Subscribe { diff --git a/library/streamhub/src/lib.rs b/library/streamhub/src/lib.rs index 156c8970..e8d3e983 100644 --- a/library/streamhub/src/lib.rs +++ b/library/streamhub/src/lib.rs @@ -516,11 +516,11 @@ pub struct StreamsHub { //enable hls hls_enabled: bool, //http notifier on sub/pub event - notifier: Option, + notifier: Option>, } impl StreamsHub { - pub fn new(notifier: Option) -> Self { + pub fn new(notifier: Option>) -> Self { let (event_producer, event_consumer) = mpsc::unbounded_channel(); let (client_producer, _) = broadcast::channel(100); @@ -566,15 +566,9 @@ impl StreamsHub { } pub async fn event_loop(&mut self) { - while let Some(message) = self.hub_event_receiver.recv().await { - let event_serialize_str = if let Ok(data) = serde_json::to_string(&message) { - log::info!("event data: {}", data); - data - } else { - String::from("empty body") - }; - - match message { + while let Some(event) = self.hub_event_receiver.recv().await { + let message = event.to_message(); + match event { StreamHubEvent::Publish { identifier, info, @@ -627,7 +621,7 @@ impl StreamsHub { { Ok(statistic_data_sender) => { if let Some(notifier) = &self.notifier { - notifier.on_publish_notify(event_serialize_str).await; + notifier.on_publish_notify(&message).await; } self.un_pub_sub_events .insert(info.id, StreamHubEvent::UnPublish { identifier, info }); @@ -658,7 +652,7 @@ impl StreamsHub { } if let Some(notifier) = &self.notifier { - notifier.on_unpublish_notify(event_serialize_str).await; + notifier.on_unpublish_notify(&message).await; } } StreamHubEvent::Subscribe { @@ -700,7 +694,7 @@ impl StreamsHub { let rv = match self.subscribe(&identifier, info_clone, sender).await { Ok(statistic_data_sender) => { if let Some(notifier) = &self.notifier { - notifier.on_play_notify(event_serialize_str).await; + notifier.on_play_notify(&message).await; } self.un_pub_sub_events @@ -720,7 +714,7 @@ impl StreamsHub { StreamHubEvent::UnSubscribe { identifier, info } => { if self.unsubscribe(&identifier, info).is_ok() { if let Some(notifier) = &self.notifier { - notifier.on_stop_notify(event_serialize_str).await; + notifier.on_stop_notify(&message).await; } } } @@ -999,7 +993,7 @@ impl StreamsHub { None => { return Err(StreamHubError { value: StreamHubErrorValue::NoAppName, - }) + }); } } diff --git a/library/streamhub/src/notify/http.rs b/library/streamhub/src/notify/http.rs new file mode 100644 index 00000000..e315b8a8 --- /dev/null +++ b/library/streamhub/src/notify/http.rs @@ -0,0 +1,122 @@ +use crate::notify::Notifier; +use reqwest::Client; +use async_trait::async_trait; +use crate::define::{StreamHubEventMessage}; + +macro_rules! serialize_event { + ($message:expr) => {{ + let event_serialize_str = match serde_json::to_string(&$message) { + Ok(data) => { + log::info!("event data: {}", data); + data + } + Err(_) => String::from("empty body"), + }; + event_serialize_str + }}; +} + + +pub struct HttpNotifier { + request_client: Client, + on_publish_url: Option, + on_unpublish_url: Option, + on_play_url: Option, + on_stop_url: Option, +} + +impl HttpNotifier { + pub fn new( + on_publish_url: Option, + on_unpublish_url: Option, + on_play_url: Option, + on_stop_url: Option, + ) -> Self { + Self { + request_client: reqwest::Client::new(), + on_publish_url, + on_unpublish_url, + on_play_url, + on_stop_url, + } + } +} + +#[async_trait] +impl Notifier for HttpNotifier { + async fn on_publish_notify(&self, event: &StreamHubEventMessage) { + if let Some(on_publish_url) = &self.on_publish_url { + match self + .request_client + .post(on_publish_url) + .body(serialize_event!(event)) + .send() + .await + { + Err(err) => { + log::error!("on_publish error: {}", err); + } + Ok(response) => { + log::info!("on_publish success: {:?}", response); + } + } + } + } + + async fn on_unpublish_notify(&self, event: &StreamHubEventMessage) { + if let Some(on_unpublish_url) = &self.on_unpublish_url { + match self + .request_client + .post(on_unpublish_url) + .body(serialize_event!(event)) + .send() + .await + { + Err(err) => { + log::error!("on_unpublish error: {}", err); + } + Ok(response) => { + log::info!("on_unpublish success: {:?}", response); + } + } + } + } + + async fn on_play_notify(&self, event: &StreamHubEventMessage) { + if let Some(on_play_url) = &self.on_play_url { + match self + .request_client + .post(on_play_url) + .body(serialize_event!(event)) + .send() + .await + { + Err(err) => { + log::error!("on_play error: {}", err); + } + Ok(response) => { + log::info!("on_play success: {:?}", response); + } + } + } + } + + async fn on_stop_notify(&self, event: &StreamHubEventMessage) { + if let Some(on_stop_url) = &self.on_stop_url { + match self + .request_client + .post(on_stop_url) + .body(serialize_event!(event)) + .send() + .await + { + Err(err) => { + log::error!("on_stop error: {}", err); + } + Ok(response) => { + log::info!("on_stop success: {:?}", response); + } + } + } + } +} diff --git a/library/streamhub/src/notify/mod.rs b/library/streamhub/src/notify/mod.rs index db8cbeb5..f66e5009 100644 --- a/library/streamhub/src/notify/mod.rs +++ b/library/streamhub/src/notify/mod.rs @@ -1,100 +1,12 @@ -use reqwest::Client; -pub struct Notifier { - request_client: Client, - on_publish_url: Option, - on_unpublish_url: Option, - on_play_url: Option, - on_stop_url: Option, -} - -impl Notifier { - pub fn new( - on_publish_url: Option, - on_unpublish_url: Option, - on_play_url: Option, - on_stop_url: Option, - ) -> Self { - Self { - request_client: reqwest::Client::new(), - on_publish_url, - on_unpublish_url, - on_play_url, - on_stop_url, - } - } - pub async fn on_publish_notify(&self, body: String) { - if let Some(on_publish_url) = &self.on_publish_url { - match self - .request_client - .post(on_publish_url) - .body(body) - .send() - .await - { - Err(err) => { - log::error!("on_publish error: {}", err); - } - Ok(response) => { - log::info!("on_publish success: {:?}", response); - } - } - } - } - - pub async fn on_unpublish_notify(&self, body: String) { - if let Some(on_unpublish_url) = &self.on_unpublish_url { - match self - .request_client - .post(on_unpublish_url) - .body(body) - .send() - .await - { - Err(err) => { - log::error!("on_unpublish error: {}", err); - } - Ok(response) => { - log::info!("on_unpublish success: {:?}", response); - } - } - } - } - - pub async fn on_play_notify(&self, body: String) { - if let Some(on_play_url) = &self.on_play_url { - match self - .request_client - .post(on_play_url) - .body(body) - .send() - .await - { - Err(err) => { - log::error!("on_play error: {}", err); - } - Ok(response) => { - log::info!("on_play success: {:?}", response); - } - } - } - } - - pub async fn on_stop_notify(&self, body: String) { - if let Some(on_stop_url) = &self.on_stop_url { - match self - .request_client - .post(on_stop_url) - .body(body) - .send() - .await - { - Err(err) => { - log::error!("on_stop error: {}", err); - } - Ok(response) => { - log::info!("on_stop success: {:?}", response); - } - } - } - } -} +pub mod http; + +use async_trait::async_trait; +use crate::define::{StreamHubEventMessage}; + +#[async_trait] +pub trait Notifier: Sync + Send { + async fn on_publish_notify(&self, event: &StreamHubEventMessage); + async fn on_unpublish_notify(&self, event: &StreamHubEventMessage); + async fn on_play_notify(&self, event: &StreamHubEventMessage); + async fn on_stop_notify(&self, event: &StreamHubEventMessage); +} \ No newline at end of file