diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 783c4fcd4..7e642c5c0 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -172,6 +172,7 @@ pub struct RoomOptions { pub dynacast: bool, pub e2ee: Option, pub rtc_config: RtcConfiguration, + pub join_retries: u32, } impl Default for RoomOptions { @@ -188,6 +189,7 @@ impl Default for RoomOptions { continual_gathering_policy: ContinualGatheringPolicy::GatherContinually, ice_transport_type: IceTransportsType::All, }, + join_retries: 3, } } } @@ -228,6 +230,7 @@ impl Room { auto_subscribe: options.auto_subscribe, adaptive_stream: options.adaptive_stream, }, + join_retries: options.join_retries, }, ) .await?; diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index f8b9d417f..27b1b0e26 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -74,6 +74,7 @@ pub enum EngineError { pub struct EngineOptions { pub rtc_config: RtcConfiguration, pub signal_options: SignalOptions, + pub join_retries: u32, } #[derive(Debug)] @@ -258,35 +259,54 @@ impl EngineInner { options: EngineOptions, ) -> EngineResult<(Arc, proto::JoinResponse, EngineEvents)> { let lk_runtime = LkRuntime::instance(); + let max_retries = options.join_retries; + + let try_connect = { + move || { + let options = options.clone(); + let lk_runtime = lk_runtime.clone(); + async move { + let (session, join_response, session_events) = + RtcSession::connect(url, token, options.clone()).await?; + session.wait_pc_connection().await?; + + let (engine_tx, engine_rx) = mpsc::unbounded_channel(); + let inner = Arc::new(Self { + lk_runtime, + engine_tx, + close_notifier: Arc::new(Notify::new()), + running_handle: RwLock::new(EngineHandle { + session: Arc::new(session), + closed: false, + reconnecting: false, + full_reconnect: false, + engine_task: None, + }), + options, + reconnecting_lock: AsyncRwLock::default(), + reconnecting_interval: AsyncMutex::new(interval(RECONNECT_INTERVAL)), + }); - let (session, join_response, session_events) = - RtcSession::connect(url, token, options.clone()).await?; - let (engine_tx, engine_rx) = mpsc::unbounded_channel(); - - session.wait_pc_connection().await?; - - let inner = Arc::new(Self { - lk_runtime, - engine_tx, - close_notifier: Arc::new(Notify::new()), - running_handle: RwLock::new(EngineHandle { - session: Arc::new(session), - closed: false, - reconnecting: false, - full_reconnect: false, - engine_task: None, - }), - options, - reconnecting_lock: AsyncRwLock::default(), - reconnecting_interval: AsyncMutex::new(interval(RECONNECT_INTERVAL)), - }); + // Start initial tasks + let (close_tx, close_rx) = oneshot::channel(); + let session_task = + tokio::spawn(Self::engine_task(inner.clone(), session_events, close_rx)); + inner.running_handle.write().engine_task = Some((session_task, close_tx)); - // Start initial tasks - let (close_tx, close_rx) = oneshot::channel(); - let session_task = tokio::spawn(Self::engine_task(inner.clone(), session_events, close_rx)); - inner.running_handle.write().engine_task = Some((session_task, close_tx)); + Ok((inner, join_response, engine_rx)) + } + } + }; + + let mut last_error = None; + for _ in 0..(max_retries + 1) { + match try_connect().await { + Ok(res) => return Ok(res), + Err(e) => last_error = Some(e), + } + } - Ok((inner, join_response, engine_rx)) + Err(last_error.unwrap()) } async fn engine_task(