diff --git a/livekit-api/src/http_client.rs b/livekit-api/src/http_client.rs index 3f87d919..b82f291a 100644 --- a/livekit-api/src/http_client.rs +++ b/livekit-api/src/http_client.rs @@ -16,7 +16,6 @@ mod async_std { // #[cfg(any(feature = "native-tls-vendored", feature = "rustls-tls-native-roots", feature = "rustls-tls-webpki-roots", feature = "__rustls-tls"))] // compile_error!("the async std compatible libraries do not support these features"); - #[cfg(any(feature = "signal-client-async", feature = "services-async"))] pub struct Response(http::Response); @@ -48,7 +47,6 @@ mod async_std { #[cfg(feature = "signal-client-async")] pub use signal_client::*; - #[cfg(feature = "services-async")] mod services { use std::io; diff --git a/livekit-api/src/lib.rs b/livekit-api/src/lib.rs index 3c7a1c11..1f5dc88c 100644 --- a/livekit-api/src/lib.rs +++ b/livekit-api/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. - #[cfg(feature = "access-token")] pub mod access_token; @@ -22,7 +21,12 @@ pub mod services; #[cfg(any(feature = "signal-client-tokio", feature = "signal-client-async"))] pub mod signal_client; -#[cfg(any(feature = "signal-client-tokio", feature = "signal-client-async", feature = "services-tokio", feature = "services-async"))] +#[cfg(any( + feature = "signal-client-tokio", + feature = "signal-client-async", + feature = "services-tokio", + feature = "services-async" +))] mod http_client; #[cfg(feature = "webhooks")] diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index 2a8bd0bb..1cda63d7 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -22,11 +22,11 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; +use http::StatusCode; use livekit_protocol as proto; +use livekit_runtime::{interval, sleep, Instant, JoinHandle}; use parking_lot::Mutex; -use http::StatusCode; use thiserror::Error; -use livekit_runtime::{JoinHandle, interval, sleep, Instant}; use tokio::sync::{mpsc, Mutex as AsyncMutex, RwLock as AsyncRwLock}; #[cfg(feature = "signal-client-tokio")] @@ -120,7 +120,8 @@ impl SignalClient { SignalInner::connect(url, token, options).await?; let (emitter, events) = mpsc::unbounded_channel(); - let signal_task = livekit_runtime::spawn(signal_task(inner.clone(), emitter.clone(), stream_events)); + let signal_task = + livekit_runtime::spawn(signal_task(inner.clone(), emitter.clone(), stream_events)); Ok((Self { inner, emitter, handle: Mutex::new(Some(signal_task)) }, join_response, events)) } @@ -131,8 +132,11 @@ impl SignalClient { self.close().await; let (reconnect_response, stream_events) = self.inner.restart().await?; - let signal_task = - livekit_runtime::spawn(signal_task(self.inner.clone(), self.emitter.clone(), stream_events)); + let signal_task = livekit_runtime::spawn(signal_task( + self.inner.clone(), + self.emitter.clone(), + stream_events, + )); *self.handle.lock() = Some(signal_task); Ok(reconnect_response) diff --git a/livekit-api/src/signal_client/signal_stream.rs b/livekit-api/src/signal_client/signal_stream.rs index 4448ea56..d31a8aac 100644 --- a/livekit-api/src/signal_client/signal_stream.rs +++ b/livekit-api/src/signal_client/signal_stream.rs @@ -26,8 +26,10 @@ use tokio::sync::{mpsc, oneshot}; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; #[cfg(feature = "signal-client-async")] -use async_tungstenite::{tungstenite::Message, async_std::ClientStream as MaybeTlsStream, WebSocketStream, - async_std::connect_async as connect_async}; +use async_tungstenite::{ + async_std::connect_async, async_std::ClientStream as MaybeTlsStream, tungstenite::Message, + WebSocketStream, +}; use super::{SignalError, SignalResult}; diff --git a/livekit-ffi/src/server/mod.rs b/livekit-ffi/src/server/mod.rs index eaf35211..2d3cf608 100644 --- a/livekit-ffi/src/server/mod.rs +++ b/livekit-ffi/src/server/mod.rs @@ -25,8 +25,8 @@ use std::{ use dashmap::{mapref::one::MappedRef, DashMap}; use downcast_rs::{impl_downcast, Downcast}; use livekit::webrtc::{native::audio_resampler::AudioResampler, prelude::*}; -use tokio::task::JoinHandle; use parking_lot::{deadlock, Mutex}; +use tokio::task::JoinHandle; use crate::{proto, proto::FfiEvent, FfiError, FfiHandleId, FfiResult, INVALID_HANDLE}; @@ -78,7 +78,8 @@ pub struct FfiServer { impl Default for FfiServer { fn default() -> Self { - let async_runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); + let async_runtime = + tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); let logger = Box::leak(Box::new(logger::FfiLogger::new(async_runtime.handle().clone()))); log::set_logger(logger).unwrap(); diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index 71cf936a..732cab2b 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -15,9 +15,9 @@ use std::{collections::HashSet, slice, sync::Arc, time::Duration}; use livekit::prelude::*; -use tokio::task::JoinHandle; use parking_lot::Mutex; use tokio::sync::{broadcast, mpsc, oneshot, Mutex as AsyncMutex}; +use tokio::task::JoinHandle; use super::FfiDataBuffer; use crate::{ diff --git a/livekit-runtime/src/async_std.rs b/livekit-runtime/src/async_std.rs index e74f2491..3230fdd6 100644 --- a/livekit-runtime/src/async_std.rs +++ b/livekit-runtime/src/async_std.rs @@ -1,10 +1,10 @@ use std::time::Duration; pub type JoinHandle = async_std::task::JoinHandle; -pub use std::time::Instant; pub use async_std::future::timeout; -pub use async_std::task::spawn; pub use async_std::net::TcpStream; +pub use async_std::task::spawn; use futures::{Future, FutureExt, StreamExt}; +pub use std::time::Instant; pub struct Interval { duration: Duration, @@ -26,7 +26,7 @@ pub fn interval(duration: Duration) -> Interval { } pub struct Sleep { - timer: async_io::Timer + timer: async_io::Timer, } impl Sleep { @@ -42,7 +42,10 @@ pub fn sleep(duration: Duration) -> Sleep { impl Future for Sleep { type Output = (); - fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { self.timer.poll_unpin(cx).map(|_| ()) } -} \ No newline at end of file +} diff --git a/livekit-runtime/src/tokio.rs b/livekit-runtime/src/tokio.rs index 6317bbe9..efdbff0b 100644 --- a/livekit-runtime/src/tokio.rs +++ b/livekit-runtime/src/tokio.rs @@ -2,10 +2,10 @@ use std::future::Future; use std::pin::Pin; use std::time::Duration; -pub use tokio::time::Instant; +pub use tokio::net::TcpStream; pub use tokio::time::sleep; pub use tokio::time::timeout; -pub use tokio::net::TcpStream; +pub use tokio::time::Instant; pub type JoinHandle = TokioJoinHandle; pub type Interval = tokio::time::Interval; @@ -33,7 +33,9 @@ impl Future for TokioJoinHandle { let this = &mut *self; let mut handle = &mut this.handle; match Pin::new(&mut handle).poll(cx) { - std::task::Poll::Ready(value) => std::task::Poll::Ready(value.expect("Tasks should not panic")), + std::task::Poll::Ready(value) => { + std::task::Poll::Ready(value.expect("Tasks should not panic")) + } std::task::Poll::Pending => std::task::Poll::Pending, } }