Skip to content

Commit

Permalink
fix error that xiu cannot receive RTSP stream published by gstreamer #…
Browse files Browse the repository at this point in the history
  • Loading branch information
harlanc committed Jul 19, 2024
1 parent a31d775 commit 6085e91
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 34 deletions.
34 changes: 33 additions & 1 deletion protocol/rtsp/src/rtsp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,39 @@ impl RtspServer {
RtspServerSession::new(tcp_stream, self.event_producer.clone(), self.auth.clone());
tokio::spawn(async move {
if let Err(err) = session.run().await {
log::error!("session run error, err: {}", err);
let session_id = if let Some(id) = session.session_id {
id.to_string()
} else {
"none".to_string()
};
log::info!(
"session run exit: session id: {} session type: {} , err: {}",
session_id,
session.session_type,
err
);

if !session.is_normal_exit {
if let Some(identifier) = session.stream_identifier.clone() {
match session.exit(identifier) {
Err(err) => {
log::error!(
"session exit error: session id: {} session type: {}, error info: {}",
session_id,
session.session_type,
err
);
}
Ok(()) => {
log::info!(
"session exit successfully: session id: {} session type: {} ",
session_id,
session.session_type,
);
}
}
}
}
}
});
}
Expand Down
76 changes: 43 additions & 33 deletions protocol/rtsp/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,15 @@ pub struct RtspServerSession {
tracks: HashMap<TrackType, RtspTrack>,
sdp: Sdp,
pub session_id: Option<Uuid>,
session_type: define::SessionType,
pub session_type: define::SessionType,

stream_handler: Arc<RtspStreamHandler>,
event_producer: StreamHubEventSender,

auth: Option<Auth>,

pub stream_identifier: Option<StreamIdentifier>,
pub is_normal_exit: bool,
}

pub struct InterleavedBinaryData {
Expand Down Expand Up @@ -137,6 +140,8 @@ impl RtspServerSession {
event_producer,
stream_handler: Arc::new(RtspStreamHandler::new()),
auth,
stream_identifier: None,
is_normal_exit: false,
}
}

Expand Down Expand Up @@ -279,12 +284,12 @@ impl RtspServerSession {
// receiver is used to receive the sdp information
let (sender, mut receiver) = mpsc::unbounded_channel();

let request_event = StreamHubEvent::Request {
identifier: StreamIdentifier::Rtsp {
stream_path: rtsp_request.uri.path.clone(),
},
sender,
let identifier = StreamIdentifier::Rtsp {
stream_path: rtsp_request.uri.path.clone(),
};
self.stream_identifier = Some(identifier.clone());

let request_event = StreamHubEvent::Request { identifier, sender };

if self.event_producer.send(request_event).is_err() {
return Err(SessionError {
Expand Down Expand Up @@ -338,10 +343,13 @@ impl RtspServerSession {

let (event_result_sender, event_result_receiver) = oneshot::channel();

let identifier = StreamIdentifier::Rtsp {
stream_path: rtsp_request.uri.path.clone(),
};
self.stream_identifier = Some(identifier.clone());

let publish_event = StreamHubEvent::Publish {
identifier: StreamIdentifier::Rtsp {
stream_path: rtsp_request.uri.path.clone(),
},
identifier,
result_sender: event_result_sender,
info: self.get_publisher_info(),
stream_handler: self.stream_handler.clone(),
Expand Down Expand Up @@ -604,59 +612,61 @@ impl RtspServerSession {
}

async fn handle_record(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> {
let status_code = http::StatusCode::OK;
let mut response = Self::gen_response(status_code, rtsp_request);

//A stream published by gstreamer does not support the Range header
//https://github.com/harlanc/xiu/issues/135
if let Some(range_str) = rtsp_request.headers.get(&String::from("Range")) {
if let Some(range) = RtspRange::unmarshal(range_str) {
let status_code = http::StatusCode::OK;
let mut response = Self::gen_response(status_code, rtsp_request);
response
.headers
.insert(String::from("Range"), range.marshal());
response
.headers
.insert("Session".to_string(), self.session_id.unwrap().to_string());

self.send_response(&response).await?;
}
}

response
.headers
.insert("Session".to_string(), self.session_id.unwrap().to_string());

self.send_response(&response).await?;

Ok(())
}

fn handle_teardown(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> {
let identifier = StreamIdentifier::Rtsp {
stream_path: rtsp_request.uri.path.clone(),
};
log::info!("handle_teardown...");
self.exit(identifier)
}

pub fn exit(&mut self, identifier: StreamIdentifier) -> Result<(), SessionError> {
let event = match self.session_type {
define::SessionType::Client => {
log::info!("handle_teardown: client");

StreamHubEvent::UnSubscribe {
identifier,
info: self.get_subscriber_info(),
}
}
define::SessionType::Server => {
log::info!("handle_teardown: server");
StreamHubEvent::UnPublish {
identifier,
info: self.get_publisher_info(),
}
}
define::SessionType::Client => StreamHubEvent::UnSubscribe {
identifier,
info: self.get_subscriber_info(),
},
define::SessionType::Server => StreamHubEvent::UnPublish {
identifier,
info: self.get_publisher_info(),
},
};

let event_json_str = serde_json::to_string(&event).unwrap();

let rv = self.event_producer.send(event);
match rv {
Err(err) => {
log::error!("handle_teardown: send event error: {err} for event: {event_json_str}");
log::error!("session exit: send event error: {err} for event: {event_json_str}");
Err(SessionError {
value: SessionErrorValue::StreamHubEventSendErr,
})
}
Ok(()) => {
log::info!("handle_teardown: send event success: {event_json_str}");
self.is_normal_exit = true;
log::info!("session exit: send event success: {event_json_str}");
Ok(())
}
}
Expand Down

0 comments on commit 6085e91

Please sign in to comment.