diff --git a/protocol/rtsp/src/rtsp.rs b/protocol/rtsp/src/rtsp.rs index 2c9020e9..b0a049b3 100644 --- a/protocol/rtsp/src/rtsp.rs +++ b/protocol/rtsp/src/rtsp.rs @@ -32,7 +32,39 @@ impl RtspServer { RtspServerSession::new(tcp_stream, self.event_producer.clone(), self.auth.clone()); tokio::spawn(async move { if let Err(err) = session.run().await { - log::error!("session run error, err: {}", err); + let session_id = if let Some(id) = session.session_id { + id.to_string() + } else { + "none".to_string() + }; + log::info!( + "session run exit: session id: {} session type: {} , err: {}", + session_id, + session.session_type, + err + ); + + if !session.is_normal_exit { + if let Some(identifier) = session.stream_identifier.clone() { + match session.exit(identifier) { + Err(err) => { + log::error!( + "session exit error: session id: {} session type: {}, error info: {}", + session_id, + session.session_type, + err + ); + } + Ok(()) => { + log::info!( + "session exit successfully: session id: {} session type: {} ", + session_id, + session.session_type, + ); + } + } + } + } } }); } diff --git a/protocol/rtsp/src/session/mod.rs b/protocol/rtsp/src/session/mod.rs index 8f84ea72..5694ddb9 100644 --- a/protocol/rtsp/src/session/mod.rs +++ b/protocol/rtsp/src/session/mod.rs @@ -73,12 +73,15 @@ pub struct RtspServerSession { tracks: HashMap, sdp: Sdp, pub session_id: Option, - session_type: define::SessionType, + pub session_type: define::SessionType, stream_handler: Arc, event_producer: StreamHubEventSender, auth: Option, + + pub stream_identifier: Option, + pub is_normal_exit: bool, } pub struct InterleavedBinaryData { @@ -137,6 +140,8 @@ impl RtspServerSession { event_producer, stream_handler: Arc::new(RtspStreamHandler::new()), auth, + stream_identifier: None, + is_normal_exit: false, } } @@ -279,12 +284,12 @@ impl RtspServerSession { // receiver is used to receive the sdp information let (sender, mut receiver) = mpsc::unbounded_channel(); - let request_event = StreamHubEvent::Request { - identifier: StreamIdentifier::Rtsp { - stream_path: rtsp_request.uri.path.clone(), - }, - sender, + let identifier = StreamIdentifier::Rtsp { + stream_path: rtsp_request.uri.path.clone(), }; + self.stream_identifier = Some(identifier.clone()); + + let request_event = StreamHubEvent::Request { identifier, sender }; if self.event_producer.send(request_event).is_err() { return Err(SessionError { @@ -338,10 +343,13 @@ impl RtspServerSession { let (event_result_sender, event_result_receiver) = oneshot::channel(); + let identifier = StreamIdentifier::Rtsp { + stream_path: rtsp_request.uri.path.clone(), + }; + self.stream_identifier = Some(identifier.clone()); + let publish_event = StreamHubEvent::Publish { - identifier: StreamIdentifier::Rtsp { - stream_path: rtsp_request.uri.path.clone(), - }, + identifier, result_sender: event_result_sender, info: self.get_publisher_info(), stream_handler: self.stream_handler.clone(), @@ -604,21 +612,25 @@ impl RtspServerSession { } async fn handle_record(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { + let status_code = http::StatusCode::OK; + let mut response = Self::gen_response(status_code, rtsp_request); + + //A stream published by gstreamer does not support the Range header + //https://github.com/harlanc/xiu/issues/135 if let Some(range_str) = rtsp_request.headers.get(&String::from("Range")) { if let Some(range) = RtspRange::unmarshal(range_str) { - let status_code = http::StatusCode::OK; - let mut response = Self::gen_response(status_code, rtsp_request); response .headers .insert(String::from("Range"), range.marshal()); - response - .headers - .insert("Session".to_string(), self.session_id.unwrap().to_string()); - - self.send_response(&response).await?; } } + response + .headers + .insert("Session".to_string(), self.session_id.unwrap().to_string()); + + self.send_response(&response).await?; + Ok(()) } @@ -626,23 +638,20 @@ impl RtspServerSession { let identifier = StreamIdentifier::Rtsp { stream_path: rtsp_request.uri.path.clone(), }; + log::info!("handle_teardown..."); + self.exit(identifier) + } + pub fn exit(&mut self, identifier: StreamIdentifier) -> Result<(), SessionError> { let event = match self.session_type { - define::SessionType::Client => { - log::info!("handle_teardown: client"); - - StreamHubEvent::UnSubscribe { - identifier, - info: self.get_subscriber_info(), - } - } - define::SessionType::Server => { - log::info!("handle_teardown: server"); - StreamHubEvent::UnPublish { - identifier, - info: self.get_publisher_info(), - } - } + define::SessionType::Client => StreamHubEvent::UnSubscribe { + identifier, + info: self.get_subscriber_info(), + }, + define::SessionType::Server => StreamHubEvent::UnPublish { + identifier, + info: self.get_publisher_info(), + }, }; let event_json_str = serde_json::to_string(&event).unwrap(); @@ -650,13 +659,14 @@ impl RtspServerSession { let rv = self.event_producer.send(event); match rv { Err(err) => { - log::error!("handle_teardown: send event error: {err} for event: {event_json_str}"); + log::error!("session exit: send event error: {err} for event: {event_json_str}"); Err(SessionError { value: SessionErrorValue::StreamHubEventSendErr, }) } Ok(()) => { - log::info!("handle_teardown: send event success: {event_json_str}"); + self.is_normal_exit = true; + log::info!("session exit: send event success: {event_json_str}"); Ok(()) } }