Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom committed Nov 14, 2023
1 parent 21e2367 commit 4128e8e
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 26 deletions.
3 changes: 3 additions & 0 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ pub struct RoomOptions {
pub dynacast: bool,
pub e2ee: Option<E2eeOptions>,
pub rtc_config: RtcConfiguration,
pub join_retries: u32,
}

impl Default for RoomOptions {
Expand All @@ -188,6 +189,7 @@ impl Default for RoomOptions {
continual_gathering_policy: ContinualGatheringPolicy::GatherContinually,
ice_transport_type: IceTransportsType::All,
},
join_retries: 3,
}
}
}
Expand Down Expand Up @@ -228,6 +230,7 @@ impl Room {
auto_subscribe: options.auto_subscribe,
adaptive_stream: options.adaptive_stream,
},
join_retries: options.join_retries,
},
)
.await?;
Expand Down
72 changes: 46 additions & 26 deletions livekit/src/rtc_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub enum EngineError {
pub struct EngineOptions {
pub rtc_config: RtcConfiguration,
pub signal_options: SignalOptions,
pub join_retries: u32,
}

#[derive(Debug)]
Expand Down Expand Up @@ -258,35 +259,54 @@ impl EngineInner {
options: EngineOptions,
) -> EngineResult<(Arc<Self>, proto::JoinResponse, EngineEvents)> {
let lk_runtime = LkRuntime::instance();
let max_retries = options.join_retries;

let try_connect = {
move || {
let options = options.clone();
let lk_runtime = lk_runtime.clone();
async move {
let (session, join_response, session_events) =
RtcSession::connect(url, token, options.clone()).await?;
session.wait_pc_connection().await?;

let (engine_tx, engine_rx) = mpsc::unbounded_channel();
let inner = Arc::new(Self {
lk_runtime,
engine_tx,
close_notifier: Arc::new(Notify::new()),
running_handle: RwLock::new(EngineHandle {
session: Arc::new(session),
closed: false,
reconnecting: false,
full_reconnect: false,
engine_task: None,
}),
options,
reconnecting_lock: AsyncRwLock::default(),
reconnecting_interval: AsyncMutex::new(interval(RECONNECT_INTERVAL)),
});

let (session, join_response, session_events) =
RtcSession::connect(url, token, options.clone()).await?;
let (engine_tx, engine_rx) = mpsc::unbounded_channel();

session.wait_pc_connection().await?;

let inner = Arc::new(Self {
lk_runtime,
engine_tx,
close_notifier: Arc::new(Notify::new()),
running_handle: RwLock::new(EngineHandle {
session: Arc::new(session),
closed: false,
reconnecting: false,
full_reconnect: false,
engine_task: None,
}),
options,
reconnecting_lock: AsyncRwLock::default(),
reconnecting_interval: AsyncMutex::new(interval(RECONNECT_INTERVAL)),
});
// Start initial tasks
let (close_tx, close_rx) = oneshot::channel();
let session_task =
tokio::spawn(Self::engine_task(inner.clone(), session_events, close_rx));
inner.running_handle.write().engine_task = Some((session_task, close_tx));

// Start initial tasks
let (close_tx, close_rx) = oneshot::channel();
let session_task = tokio::spawn(Self::engine_task(inner.clone(), session_events, close_rx));
inner.running_handle.write().engine_task = Some((session_task, close_tx));
Ok((inner, join_response, engine_rx))
}
}
};

let mut last_error = None;
for _ in 0..(max_retries + 1) {
match try_connect().await {
Ok(res) => return Ok(res),
Err(e) => last_error = Some(e),
}
}

Ok((inner, join_response, engine_rx))
Err(last_error.unwrap())
}

async fn engine_task(
Expand Down

0 comments on commit 4128e8e

Please sign in to comment.