diff --git a/Cargo.lock b/Cargo.lock index d907dc782..3855801a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -214,6 +214,19 @@ version = "4.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" +[[package]] +name = "async-tls" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2ae3c9eba89d472a0e4fe1dea433df78fbbe63d2b764addaf2ba3a6bde89a5e" +dependencies = [ + "futures-core", + "futures-io", + "rustls", + "rustls-pemfile", + "webpki-roots 0.22.6", +] + [[package]] name = "async-trait" version = "0.1.74" @@ -233,6 +246,7 @@ checksum = "ef0f8d64ef9351752fbe5462f242c625d9c4910d2bc3f7ec44c43857ca123f5d" dependencies = [ "async-native-tls", "async-std", + "async-tls", "futures-io", "futures-util", "log", @@ -2349,7 +2363,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots", + "webpki-roots 0.25.3", "winreg", ] @@ -2916,7 +2930,7 @@ dependencies = [ "tokio-native-tls", "tokio-rustls", "tungstenite 0.20.1", - "webpki-roots", + "webpki-roots 0.25.3", ] [[package]] @@ -3286,6 +3300,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "webpki-roots" +version = "0.22.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" +dependencies = [ + "webpki", +] + [[package]] name = "webpki-roots" version = "0.25.3" diff --git a/Cargo.toml b/Cargo.toml index 52650e01e..b54a1addb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,10 +14,10 @@ members = [ [workspace.dependencies] libwebrtc = { version = "0.3.7", path = "libwebrtc" } -livekit-api = { version = "0.4.1", path = "livekit-api" } +livekit-api = { version = "0.4.1", path = "livekit-api", default-features = false } livekit-ffi = { version = "0.12.3", path = "livekit-ffi" } livekit-protocol = { version = "0.3.6", path = "livekit-protocol" } -livekit-runtime = { version = "0.3.1", path = "livekit-runtime" } +livekit-runtime = { version = "0.3.1", path = "livekit-runtime", default-features = false } livekit = { version = "0.7.0", path = "livekit" } soxr-sys = { version = "0.1.0", path = "soxr-sys" } webrtc-sys-build = { version = "0.3.5", path = "webrtc-sys/build" } diff --git a/livekit-api/Cargo.toml b/livekit-api/Cargo.toml index 99538ca4a..eab7df111 100644 --- a/livekit-api/Cargo.toml +++ b/livekit-api/Cargo.toml @@ -8,7 +8,7 @@ repository = "https://github.com/livekit/rust-sdks" [features] # By default ws TLS is not enabled -default = ["services-tokio", "access-token", "webhooks"] +default = ["services-tokio", "webhooks"] signal-client-tokio = [ "dep:tokio-tungstenite", @@ -16,39 +16,41 @@ signal-client-tokio = [ "dep:futures-util", "dep:reqwest", "dep:livekit-runtime", - "livekit-runtime/tokio" + "livekit-runtime/tokio", ] signal-client-async = [ - "__signal-client-async-compatible", - "livekit-runtime/async" + "dep:async-tungstenite", + "dep:tokio", # For macros and sync + "dep:futures-util", + "dep:isahc", + "dep:livekit-runtime", + "livekit-runtime/async", ] signal-client-dispatcher = [ - "__signal-client-async-compatible", - "livekit-runtime/dispatcher" -] - -__signal-client-async-compatible = [ "dep:async-tungstenite", - "dep:tokio", # For macros and sync + "dep:tokio", # For macros and sync "dep:futures-util", - "dep:isahc", "dep:livekit-runtime", + "livekit-runtime/dispatcher", + "dep:serde_json", ] - services-tokio = ["dep:reqwest"] +services-dispatcher = ["dep:serde_json"] services-async = ["dep:isahc"] -access-token = ["dep:jsonwebtoken"] -webhooks = ["access-token", "dep:serde_json", "dep:base64"] +webhooks = ["dep:serde_json", "dep:base64"] + +# Deprecated: old feature that does nothing. +access-token = [] # Note that the following features only change the behavior of tokio-tungstenite. # It doesn't change the behavior of libwebrtc/webrtc-sys native-tls = [ "tokio-tungstenite?/native-tls", "async-tungstenite?/async-native-tls", - "reqwest?/native-tls" + "reqwest?/native-tls", ] native-tls-vendored = [ "tokio-tungstenite?/native-tls-vendored", @@ -56,6 +58,7 @@ native-tls-vendored = [ ] rustls-tls-native-roots = [ "tokio-tungstenite?/rustls-tls-native-roots", + "async-tungstenite?/async-tls", "reqwest?/rustls-tls-native-roots", ] rustls-tls-webpki-roots = [ @@ -79,18 +82,47 @@ serde_json = { version = "1.0", optional = true } base64 = { version = "0.21", optional = true } # access_token & services -jsonwebtoken = { version = "9", default-features = false, optional = true } +http = "0.2.1" +scopeguard = "1.2.0" -# signal_client -livekit-runtime = { path = "../livekit-runtime", version = "0.3.0", optional = true} -tokio-tungstenite = { version = "0.20", optional = true } -async-tungstenite = { version = "0.25.0", features = [ "async-std-runtime", "async-native-tls"], optional = true } -tokio = { version = "1", default-features = false, features = ["sync", "macros", "signal"], optional = true } -futures-util = { version = "0.3", default-features = false, features = [ "sink" ], optional = true } +[dependencies.jsonwebtoken] +version = "9" +default-features = false -# This dependency must be kept in sync with reqwest's version -http = "0.2.1" -reqwest = { version = "0.11", default-features = false, features = [ "json" ], optional = true } -isahc = { version = "1.7.2", default-features = false, features = [ "json", "text-decoding" ], optional = true } +[dependencies.livekit-runtime] +version = "0.3.0" +path = "../livekit-runtime" +optional = true -scopeguard = "1.2.0" +[dependencies.tokio-tungstenite] +version = "0.20" +optional = true + +[dependencies.async-tungstenite] +optional = true +version = "0.25.0" +features = ["async-std-runtime", "async-native-tls"] + +[dependencies.futures-util] +version = "0.3" +default-features = false +features = ["sink"] +optional = true + +[dependencies.tokio] +version = "1" +default-features = false +features = ["sync", "macros", "signal"] +optional = true + +[dependencies.reqwest] +version = "0.11" +default-features = false +features = ["json"] +optional = true + +[dependencies.isahc] +features = ["json", "text-decoding"] +optional = true +version = "1.7.2" +default-features = false diff --git a/livekit-api/src/http_client.rs b/livekit-api/src/http_client.rs index 757a71a37..bcaf3aceb 100644 --- a/livekit-api/src/http_client.rs +++ b/livekit-api/src/http_client.rs @@ -10,7 +10,133 @@ mod tokio { #[cfg(any(feature = "services-tokio", feature = "signal-client-tokio"))] pub use tokio::*; -#[cfg(any(feature = "__signal-client-async-compatible", feature = "services-async"))] +#[cfg(all( + any(feature = "signal-client-dispatcher", feature = "signal-client-async"), + any(feature = "native-tls-vendored", feature = "rustls-tls-webpki-roots") +))] +compile_error!("The dispatcher and async signal clients do not support vendored or webpki roots"); + +#[cfg(any(feature = "services-dispatcher", feature = "signal-client-dispatcher"))] +mod dispatcher { + use std::{future::Future, io, pin::Pin, sync::OnceLock}; + + pub struct Response { + pub body: Pin>, + pub status: http::StatusCode, + } + + pub trait HttpClient: 'static + Send + Sync { + fn get(&self, url: &str) -> Pin> + Send>>; + fn send_async( + &self, + request: http::Request>, + ) -> Pin> + Send>>; + } + + impl std::fmt::Debug for dyn HttpClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("dyn HttpClient").finish() + } + } + + static HTTP_CLIENT: OnceLock<&'static dyn HttpClient> = OnceLock::new(); + + pub fn set_http_client(http_client: impl HttpClient) { + let http_client = Box::leak(Box::new(http_client)); + HTTP_CLIENT.set(http_client).ok(); + } + + fn get_http_client() -> &'static dyn HttpClient { + *HTTP_CLIENT.get().expect("Livekit requires a call to set_http_client()") + } + + #[cfg(feature = "signal-client-dispatcher")] + mod signal_client { + use std::io; + + use futures_util::AsyncReadExt; + + use super::Response; + + impl Response { + pub fn status(&self) -> http::StatusCode { + self.status + } + + pub async fn text(mut self) -> io::Result { + let mut string = String::new(); + self.body.read_to_string(&mut string).await?; + Ok(string) + } + } + + pub async fn get(url: &str) -> io::Result { + super::get_http_client().get(url).await + } + } + + #[cfg(feature = "signal-client-dispatcher")] + pub use signal_client::*; + + #[cfg(feature = "services-dispatcher")] + mod services { + use std::io; + + use futures_util::AsyncReadExt; + use prost::bytes::Bytes; + + use super::{get_http_client, HttpClient, Response}; + + use url::Url; + + impl Response { + pub async fn bytes(mut self) -> io::Result { + let mut bytes = Vec::new(); + self.body.read_to_end(&mut bytes).await?; + Ok(bytes.into()) + } + + pub async fn json(self) -> io::Result { + let bytes = self.bytes().await?; + serde_json::from_slice::(&bytes) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + } + } + + #[derive(Debug)] + pub struct Client { + pub(crate) client: &'static dyn HttpClient, + } + + impl Client { + pub fn new() -> Self { + Self { client: get_http_client() } + } + + pub fn post(&self, url: Url) -> RequestBuilder { + RequestBuilder { + body: Vec::new(), + builder: http::request::Builder::new().method("POST").uri(url.as_str()), + client: self.client, + } + } + } + + pub struct RequestBuilder { + pub(crate) builder: http::request::Builder, + pub(crate) body: Vec, + pub(crate) client: &'static dyn HttpClient, + } + } + + #[cfg(feature = "services-dispatcher")] + pub use services::*; +} + +#[cfg(feature = "signal-client-dispatcher")] +pub use dispatcher::*; + +#[cfg(any(feature = "signal-client-async", feature = "services-async"))] mod async_std { #[cfg(any( @@ -21,10 +147,10 @@ mod async_std { ))] compile_error!("the async std compatible libraries do not support these features"); - #[cfg(any(feature = "__signal-client-async-compatible", feature = "services-async"))] + #[cfg(any(feature = "signal-client-async", feature = "services-async"))] pub struct Response(http::Response); - #[cfg(feature = "__signal-client-async-compatible")] + #[cfg(feature = "signal-client-async")] mod signal_client { use std::io; @@ -48,7 +174,7 @@ mod async_std { } } - #[cfg(feature = "__signal-client-async-compatible")] + #[cfg(feature = "signal-client-async")] pub use signal_client::*; #[cfg(feature = "services-async")] @@ -60,15 +186,14 @@ mod async_std { use super::Response; - use http::header::{Entry, OccupiedEntry}; use url::Url; impl Response { - pub async fn bytes(self) -> io::Result { + pub async fn bytes(mut self) -> io::Result { Ok(self.0.bytes().await?.into()) } - pub async fn json(&mut self) -> io::Result { + pub async fn json(mut self) -> io::Result { self.0.json().await.map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } } @@ -80,9 +205,7 @@ mod async_std { pub fn new() -> Self { Self(isahc::HttpClient::new().unwrap()) } - } - impl Client { pub fn post(&self, url: Url) -> RequestBuilder { RequestBuilder { body: Vec::new(), @@ -97,51 +220,54 @@ mod async_std { body: Vec, client: isahc::HttpClient, } + } - impl RequestBuilder { - pub fn headers(mut self, headers: http::HeaderMap) -> Self { - // Copied from: https://docs.rs/reqwest/0.11.24/src/reqwest/util.rs.html#62-89 - let self_headers = self.builder.headers_mut().unwrap(); - let mut prev_entry: Option> = None; - for (key, value) in headers { - match key { - Some(key) => match self_headers.entry(key) { - Entry::Occupied(mut e) => { - e.insert(value); - prev_entry = Some(e); - } - Entry::Vacant(e) => { - let e = e.insert_entry(value); - prev_entry = Some(e); - } - }, - None => match prev_entry { - Some(ref mut entry) => { - entry.append(value); - } - None => unreachable!("HeaderMap::into_iter yielded None first"), - }, - } - } - self - } + #[cfg(feature = "services-async")] + pub use services::*; +} - pub fn body(mut self, body: Vec) -> Self { - self.body = body; - self - } +#[cfg(any(feature = "signal-client-async", feature = "services-async"))] +pub use async_std::*; - pub async fn send(self) -> io::Result { - let request = self.builder.body(self.body).unwrap(); - let response = self.client.send_async(request).await?; - Ok(Response(response)) +#[cfg(any(feature = "services-dispatcher", feature = "services-async"))] +impl RequestBuilder { + pub fn headers(mut self, headers: http::HeaderMap) -> Self { + use http::header::{Entry, OccupiedEntry}; + + // Copied from: https://docs.rs/reqwest/0.11.24/src/reqwest/util.rs.html#62-89 + let self_headers = self.builder.headers_mut().unwrap(); + let mut prev_entry: Option> = None; + for (key, value) in headers { + match key { + Some(key) => match self_headers.entry(key) { + Entry::Occupied(mut e) => { + e.insert(value); + prev_entry = Some(e); + } + Entry::Vacant(e) => { + let e = e.insert_entry(value); + prev_entry = Some(e); + } + }, + None => match prev_entry { + Some(ref mut entry) => { + entry.append(value); + } + None => unreachable!("HeaderMap::into_iter yielded None first"), + }, } } + self } - #[cfg(feature = "services-async")] - pub use services::*; -} + pub fn body(mut self, body: Vec) -> Self { + self.body = body; + self + } -#[cfg(any(feature = "__signal-client-async-compatible", feature = "services-async"))] -pub use async_std::*; + pub async fn send(self) -> std::io::Result { + let request = self.builder.body(self.body).unwrap(); + let response = self.client.send_async(request).await?; + Ok(response) + } +} diff --git a/livekit-api/src/lib.rs b/livekit-api/src/lib.rs index b73394bec..5bc16b200 100644 --- a/livekit-api/src/lib.rs +++ b/livekit-api/src/lib.rs @@ -12,10 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[cfg(feature = "access-token")] +#[cfg_attr( + feature = "access-token", + deprecated(note = "access-token feature is no longer optional, and so is deprecated") +)] pub mod access_token; -#[cfg(any(feature = "services-tokio", feature = "services-async"))] +#[cfg(any( + feature = "services-tokio", + feature = "services-async", + feature = "services-dispatcher" +))] pub mod services; #[cfg(any( @@ -30,10 +37,14 @@ pub mod signal_client; feature = "signal-client-async", feature = "signal-client-dispatcher", feature = "services-tokio", - feature = "services-async" + feature = "services-async", + feature = "services-dispatcher", ))] mod http_client; +#[cfg(any(feature = "signal-client-dispatcher", feature = "services-dispatcher",))] +pub use http_client::{set_http_client, HttpClient, Response}; + #[cfg(feature = "webhooks")] pub mod webhooks; diff --git a/livekit-api/src/services/twirp_client.rs b/livekit-api/src/services/twirp_client.rs index 9dd6c5b51..5b6b63082 100644 --- a/livekit-api/src/services/twirp_client.rs +++ b/livekit-api/src/services/twirp_client.rs @@ -30,7 +30,7 @@ pub enum TwirpError { #[cfg(feature = "services-tokio")] #[error("failed to execute the request: {0}")] Request(#[from] reqwest::Error), - #[cfg(feature = "services-async")] + #[cfg(any(feature = "services-async", feature = "services-dispatcher"))] #[error("failed to execute the request: {0}")] Request(#[from] std::io::Error), #[error("twirp error: {0}")] diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index 2a8cb16ff..fa9fc30eb 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -32,7 +32,7 @@ use tokio::sync::{mpsc, Mutex as AsyncMutex, RwLock as AsyncRwLock}; #[cfg(feature = "signal-client-tokio")] use tokio_tungstenite::tungstenite::Error as WsError; -#[cfg(feature = "__signal-client-async-compatible")] +#[cfg(any(feature = "signal-client-dispatcher", feature = "signal-client-async"))] use async_tungstenite::tungstenite::Error as WsError; use crate::{http_client, signal_client::signal_stream::SignalStream}; diff --git a/livekit-api/src/signal_client/signal_stream.rs b/livekit-api/src/signal_client/signal_stream.rs index c5ab27951..d738f459a 100644 --- a/livekit-api/src/signal_client/signal_stream.rs +++ b/livekit-api/src/signal_client/signal_stream.rs @@ -30,7 +30,7 @@ use tokio_tungstenite::{ MaybeTlsStream, WebSocketStream, }; -#[cfg(feature = "__signal-client-async-compatible")] +#[cfg(any(feature = "signal-client-dispatcher", feature = "signal-client-async"))] use async_tungstenite::{ async_std::connect_async, async_std::ClientStream as MaybeTlsStream, diff --git a/livekit-protocol/src/livekit.rs b/livekit-protocol/src/livekit.rs index 19181c255..43f5496ed 100644 --- a/livekit-protocol/src/livekit.rs +++ b/livekit-protocol/src/livekit.rs @@ -3837,7 +3837,7 @@ pub struct SendDataRequest { #[prost(string, optional, tag="5")] pub topic: ::core::option::Option<::prost::alloc::string::String>, } -/// + #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct SendDataResponse { diff --git a/livekit/Cargo.toml b/livekit/Cargo.toml index 29113321a..fc3528592 100644 --- a/livekit/Cargo.toml +++ b/livekit/Cargo.toml @@ -13,6 +13,7 @@ default = ["tokio"] async = ["livekit-api/signal-client-async"] tokio = ["livekit-api/signal-client-tokio"] dispatcher = ["livekit-api/signal-client-dispatcher"] +services-dispatcher = ["livekit-api/services-dispatcher"] # Note that the following features only change the behavior of tokio-tungstenite. @@ -34,9 +35,14 @@ livekit-protocol = { workspace = true } prost = "0.12" serde = { version = "1", features = ["derive"] } serde_json = "1.0" -tokio = { version = "1", default-features = false, features = ["sync", "macros"] } +tokio = { version = "1", default-features = false, features = [ + "sync", + "macros", +] } parking_lot = { version = "0.12" } -futures-util = { version = "0.3", default-features = false, features = ["sink"] } +futures-util = { version = "0.3", default-features = false, features = [ + "sink", +] } thiserror = "1.0" lazy_static = "1.4" log = "0.4" diff --git a/livekit/src/lib.rs b/livekit/src/lib.rs index b63d77abc..cb7eb568e 100644 --- a/livekit/src/lib.rs +++ b/livekit/src/lib.rs @@ -27,7 +27,6 @@ pub mod prelude; #[cfg(feature = "dispatcher")] pub mod dispatcher { - pub use livekit_runtime::set_dispatcher; - pub use livekit_runtime::Dispatcher; - pub use livekit_runtime::Runnable; + pub use livekit_api::{set_http_client, HttpClient, Response}; + pub use livekit_runtime::{set_dispatcher, Dispatcher, Runnable}; }