Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
mikayla-maki committed Feb 27, 2024
1 parent 1746ddc commit 632780e
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 22 deletions.
2 changes: 0 additions & 2 deletions livekit-api/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ mod async_std {
// #[cfg(any(feature = "native-tls-vendored", feature = "rustls-tls-native-roots", feature = "rustls-tls-webpki-roots", feature = "__rustls-tls"))]
// compile_error!("the async std compatible libraries do not support these features");


#[cfg(any(feature = "signal-client-async", feature = "services-async"))]
pub struct Response(http::Response<isahc::AsyncBody>);

Expand Down Expand Up @@ -48,7 +47,6 @@ mod async_std {
#[cfg(feature = "signal-client-async")]
pub use signal_client::*;


#[cfg(feature = "services-async")]
mod services {
use std::io;
Expand Down
8 changes: 6 additions & 2 deletions livekit-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.


#[cfg(feature = "access-token")]
pub mod access_token;

Expand All @@ -22,7 +21,12 @@ pub mod services;
#[cfg(any(feature = "signal-client-tokio", feature = "signal-client-async"))]
pub mod signal_client;

#[cfg(any(feature = "signal-client-tokio", feature = "signal-client-async", feature = "services-tokio", feature = "services-async"))]
#[cfg(any(
feature = "signal-client-tokio",
feature = "signal-client-async",
feature = "services-tokio",
feature = "services-async"
))]
mod http_client;

#[cfg(feature = "webhooks")]
Expand Down
14 changes: 9 additions & 5 deletions livekit-api/src/signal_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};

use http::StatusCode;
use livekit_protocol as proto;
use livekit_runtime::{interval, sleep, Instant, JoinHandle};
use parking_lot::Mutex;
use http::StatusCode;
use thiserror::Error;
use livekit_runtime::{JoinHandle, interval, sleep, Instant};
use tokio::sync::{mpsc, Mutex as AsyncMutex, RwLock as AsyncRwLock};

#[cfg(feature = "signal-client-tokio")]
Expand Down Expand Up @@ -120,7 +120,8 @@ impl SignalClient {
SignalInner::connect(url, token, options).await?;

let (emitter, events) = mpsc::unbounded_channel();
let signal_task = livekit_runtime::spawn(signal_task(inner.clone(), emitter.clone(), stream_events));
let signal_task =
livekit_runtime::spawn(signal_task(inner.clone(), emitter.clone(), stream_events));

Ok((Self { inner, emitter, handle: Mutex::new(Some(signal_task)) }, join_response, events))
}
Expand All @@ -131,8 +132,11 @@ impl SignalClient {
self.close().await;

let (reconnect_response, stream_events) = self.inner.restart().await?;
let signal_task =
livekit_runtime::spawn(signal_task(self.inner.clone(), self.emitter.clone(), stream_events));
let signal_task = livekit_runtime::spawn(signal_task(
self.inner.clone(),
self.emitter.clone(),
stream_events,
));

*self.handle.lock() = Some(signal_task);
Ok(reconnect_response)
Expand Down
6 changes: 4 additions & 2 deletions livekit-api/src/signal_client/signal_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ use tokio::sync::{mpsc, oneshot};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};

#[cfg(feature = "signal-client-async")]
use async_tungstenite::{tungstenite::Message, async_std::ClientStream as MaybeTlsStream, WebSocketStream,
async_std::connect_async as connect_async};
use async_tungstenite::{
async_std::connect_async, async_std::ClientStream as MaybeTlsStream, tungstenite::Message,
WebSocketStream,
};

use super::{SignalError, SignalResult};

Expand Down
5 changes: 3 additions & 2 deletions livekit-ffi/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use std::{
use dashmap::{mapref::one::MappedRef, DashMap};
use downcast_rs::{impl_downcast, Downcast};
use livekit::webrtc::{native::audio_resampler::AudioResampler, prelude::*};
use tokio::task::JoinHandle;
use parking_lot::{deadlock, Mutex};
use tokio::task::JoinHandle;

use crate::{proto, proto::FfiEvent, FfiError, FfiHandleId, FfiResult, INVALID_HANDLE};

Expand Down Expand Up @@ -78,7 +78,8 @@ pub struct FfiServer {

impl Default for FfiServer {
fn default() -> Self {
let async_runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
let async_runtime =
tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();

let logger = Box::leak(Box::new(logger::FfiLogger::new(async_runtime.handle().clone())));
log::set_logger(logger).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
use std::{collections::HashSet, slice, sync::Arc, time::Duration};

use livekit::prelude::*;
use tokio::task::JoinHandle;
use parking_lot::Mutex;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex as AsyncMutex};
use tokio::task::JoinHandle;

use super::FfiDataBuffer;
use crate::{
Expand Down
13 changes: 8 additions & 5 deletions livekit-runtime/src/async_std.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::time::Duration;
pub type JoinHandle<T> = async_std::task::JoinHandle<T>;
pub use std::time::Instant;
pub use async_std::future::timeout;
pub use async_std::task::spawn;
pub use async_std::net::TcpStream;
pub use async_std::task::spawn;
use futures::{Future, FutureExt, StreamExt};
pub use std::time::Instant;

pub struct Interval {
duration: Duration,
Expand All @@ -26,7 +26,7 @@ pub fn interval(duration: Duration) -> Interval {
}

pub struct Sleep {
timer: async_io::Timer
timer: async_io::Timer,
}

impl Sleep {
Expand All @@ -42,7 +42,10 @@ pub fn sleep(duration: Duration) -> Sleep {
impl Future for Sleep {
type Output = ();

fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self.timer.poll_unpin(cx).map(|_| ())
}
}
}
8 changes: 5 additions & 3 deletions livekit-runtime/src/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::future::Future;
use std::pin::Pin;
use std::time::Duration;

pub use tokio::time::Instant;
pub use tokio::net::TcpStream;
pub use tokio::time::sleep;
pub use tokio::time::timeout;
pub use tokio::net::TcpStream;
pub use tokio::time::Instant;

pub type JoinHandle<T> = TokioJoinHandle<T>;
pub type Interval = tokio::time::Interval;
Expand Down Expand Up @@ -33,7 +33,9 @@ impl<T> Future for TokioJoinHandle<T> {
let this = &mut *self;
let mut handle = &mut this.handle;
match Pin::new(&mut handle).poll(cx) {
std::task::Poll::Ready(value) => std::task::Poll::Ready(value.expect("Tasks should not panic")),
std::task::Poll::Ready(value) => {
std::task::Poll::Ready(value.expect("Tasks should not panic"))
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
Expand Down

0 comments on commit 632780e

Please sign in to comment.