diff --git a/Cargo.toml b/Cargo.toml index 3bd9c839..e29ebfe9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,21 +21,21 @@ repository = "https://github.com/cloudwego/volo" license = "MIT OR Apache-2.0" [workspace.dependencies] -pilota = "0.8" -pilota-build = "0.8" -pilota-thrift-parser = "0.4" -# pilota = { git = "https://github.com/cloudwego/pilota", branch = "main" } -# pilota-build = { git = "https://github.com/cloudwego/pilota", branch = "main" } -# pilota-thrift-parser = { git = "https://github.com/cloudwego/pilota", branch = "main" } +pilota = "0.9" +pilota-build = "0.9" +pilota-thrift-parser = "0.9" +#pilota = { git = "https://github.com/cloudwego/pilota", branch = "main" } +#pilota-build = { git = "https://github.com/cloudwego/pilota", branch = "main" } +#pilota-thrift-parser = { git = "https://github.com/cloudwego/pilota", branch = "main" } -motore = "0.3" +motore = "0.4" +#motore = { git = "https://github.com/cloudwego/motore", branch = "main" } metainfo = "0.7" anyhow = "1" -async-broadcast = "0.5" +async-broadcast = "0.6" async-stream = "0.3" -async-trait = "0.1" base64 = "0.13" bytes = "1" chrono = { version = "0.4", default-features = false, features = [ @@ -67,10 +67,10 @@ linked-hash-map = "0.5" log = "0.4" matchit = "0.7" mur3 = "0.1" -nix = "0.26" +nix = "0.27" nom = "7" normpath = "1" -num_enum = "0.6" +num_enum = "0.7" once_cell = "1" parking_lot = "0.12" paste = "1" diff --git a/README-zh_cn.md b/README-zh_cn.md index 46146887..f9848dd7 100644 --- a/README-zh_cn.md +++ b/README-zh_cn.md @@ -11,9 +11,9 @@ [English](README.md) | 中文 -Volo 是字节跳动服务框架团队研发的 **高性能**、**可扩展性强** 的 Rust RPC 框架,使用了 Rust 最新的 GAT 特性。 +Volo 是字节跳动服务框架团队研发的 **高性能**、**可扩展性强** 的 Rust RPC 框架,使用了 Rust 最新的 AFIT 和 RPITIT 特性。 -Volo 使用 [`Motore`][motore] 作为其中间件抽象层, Motore 基于 GAT 设计。 +Volo 使用 [`Motore`][motore] 作为其中间件抽象层, Motore 基于 AFIT 和 RPITIT 设计。 ## 概览 @@ -30,11 +30,11 @@ Volo 主要包含 6 个 crate 库: ### 特点 -#### 使用 GAT 特性 +#### 使用 AFIT 和 RPITIT 特性 -Volo 使用 [`Motore`][motore] 作为其中间件抽象层, Motore 基于 GAT 设计。 +Volo 使用 [`Motore`][motore] 作为其中间件抽象层, Motore 基于 AFIT 和 RPITIT 设计。 -通过 GAT,我们可以避免很多不必要的 Box 内存分配,以及提升易用性,给用户提供更友好的编程接口和更符合人体工程学的编程范式。 +通过 RPITIT,我们可以避免很多不必要的 Box 内存分配,以及提升易用性,给用户提供更友好的编程接口和更符合人体工程学的编程范式。 #### 高性能 @@ -77,7 +77,7 @@ Volo-gRPC: https://www.cloudwego.io/zh/docs/volo/volo-grpc/getting-started/ ## 相关生态 - [Volo-rs][volo-rs]: Volo 的相关生态,包含了 Volo 的许多组件 -- [Motore][motore]: Volo 参考 Tower 设计的,使用了 GAT 的 middleware 抽象层 +- [Motore][motore]: Volo 参考 Tower 设计的,使用了 AFIT 和 RPITIT 的 middleware 抽象层 - [Pilota][pilota]: Volo 使用的 Thrift 与 Protobuf 编译器及编解码的纯 Rust 实现(不依赖 protoc) - [Metainfo][metainfo]: Volo 用于进行元信息透传的组件,期望定义一套元信息透传的标准 diff --git a/README.md b/README.md index dbd971bc..c4611e48 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ English | [中文](README-zh_cn.md) Volo is a **high-performance** and **strong-extensibility** Rust RPC framework that helps developers build microservices. -Volo uses [`Motore`][motore] as its middleware abstraction, which is powered by GAT. +Volo uses [`Motore`][motore] as its middleware abstraction, which is powered by AFIT and RPITIT. ## Overview @@ -30,11 +30,11 @@ Volo mainly consists of six crates: ### Features -#### Powered by GAT +#### Powered by AFIT and RPITIT -Volo uses [`Motore`][motore] as its middleware abstraction, which is powered by GAT. +Volo uses [`Motore`][motore] as its middleware abstraction, which is powered by AFIT and RPITIT. -Through GAT, we can avoid many unnecessary `Box` memory allocations, improve ease of use, and provide users with a more friendly programming interface and a more ergonomic programming paradigm. +Through RPITIT, we can avoid many unnecessary `Box` memory allocations, improve ease of use, and provide users with a more friendly programming interface and a more ergonomic programming paradigm. #### High Performance @@ -77,7 +77,7 @@ See [Examples][examples]. ## Related Projects - [Volo-rs][volo-rs]: The volo ecosystem which contains a lot of useful components. -- [Motore][motore]: Middleware abstraction layer powered by GAT. +- [Motore][motore]: Middleware abstraction layer powered by AFIT and RPITIT. - [Pilota][pilota]: A thrift and protobuf implementation in pure rust with high performance and extensibility. - [Metainfo][metainfo]: Transmissing metainfo across components. diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 03af2a41..45a4afb4 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -76,7 +76,6 @@ path = "src/unknown/thrift_client.rs" [dependencies] anyhow.workspace = true async-stream.workspace = true -async-trait.workspace = true lazy_static.workspace = true metainfo.workspace = true tokio = { workspace = true, features = ["full"] } diff --git a/examples/src/compression/grpc_client.rs b/examples/src/compression/grpc_client.rs index 403d84e6..e7f702c3 100644 --- a/examples/src/compression/grpc_client.rs +++ b/examples/src/compression/grpc_client.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use lazy_static::lazy_static; diff --git a/examples/src/compression/grpc_server.rs b/examples/src/compression/grpc_server.rs index 6d80bfd1..3fef6380 100644 --- a/examples/src/compression/grpc_server.rs +++ b/examples/src/compression/grpc_server.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use volo_grpc::{ @@ -12,7 +10,6 @@ use volo_grpc::{ pub struct S; -#[volo::async_trait] impl volo_gen::proto_gen::hello::Greeter for S { async fn say_hello( &self, diff --git a/examples/src/hello/grpc_client.rs b/examples/src/hello/grpc_client.rs index 006cdf7e..3e722dfc 100644 --- a/examples/src/hello/grpc_client.rs +++ b/examples/src/hello/grpc_client.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use lazy_static::lazy_static; diff --git a/examples/src/hello/grpc_server.rs b/examples/src/hello/grpc_server.rs index a44cbed2..a25c460d 100644 --- a/examples/src/hello/grpc_server.rs +++ b/examples/src/hello/grpc_server.rs @@ -1,12 +1,9 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use volo_grpc::server::{Server, ServiceBuilder}; pub struct S; -#[volo::async_trait] impl volo_gen::proto_gen::hello::Greeter for S { async fn say_hello( &self, diff --git a/examples/src/hello/thrift_client.rs b/examples/src/hello/thrift_client.rs index ad1e2af0..379178d0 100644 --- a/examples/src/hello/thrift_client.rs +++ b/examples/src/hello/thrift_client.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use lazy_static::lazy_static; diff --git a/examples/src/hello/thrift_server.rs b/examples/src/hello/thrift_server.rs index 367c59b5..84fc3ffc 100644 --- a/examples/src/hello/thrift_server.rs +++ b/examples/src/hello/thrift_server.rs @@ -1,10 +1,7 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; pub struct S; -#[volo::async_trait] impl volo_gen::thrift_gen::hello::HelloService for S { async fn hello( &self, diff --git a/examples/src/loadbalance/grpc_client.rs b/examples/src/loadbalance/grpc_client.rs index 21901331..8574309b 100644 --- a/examples/src/loadbalance/grpc_client.rs +++ b/examples/src/loadbalance/grpc_client.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::{ cell::RefCell, hash::{Hash, Hasher}, diff --git a/examples/src/loadbalance/grpc_server.rs b/examples/src/loadbalance/grpc_server.rs index 5ba8ec93..99c26cf7 100644 --- a/examples/src/loadbalance/grpc_server.rs +++ b/examples/src/loadbalance/grpc_server.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use tokio::task; @@ -16,7 +14,6 @@ impl S { } } -#[volo::async_trait] impl volo_gen::proto_gen::hello::Greeter for S { async fn say_hello( &self, diff --git a/examples/src/multiplex/grpc_client.rs b/examples/src/multiplex/grpc_client.rs index 55a2e57e..7f7cc492 100644 --- a/examples/src/multiplex/grpc_client.rs +++ b/examples/src/multiplex/grpc_client.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use lazy_static::lazy_static; diff --git a/examples/src/multiplex/grpc_server.rs b/examples/src/multiplex/grpc_server.rs index 7fc4d859..097931ef 100644 --- a/examples/src/multiplex/grpc_server.rs +++ b/examples/src/multiplex/grpc_server.rs @@ -1,12 +1,9 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use volo_grpc::server::{Server, ServiceBuilder}; pub struct G; -#[volo::async_trait] impl volo_gen::proto_gen::hello::Greeter for G { async fn say_hello( &self, @@ -22,7 +19,6 @@ impl volo_gen::proto_gen::hello::Greeter for G { pub struct E; -#[volo::async_trait] impl volo_gen::proto_gen::echo::Echo for E { async fn echo( &self, diff --git a/examples/src/streaming/grpc_client.rs b/examples/src/streaming/grpc_client.rs index 8d0f8a18..01fc9fe8 100644 --- a/examples/src/streaming/grpc_client.rs +++ b/examples/src/streaming/grpc_client.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use async_stream::stream; diff --git a/examples/src/streaming/grpc_server.rs b/examples/src/streaming/grpc_server.rs index 377ed132..960ab898 100644 --- a/examples/src/streaming/grpc_server.rs +++ b/examples/src/streaming/grpc_server.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use tokio::sync::mpsc; @@ -12,7 +10,6 @@ use volo_grpc::{ pub struct S; -#[volo::async_trait] impl volo_gen::proto_gen::streaming::Streaming for S { async fn unary( &self, diff --git a/examples/src/unknown/thrift_client.rs b/examples/src/unknown/thrift_client.rs index 5a0d1d10..ad08531f 100644 --- a/examples/src/unknown/thrift_client.rs +++ b/examples/src/unknown/thrift_client.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; use lazy_static::lazy_static; diff --git a/examples/src/unknown/thrift_server.rs b/examples/src/unknown/thrift_server.rs index c9bb3fe1..40a6b31b 100644 --- a/examples/src/unknown/thrift_server.rs +++ b/examples/src/unknown/thrift_server.rs @@ -1,10 +1,7 @@ -#![feature(impl_trait_in_assoc_type)] - use std::net::SocketAddr; pub struct S; -#[volo::async_trait] impl volo_gen::thrift_gen::echo_unknown::EchoService for S { async fn hello( &self, diff --git a/examples/volo-gen/Cargo.toml b/examples/volo-gen/Cargo.toml index ed41de4d..959e4d5a 100644 --- a/examples/volo-gen/Cargo.toml +++ b/examples/volo-gen/Cargo.toml @@ -12,7 +12,6 @@ publish = false [dependencies] anyhow.workspace = true -async-trait.workspace = true futures.workspace = true tokio = { workspace = true, features = ["full"] } diff --git a/examples/volo-gen/src/lib.rs b/examples/volo-gen/src/lib.rs index 97e9cdd6..987d7b91 100644 --- a/examples/volo-gen/src/lib.rs +++ b/examples/volo-gen/src/lib.rs @@ -1,5 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] - mod gen { volo::include_service!("thrift_gen.rs"); volo::include_service!("proto_gen.rs"); diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 8e47c085..ecd40730 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,3 @@ [toolchain] -# TODO: we can remove this toolchain file when TAIT hits stable. -# Related issue: https://github.com/rust-lang/rust/issues/63063. +# TODO: we can remove this toolchain file when AFIT and RPITIT hits stable. channel = "nightly" diff --git a/volo-build/Cargo.toml b/volo-build/Cargo.toml index a135e098..9055d4b2 100644 --- a/volo-build/Cargo.toml +++ b/volo-build/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volo-build" -version = "0.6.2" +version = "0.8.1" edition.workspace = true homepage.workspace = true repository.workspace = true @@ -17,12 +17,11 @@ keywords = ["thrift", "grpc", "protobuf", "volo", "build"] maintenance = { status = "actively-developed" } [dependencies] -volo = { version = "0.5", path = "../volo" } +volo = { version = "0.8", path = "../volo" } pilota-build.workspace = true anyhow.workspace = true -async-trait.workspace = true dirs.workspace = true heck.workspace = true itertools.workspace = true diff --git a/volo-build/src/grpc_backend.rs b/volo-build/src/grpc_backend.rs index 3e82fb64..eee3e306 100644 --- a/volo-build/src/grpc_backend.rs +++ b/volo-build/src/grpc_backend.rs @@ -547,20 +547,14 @@ impl CodegenBackend for VoloGrpcBackend { {{ type Response = ::volo_grpc::Response<{resp_enum_name_send}>; type Error = ::volo_grpc::status::Status; - type Future<'cx> = impl ::std::future::Future> + 'cx; - fn call<'cx, 's>(&'s self, cx: &'cx mut ::volo_grpc::context::ServerContext, req: ::volo_grpc::Request<{req_enum_name_recv}>) -> Self::Future<'cx> - where - 's: 'cx, - {{ + async fn call<'s, 'cx>(&'s self, cx: &'cx mut ::volo_grpc::context::ServerContext, req: ::volo_grpc::Request<{req_enum_name_recv}>) -> ::std::result::Result {{ let inner = self.inner.clone(); - async move {{ - match cx.rpc_info.method().unwrap().as_str() {{ - {req_matches} - path => {{ - let path = path.to_string(); - Err(::volo_grpc::Status::unimplemented(::std::format!("Unimplemented http path: {{}}", path))) - }} + match cx.rpc_info.method().unwrap().as_str() {{ + {req_matches} + path => {{ + let path = path.to_string(); + Err(::volo_grpc::Status::unimplemented(::std::format!("Unimplemented http path: {{}}", path))) }} }} }} @@ -596,7 +590,10 @@ impl CodegenBackend for VoloGrpcBackend { let name = self.cx().rust_name(method.def_id); - format!("async fn {name}(&self, {args}) -> ::std::result::Result<{ret_ty}>;") + format!( + "fn {name}(&self, {args}) -> impl ::std::future::Future> + Send;" + ) } fn codegen_service_method_with_global_path( @@ -614,7 +611,7 @@ impl CodegenBackend for VoloGrpcBackend { let ty = self.trait_input_ty(a.ty.clone(), client_streaming, true); let ident = &a.name; - // args are unsed, add _ to avoid unused variable warning + // args are unused, add _ to avoid unused variable warning format!("_{ident}: {ty}") }) .join(","); diff --git a/volo-build/src/thrift_backend.rs b/volo-build/src/thrift_backend.rs index 28147682..8a4367f0 100644 --- a/volo-build/src/thrift_backend.rs +++ b/volo-build/src/thrift_backend.rs @@ -46,10 +46,27 @@ impl VoloThriftBackend { let res_send_name = format!("{service_name}ResponseSend"); let req_impl = { - let mk_decode = |is_async: bool| { + let mk_decode = |is_async: bool, is_send: bool| { let helper = DecodeHelper::new(is_async); - let decode_variants = helper.codegen_item_decode(); - let match_methods = crate::join_multi_strs!("", |methods_names, variant_names| -> "\"{methods_names}\" => {{ Self::{variant_names}({decode_variants}) }},"); + let mut match_methods = String::new(); + let args_names = if is_send { + args_send_names.clone() + } else { + args_recv_names.clone() + }; + for (methods_names, variant_names, args_name) in itertools::multizip(( + methods_names.iter(), + variant_names.iter(), + args_names.iter(), + )) { + let decode_variants = helper.codegen_item_decode(args_name.clone().into()); + match_methods.push_str(&format!( + "\"{methods_names}\" => {{ Self::{variant_names}({decode_variants}) }}," + )); + } + // let decode_variants = helper.codegen_item_decode(req_recv_name.clone().into()); + // let match_methods = crate::join_multi_strs!("", |methods_names, variant_names| -> + // "\"{methods_names}\" => {{ Self::{variant_names}({decode_variants}) }},"); format! { r#"Ok(match &*msg_ident.name {{ @@ -61,8 +78,10 @@ impl VoloThriftBackend { } }; - let decode = mk_decode(false); - let decode_async = mk_decode(true); + let send_decode = mk_decode(false, true); + let send_decode_async = mk_decode(true, true); + let recv_decode = mk_decode(false, false); + let recv_decode_async = mk_decode(true, false); let mut match_encode = crate::join_multi_strs!(",", |variant_names| -> "Self::{variant_names}(value) => {{::pilota::thrift::Message::encode(value, protocol).map_err(|err| err.into())}}"); let mut match_size = crate::join_multi_strs!(",", |variant_names| -> "Self::{variant_names}(value) => {{::volo_thrift::Message::size(value, protocol)}}"); @@ -73,8 +92,7 @@ impl VoloThriftBackend { } format! { - r#"#[::async_trait::async_trait] - impl ::volo_thrift::EntryMessage for {req_recv_name} {{ + r#"impl ::volo_thrift::EntryMessage for {req_recv_name} {{ fn encode(&self, protocol: &mut T) -> ::core::result::Result<(), ::pilota::thrift::EncodeError> {{ match self {{ {match_encode} @@ -82,7 +100,7 @@ impl VoloThriftBackend { }} fn decode(protocol: &mut T, msg_ident: &::pilota::thrift::TMessageIdentifier) -> ::core::result::Result {{ - {decode} + {recv_decode} }} async fn decode_async( @@ -90,7 +108,7 @@ impl VoloThriftBackend { msg_ident: &::pilota::thrift::TMessageIdentifier ) -> ::core::result::Result {{ - {decode_async} + {recv_decode_async} }} fn size(&self, protocol: &mut T) -> usize {{ @@ -100,7 +118,6 @@ impl VoloThriftBackend { }} }} - #[::async_trait::async_trait] impl ::volo_thrift::EntryMessage for {req_send_name} {{ fn encode(&self, protocol: &mut T) -> ::core::result::Result<(), ::pilota::thrift::EncodeError> {{ match self {{ @@ -109,7 +126,7 @@ impl VoloThriftBackend { }} fn decode(protocol: &mut T, msg_ident: &::pilota::thrift::TMessageIdentifier) -> ::core::result::Result {{ - {decode} + {send_decode} }} async fn decode_async( @@ -117,7 +134,7 @@ impl VoloThriftBackend { msg_ident: &::pilota::thrift::TMessageIdentifier ) -> ::core::result::Result {{ - {decode_async} + {send_decode_async} }} fn size(&self, protocol: &mut T) -> usize {{ @@ -130,11 +147,28 @@ impl VoloThriftBackend { }; let res_impl = { - let mk_decode = |is_async: bool| { + let mk_decode = |is_async: bool, is_send: bool| { let helper = DecodeHelper::new(is_async); - let decode_item = helper.codegen_item_decode(); + let mut match_methods = String::new(); + let args_names = if is_send { + result_send_names.clone() + } else { + result_recv_names.clone() + }; + for (methods_names, variant_names, args_name) in itertools::multizip(( + methods_names.iter(), + variant_names.iter(), + args_names.iter(), + )) { + let decode_item = helper.codegen_item_decode(args_name.clone().into()); + match_methods.push_str(&format!( + "\"{methods_names}\" => {{ Self::{variant_names}({decode_item}) }}," + )); + } + // let decode_item = helper.codegen_item_decode(res_recv_name.clone().into()); - let match_methods = crate::join_multi_strs!("", |methods_names, variant_names| -> "\"{methods_names}\" => {{ Self::{variant_names}({decode_item}) }},"); + // let match_methods = crate::join_multi_strs!("", |methods_names, variant_names| -> + // "\"{methods_names}\" => {{ Self::{variant_names}({decode_item}) }},"); format!( r#"Ok(match &*msg_ident.name {{ @@ -154,11 +188,12 @@ impl VoloThriftBackend { match_size = "_ => unreachable!(),".to_string(); } - let decode = mk_decode(false); - let decode_async = mk_decode(true); + let send_decode = mk_decode(false, true); + let send_decode_async = mk_decode(true, true); + let recv_decode = mk_decode(false, false); + let recv_decode_async = mk_decode(true, false); format! { - r#"#[::async_trait::async_trait] - impl ::volo_thrift::EntryMessage for {res_recv_name} {{ + r#"impl ::volo_thrift::EntryMessage for {res_recv_name} {{ fn encode(&self, protocol: &mut T) -> ::core::result::Result<(), ::pilota::thrift::EncodeError> {{ match self {{ {match_encode} @@ -166,7 +201,7 @@ impl VoloThriftBackend { }} fn decode(protocol: &mut T, msg_ident: &::pilota::thrift::TMessageIdentifier) -> ::core::result::Result {{ - {decode} + {recv_decode} }} async fn decode_async( @@ -174,7 +209,7 @@ impl VoloThriftBackend { msg_ident: &::pilota::thrift::TMessageIdentifier, ) -> ::core::result::Result {{ - {decode_async} + {recv_decode_async} }} fn size(&self, protocol: &mut T) -> usize {{ @@ -184,7 +219,6 @@ impl VoloThriftBackend { }} }} - #[::async_trait::async_trait] impl ::volo_thrift::EntryMessage for {res_send_name} {{ fn encode(&self, protocol: &mut T) -> ::core::result::Result<(), ::pilota::thrift::EncodeError> {{ match self {{ @@ -193,7 +227,7 @@ impl VoloThriftBackend { }} fn decode(protocol: &mut T, msg_ident: &::pilota::thrift::TMessageIdentifier) -> ::core::result::Result {{ - {decode} + {send_decode} }} async fn decode_async( @@ -201,7 +235,7 @@ impl VoloThriftBackend { msg_ident: &::pilota::thrift::TMessageIdentifier, ) -> ::core::result::Result {{ - {decode_async} + {send_decode_async} }} fn size(&self, protocol: &mut T) -> usize {{ @@ -554,13 +588,9 @@ impl pilota_build::CodegenBackend for VoloThriftBackend { type Response = {res_send_name}; type Error = ::anyhow::Error; - type Future<'cx> = impl ::std::future::Future> + 'cx; - - fn call<'cx, 's>(&'s self, _cx: &'cx mut ::volo_thrift::context::ServerContext, req: {req_recv_name}) -> Self::Future<'cx> where 's:'cx {{ - async move {{ - match req {{ - {handler} - }} + async fn call<'s, 'cx>(&'s self, _cx: &'cx mut ::volo_thrift::context::ServerContext, req: {req_recv_name}) -> ::std::result::Result {{ + match req {{ + {handler} }} }} }}"# @@ -597,7 +627,10 @@ impl pilota_build::CodegenBackend for VoloThriftBackend { "::volo_thrift::AnyhowError".into() }; - format!("async fn {name}(&self, {args}) -> ::core::result::Result<{ret_ty}, {exception}>;") + format!( + "fn {name}(&self, {args}) -> impl ::std::future::Future> + Send;" + ) } fn codegen_service_method_with_global_path( diff --git a/volo-build/src/workspace.rs b/volo-build/src/workspace.rs index 771b282b..4732b0a5 100644 --- a/volo-build/src/workspace.rs +++ b/volo-build/src/workspace.rs @@ -64,4 +64,9 @@ where self.pilota_builder = self.pilota_builder.plugin(plugin); self } + + pub fn ignore_unused(mut self, ignore_unused: bool) -> Self { + self.pilota_builder = self.pilota_builder.ignore_unused(ignore_unused); + self + } } diff --git a/volo-cli/Cargo.toml b/volo-cli/Cargo.toml index 6e86a51a..3acfa44c 100644 --- a/volo-cli/Cargo.toml +++ b/volo-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volo-cli" -version = "0.6.2" +version = "0.8.0" edition.workspace = true homepage.workspace = true repository.workspace = true @@ -22,7 +22,7 @@ keywords = ["thrift", "grpc", "protobuf", "volo", "cli"] maintenance = { status = "actively-developed" } [dependencies] -volo-build = { version = "0.6", path = "../volo-build" } +volo-build = { version = "0.8", path = "../volo-build" } pilota-thrift-parser.workspace = true anyhow.workspace = true diff --git a/volo-cli/src/bin/volo.rs b/volo-cli/src/bin/volo.rs index 345ba92b..f29b2375 100644 --- a/volo-cli/src/bin/volo.rs +++ b/volo-cli/src/bin/volo.rs @@ -25,7 +25,7 @@ fn main() -> Result<()> { error!("{}", e); } - // detech new version and notify the user + // detect new version and notify the user let pkg_name = env!("CARGO_PKG_NAME"); let current_version = env!("CARGO_PKG_VERSION"); diff --git a/volo-cli/src/init.rs b/volo-cli/src/init.rs index b17f4d55..fb3cf2cb 100644 --- a/volo-cli/src/init.rs +++ b/volo-cli/src/init.rs @@ -12,6 +12,7 @@ use crate::command::CliCommand; #[derive(Parser, Debug)] #[command(about = "init your project")] pub struct Init { + #[arg(help = "The name of project")] pub name: String, #[arg( short = 'g', @@ -182,24 +183,21 @@ impl CliCommand for Init { fn run(&self, cx: crate::context::Context) -> anyhow::Result<()> { volo_build::util::with_config(|config| { let mut lock = None; - - if self.git.is_some() { - let r#ref = self.r#ref.as_deref().unwrap_or("HEAD"); - let lock_value = get_repo_latest_commit_id(self.git.as_ref().unwrap(), r#ref)?; - let _ = lock.insert(lock_value); - } let mut idl = Idl::new(); idl.includes = self.includes.clone(); + + // Handling Git-Based Template Creation if let Some(git) = self.git.as_ref() { + let r#ref = self.r#ref.as_deref().unwrap_or("HEAD"); + let lock_value = get_repo_latest_commit_id(git, r#ref)?; + let _ = lock.insert(lock_value); idl.source = Source::Git(GitSource { repo: git.clone(), r#ref: None, lock, }); - idl.path = self.idl.clone(); - } else { - idl.path = self.idl.clone(); } + idl.path = self.idl.clone(); let mut entry = Entry { protocol: idl.protocol(), @@ -212,8 +210,7 @@ impl CliCommand for Init { self.copy_thrift_template(entry.clone())?; } - if self.git.as_ref().is_some() { - } else { + if self.git.as_ref().is_none() { // we will move volo.yml to volo-gen, so we need to add .. to includes and idl path let idl = entry.idls.get_mut(0).unwrap(); if let Some(includes) = &mut idl.includes { diff --git a/volo-cli/src/templates/grpc/rust-toolchain_toml b/volo-cli/src/templates/grpc/rust-toolchain_toml index 8e47c085..ecd40730 100644 --- a/volo-cli/src/templates/grpc/rust-toolchain_toml +++ b/volo-cli/src/templates/grpc/rust-toolchain_toml @@ -1,4 +1,3 @@ [toolchain] -# TODO: we can remove this toolchain file when TAIT hits stable. -# Related issue: https://github.com/rust-lang/rust/issues/63063. +# TODO: we can remove this toolchain file when AFIT and RPITIT hits stable. channel = "nightly" diff --git a/volo-cli/src/templates/grpc/src/bin/server_rs b/volo-cli/src/templates/grpc/src/bin/server_rs index 4e37b629..5d92fed9 100644 --- a/volo-cli/src/templates/grpc/src/bin/server_rs +++ b/volo-cli/src/templates/grpc/src/bin/server_rs @@ -1,4 +1,4 @@ -#![feature(impl_trait_in_assoc_type)] + use std::net::SocketAddr; diff --git a/volo-cli/src/templates/grpc/src/lib_rs b/volo-cli/src/templates/grpc/src/lib_rs index 879cf755..c5fe0d21 100644 --- a/volo-cli/src/templates/grpc/src/lib_rs +++ b/volo-cli/src/templates/grpc/src/lib_rs @@ -1,8 +1,7 @@ -#![feature(impl_trait_in_assoc_type)] + pub struct S; -#[volo::async_trait] impl volo_gen::{service_global_name} for S {{ {methods} }} diff --git a/volo-cli/src/templates/grpc/volo-gen/src/lib_rs b/volo-cli/src/templates/grpc/volo-gen/src/lib_rs index 38503f80..fe51797c 100644 --- a/volo-cli/src/templates/grpc/volo-gen/src/lib_rs +++ b/volo-cli/src/templates/grpc/volo-gen/src/lib_rs @@ -1,4 +1,4 @@ -#![feature(impl_trait_in_assoc_type)] + mod gen {{ volo::include_service!("volo_gen.rs"); diff --git a/volo-cli/src/templates/thrift/rust-toolchain_toml b/volo-cli/src/templates/thrift/rust-toolchain_toml index 8e47c085..ecd40730 100644 --- a/volo-cli/src/templates/thrift/rust-toolchain_toml +++ b/volo-cli/src/templates/thrift/rust-toolchain_toml @@ -1,4 +1,3 @@ [toolchain] -# TODO: we can remove this toolchain file when TAIT hits stable. -# Related issue: https://github.com/rust-lang/rust/issues/63063. +# TODO: we can remove this toolchain file when AFIT and RPITIT hits stable. channel = "nightly" diff --git a/volo-cli/src/templates/thrift/src/bin/server_rs b/volo-cli/src/templates/thrift/src/bin/server_rs index 8089a522..b9b255e3 100644 --- a/volo-cli/src/templates/thrift/src/bin/server_rs +++ b/volo-cli/src/templates/thrift/src/bin/server_rs @@ -1,4 +1,4 @@ -#![feature(impl_trait_in_assoc_type)] + use std::net::SocketAddr; diff --git a/volo-cli/src/templates/thrift/src/lib_rs b/volo-cli/src/templates/thrift/src/lib_rs index 879cf755..c5fe0d21 100644 --- a/volo-cli/src/templates/thrift/src/lib_rs +++ b/volo-cli/src/templates/thrift/src/lib_rs @@ -1,8 +1,7 @@ -#![feature(impl_trait_in_assoc_type)] + pub struct S; -#[volo::async_trait] impl volo_gen::{service_global_name} for S {{ {methods} }} diff --git a/volo-cli/src/templates/thrift/volo-gen/src/lib_rs b/volo-cli/src/templates/thrift/volo-gen/src/lib_rs index 38503f80..fe51797c 100644 --- a/volo-cli/src/templates/thrift/volo-gen/src/lib_rs +++ b/volo-cli/src/templates/thrift/volo-gen/src/lib_rs @@ -1,4 +1,4 @@ -#![feature(impl_trait_in_assoc_type)] + mod gen {{ volo::include_service!("volo_gen.rs"); diff --git a/volo-grpc/Cargo.toml b/volo-grpc/Cargo.toml index b0d028fa..7b5bc6ca 100644 --- a/volo-grpc/Cargo.toml +++ b/volo-grpc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volo-grpc" -version = "0.6.0" +version = "0.8.0" edition.workspace = true homepage.workspace = true repository.workspace = true @@ -19,13 +19,12 @@ maintenance = { status = "actively-developed" } [dependencies] pilota.workspace = true -volo = { version = "0.5", path = "../volo" } +volo = { version = "0.8", path = "../volo" } motore = { workspace = true, features = ["tower"] } metainfo.workspace = true anyhow.workspace = true async-stream.workspace = true -async-trait.workspace = true base64.workspace = true bytes.workspace = true fxhash.workspace = true diff --git a/volo-grpc/src/client/meta.rs b/volo-grpc/src/client/meta.rs index 49f8a806..45975cf9 100644 --- a/volo-grpc/src/client/meta.rs +++ b/volo-grpc/src/client/meta.rs @@ -1,6 +1,5 @@ use std::{net::SocketAddr, str::FromStr}; -use futures::Future; use metainfo::{Backward, Forward}; use volo::{context::Context, Service}; @@ -36,95 +35,86 @@ where type Error = S::Error; - type Future<'cx> = impl Future> + 'cx; - - fn call<'cx, 's>( + async fn call<'s, 'cx>( &'s self, cx: &'cx mut ClientContext, mut volo_req: Request, - ) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - let metadata = volo_req.metadata_mut(); - _ = metainfo::METAINFO.with(|metainfo| { - let metainfo = metainfo.borrow_mut(); - - // persistents for multi-hops - if let Some(ap) = metainfo.get_all_persistents() { - for (key, value) in ap { - let key = metainfo::HTTP_PREFIX_PERSISTENT.to_owned() + key; - metadata.insert(MetadataKey::from_str(key.as_str())?, value.parse()?); - } - } - - // transients for one-hop - if let Some(at) = metainfo.get_all_transients() { - for (key, value) in at { - let key = metainfo::HTTP_PREFIX_TRANSIENT.to_owned() + key; - metadata.insert(MetadataKey::from_str(key.as_str())?, value.parse()?); - } + ) -> Result { + let metadata = volo_req.metadata_mut(); + _ = metainfo::METAINFO.with(|metainfo| { + let metainfo = metainfo.borrow_mut(); + + // persistents for multi-hops + if let Some(ap) = metainfo.get_all_persistents() { + for (key, value) in ap { + let key = metainfo::HTTP_PREFIX_PERSISTENT.to_owned() + key; + metadata.insert(MetadataKey::from_str(key.as_str())?, value.parse()?); } + } - // caller - if let Some(caller) = cx.rpc_info.caller.as_ref() { - metadata.insert(SOURCE_SERVICE, caller.service_name().parse()?); + // transients for one-hop + if let Some(at) = metainfo.get_all_transients() { + for (key, value) in at { + let key = metainfo::HTTP_PREFIX_TRANSIENT.to_owned() + key; + metadata.insert(MetadataKey::from_str(key.as_str())?, value.parse()?); } - - // callee - if let Some(callee) = cx.rpc_info.callee.as_ref() { - metadata.insert(DESTINATION_SERVICE, callee.service_name().parse()?); - if let Some(method) = cx.rpc_info.method() { - metadata.insert(DESTINATION_METHOD, method.parse()?); - } + } + + // caller + if let Some(caller) = cx.rpc_info.caller.as_ref() { + metadata.insert(SOURCE_SERVICE, caller.service_name().parse()?); + } + + // callee + if let Some(callee) = cx.rpc_info.callee.as_ref() { + metadata.insert(DESTINATION_SERVICE, callee.service_name().parse()?); + if let Some(method) = cx.rpc_info.method() { + metadata.insert(DESTINATION_METHOD, method.parse()?); } + } - Ok::<(), Status>(()) - }); + Ok::<(), Status>(()) + }); - let mut volo_resp = self.inner.call(cx, volo_req).await?; + let mut volo_resp = self.inner.call(cx, volo_req).await?; - let metadata = volo_resp.metadata_mut(); - _ = metainfo::METAINFO.with(|metainfo| { - let mut metainfo = metainfo.borrow_mut(); + let metadata = volo_resp.metadata_mut(); + _ = metainfo::METAINFO.with(|metainfo| { + let mut metainfo = metainfo.borrow_mut(); - // callee - if let Some(ad) = metadata.remove(HEADER_TRANS_REMOTE_ADDR) { - let maybe_addr = ad.to_str()?.parse::(); - if let (Some(callee), Ok(addr)) = - (cx.rpc_info_mut().callee.as_mut(), maybe_addr) - { - callee.set_address(volo::net::Address::from(addr)); - } + // callee + if let Some(ad) = metadata.remove(HEADER_TRANS_REMOTE_ADDR) { + let maybe_addr = ad.to_str()?.parse::(); + if let (Some(callee), Ok(addr)) = (cx.rpc_info_mut().callee.as_mut(), maybe_addr) { + callee.set_address(volo::net::Address::from(addr)); } - - // backward - let mut vec = Vec::with_capacity(metadata.len()); - for key_and_value in metadata.iter() { - match key_and_value { - KeyAndValueRef::Ascii(k, v) => { - let k = k.as_str(); - let v = v.to_str()?; - if k.starts_with(metainfo::HTTP_PREFIX_BACKWARD) { - vec.push(k.to_owned()); - metainfo.strip_http_prefix_and_set_backward_downstream( - k.to_owned(), - v.to_owned(), - ); - } + } + + // backward + let mut vec = Vec::with_capacity(metadata.len()); + for key_and_value in metadata.iter() { + match key_and_value { + KeyAndValueRef::Ascii(k, v) => { + let k = k.as_str(); + let v = v.to_str()?; + if k.starts_with(metainfo::HTTP_PREFIX_BACKWARD) { + vec.push(k.to_owned()); + metainfo.strip_http_prefix_and_set_backward_downstream( + k.to_owned(), + v.to_owned(), + ); } - _ => unreachable!(), } + _ => unreachable!(), } - for k in vec { - metadata.remove(k); - } + } + for k in vec { + metadata.remove(k); + } - Ok::<(), Status>(()) - }); + Ok::<(), Status>(()) + }); - Ok(volo_resp) - } + Ok(volo_resp) } } diff --git a/volo-grpc/src/client/mod.rs b/volo-grpc/src/client/mod.rs index f2268526..3be9033c 100644 --- a/volo-grpc/src/client/mod.rs +++ b/volo-grpc/src/client/mod.rs @@ -11,7 +11,6 @@ mod meta; use std::{cell::RefCell, marker::PhantomData, sync::Arc, time::Duration}; pub use callopt::CallOpt; -use futures::Future; pub use meta::MetaService; use motore::{ layer::{Identity, Layer, Stack}, @@ -430,13 +429,10 @@ where Service, Response = Response> + 'static + Send + Clone + Sync, <>::Service as Service>>::Error: Into, - for<'cx> <>::Service as Service>>::Future<'cx>: - Send, IL: Layer>>, IL::Service: Service, Response = Response> + 'static + Send + Clone + Sync, >>::Error: Into, - for<'cx> >>::Future<'cx>: Send, OL: Layer< BoxCloneService< @@ -452,7 +448,6 @@ where OL::Service: Service, Response = Response> + 'static + Send + Clone + Sync, >>::Error: Send + Into, - for<'cx> >>::Future<'cx>: Send, T: 'static + Send, { /// Builds a new [`Client`]. @@ -551,17 +546,13 @@ macro_rules! impl_client { { type Response = S::Response; type Error = S::Error; - type Future<'cx> = impl Future> + 'cx; - fn call<'cx, 's>( + async fn call<'s, 'cx>( &'s $self, $cx: &'cx mut crate::context::ClientContext, $req: Req, - ) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { $e } + ) -> Result { + $e } } @@ -578,17 +569,13 @@ macro_rules! impl_client { { type Response = S::Response; type Error = S::Error; - type Future<'cx> = impl Future> + 'cx; - fn call<'cx>( + async fn call<'cx>( $self, $cx: &'cx mut crate::context::ClientContext, $req: Req, - ) -> Self::Future<'cx> - where - Self: 'cx, - { - async move { $e } + ) -> Result { + $e } } }; diff --git a/volo-grpc/src/layer/cross_origin.rs b/volo-grpc/src/layer/cross_origin.rs index 7dfe2627..304f8903 100644 --- a/volo-grpc/src/layer/cross_origin.rs +++ b/volo-grpc/src/layer/cross_origin.rs @@ -1,4 +1,3 @@ -use futures::Future; use http::{Request, Uri}; use motore::Service; @@ -18,20 +17,18 @@ impl AddOrigin { impl Service> for AddOrigin where - T: Service>, - ReqBody: 'static, + T: Service> + Send + Sync, + ReqBody: Send + 'static, + Cx: Send, { type Response = T::Response; type Error = T::Error; - type Future<'cx> = impl Future> + 'cx - where - Self: 'cx, - Cx: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Request) -> Self::Future<'cx> - where - 's: 'cx, - { + + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Request, + ) -> Result { // split the header and body let (mut head, body) = req.into_parts(); @@ -49,6 +46,6 @@ where let request = Request::from_parts(head, body); // call inner Service - self.inner.call(cx, request) + self.inner.call(cx, request).await } } diff --git a/volo-grpc/src/layer/grpc_timeout.rs b/volo-grpc/src/layer/grpc_timeout.rs index 6c41e47b..55ee019d 100644 --- a/volo-grpc/src/layer/grpc_timeout.rs +++ b/volo-grpc/src/layer/grpc_timeout.rs @@ -88,20 +88,18 @@ fn try_parse_client_timeout( impl Service> for GrpcTimeout where - S: Service, Error = Status>, - ReqBody: 'static, + Cx: Send, + S: Service, Error = Status> + Send + Sync, + ReqBody: 'static + Send, { type Response = S::Response; type Error = Status; - type Future<'cx> = ResponseFuture> + 'cx> - where - Self: 'cx, - Cx: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: hyper::Request) -> Self::Future<'cx> - where - 's: 'cx, - { + + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: hyper::Request, + ) -> Result { // parse the client_timeout let client_timeout = try_parse_client_timeout(req.headers()).unwrap_or_else(|_| { tracing::trace!("[VOLO] error parsing grpc-timeout header"); @@ -126,6 +124,7 @@ where inner: self.inner.call(cx, req), sleep: pined_sleep, } + .await } } diff --git a/volo-grpc/src/layer/loadbalance/mod.rs b/volo-grpc/src/layer/loadbalance/mod.rs index b2c7dab2..598ecb72 100644 --- a/volo-grpc/src/layer/loadbalance/mod.rs +++ b/volo-grpc/src/layer/loadbalance/mod.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, future::Future, sync::Arc}; +use std::{fmt::Debug, sync::Arc}; use motore::Service; use tracing::warn; @@ -79,7 +79,6 @@ where D: Discover, LB: LoadBalance, S: Service> + 'static + Send + Sync, - for<'cx> S::Future<'cx>: Send, LoadBalanceError: Into, S::Error: Debug, T: Send + 'static, @@ -88,49 +87,44 @@ where type Error = S::Error; - type Future<'cx> = impl Future> + Send + 'cx - where - Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Request) -> Self::Future<'cx> - where - 's: 'cx, - { + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Request, + ) -> Result { debug_assert!( cx.rpc_info().callee.is_some(), "must set callee endpoint before load balance service" ); - async move { - let callee = cx.rpc_info().callee().volo_unwrap(); - - let mut picker = match &callee.address { - None => self - .load_balance - .get_picker(callee, &self.discover) - .await - .map_err(|err| err.into())?, - _ => { - return self.service.call(cx, req).await.map_err(Into::into); - } - }; - - if let Some(addr) = picker.next() { - if let Some(callee) = cx.rpc_info_mut().callee_mut() { - callee.address = Some(addr.clone()) - } + let callee = cx.rpc_info().callee().volo_unwrap(); + + let mut picker = match &callee.address { + None => self + .load_balance + .get_picker(callee, &self.discover) + .await + .map_err(|err| err.into())?, + _ => { + return self.service.call(cx, req).await.map_err(Into::into); + } + }; - return match self.service.call(cx, req).await { - Ok(resp) => Ok(resp), - Err(err) => { - warn!("[VOLO] call endpoint: {:?} error: {:?}", addr, err); - Err(err) - } - }; - } else { - warn!("[VOLO] zero call count, call info: {:?}", cx.rpc_info()); + if let Some(addr) = picker.next() { + if let Some(callee) = cx.rpc_info_mut().callee_mut() { + callee.address = Some(addr.clone()) } - Err(LoadBalanceError::Retry).map_err(|err| err.into())? + + return match self.service.call(cx, req).await { + Ok(resp) => Ok(resp), + Err(err) => { + warn!("[VOLO] call endpoint: {:?} error: {:?}", addr, err); + Err(err) + } + }; + } else { + warn!("[VOLO] zero call count, call info: {:?}", cx.rpc_info()); } + Err(LoadBalanceError::Retry).map_err(|err| err.into())? } } diff --git a/volo-grpc/src/layer/user_agent.rs b/volo-grpc/src/layer/user_agent.rs index aecbe0a6..3e6bc796 100644 --- a/volo-grpc/src/layer/user_agent.rs +++ b/volo-grpc/src/layer/user_agent.rs @@ -1,4 +1,3 @@ -use futures::Future; use http::{header::USER_AGENT, HeaderValue, Request}; use motore::Service; @@ -29,24 +28,22 @@ impl UserAgent { impl Service> for UserAgent where - T: Service>, - ReqBody: 'static, + T: Service> + Send + Sync, + ReqBody: Send + 'static, + Cx: Send, { type Response = T::Response; type Error = T::Error; - type Future<'cx> = impl Future> + 'cx - where - Self: 'cx, - Cx: 'cx; - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, mut req: Request) -> Self::Future<'cx> - where - 's: 'cx, - { + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + mut req: Request, + ) -> Result { req.headers_mut() .insert(USER_AGENT, self.user_agent.clone()); - self.inner.call(cx, req) + self.inner.call(cx, req).await } } diff --git a/volo-grpc/src/server/meta.rs b/volo-grpc/src/server/meta.rs index 78794450..0703489b 100644 --- a/volo-grpc/src/server/meta.rs +++ b/volo-grpc/src/server/meta.rs @@ -1,6 +1,5 @@ use std::{cell::RefCell, net::SocketAddr, str::FromStr, sync::Arc}; -use futures::Future; use metainfo::{Backward, Forward}; use volo::{ context::{Context, Endpoint}, @@ -51,112 +50,111 @@ where type Error = Status; - type Future<'cx> = impl Future> + Send + 'cx; - - fn call<'cx, 's>( + async fn call<'s, 'cx>( &'s self, cx: &'cx mut ServerContext, req: hyper::Request, - ) -> Self::Future<'cx> - where - 's: 'cx, - { + ) -> Result { let peer_addr = self.peer_addr.clone(); - metainfo::METAINFO.scope(RefCell::new(metainfo::MetaInfo::default()), async move { - cx.rpc_info.method = Some(FastStr::new(req.uri().path())); + metainfo::METAINFO + .scope(RefCell::new(metainfo::MetaInfo::default()), async move { + cx.rpc_info.method = Some(FastStr::new(req.uri().path())); - let mut volo_req = Request::from_http(req); + let mut volo_req = Request::from_http(req); - let metadata = volo_req.metadata_mut(); + let metadata = volo_req.metadata_mut(); - status_to_http!(metainfo::METAINFO.with(|metainfo| { - let mut metainfo = metainfo.borrow_mut(); + status_to_http!(metainfo::METAINFO.with(|metainfo| { + let mut metainfo = metainfo.borrow_mut(); - // caller - if let Some(source_service) = metadata.remove(SOURCE_SERVICE) { - let source_service = Arc::::from(source_service.to_str()?); - let mut caller = Endpoint::new(source_service.into()); - if let Some(ad) = metadata.remove(HEADER_TRANS_REMOTE_ADDR) { - let addr = ad.to_str()?.parse::(); - if let Ok(addr) = addr { - caller.set_address(volo::net::Address::from(addr)); + // caller + if let Some(source_service) = metadata.remove(SOURCE_SERVICE) { + let source_service = Arc::::from(source_service.to_str()?); + let mut caller = Endpoint::new(source_service.into()); + if let Some(ad) = metadata.remove(HEADER_TRANS_REMOTE_ADDR) { + let addr = ad.to_str()?.parse::(); + if let Ok(addr) = addr { + caller.set_address(volo::net::Address::from(addr)); + } } + if caller.address.is_none() { + caller.address = peer_addr; + } + cx.rpc_info_mut().caller = Some(caller); } - if caller.address.is_none() { - caller.address = peer_addr; + + // callee + if let Some(destination_service) = metadata.remove(DESTINATION_SERVICE) { + let destination_service = Arc::::from(destination_service.to_str()?); + let callee = Endpoint::new(destination_service.into()); + cx.rpc_info_mut().callee = Some(callee); } - cx.rpc_info_mut().caller = Some(caller); - } - - // callee - if let Some(destination_service) = metadata.remove(DESTINATION_SERVICE) { - let destination_service = Arc::::from(destination_service.to_str()?); - let callee = Endpoint::new(destination_service.into()); - cx.rpc_info_mut().callee = Some(callee); - } - - // persistent and transient - let mut vec = Vec::with_capacity(metadata.len()); - for key_and_value in metadata.iter() { - match key_and_value { - KeyAndValueRef::Ascii(k, v) => { - let k = k.as_str(); - let v = v.to_str()?; - if k.starts_with(metainfo::HTTP_PREFIX_PERSISTENT) { - vec.push(k.to_owned()); - metainfo.strip_http_prefix_and_set_persistent( - k.to_owned(), - v.to_owned(), - ); - } else if k.starts_with(metainfo::HTTP_PREFIX_TRANSIENT) { - vec.push(k.to_owned()); - metainfo - .strip_http_prefix_and_set_upstream(k.to_owned(), v.to_owned()); + + // persistent and transient + let mut vec = Vec::with_capacity(metadata.len()); + for key_and_value in metadata.iter() { + match key_and_value { + KeyAndValueRef::Ascii(k, v) => { + let k = k.as_str(); + let v = v.to_str()?; + if k.starts_with(metainfo::HTTP_PREFIX_PERSISTENT) { + vec.push(k.to_owned()); + metainfo.strip_http_prefix_and_set_persistent( + k.to_owned(), + v.to_owned(), + ); + } else if k.starts_with(metainfo::HTTP_PREFIX_TRANSIENT) { + vec.push(k.to_owned()); + metainfo.strip_http_prefix_and_set_upstream( + k.to_owned(), + v.to_owned(), + ); + } } + _ => unreachable!(), } - _ => unreachable!(), } - } - for k in vec { - metadata.remove(k); - } - - Ok::<(), Status>(()) - })); - - let volo_resp = match self.inner.call(cx, volo_req).await { - Ok(resp) => resp, - Err(err) => { - return Ok(err.into().to_http()); - } - }; - - let (mut metadata, extensions, message) = volo_resp.into_parts(); - - status_to_http!(metainfo::METAINFO.with(|metainfo| { - let metainfo = metainfo.borrow_mut(); - - // backward - if let Some(at) = metainfo.get_all_backward_transients() { - for (key, value) in at { - let key = metainfo::HTTP_PREFIX_BACKWARD.to_owned() + key; - metadata.insert(MetadataKey::from_str(key.as_str())?, value.parse()?); + for k in vec { + metadata.remove(k); + } + + Ok::<(), Status>(()) + })); + + let volo_resp = match self.inner.call(cx, volo_req).await { + Ok(resp) => resp, + Err(err) => { + return Ok(err.into().to_http()); + } + }; + + let (mut metadata, extensions, message) = volo_resp.into_parts(); + + status_to_http!(metainfo::METAINFO.with(|metainfo| { + let metainfo = metainfo.borrow_mut(); + + // backward + if let Some(at) = metainfo.get_all_backward_transients() { + for (key, value) in at { + let key = metainfo::HTTP_PREFIX_BACKWARD.to_owned() + key; + metadata.insert(MetadataKey::from_str(key.as_str())?, value.parse()?); + } } - } - Ok::<(), Status>(()) - })); + Ok::<(), Status>(()) + })); - let mut resp = hyper::Response::new(message); - *resp.headers_mut() = metadata.into_headers(); - *resp.extensions_mut() = extensions; - resp.headers_mut().insert( - http::header::CONTENT_TYPE, - http::header::HeaderValue::from_static("application/grpc"), - ); + let mut resp = hyper::Response::new(message); + *resp.headers_mut() = metadata.into_headers(); + *resp.extensions_mut() = extensions; + resp.headers_mut().insert( + http::header::CONTENT_TYPE, + http::header::HeaderValue::from_static("application/grpc"), + ); - Ok(resp) - }) + Ok(resp) + }) + .await } } diff --git a/volo-grpc/src/server/router.rs b/volo-grpc/src/server/router.rs index fb84041a..0eb0fa3b 100644 --- a/volo-grpc/src/server/router.rs +++ b/volo-grpc/src/server/router.rs @@ -3,7 +3,6 @@ use std::{ sync::atomic::{AtomicU32, Ordering}, }; -use futures::Future; use fxhash::FxHashMap; use http_body::Body as HttpBody; use motore::{BoxCloneService, Service}; @@ -93,24 +92,20 @@ where { type Response = Response; type Error = Status; - type Future<'cx> = impl Future> + 'cx - where - Self: 'cx; - fn call<'cx, 's>(&'s self, cx: &'cx mut ServerContext, req: Request) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - let path = cx.rpc_info.method.as_ref().unwrap(); - match self.node.at(path) { - Ok(match_) => { - let id = match_.value; - let route = self.routes.get(id).volo_unwrap().clone(); - route.call(cx, req).await - } - Err(err) => Err(Status::unimplemented(err.to_string())), + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut ServerContext, + req: Request, + ) -> Result { + let path = cx.rpc_info.method.as_ref().unwrap(); + match self.node.at(path) { + Ok(match_) => { + let id = match_.value; + let route = self.routes.get(id).volo_unwrap().clone(); + route.call(cx, req).await } + Err(err) => Err(Status::unimplemented(err.to_string())), } } } diff --git a/volo-grpc/src/server/service.rs b/volo-grpc/src/server/service.rs index 8d3dd212..91481538 100644 --- a/volo-grpc/src/server/service.rs +++ b/volo-grpc/src/server/service.rs @@ -1,6 +1,5 @@ use std::marker::PhantomData; -use futures::Future; use motore::{ layer::{Identity, Layer, Stack}, service::Service, @@ -126,52 +125,44 @@ where { type Response = Response; type Error = Status; - type Future<'cx> = impl Future> + Send + 'cx - where - Self: 'cx; - fn call<'cx, 's>( + async fn call<'s, 'cx>( &'s self, cx: &'cx mut ServerContext, req: Request, - ) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - let (metadata, extensions, body) = req.into_parts(); - let send_compression = CompressionEncoding::from_accept_encoding_header( - metadata.headers(), - &self.rpc_config.send_compressions, + ) -> Result { + let (metadata, extensions, body) = req.into_parts(); + let send_compression = CompressionEncoding::from_accept_encoding_header( + metadata.headers(), + &self.rpc_config.send_compressions, + ); + + let recv_compression = CompressionEncoding::from_encoding_header( + metadata.headers(), + &self.rpc_config.accept_compressions, + )?; + + let message = T::from_body( + cx.rpc_info.method.as_deref(), + body, + Kind::Request, + recv_compression, + )?; + + let volo_req = Request::from_parts(metadata, extensions, message); + + let volo_resp = self.inner.call(cx, volo_req).await.map_err(Into::into)?; + + let mut resp = volo_resp.map(|message| Body::new(message.into_body(send_compression))); + + if let Some(encoding) = send_compression { + resp.metadata_mut().insert( + ENCODING_HEADER, + MetadataValue::unchecked_from_header_value(encoding.into_header_value()), ); + }; - let recv_compression = CompressionEncoding::from_encoding_header( - metadata.headers(), - &self.rpc_config.accept_compressions, - )?; - - let message = T::from_body( - cx.rpc_info.method.as_deref(), - body, - Kind::Request, - recv_compression, - )?; - - let volo_req = Request::from_parts(metadata, extensions, message); - - let volo_resp = self.inner.call(cx, volo_req).await.map_err(Into::into)?; - - let mut resp = volo_resp.map(|message| Body::new(message.into_body(send_compression))); - - if let Some(encoding) = send_compression { - resp.metadata_mut().insert( - ENCODING_HEADER, - MetadataValue::unchecked_from_header_value(encoding.into_header_value()), - ); - }; - - Ok(resp) - } + Ok(resp) } } diff --git a/volo-grpc/src/transport/client.rs b/volo-grpc/src/transport/client.rs index 9b3e1ee0..d9fc3e0a 100644 --- a/volo-grpc/src/transport/client.rs +++ b/volo-grpc/src/transport/client.rs @@ -1,6 +1,5 @@ use std::{io, marker::PhantomData}; -use futures::Future; use http::{ header::{CONTENT_TYPE, TE}, HeaderValue, @@ -107,103 +106,94 @@ where type Error = Status; - type Future<'cx> = impl Future> + 'cx; - - fn call<'cx, 's>( + async fn call<'s, 'cx>( &'s self, cx: &'cx mut ClientContext, volo_req: Request, - ) -> Self::Future<'cx> - where - 's: 'cx, - { + ) -> Result { let mut http_client = self.http_client.clone(); - async move { - // SAFETY: parameters controlled by volo-grpc are guaranteed to be valid. - // get the call address from the context - let target = cx - .rpc_info - .callee() - .volo_unwrap() - .address() - .ok_or_else(|| { - io::Error::new(std::io::ErrorKind::InvalidData, "address is required") - })?; - - let (metadata, extensions, message) = volo_req.into_parts(); - let path = cx.rpc_info.method().volo_unwrap(); - let rpc_config = cx.rpc_info.config().volo_unwrap(); - let accept_compressions = &rpc_config.accept_compressions; - - // select the compression algorithm with the highest priority by user's config - let send_compression = rpc_config - .send_compressions - .as_ref() - .map(|config| config[0]); - - let body = hyper::Body::wrap_stream(message.into_body(send_compression)); - - let mut req = hyper::Request::new(body); - *req.version_mut() = http::Version::HTTP_2; - *req.method_mut() = http::Method::POST; - *req.uri_mut() = build_uri(target, path); - *req.headers_mut() = metadata.into_headers(); - *req.extensions_mut() = extensions; - req.headers_mut() - .insert(TE, HeaderValue::from_static("trailers")); + // SAFETY: parameters controlled by volo-grpc are guaranteed to be valid. + // get the call address from the context + let target = cx + .rpc_info + .callee() + .volo_unwrap() + .address() + .ok_or_else(|| { + io::Error::new(std::io::ErrorKind::InvalidData, "address is required") + })?; + + let (metadata, extensions, message) = volo_req.into_parts(); + let path = cx.rpc_info.method().volo_unwrap(); + let rpc_config = cx.rpc_info.config().volo_unwrap(); + let accept_compressions = &rpc_config.accept_compressions; + + // select the compression algorithm with the highest priority by user's config + let send_compression = rpc_config + .send_compressions + .as_ref() + .map(|config| config[0]); + + let body = hyper::Body::wrap_stream(message.into_body(send_compression)); + + let mut req = hyper::Request::new(body); + *req.version_mut() = http::Version::HTTP_2; + *req.method_mut() = http::Method::POST; + *req.uri_mut() = build_uri(target, path); + *req.headers_mut() = metadata.into_headers(); + *req.extensions_mut() = extensions; + req.headers_mut() + .insert(TE, HeaderValue::from_static("trailers")); + req.headers_mut() + .insert(CONTENT_TYPE, HeaderValue::from_static("application/grpc")); + + // insert compression headers + if let Some(send_compression) = send_compression { req.headers_mut() - .insert(CONTENT_TYPE, HeaderValue::from_static("application/grpc")); - - // insert compression headers - if let Some(send_compression) = send_compression { - req.headers_mut() - .insert(ENCODING_HEADER, send_compression.into_header_value()); - } - if let Some(accept_compressions) = accept_compressions { - if !accept_compressions.is_empty() { - if let Some(header_value) = accept_compressions[0] - .into_accept_encoding_header_value(accept_compressions) - { - req.headers_mut() - .insert(ACCEPT_ENCODING_HEADER, header_value); - } + .insert(ENCODING_HEADER, send_compression.into_header_value()); + } + if let Some(accept_compressions) = accept_compressions { + if !accept_compressions.is_empty() { + if let Some(header_value) = + accept_compressions[0].into_accept_encoding_header_value(accept_compressions) + { + req.headers_mut() + .insert(ACCEPT_ENCODING_HEADER, header_value); } } + } - // call the service through hyper client - let resp = http_client - .ready() - .await - .map_err(|err| Status::from_error(err.into()))? - .call(req) - .await - .map_err(|err| Status::from_error(err.into()))?; - - let status_code = resp.status(); - let headers = resp.headers(); - - if let Some(status) = Status::from_header_map(headers) { - if status.code() != Code::Ok { - return Err(status); - } + // call the service through hyper client + let resp = http_client + .ready() + .await + .map_err(|err| Status::from_error(err.into()))? + .call(req) + .await + .map_err(|err| Status::from_error(err.into()))?; + + let status_code = resp.status(); + let headers = resp.headers(); + + if let Some(status) = Status::from_header_map(headers) { + if status.code() != Code::Ok { + return Err(status); } - - let accept_compression = CompressionEncoding::from_encoding_header( - headers, - &rpc_config.accept_compressions, - )?; - - let (parts, body) = resp.into_parts(); - - let body = U::from_body( - Some(path), - body, - Kind::Response(status_code), - accept_compression, - )?; - let resp = hyper::Response::from_parts(parts, body); - Ok(Response::from_http(resp)) } + + let accept_compression = + CompressionEncoding::from_encoding_header(headers, &rpc_config.accept_compressions)?; + + let (parts, body) = resp.into_parts(); + + let body = U::from_body( + Some(path), + body, + Kind::Response(status_code), + accept_compression, + )?; + let resp = hyper::Response::from_parts(parts, body); + Ok(Response::from_http(resp)) } } diff --git a/volo-macros/Cargo.toml b/volo-macros/Cargo.toml index 2fafc2a1..5fe89b41 100644 --- a/volo-macros/Cargo.toml +++ b/volo-macros/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volo-macros" -version = "0.3.0" +version = "0.8.0" edition.workspace = true homepage.workspace = true repository.workspace = true diff --git a/volo-thrift/Cargo.toml b/volo-thrift/Cargo.toml index c29a9ed3..54fe18bc 100644 --- a/volo-thrift/Cargo.toml +++ b/volo-thrift/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volo-thrift" -version = "0.7.3" +version = "0.8.0" edition.workspace = true homepage.workspace = true repository.workspace = true @@ -18,13 +18,12 @@ keywords = ["async", "rpc", "thrift"] maintenance = { status = "actively-developed" } [dependencies] -volo = { version = "0.5", path = "../volo" } +volo = { version = "0.8", path = "../volo" } pilota.workspace = true motore.workspace = true metainfo.workspace = true anyhow.workspace = true -async-trait.workspace = true bytes.workspace = true chrono.workspace = true futures.workspace = true @@ -47,6 +46,7 @@ tokio = { workspace = true, features = [ tracing.workspace = true [features] +default = [] # multiplex is unstable and we don't provide backward compatibility multiplex = [] # unsafe-codec can achieve better performance for thrift binary protocol, but may cause undefined behavior diff --git a/volo-thrift/src/client/layer/timeout.rs b/volo-thrift/src/client/layer/timeout.rs index 30739cdd..2377506c 100644 --- a/volo-thrift/src/client/layer/timeout.rs +++ b/volo-thrift/src/client/layer/timeout.rs @@ -1,8 +1,6 @@ //! Applies a timeout to request //! if the inner service's call does not complete within specified timeout, the response will be //! aborted. - -use futures::Future; use motore::{layer::Layer, service::Service}; use tracing::warn; @@ -23,39 +21,36 @@ where type Error = crate::Error; - type Future<'cx> = impl Future> + 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut ClientContext, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - if let Some(config) = cx.rpc_info.config() { - match config.rpc_timeout() { - Some(duration) => { - let start = std::time::Instant::now(); - match tokio::time::timeout(duration, self.inner.call(cx, req)).await { - Ok(r) => r.map_err(Into::into), - Err(_) => { - let msg = format!( - "[VOLO] thrift rpc call timeout, rpcinfo: {:?}, elpased: \ - {:?}, timeout config: {:?}", - cx.rpc_info, - start.elapsed(), - duration - ); - warn!(msg); - Err(crate::Error::Transport( - std::io::Error::new(std::io::ErrorKind::TimedOut, msg).into(), - )) - } + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut ClientContext, + req: Req, + ) -> Result { + if let Some(config) = cx.rpc_info.config() { + match config.rpc_timeout() { + Some(duration) => { + let start = std::time::Instant::now(); + match tokio::time::timeout(duration, self.inner.call(cx, req)).await { + Ok(r) => r.map_err(Into::into), + Err(_) => { + let msg = format!( + "[VOLO] thrift rpc call timeout, rpcinfo: {:?}, elpased: {:?}, \ + timeout config: {:?}", + cx.rpc_info, + start.elapsed(), + duration + ); + warn!(msg); + Err(crate::Error::Transport( + std::io::Error::new(std::io::ErrorKind::TimedOut, msg).into(), + )) } } - None => self.inner.call(cx, req).await.map_err(Into::into), } - } else { - unreachable!("rpc_info.config should never be None") + None => self.inner.call(cx, req).await.map_err(Into::into), } + } else { + unreachable!("rpc_info.config should never be None") } } } diff --git a/volo-thrift/src/client/mod.rs b/volo-thrift/src/client/mod.rs index 02e1f4ed..fb2704b1 100644 --- a/volo-thrift/src/client/mod.rs +++ b/volo-thrift/src/client/mod.rs @@ -11,7 +11,6 @@ use std::{ sync::{atomic::AtomicI32, Arc}, }; -use futures::Future; use motore::{ layer::{Identity, Layer, Stack}, service::{BoxCloneService, Service}, @@ -474,21 +473,18 @@ where type Error = Error; - type Future<'cx> = impl Future> + 'cx + Send where Self:'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut ClientContext, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - let msg = ThriftMessage::mk_client_msg(cx, Ok(req))?; - let resp = self.inner.call(cx, msg).await; - match resp { - Ok(Some(ThriftMessage { data: Ok(data), .. })) => Ok(Some(data)), - Ok(Some(ThriftMessage { data: Err(e), .. })) => Err(e), - Err(e) => Err(e), - Ok(None) => Ok(None), - } + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut ClientContext, + req: Req, + ) -> Result { + let msg = ThriftMessage::mk_client_msg(cx, Ok(req))?; + let resp = self.inner.call(cx, msg).await; + match resp { + Ok(Some(ThriftMessage { data: Ok(data), .. })) => Ok(Some(data)), + Ok(Some(ThriftMessage { data: Err(e), .. })) => Err(e), + Err(e) => Err(e), + Ok(None) => Ok(None), } } } @@ -512,21 +508,17 @@ where + Send + Clone + Sync, - for<'cx> <>::Service as Service>::Future<'cx>: - Send, Req: EntryMessage + Send + 'static + Sync + Clone, Resp: EntryMessage + Send + 'static, IL: Layer>, IL::Service: Service> + Sync + Clone + Send + 'static, >::Error: Send + Into, - for<'cx> >::Future<'cx>: Send, MkT: MakeTransport, MkC: MakeCodec + Sync, OL: Layer, Error>>, OL::Service: Service> + 'static + Send + Clone + Sync, - for<'cx> >::Future<'cx>: Send, >::Error: Send + Sync + Into, { /// Build volo client. @@ -692,17 +684,13 @@ macro_rules! impl_client { { type Response = S::Response; type Error = S::Error; - type Future<'cx> = impl Future> + 'cx; - fn call<'cx, 's>( + async fn call<'s, 'cx>( &'s $self, $cx: &'cx mut crate::context::ClientContext, $req: Req, - ) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { $e } + ) -> Result { + $e } } @@ -720,17 +708,13 @@ macro_rules! impl_client { { type Response = S::Response; type Error = S::Error; - type Future<'cx> = impl Future> + 'cx; - fn call<'cx>( + async fn call<'cx>( $self, $cx: &'cx mut crate::context::ClientContext, $req: Req, - ) -> Self::Future<'cx> - where - Self: 'cx, - { - async move { $e } + ) -> Result { + $e } } }; diff --git a/volo-thrift/src/codec/default/framed.rs b/volo-thrift/src/codec/default/framed.rs index 65e8c939..9fa94e79 100644 --- a/volo-thrift/src/codec/default/framed.rs +++ b/volo-thrift/src/codec/default/framed.rs @@ -74,7 +74,6 @@ impl FramedDecoder { /// https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#compatibility pub const HEADER_DETECT_LENGTH: usize = 6; -#[async_trait::async_trait] impl ZeroCopyDecoder for FramedDecoder where D: ZeroCopyDecoder, diff --git a/volo-thrift/src/codec/default/mod.rs b/volo-thrift/src/codec/default/mod.rs index 6d14a6ca..9987d026 100644 --- a/volo-thrift/src/codec/default/mod.rs +++ b/volo-thrift/src/codec/default/mod.rs @@ -26,6 +26,8 @@ //! [Kitex]: https://github.com/cloudwego/kitex //! [TTHeader]: https://www.cloudwego.io/docs/kitex/reference/transport_protocol_ttheader/ //! [Framed]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#framed-vs-unframed-transport +use std::future::Future; + use bytes::Bytes; use linkedbytes::LinkedBytes; use pilota::thrift::{DecodeError, EncodeError, TransportError}; @@ -73,7 +75,6 @@ pub trait ZeroCopyEncoder: Send + Sync + 'static { /// [`ZeroCopyDecoder`] tries to decode a message without copying large data, so the [`Bytes`] in /// the [`decode`] method is not designed to be reused, and the implementation can use /// `Bytes::split_to` to get a [`Bytes`] and hand it to the user directly. -#[async_trait::async_trait] pub trait ZeroCopyDecoder: Send + Sync + 'static { /// If the outer decoder is framed, it can reads all the payload into a [`Bytes`] and /// call this function for better performance. @@ -85,7 +86,7 @@ pub trait ZeroCopyDecoder: Send + Sync + 'static { /// The [`DefaultDecoder`] will always call `decode_async`, so the most outer decoder /// must implement this function. - async fn decode_async< + fn decode_async< Msg: Send + EntryMessage, Cx: ThriftContext, R: AsyncRead + Unpin + Send + Sync, @@ -93,7 +94,7 @@ pub trait ZeroCopyDecoder: Send + Sync + 'static { &mut self, cx: &mut Cx, reader: &mut BufReader, - ) -> Result>, DecodeError>; + ) -> impl Future>, DecodeError>> + Send; } /// [`MakeZeroCopyCodec`] is used to create a [`ZeroCopyEncoder`] and a [`ZeroCopyDecoder`]. @@ -112,7 +113,6 @@ pub struct DefaultEncoder { linked_bytes: LinkedBytes, } -#[async_trait::async_trait] impl Encoder for DefaultEncoder { @@ -184,7 +184,6 @@ pub struct DefaultDecoder { reader: BufReader, } -#[async_trait::async_trait] impl Decoder for DefaultDecoder { diff --git a/volo-thrift/src/codec/default/thrift.rs b/volo-thrift/src/codec/default/thrift.rs index a3cdb046..a94f735a 100644 --- a/volo-thrift/src/codec/default/thrift.rs +++ b/volo-thrift/src/codec/default/thrift.rs @@ -104,7 +104,6 @@ impl Default for ThriftCodec { } } -#[async_trait::async_trait] impl ZeroCopyDecoder for ThriftCodec { #[inline] fn decode( diff --git a/volo-thrift/src/codec/default/ttheader.rs b/volo-thrift/src/codec/default/ttheader.rs index 3c3ec27c..fbbcc42c 100644 --- a/volo-thrift/src/codec/default/ttheader.rs +++ b/volo-thrift/src/codec/default/ttheader.rs @@ -70,7 +70,6 @@ impl TTHeaderDecoder { /// https://www.cloudwego.io/docs/kitex/reference/transport_protocol_ttheader/ pub const HEADER_DETECT_LENGTH: usize = 6; -#[async_trait::async_trait] impl ZeroCopyDecoder for TTHeaderDecoder where D: ZeroCopyDecoder, diff --git a/volo-thrift/src/codec/mod.rs b/volo-thrift/src/codec/mod.rs index e7454c4e..93118cb4 100644 --- a/volo-thrift/src/codec/mod.rs +++ b/volo-thrift/src/codec/mod.rs @@ -1,3 +1,5 @@ +use std::future::Future; + use tokio::io::{AsyncRead, AsyncWrite}; use crate::{context::ThriftContext, EntryMessage, ThriftMessage}; @@ -12,24 +14,22 @@ pub use default::DefaultMakeCodec; /// Returning an Ok(None) indicates the EOF has been reached. /// /// Note: [`Decoder`] should be designed to be ready for reuse. -#[async_trait::async_trait] pub trait Decoder: Send + 'static { - async fn decode( + fn decode( &mut self, cx: &mut Cx, - ) -> Result>, crate::Error>; + ) -> impl Future>, crate::Error>> + Send; } /// [`Encoder`] writes a [`ThriftMessage`] to an [`AsyncWrite`] and flushes the data. /// /// Note: [`Encoder`] should be designed to be ready for reuse. -#[async_trait::async_trait] pub trait Encoder: Send + 'static { - async fn encode( + fn encode( &mut self, cx: &mut Cx, msg: ThriftMessage, - ) -> Result<(), crate::Error>; + ) -> impl Future> + Send; } /// [`MakeCodec`] receives an [`AsyncRead`] and an [`AsyncWrite`] and returns a diff --git a/volo-thrift/src/error.rs b/volo-thrift/src/error.rs index 1fbf154b..6677435a 100644 --- a/volo-thrift/src/error.rs +++ b/volo-thrift/src/error.rs @@ -33,7 +33,6 @@ pub enum Error { #[derive(Debug, Clone, Copy)] pub struct DummyError; -#[async_trait::async_trait] impl Message for DummyError { fn encode(&self, _protocol: &mut T) -> Result<(), EncodeError> { panic!() @@ -121,6 +120,7 @@ impl From for Error { pilota::thrift::ProtocolErrorKind::Unknown, value.to_string(), )), + pilota::thrift::DecodeErrorKind::Unknown => protocol_err!(Unknown), } } } @@ -210,7 +210,6 @@ impl Display for ApplicationError { } } -#[async_trait::async_trait] impl Message for ApplicationError { /// Convert an `ApplicationError` into its wire representation and write /// it to the remote. diff --git a/volo-thrift/src/lib.rs b/volo-thrift/src/lib.rs index 33da6c56..415a65f9 100644 --- a/volo-thrift/src/lib.rs +++ b/volo-thrift/src/lib.rs @@ -1,4 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] #![doc( html_logo_url = "https://github.com/cloudwego/volo/raw/main/.github/assets/logo.png?sanitize=true" )] diff --git a/volo-thrift/src/message.rs b/volo-thrift/src/message.rs index 0c7b7a7b..6864942c 100644 --- a/volo-thrift/src/message.rs +++ b/volo-thrift/src/message.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{future::Future, sync::Arc}; pub use pilota::thrift::Message; use pilota::thrift::{ @@ -6,7 +6,6 @@ use pilota::thrift::{ TMessageIdentifier, TOutputProtocol, }; -#[async_trait::async_trait] pub trait EntryMessage: Sized + Send { fn encode(&self, protocol: &mut T) -> Result<(), EncodeError>; @@ -15,15 +14,14 @@ pub trait EntryMessage: Sized + Send { msg_ident: &TMessageIdentifier, ) -> Result; - async fn decode_async( + fn decode_async( protocol: &mut T, msg_ident: &TMessageIdentifier, - ) -> Result; + ) -> impl Future> + Send; fn size(&self, protocol: &mut T) -> usize; } -#[async_trait::async_trait] impl EntryMessage for Arc where Message: EntryMessage + Sync, diff --git a/volo-thrift/src/message_wrapper.rs b/volo-thrift/src/message_wrapper.rs index d2c1b7ee..f42897be 100644 --- a/volo-thrift/src/message_wrapper.rs +++ b/volo-thrift/src/message_wrapper.rs @@ -24,7 +24,6 @@ pub struct ThriftMessage { pub(crate) struct DummyMessage; -#[async_trait::async_trait] impl EntryMessage for DummyMessage { #[inline] fn encode(&self, _protocol: &mut T) -> Result<(), EncodeError> { @@ -186,7 +185,7 @@ where let res = match msg_ident.message_type { TMessageType::Exception => Err(crate::Error::Application( - Message::decode_async(protocol).await?, + ::decode_async(protocol).await?, )), _ => Ok(U::decode_async(protocol, &msg_ident).await?), }; diff --git a/volo-thrift/src/server.rs b/volo-thrift/src/server.rs index 42b94500..44a5d1d7 100644 --- a/volo-thrift/src/server.rs +++ b/volo-thrift/src/server.rs @@ -176,7 +176,6 @@ impl Server { MkC: MakeCodec, L::Service: Service + Send + 'static + Sync, >::Error: Into + Send, - for<'cx> >::Future<'cx>: Send, S: Service + Send + 'static, S::Error: Into + Send, Req: EntryMessage + Send + 'static, diff --git a/volo-thrift/src/transport/multiplex/client.rs b/volo-thrift/src/transport/multiplex/client.rs index e1068df9..d6acd514 100644 --- a/volo-thrift/src/transport/multiplex/client.rs +++ b/volo-thrift/src/transport/multiplex/client.rs @@ -1,6 +1,5 @@ use std::{io, marker::PhantomData}; -use futures::Future; use motore::service::{Service, UnaryService}; use pilota::thrift::TransportErrorKind; use volo::{ @@ -64,19 +63,16 @@ where { type Response = ThriftTransport; type Error = io::Error; - type Future<'s> = impl Future> + 's; - fn call(&self, target: Address) -> Self::Future<'_> { + async fn call(&self, target: Address) -> Result { let make_transport = self.make_transport.clone(); - async move { - let (rh, wh) = make_transport.make_transport(target.clone()).await?; - Ok(ThriftTransport::new( - rh, - wh, - self.make_codec.clone(), - target, - )) - } + let (rh, wh) = make_transport.make_transport(target.clone()).await?; + Ok(ThriftTransport::new( + rh, + wh, + self.make_codec.clone(), + target, + )) } } @@ -132,39 +128,32 @@ where type Error = crate::Error; - type Future<'cx> = impl Future> + Send + 'cx where Self:'cx; - - fn call<'cx, 's>( + async fn call<'cx, 's>( &'s self, cx: &'cx mut ClientContext, req: ThriftMessage, - ) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - let rpc_info = &cx.rpc_info; - let target = rpc_info.callee().volo_unwrap().address().ok_or_else(|| { - let msg = format!("address is required, rpcinfo: {:?}", rpc_info); - crate::Error::Transport(io::Error::new(io::ErrorKind::InvalidData, msg).into()) - })?; - let oneway = cx.message_type == TMessageType::OneWay; - cx.stats.record_make_transport_start_at(); - let transport = self.make_transport.call(target).await?; - cx.stats.record_make_transport_end_at(); - let resp = transport.send(cx, req, oneway).await; - if let Ok(None) = resp { - if !oneway { - return Err(Error::Transport(pilota::thrift::TransportError::new( - TransportErrorKind::EndOfFile, - format!("an unexpected end of file from server, cx: {:?}", cx), - ))); - } + ) -> Result { + let rpc_info = &cx.rpc_info; + let target = rpc_info.callee().volo_unwrap().address().ok_or_else(|| { + let msg = format!("address is required, rpcinfo: {:?}", rpc_info); + crate::Error::Transport(io::Error::new(io::ErrorKind::InvalidData, msg).into()) + })?; + let oneway = cx.message_type == TMessageType::OneWay; + cx.stats.record_make_transport_start_at(); + let transport = self.make_transport.call(target).await?; + cx.stats.record_make_transport_end_at(); + let resp = transport.send(cx, req, oneway).await; + if let Ok(None) = resp { + if !oneway { + return Err(Error::Transport(pilota::thrift::TransportError::new( + TransportErrorKind::EndOfFile, + format!("an unexpected end of file from server, cx: {:?}", cx), + ))); } - if cx.transport.should_reuse && resp.is_ok() { - transport.reuse(); - } - resp } + if cx.transport.should_reuse && resp.is_ok() { + transport.reuse(); + } + resp } } diff --git a/volo-thrift/src/transport/pingpong/client.rs b/volo-thrift/src/transport/pingpong/client.rs index 3aae4fff..498c5dad 100644 --- a/volo-thrift/src/transport/pingpong/client.rs +++ b/volo-thrift/src/transport/pingpong/client.rs @@ -1,6 +1,5 @@ use std::{io, marker::PhantomData}; -use futures::Future; use motore::service::{Service, UnaryService}; use pilota::thrift::{TransportError, TransportErrorKind}; use volo::{ @@ -51,15 +50,12 @@ where { type Response = ThriftTransport; type Error = io::Error; - type Future<'s> = impl Future> + 's; #[inline] - fn call(&self, target: Address) -> Self::Future<'_> { + async fn call(&self, target: Address) -> Result { let make_transport = self.make_transport.clone(); - async move { - let (rh, wh) = make_transport.make_transport(target).await?; - Ok(ThriftTransport::new(rh, wh, self.make_codec.clone())) - } + let (rh, wh) = make_transport.make_transport(target).await?; + Ok(ThriftTransport::new(rh, wh, self.make_codec.clone())) } } @@ -112,47 +108,40 @@ where type Error = crate::Error; - type Future<'cx> = impl Future> + Send + 'cx where Self:'cx; - #[inline] - fn call<'cx, 's>( + async fn call<'s, 'cx>( &'s self, cx: &'cx mut ClientContext, req: ThriftMessage, - ) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - let rpc_info = &cx.rpc_info; - let target = rpc_info.callee().volo_unwrap().address().ok_or_else(|| { - TransportError::from(io::Error::new( - io::ErrorKind::InvalidData, - format!("address is required, rpc_info: {:?}", rpc_info), - )) - })?; - let oneway = cx.message_type == TMessageType::OneWay; - cx.stats.record_make_transport_start_at(); - let mut transport = self.make_transport.call(target).await?; - cx.stats.record_make_transport_end_at(); - let resp = transport.send(cx, req, oneway).await; - if let Ok(None) = resp { - if !oneway { - return Err(crate::Error::Transport( - pilota::thrift::TransportError::new( - TransportErrorKind::EndOfFile, - format!( - "an unexpected end of file from server, rpc_info: {:?}", - cx.rpc_info - ), + ) -> Result { + let rpc_info = &cx.rpc_info; + let target = rpc_info.callee().volo_unwrap().address().ok_or_else(|| { + TransportError::from(io::Error::new( + io::ErrorKind::InvalidData, + format!("address is required, rpc_info: {:?}", rpc_info), + )) + })?; + let oneway = cx.message_type == TMessageType::OneWay; + cx.stats.record_make_transport_start_at(); + let mut transport = self.make_transport.call(target).await?; + cx.stats.record_make_transport_end_at(); + let resp = transport.send(cx, req, oneway).await; + if let Ok(None) = resp { + if !oneway { + return Err(crate::Error::Transport( + pilota::thrift::TransportError::new( + TransportErrorKind::EndOfFile, + format!( + "an unexpected end of file from server, rpc_info: {:?}", + cx.rpc_info ), - )); - } + ), + )); } - if cx.transport.should_reuse && resp.is_ok() { - transport.reuse(); - } - resp } + if cx.transport.should_reuse && resp.is_ok() { + transport.reuse(); + } + resp } } diff --git a/volo-thrift/src/transport/pool/make_transport.rs b/volo-thrift/src/transport/pool/make_transport.rs index d460d445..15c27adf 100644 --- a/volo-thrift/src/transport/pool/make_transport.rs +++ b/volo-thrift/src/transport/pool/make_transport.rs @@ -2,7 +2,6 @@ use std::{fmt::Debug, hash::Hash}; -use futures::Future; use motore::{service::UnaryService, BoxError}; use super::{Pool, Poolable, Pooled}; @@ -55,10 +54,8 @@ where type Error = BoxError; - type Future<'cx> = impl Future> + 'cx; - - fn call(&self, key: Key) -> Self::Future<'_> { + async fn call(&self, key: Key) -> Result { let mt = self.inner.clone(); - async move { self.pool.get(key, mt).await } + self.pool.get(key, mt).await } } diff --git a/volo/Cargo.toml b/volo/Cargo.toml index eb980b17..b7c77457 100644 --- a/volo/Cargo.toml +++ b/volo/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volo" -version = "0.5.5" +version = "0.8.0" edition.workspace = true homepage.workspace = true repository.workspace = true @@ -23,7 +23,6 @@ maintenance = { status = "actively-developed" } motore.workspace = true async-broadcast.workspace = true -async-trait.workspace = true dashmap.workspace = true faststr.workspace = true futures.workspace = true @@ -31,7 +30,13 @@ lazy_static.workspace = true libc.workspace = true metainfo.workspace = true mur3.workspace = true -nix.workspace = true +nix = { workspace = true, features = [ + "uio", + "socket", + "process", + "signal", + "feature", +] } once_cell.workspace = true pin-project.workspace = true rand.workspace = true diff --git a/volo/src/client.rs b/volo/src/client.rs index de4c1223..14498406 100644 --- a/volo/src/client.rs +++ b/volo/src/client.rs @@ -26,16 +26,12 @@ pub trait OneShotService { /// Errors produced by the service. type Error; - /// The future response value. - type Future<'cx>: Future> + Send + 'cx - where - Cx: 'cx, - Self: 'cx; - /// Process the request and return the response asynchronously. - fn call<'cx>(self, cx: &'cx mut Cx, req: Request) -> Self::Future<'cx> - where - Self: 'cx; + fn call<'cx>( + self, + cx: &'cx mut Cx, + req: Request, + ) -> impl Future> + Send; } impl OneShotService for WithOptService @@ -44,22 +40,17 @@ where Opt: 'static + Send + Sync + Apply, Req: 'static + Send, S: Service + 'static + Sync + Send, - for<'cx> S::Future<'cx>: Send, { type Response = S::Response; type Error = S::Error; - type Future<'cx> = impl Future> + Send + 'cx - where - Cx: 'cx, - Self: 'cx; - #[inline] - fn call<'cx>(self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx> - where - Self: 'cx, - { + fn call<'cx>( + self, + cx: &'cx mut Cx, + req: Req, + ) -> impl Future> + Send { async move { self.opt.apply(cx)?; self.inner.call(cx, req).await diff --git a/volo/src/discovery/mod.rs b/volo/src/discovery/mod.rs index 0f4a2c11..d2e05371 100644 --- a/volo/src/discovery/mod.rs +++ b/volo/src/discovery/mod.rs @@ -31,11 +31,12 @@ pub trait Discover: Send + Sync + 'static { type Key: Hash + PartialEq + Eq + Send + Sync + Clone + 'static; /// `Error` is the discovery error. type Error: Into; - /// `DiscFut` is a Future object which returns a discovery result. - type DiscFut<'future>: Future>, Self::Error>> + Send + 'future; /// `discover` allows to request an endpoint and return a discover future. - fn discover<'s>(&'s self, endpoint: &'s Endpoint) -> Self::DiscFut<'s>; + fn discover<'s>( + &'s self, + endpoint: &'s Endpoint, + ) -> impl Future>, Self::Error>> + Send; /// `key` should return a key suitable for cache. fn key(&self, endpoint: &Endpoint) -> Self::Key; /// `watch` should return a [`async_broadcast::Receiver`] which can be used to subscribe @@ -150,10 +151,9 @@ impl From> for StaticDiscover { impl Discover for StaticDiscover { type Key = (); type Error = Infallible; - type DiscFut<'a> = impl Future>, Self::Error>> + 'a; - fn discover(&self, _: &Endpoint) -> Self::DiscFut<'_> { - async { Ok(self.instances.clone()) } + async fn discover<'s>(&'s self, _: &'s Endpoint) -> Result>, Self::Error> { + Ok(self.instances.clone()) } fn key(&self, _: &Endpoint) -> Self::Key {} @@ -172,10 +172,9 @@ pub struct DummyDiscover; impl Discover for DummyDiscover { type Key = (); type Error = Infallible; - type DiscFut<'a> = impl Future>, Self::Error>> + 'a; - fn discover(&self, _: &Endpoint) -> Self::DiscFut<'_> { - async { Ok(vec![]) } + async fn discover<'s>(&'s self, _: &'s Endpoint) -> Result>, Self::Error> { + Ok(vec![]) } fn key(&self, _: &Endpoint) {} diff --git a/volo/src/lib.rs b/volo/src/lib.rs index bb0edd17..e34118db 100644 --- a/volo/src/lib.rs +++ b/volo/src/lib.rs @@ -1,4 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] #![doc( html_logo_url = "https://github.com/cloudwego/volo/raw/main/.github/assets/logo.png?sanitize=true" )] @@ -8,7 +7,6 @@ #[macro_use] mod cfg; -pub use async_trait::async_trait; pub use motore::{layer, layer::Layer, service, Service}; pub use tokio::main; diff --git a/volo/src/loadbalance/consistent_hash.rs b/volo/src/loadbalance/consistent_hash.rs index 43c6e3b1..4fb80bf4 100644 --- a/volo/src/loadbalance/consistent_hash.rs +++ b/volo/src/loadbalance/consistent_hash.rs @@ -1,4 +1,4 @@ -use std::{cmp::min, collections::HashSet, future::Future, hash::Hash, sync::Arc}; +use std::{cmp::min, collections::HashSet, hash::Hash, sync::Arc}; use dashmap::{mapref::entry::Entry, DashMap}; @@ -249,51 +249,40 @@ where D: Discover, { type InstanceIter = InstancePicker; - - type GetFut<'future> = - impl Future> + Send + 'future - where - Self: 'future; - - fn get_picker<'future>( + async fn get_picker<'future>( &'future self, endpoint: &'future Endpoint, discover: &'future D, - ) -> Self::GetFut<'future> - where - Self: 'future, - { - async move { - let request_hash = metainfo::METAINFO - .try_with(|m| m.borrow().get::().copied()) - .map_err(|_| LoadBalanceError::MissRequestHash)?; - if request_hash.is_none() { - return Err(LoadBalanceError::MissRequestHash); - } - let request_hash = request_hash.unwrap(); - let key = discover.key(endpoint); - let weighted_list = match self.router.entry(key) { - Entry::Occupied(e) => e.get().clone(), - Entry::Vacant(e) => { - let instances = Arc::new( - self.build_weighted_instances( - discover - .discover(endpoint) - .await - .map_err(|err| err.into())?, - ), - ); - e.insert(instances).value().clone() - } - }; - Ok(InstancePicker { - shared_instances: weighted_list, - request_hash, - last_pick: None, - used: HashSet::new(), - replicas: self.option.replicas, - }) + ) -> Result { + let request_hash = metainfo::METAINFO + .try_with(|m| m.borrow().get::().copied()) + .map_err(|_| LoadBalanceError::MissRequestHash)?; + if request_hash.is_none() { + return Err(LoadBalanceError::MissRequestHash); } + let request_hash = request_hash.unwrap(); + let key = discover.key(endpoint); + let weighted_list = match self.router.entry(key) { + Entry::Occupied(e) => e.get().clone(), + Entry::Vacant(e) => { + let instances = Arc::new( + self.build_weighted_instances( + discover + .discover(endpoint) + .await + .map_err(|err| err.into())?, + ), + ); + e.insert(instances).value().clone() + } + }; + Ok(InstancePicker { + shared_instances: weighted_list, + request_hash, + last_pick: None, + used: HashSet::new(), + replicas: self.option.replicas, + }) } fn rebalance(&self, changes: Change<::Key>) { diff --git a/volo/src/loadbalance/layer.rs b/volo/src/loadbalance/layer.rs index a6b22f10..2c5a7e1d 100644 --- a/volo/src/loadbalance/layer.rs +++ b/volo/src/loadbalance/layer.rs @@ -52,20 +52,16 @@ where LoadBalanceError: Into, S::Error: Debug + Retryable, Req: Clone + Send + Sync + 'static, - for<'cx> S::Future<'cx>: Send, { type Response = S::Response; type Error = S::Error; - type Future<'cx> = impl Future> + Send + 'cx - where - Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> impl Future> + Send { debug_assert!( cx.rpc_info().callee.is_some(), "must set callee endpoint before load balance service" diff --git a/volo/src/loadbalance/mod.rs b/volo/src/loadbalance/mod.rs index 84b8ab72..cc0b55d5 100644 --- a/volo/src/loadbalance/mod.rs +++ b/volo/src/loadbalance/mod.rs @@ -23,22 +23,13 @@ where /// `InstanceIter` is an iterator of [`crate::discovery::Instance`]. type InstanceIter: Iterator + Send; - /// `GetFut` is the return type of `get_picker`. - type GetFut<'future>: Future> - + Send - + 'future - where - Self: 'future; - /// `get_picker` allows to get an instance iterator of a specified endpoint from self or /// service discovery. fn get_picker<'future>( &'future self, endpoint: &'future Endpoint, discover: &'future D, - ) -> Self::GetFut<'future> - where - Self: 'future; + ) -> impl Future> + Send; /// `rebalance` is the callback method be used in service discovering subscription. fn rebalance(&self, changes: Change); } diff --git a/volo/src/loadbalance/random.rs b/volo/src/loadbalance/random.rs index 3d3dcced..13ca735e 100644 --- a/volo/src/loadbalance/random.rs +++ b/volo/src/loadbalance/random.rs @@ -1,5 +1,5 @@ use core::cell::OnceCell; -use std::{future::Future, hash::Hash, sync::Arc}; +use std::{hash::Hash, sync::Arc}; use dashmap::{mapref::entry::Entry, DashMap}; use rand::Rng; @@ -117,41 +117,31 @@ where { type InstanceIter = InstancePicker; - type GetFut<'future> = - impl Future> + Send + 'future - where - Self: 'future; - - fn get_picker<'future>( + async fn get_picker<'future>( &'future self, endpoint: &'future Endpoint, discover: &'future D, - ) -> Self::GetFut<'future> - where - Self: 'future, - { - async { - let key = discover.key(endpoint); - let weighted_list = match self.router.entry(key) { - Entry::Occupied(e) => e.get().clone(), - Entry::Vacant(e) => { - let instances = Arc::new(WeightedInstances::from( - discover - .discover(endpoint) - .await - .map_err(|err| err.into())?, - )); - e.insert(instances).value().clone() - } - }; - let sum_of_weights = weighted_list.sum_of_weights; - Ok(InstancePicker { - owned_instances: OnceCell::new(), - last_pick: None, - shared_instances: weighted_list, - sum_of_weights, - }) - } + ) -> Result { + let key = discover.key(endpoint); + let weighted_list = match self.router.entry(key) { + Entry::Occupied(e) => e.get().clone(), + Entry::Vacant(e) => { + let instances = Arc::new(WeightedInstances::from( + discover + .discover(endpoint) + .await + .map_err(|err| err.into())?, + )); + e.insert(instances).value().clone() + } + }; + let sum_of_weights = weighted_list.sum_of_weights; + Ok(InstancePicker { + owned_instances: OnceCell::new(), + last_pick: None, + shared_instances: weighted_list, + sum_of_weights, + }) } fn rebalance(&self, changes: Change) { diff --git a/volo/src/net/dial.rs b/volo/src/net/dial.rs index f0771681..f25d49e1 100644 --- a/volo/src/net/dial.rs +++ b/volo/src/net/dial.rs @@ -1,4 +1,4 @@ -use std::{io, net::SocketAddr}; +use std::{io, net::SocketAddr, future::Future}; use socket2::{Domain, Protocol, Socket, Type}; #[cfg(target_family = "unix")] @@ -15,12 +15,14 @@ use super::{ }; /// [`MakeTransport`] creates an [`AsyncRead`] and an [`AsyncWrite`] for the given [`Address`]. -#[async_trait::async_trait] pub trait MakeTransport: Clone + Send + Sync + 'static { type ReadHalf: AsyncRead + Send + Sync + Unpin + 'static; type WriteHalf: AsyncWrite + Send + Sync + Unpin + 'static; - async fn make_transport(&self, addr: Address) -> io::Result<(Self::ReadHalf, Self::WriteHalf)>; + fn make_transport( + &self, + addr: Address, + ) -> impl Future> + Send; fn set_connect_timeout(&mut self, timeout: Option); fn set_read_timeout(&mut self, timeout: Option); fn set_write_timeout(&mut self, timeout: Option); @@ -73,7 +75,6 @@ impl DefaultMakeTransport { } } -#[async_trait::async_trait] impl MakeTransport for DefaultMakeTransport { type ReadHalf = OwnedReadHalf; @@ -230,7 +231,6 @@ cfg_rustls_or_native_tls! { } } - #[async_trait::async_trait] impl MakeTransport for TlsMakeTransport { type ReadHalf = OwnedReadHalf; diff --git a/volo/src/net/incoming.rs b/volo/src/net/incoming.rs index 3329c8e3..e1750167 100644 --- a/volo/src/net/incoming.rs +++ b/volo/src/net/incoming.rs @@ -1,5 +1,7 @@ use std::{ - fmt, io, + fmt, + future::Future, + io, task::{Context, Poll}, }; @@ -22,7 +24,6 @@ pub enum DefaultIncoming { Unix(#[pin] UnixListenerStream), } -#[async_trait::async_trait] impl MakeIncoming for DefaultIncoming { type Incoming = DefaultIncoming; @@ -44,12 +45,10 @@ impl From for DefaultIncoming { } } -#[async_trait::async_trait] pub trait Incoming: fmt::Debug + Send + 'static { - async fn accept(&mut self) -> io::Result>; + fn accept(&mut self) -> impl Future>> + Send; } -#[async_trait::async_trait] impl Incoming for DefaultIncoming { async fn accept(&mut self) -> io::Result> { if let Some(conn) = self.try_next().await? { @@ -61,15 +60,13 @@ impl Incoming for DefaultIncoming { } } -#[async_trait::async_trait] pub trait MakeIncoming { type Incoming: Incoming; - async fn make_incoming(self) -> io::Result; + fn make_incoming(self) -> impl Future> + Send; } #[cfg(target_family = "unix")] -#[async_trait::async_trait] impl MakeIncoming for Address { type Incoming = DefaultIncoming; @@ -88,7 +85,6 @@ impl MakeIncoming for Address { } #[cfg(not(target_family = "unix"))] -#[async_trait::async_trait] impl MakeIncoming for Address { type Incoming = DefaultIncoming;