Skip to content

Commit

Permalink
Merge pull request #152 from vaptu/master
Browse files Browse the repository at this point in the history
feat: add rtmp client session timeout option
  • Loading branch information
harlanc authored Sep 20, 2024
2 parents dffca6e + 68e4655 commit f4a8595
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion protocol/rtmp/src/session/client_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use {
},
indexmap::IndexMap,
std::sync::Arc,
std::time::Duration,
//crate::utils::print::print,
streamhub::define::StreamHubEventSender,
tokio::{net::TcpStream, sync::Mutex},
Expand Down Expand Up @@ -67,6 +68,7 @@ pub enum ClientSessionType {
}
pub struct ClientSession {
io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
timeout: Option<Duration>,
common: Common,
handshaker: SimpleHandshakeClient,
unpacketizer: ChunkUnpacketizer,
Expand Down Expand Up @@ -115,6 +117,7 @@ impl ClientSession {

Self {
io: Arc::clone(&net_io),
timeout: None,
common,
handshaker: SimpleHandshakeClient::new(Arc::clone(&net_io)),
unpacketizer: ChunkUnpacketizer::new(),
Expand All @@ -129,6 +132,10 @@ impl ClientSession {
gop_num,
}
}

pub fn set_timeout(&mut self, timeout: Duration){
self.timeout = Some(timeout)
}

pub async fn run(&mut self) -> Result<(), SessionError> {
loop {
Expand Down Expand Up @@ -169,7 +176,10 @@ impl ClientSession {
ClientSessionState::WaitStateChange => {}
}

let data = self.io.lock().await.read().await?;
let data = match self.timeout {
None => self.io.lock().await.read().await?,
Some(t) => self.io.lock().await.read_timeout(t).await?,
};
self.unpacketizer.extend_data(&data[..]);

loop {
Expand Down

0 comments on commit f4a8595

Please sign in to comment.