Skip to content

Commit

Permalink
Merge pull request #132 from vaptu/master
Browse files Browse the repository at this point in the history
feat: abstract streamhub message notifications
  • Loading branch information
harlanc authored May 29, 2024
2 parents 50b31e9 + d020792 commit 2be56a7
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 121 deletions.
10 changes: 5 additions & 5 deletions application/xiu/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use commonlib::auth::AuthType;
use rtmp::remuxer::RtmpRemuxer;

use std::sync::Arc;
use crate::config::{AuthConfig, AuthSecretConfig};

use {
Expand All @@ -16,7 +16,7 @@ use {
relay::{pull_client::PullClient, push_client::PushClient},
rtmp::RtmpServer,
},
streamhub::{notify::Notifier, StreamsHub},
streamhub::{notify::Notifier, notify::http::HttpNotifier, StreamsHub},
tokio,
xrtsp::rtsp::RtspServer,
xwebrtc::webrtc::WebRTCServer,
Expand Down Expand Up @@ -61,16 +61,16 @@ impl Service {
}

pub async fn run(&mut self) -> Result<()> {
let notifier = if let Some(httpnotifier) = &self.cfg.httpnotify {
let notifier: Option<Arc<dyn Notifier>> = if let Some(httpnotifier) = &self.cfg.httpnotify {
if !httpnotifier.enabled {
None
} else {
Some(Notifier::new(
Some(Arc::new(HttpNotifier::new(
httpnotifier.on_publish.clone(),
httpnotifier.on_unpublish.clone(),
httpnotifier.on_play.clone(),
httpnotifier.on_stop.clone(),
))
)))
}
} else {
None
Expand Down
43 changes: 43 additions & 0 deletions library/streamhub/src/define.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,27 @@ pub enum PubDataType {
Both,
}

#[derive(Clone, Serialize)]
pub enum StreamHubEventMessage {
Subscribe {
identifier: StreamIdentifier,
info: SubscriberInfo,
},
UnSubscribe {
identifier: StreamIdentifier,
info: SubscriberInfo,
},
Publish {
identifier: StreamIdentifier,
info: PublisherInfo,
},
UnPublish {
identifier: StreamIdentifier,
info: PublisherInfo,
},
NotSupport {},
}

#[derive(Serialize)]
pub enum StreamHubEvent {
Subscribe {
Expand Down Expand Up @@ -266,6 +287,28 @@ pub enum StreamHubEvent {
},
}

impl StreamHubEvent {
pub fn to_message(&self) -> StreamHubEventMessage {
match self {
StreamHubEvent::Subscribe { identifier, info, result_sender: _result_sender } => {
StreamHubEventMessage::Subscribe { identifier: identifier.clone(), info: info.clone() }
}
StreamHubEvent::UnSubscribe { identifier, info } => {
StreamHubEventMessage::UnSubscribe { identifier: identifier.clone(), info: info.clone() }
}
StreamHubEvent::Publish { identifier, info, result_sender: _result_sender, stream_handler: _stream_handler } => {
StreamHubEventMessage::Publish { identifier: identifier.clone(), info: info.clone() }
}
StreamHubEvent::UnPublish { identifier, info } => {
StreamHubEventMessage::UnPublish { identifier: identifier.clone(), info: info.clone() }
}
_ => {
StreamHubEventMessage::NotSupport {}
}
}
}
}

#[derive(Debug)]
pub enum TransceiverEvent {
Subscribe {
Expand Down
26 changes: 10 additions & 16 deletions library/streamhub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,11 +516,11 @@ pub struct StreamsHub {
//enable hls
hls_enabled: bool,
//http notifier on sub/pub event
notifier: Option<Notifier>,
notifier: Option<Arc<dyn Notifier>>,
}

impl StreamsHub {
pub fn new(notifier: Option<Notifier>) -> Self {
pub fn new(notifier: Option<Arc<dyn Notifier>>) -> Self {
let (event_producer, event_consumer) = mpsc::unbounded_channel();
let (client_producer, _) = broadcast::channel(100);

Expand Down Expand Up @@ -566,15 +566,9 @@ impl StreamsHub {
}

pub async fn event_loop(&mut self) {
while let Some(message) = self.hub_event_receiver.recv().await {
let event_serialize_str = if let Ok(data) = serde_json::to_string(&message) {
log::info!("event data: {}", data);
data
} else {
String::from("empty body")
};

match message {
while let Some(event) = self.hub_event_receiver.recv().await {
let message = event.to_message();
match event {
StreamHubEvent::Publish {
identifier,
info,
Expand Down Expand Up @@ -627,7 +621,7 @@ impl StreamsHub {
{
Ok(statistic_data_sender) => {
if let Some(notifier) = &self.notifier {
notifier.on_publish_notify(event_serialize_str).await;
notifier.on_publish_notify(&message).await;
}
self.un_pub_sub_events
.insert(info.id, StreamHubEvent::UnPublish { identifier, info });
Expand Down Expand Up @@ -658,7 +652,7 @@ impl StreamsHub {
}

if let Some(notifier) = &self.notifier {
notifier.on_unpublish_notify(event_serialize_str).await;
notifier.on_unpublish_notify(&message).await;
}
}
StreamHubEvent::Subscribe {
Expand Down Expand Up @@ -700,7 +694,7 @@ impl StreamsHub {
let rv = match self.subscribe(&identifier, info_clone, sender).await {
Ok(statistic_data_sender) => {
if let Some(notifier) = &self.notifier {
notifier.on_play_notify(event_serialize_str).await;
notifier.on_play_notify(&message).await;
}

self.un_pub_sub_events
Expand All @@ -720,7 +714,7 @@ impl StreamsHub {
StreamHubEvent::UnSubscribe { identifier, info } => {
if self.unsubscribe(&identifier, info).is_ok() {
if let Some(notifier) = &self.notifier {
notifier.on_stop_notify(event_serialize_str).await;
notifier.on_stop_notify(&message).await;
}
}
}
Expand Down Expand Up @@ -999,7 +993,7 @@ impl StreamsHub {
None => {
return Err(StreamHubError {
value: StreamHubErrorValue::NoAppName,
})
});
}
}

Expand Down
122 changes: 122 additions & 0 deletions library/streamhub/src/notify/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use crate::notify::Notifier;
use reqwest::Client;
use async_trait::async_trait;
use crate::define::{StreamHubEventMessage};

macro_rules! serialize_event {
($message:expr) => {{
let event_serialize_str = match serde_json::to_string(&$message) {
Ok(data) => {
log::info!("event data: {}", data);
data
}
Err(_) => String::from("empty body"),
};
event_serialize_str
}};
}


pub struct HttpNotifier {
request_client: Client,
on_publish_url: Option<String>,
on_unpublish_url: Option<String>,
on_play_url: Option<String>,
on_stop_url: Option<String>,
}

impl HttpNotifier {
pub fn new(
on_publish_url: Option<String>,
on_unpublish_url: Option<String>,
on_play_url: Option<String>,
on_stop_url: Option<String>,
) -> Self {
Self {
request_client: reqwest::Client::new(),
on_publish_url,
on_unpublish_url,
on_play_url,
on_stop_url,
}
}
}

#[async_trait]
impl Notifier for HttpNotifier {
async fn on_publish_notify(&self, event: &StreamHubEventMessage) {
if let Some(on_publish_url) = &self.on_publish_url {
match self
.request_client
.post(on_publish_url)
.body(serialize_event!(event))
.send()
.await
{
Err(err) => {
log::error!("on_publish error: {}", err);
}
Ok(response) => {
log::info!("on_publish success: {:?}", response);
}
}
}
}

async fn on_unpublish_notify(&self, event: &StreamHubEventMessage) {
if let Some(on_unpublish_url) = &self.on_unpublish_url {
match self
.request_client
.post(on_unpublish_url)
.body(serialize_event!(event))
.send()
.await
{
Err(err) => {
log::error!("on_unpublish error: {}", err);
}
Ok(response) => {
log::info!("on_unpublish success: {:?}", response);
}
}
}
}

async fn on_play_notify(&self, event: &StreamHubEventMessage) {
if let Some(on_play_url) = &self.on_play_url {
match self
.request_client
.post(on_play_url)
.body(serialize_event!(event))
.send()
.await
{
Err(err) => {
log::error!("on_play error: {}", err);
}
Ok(response) => {
log::info!("on_play success: {:?}", response);
}
}
}
}

async fn on_stop_notify(&self, event: &StreamHubEventMessage) {
if let Some(on_stop_url) = &self.on_stop_url {
match self
.request_client
.post(on_stop_url)
.body(serialize_event!(event))
.send()
.await
{
Err(err) => {
log::error!("on_stop error: {}", err);
}
Ok(response) => {
log::info!("on_stop success: {:?}", response);
}
}
}
}
}
Loading

0 comments on commit 2be56a7

Please sign in to comment.