diff --git a/any_spawner/src/lib.rs b/any_spawner/src/lib.rs index a03196a23b..25128f765c 100644 --- a/any_spawner/src/lib.rs +++ b/any_spawner/src/lib.rs @@ -291,9 +291,10 @@ impl Executor { /// /// Returns `Err(_)` if an executor has already been set. pub fn init_custom_executor( - custom_executor: impl CustomExecutor + 'static, + custom_executor: impl CustomExecutor + Send + Sync + 'static, ) -> Result<(), ExecutorError> { - static EXECUTOR: OnceLock> = OnceLock::new(); + static EXECUTOR: OnceLock> = + OnceLock::new(); EXECUTOR .set(Box::new(custom_executor)) .map_err(|_| ExecutorError::AlreadySet)?; @@ -311,13 +312,46 @@ impl Executor { .map_err(|_| ExecutorError::AlreadySet)?; Ok(()) } + + /// Locally sets a custom executor as the executor used to spawn tasks + /// in the current thread. + /// + /// Returns `Err(_)` if an executor has already been set. + pub fn init_local_custom_executor( + custom_executor: impl CustomExecutor + 'static, + ) -> Result<(), ExecutorError> { + thread_local! { + static EXECUTOR: OnceLock> = OnceLock::new(); + } + EXECUTOR.with(|this| { + this.set(Box::new(custom_executor)) + .map_err(|_| ExecutorError::AlreadySet) + })?; + + SPAWN + .set(|fut| { + EXECUTOR.with(|this| this.get().unwrap().spawn(fut)); + }) + .map_err(|_| ExecutorError::AlreadySet)?; + SPAWN_LOCAL + .set(|fut| { + EXECUTOR.with(|this| this.get().unwrap().spawn_local(fut)); + }) + .map_err(|_| ExecutorError::AlreadySet)?; + POLL_LOCAL + .set(|| { + EXECUTOR.with(|this| this.get().unwrap().poll_local()); + }) + .map_err(|_| ExecutorError::AlreadySet)?; + Ok(()) + } } /// A trait for custom executors. /// Custom executors can be used to integrate with any executor that supports spawning futures. /// /// All methods can be called recursively. -pub trait CustomExecutor: Send + Sync { +pub trait CustomExecutor { /// Spawns a future, usually on a thread pool. fn spawn(&self, fut: PinnedFuture<()>); /// Spawns a local future. May require calling `poll_local` to make progress. diff --git a/leptos_macro/Cargo.toml b/leptos_macro/Cargo.toml index 395dd2e0cf..cc9e37a357 100644 --- a/leptos_macro/Cargo.toml +++ b/leptos_macro/Cargo.toml @@ -48,6 +48,7 @@ experimental-islands = [] trace-component-props = [] actix = ["server_fn_macro/actix"] axum = ["server_fn_macro/axum"] +generic = ["server_fn_macro/generic"] [package.metadata.cargo-all-features] denylist = ["nightly", "tracing", "trace-component-props"] @@ -68,6 +69,14 @@ skip_feature_sets = [ "actix", "axum", ], + [ + "actix", + "generic", + ], + [ + "generic", + "axum", + ], ] [package.metadata.docs.rs] diff --git a/server_fn/Cargo.toml b/server_fn/Cargo.toml index 745ca291f0..b15f58498b 100644 --- a/server_fn/Cargo.toml +++ b/server_fn/Cargo.toml @@ -80,6 +80,7 @@ pin-project-lite = "0.2.14" default = ["json"] axum-no-default = [ "ssr", + "generic", "dep:axum", "dep:hyper", "dep:http-body-util", @@ -110,6 +111,7 @@ default-tls = ["reqwest?/default-tls"] rustls = ["reqwest?/rustls-tls"] reqwest = ["dep:reqwest"] ssr = ["inventory"] +generic = [] [package.metadata.docs.rs] all-features = true @@ -138,6 +140,10 @@ skip_feature_sets = [ "actix", "axum", ], + [ + "actix", + "generic", + ], [ "browser", "actix", @@ -150,6 +156,10 @@ skip_feature_sets = [ "browser", "reqwest", ], + [ + "browser", + "generic", + ], [ "default-tls", "rustls", @@ -166,6 +176,10 @@ skip_feature_sets = [ "axum-no-default", "browser", ], + [ + "axum-no-default", + "generic", + ], [ "rkyv", "json", diff --git a/server_fn/src/lib.rs b/server_fn/src/lib.rs index a3e67ad459..7e91e1a114 100644 --- a/server_fn/src/lib.rs +++ b/server_fn/src/lib.rs @@ -120,6 +120,12 @@ pub use ::actix_web as actix_export; #[cfg(feature = "axum-no-default")] #[doc(hidden)] pub use ::axum as axum_export; +#[cfg(feature = "generic")] +#[doc(hidden)] +pub use ::bytes as bytes_export; +#[cfg(feature = "generic")] +#[doc(hidden)] +pub use ::http as http_export; use client::Client; use codec::{Encoding, FromReq, FromRes, IntoReq, IntoRes}; #[doc(hidden)] diff --git a/server_fn/src/request/generic.rs b/server_fn/src/request/generic.rs new file mode 100644 index 0000000000..da1add07ff --- /dev/null +++ b/server_fn/src/request/generic.rs @@ -0,0 +1,74 @@ +//! This module uses platform-agnostic abstractions +//! allowing users to run server functions on a wide range of +//! platforms. +//! +//! The crates in use in this crate are: +//! +//! * `bytes`: platform-agnostic manipulation of bytes. +//! * `http`: low-dependency HTTP abstractions' *front-end*. +//! +//! # Users +//! +//! * `wasm32-wasip*` integration crate `leptos_wasi` is using this +//! crate under the hood. + +use crate::request::Req; +use bytes::Bytes; +use futures::{ + stream::{self, Stream}, + StreamExt, +}; +use http::Request; +use std::borrow::Cow; + +impl Req for Request +where + CustErr: 'static, +{ + async fn try_into_bytes( + self, + ) -> Result> { + Ok(self.into_body()) + } + + async fn try_into_string( + self, + ) -> Result> { + String::from_utf8(self.into_body().into()).map_err(|err| { + crate::ServerFnError::Deserialization(err.to_string()) + }) + } + + fn try_into_stream( + self, + ) -> Result< + impl Stream> + Send + 'static, + crate::ServerFnError, + > { + Ok(stream::iter(self.into_body()) + .ready_chunks(16) + .map(|chunk| Ok(Bytes::from(chunk)))) + } + + fn to_content_type(&self) -> Option> { + self.headers() + .get(http::header::CONTENT_TYPE) + .map(|val| String::from_utf8_lossy(val.as_bytes())) + } + + fn accepts(&self) -> Option> { + self.headers() + .get(http::header::ACCEPT) + .map(|val| String::from_utf8_lossy(val.as_bytes())) + } + + fn referer(&self) -> Option> { + self.headers() + .get(http::header::REFERER) + .map(|val| String::from_utf8_lossy(val.as_bytes())) + } + + fn as_query(&self) -> Option<&str> { + self.uri().query() + } +} diff --git a/server_fn/src/request/mod.rs b/server_fn/src/request/mod.rs index 2f68c8930f..3a4c71d393 100644 --- a/server_fn/src/request/mod.rs +++ b/server_fn/src/request/mod.rs @@ -12,6 +12,8 @@ pub mod axum; /// Request types for the browser. #[cfg(feature = "browser")] pub mod browser; +#[cfg(feature = "generic")] +pub mod generic; /// Request types for [`reqwest`]. #[cfg(feature = "reqwest")] pub mod reqwest; diff --git a/server_fn/src/response/generic.rs b/server_fn/src/response/generic.rs new file mode 100644 index 0000000000..f9e10b5f4c --- /dev/null +++ b/server_fn/src/response/generic.rs @@ -0,0 +1,105 @@ +//! This module uses platform-agnostic abstractions +//! allowing users to run server functions on a wide range of +//! platforms. +//! +//! The crates in use in this crate are: +//! +//! * `bytes`: platform-agnostic manipulation of bytes. +//! * `http`: low-dependency HTTP abstractions' *front-end*. +//! +//! # Users +//! +//! * `wasm32-wasip*` integration crate `leptos_wasi` is using this +//! crate under the hood. + +use super::Res; +use crate::error::{ + ServerFnError, ServerFnErrorErr, ServerFnErrorSerde, SERVER_FN_ERROR_HEADER, +}; +use bytes::Bytes; +use futures::{Stream, TryStreamExt}; +use http::{header, HeaderValue, Response, StatusCode}; +use std::{ + fmt::{Debug, Display}, + pin::Pin, + str::FromStr, +}; +use throw_error::Error; + +/// The Body of a Response whose *execution model* can be +/// customised using the variants. +pub enum Body { + /// The response body will be written synchronously. + Sync(Bytes), + + /// The response body will be written asynchronously, + /// this execution model is also known as + /// "streaming". + Async(Pin> + Send + 'static>>), +} + +impl From for Body { + fn from(value: String) -> Self { + Body::Sync(Bytes::from(value)) + } +} + +impl Res for Response +where + CustErr: Send + Sync + Debug + FromStr + Display + 'static, +{ + fn try_from_string( + content_type: &str, + data: String, + ) -> Result> { + let builder = http::Response::builder(); + builder + .status(200) + .header(http::header::CONTENT_TYPE, content_type) + .body(data.into()) + .map_err(|e| ServerFnError::Response(e.to_string())) + } + + fn try_from_bytes( + content_type: &str, + data: Bytes, + ) -> Result> { + let builder = http::Response::builder(); + builder + .status(200) + .header(http::header::CONTENT_TYPE, content_type) + .body(Body::Sync(data)) + .map_err(|e| ServerFnError::Response(e.to_string())) + } + + fn try_from_stream( + content_type: &str, + data: impl Stream>> + + Send + + 'static, + ) -> Result> { + let builder = http::Response::builder(); + builder + .status(200) + .header(http::header::CONTENT_TYPE, content_type) + .body(Body::Async(Box::pin( + data.map_err(ServerFnErrorErr::from).map_err(Error::from), + ))) + .map_err(|e| ServerFnError::Response(e.to_string())) + } + + fn error_response(path: &str, err: &ServerFnError) -> Self { + Response::builder() + .status(http::StatusCode::INTERNAL_SERVER_ERROR) + .header(SERVER_FN_ERROR_HEADER, path) + .body(err.ser().unwrap_or_else(|_| err.to_string()).into()) + .unwrap() + } + + fn redirect(&mut self, path: &str) { + if let Ok(path) = HeaderValue::from_str(path) { + self.headers_mut().insert(header::LOCATION, path); + *self.status_mut() = StatusCode::FOUND; + } + } +} diff --git a/server_fn/src/response/mod.rs b/server_fn/src/response/mod.rs index 9515d711a6..6a0f60bace 100644 --- a/server_fn/src/response/mod.rs +++ b/server_fn/src/response/mod.rs @@ -4,6 +4,8 @@ pub mod actix; /// Response types for the browser. #[cfg(feature = "browser")] pub mod browser; +#[cfg(feature = "generic")] +pub mod generic; /// Response types for Axum. #[cfg(feature = "axum-no-default")] pub mod http; diff --git a/server_fn_macro/Cargo.toml b/server_fn_macro/Cargo.toml index b14343fb02..97a8137580 100644 --- a/server_fn_macro/Cargo.toml +++ b/server_fn_macro/Cargo.toml @@ -21,6 +21,7 @@ nightly = [] ssr = [] actix = [] axum = [] +generic = [] reqwest = [] [package.metadata.docs.rs] diff --git a/server_fn_macro/src/lib.rs b/server_fn_macro/src/lib.rs index 896f5f2c16..1e80222a8e 100644 --- a/server_fn_macro/src/lib.rs +++ b/server_fn_macro/src/lib.rs @@ -527,12 +527,16 @@ pub fn server_macro_impl( } } else if cfg!(feature = "axum") { quote! { - #server_fn_path::axum_export::http::Request<#server_fn_path::axum_export::body::Body> + #server_fn_path::http_export::Request<#server_fn_path::axum_export::body::Body> } } else if cfg!(feature = "actix") { quote! { #server_fn_path::request::actix::ActixRequest } + } else if cfg!(feature = "generic") { + quote! { + #server_fn_path::http_export::Request<#server_fn_path::bytes_export::Bytes> + } } else if let Some(req_ty) = req_ty { req_ty.to_token_stream() } else if let Some(req_ty) = preset_req { @@ -551,12 +555,16 @@ pub fn server_macro_impl( } } else if cfg!(feature = "axum") { quote! { - #server_fn_path::axum_export::http::Response<#server_fn_path::axum_export::body::Body> + #server_fn_path::http_export::Response<#server_fn_path::axum_export::body::Body> } } else if cfg!(feature = "actix") { quote! { #server_fn_path::response::actix::ActixResponse } + } else if cfg!(feature = "generic") { + quote! { + #server_fn_path::http_export::Response<#server_fn_path::response::generic::Body> + } } else if let Some(res_ty) = res_ty { res_ty.to_token_stream() } else if let Some(res_ty) = preset_res {