From bfa62d758e01ec05f8b639fdce547d7b507ef84f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9o=20Monnom?= Date: Thu, 19 Oct 2023 11:33:05 -0700 Subject: [PATCH] fix: redundant close on SignalClient (#224) --- livekit-api/src/signal_client/mod.rs | 27 ++++++--------------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index 9a070cdf6..fe77aae58 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -22,9 +22,9 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use thiserror::Error; +use tokio::sync::mpsc; use tokio::sync::Mutex as AsyncMutex; use tokio::sync::RwLock as AsyncRwLock; -use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use tokio::time::{interval, sleep, Instant}; use tokio_tungstenite::tungstenite::Error as WsError; @@ -92,7 +92,7 @@ struct SignalInner { pub struct SignalClient { inner: Arc, emitter: SignalEmitter, - handle: Mutex, oneshot::Sender<()>)>>, + handle: Mutex>>, } impl Debug for SignalClient { @@ -115,20 +115,13 @@ impl SignalClient { SignalInner::connect(url, token, options).await?; let (emitter, events) = mpsc::unbounded_channel(); - let (close_tx, close_rx) = oneshot::channel(); - - let signal_task = tokio::spawn(signal_task( - inner.clone(), - emitter.clone(), - stream_events, - close_rx, - )); + let signal_task = tokio::spawn(signal_task(inner.clone(), emitter.clone(), stream_events)); Ok(( Self { inner, emitter, - handle: Mutex::new(Some((signal_task, close_tx))), + handle: Mutex::new(Some(signal_task)), }, join_response, events, @@ -141,15 +134,13 @@ impl SignalClient { self.close().await; let (reconnect_response, stream_events) = self.inner.restart().await?; - let (close_tx, close_rx) = oneshot::channel(); let signal_task = tokio::spawn(signal_task( self.inner.clone(), self.emitter.clone(), stream_events, - close_rx, )); - *self.handle.lock() = Some((signal_task, close_tx)); + *self.handle.lock() = Some(signal_task); Ok(reconnect_response) } @@ -165,8 +156,7 @@ impl SignalClient { self.inner.close().await; let handle = self.handle.lock().take(); - if let Some((signal_task, close_tx)) = handle { - let _ = close_tx.send(()); + if let Some(signal_task) = handle { let _ = signal_task.await; } } @@ -343,7 +333,6 @@ async fn signal_task( inner: Arc, emitter: SignalEmitter, // Public emitter mut internal_events: mpsc::UnboundedReceiver>, - mut close_rx: oneshot::Receiver<()>, ) { let mut ping_interval = interval(Duration::from_secs( inner.join_response.ping_interval as u64, @@ -400,10 +389,6 @@ async fn signal_task( let _ = emitter.send(SignalEvent::Close("ping timeout".into())); break; } - _ = &mut close_rx => { - let _ = emitter.send(SignalEvent::Close("client closed".into())); - break; - } } }