diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index b02b3bc9..19d8c4d0 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -8,7 +8,6 @@ on: pull_request: branches: ["*"] - workflow_dispatch: jobs: check: diff --git a/application.yaml b/application.yaml index d357db14..902b0efd 100644 --- a/application.yaml +++ b/application.yaml @@ -21,4 +21,37 @@ dubbo: references: GreeterClientImpl: url: tri://localhost:20000 - protocol: tri \ No newline at end of file + protocol: tri + routers: + consumer: + - service: "org.apache.dubbo.sample.tri.Greeter" + url: triple://localhost:20000 + protocol: triple + nacos: + addr: "127.0.0.1:8848" + namespace: "" + app: "" + conditions: + - scope: "service" + force: false + runtime: true + enabled: true + key: "org.apache.dubbo.sample.tri.Greeter" + conditions: + - method=greet => port=8889 + - scope: "service" + force: true + runtime: true + enabled: true + key: "user.UserService" + conditions: + - method=get_s => port=2003 + tags: + force: true + enabled: true + key: shop-detail + tags: + - name: gray + matches: + - key: env + value: gray \ No newline at end of file diff --git a/config/src/config.rs b/config/src/config.rs index 646873d1..cf4e4415 100644 --- a/config/src/config.rs +++ b/config/src/config.rs @@ -17,7 +17,7 @@ use std::{collections::HashMap, env, path::PathBuf}; -use crate::{protocol::Protocol, registry::RegistryConfig}; +use crate::{protocol::Protocol, registry::RegistryConfig, router::RouterConfig}; use dubbo_logger::tracing; use dubbo_utils::yaml_util::yaml_file_parser; use once_cell::sync::OnceCell; @@ -44,6 +44,9 @@ pub struct RootConfig { #[serde(default)] pub registries: HashMap, + #[serde(default)] + pub routers: RouterConfig, + #[serde(default)] pub data: HashMap, } @@ -63,6 +66,7 @@ impl RootConfig { protocols: HashMap::new(), registries: HashMap::new(), provider: ProviderConfig::new(), + routers: RouterConfig::default(), data: HashMap::new(), } } diff --git a/config/src/lib.rs b/config/src/lib.rs index 0748c667..6fa38801 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -21,4 +21,5 @@ pub mod config; pub mod protocol; pub mod provider; pub mod registry; +pub mod router; pub mod service; diff --git a/config/src/router.rs b/config/src/router.rs new file mode 100644 index 00000000..98aa14a6 --- /dev/null +++ b/config/src/router.rs @@ -0,0 +1,67 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)] +pub struct ConditionRouterConfig { + pub scope: String, + pub force: bool, + pub runtime: bool, + pub enabled: bool, + pub key: String, + pub conditions: Vec, +} + +#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)] +pub struct TagRouterConfig { + pub force: bool, + pub enabled: bool, + pub key: String, + pub tags: Vec, +} + +#[derive(Serialize, Deserialize, Clone, PartialEq, Default, Debug)] +pub struct ConsumerConfig { + pub service: String, + pub url: String, + pub protocol: String, +} + +#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)] +pub struct Tag { + pub name: String, + pub matches: Vec, +} + +#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)] +pub struct TagMatchRule { + pub key: String, + pub value: String, +} + +impl ConditionRouterConfig { + pub fn new(config: &String) -> Self { + serde_yaml::from_str(config).expect("parse error") + } +} + +#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq)] +pub struct EnableAuth { + pub auth_username: String, + pub auth_password: String, +} + +#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq)] +pub struct NacosConfig { + pub addr: String, + pub namespace: String, + pub app: String, + pub enable_auth: Option, + pub enable_auth_plugin_http: Option, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)] +pub struct RouterConfig { + pub consumer: Option>, + pub nacos: Option, + pub conditions: Option>, + pub tags: Option, +} diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml index d0ab2a4d..97c48992 100644 --- a/dubbo/Cargo.toml +++ b/dubbo/Cargo.toml @@ -14,7 +14,7 @@ hyper = { version = "0.14.26", features = ["full"] } http = "0.2" tower-service.workspace = true http-body = "0.4.4" -tower = { workspace = true, features = ["timeout"] } +tower = { workspace = true, features = ["timeout", "ready-cache"] } futures-util = "0.3.23" futures-core ="0.3.23" argh = "0.1" @@ -40,8 +40,14 @@ urlencoding.workspace = true lazy_static.workspace = true dubbo-base.workspace = true dubbo-logger.workspace = true +once_cell.workspace = true dubbo-config = { path = "../config", version = "0.3.0" } #对象存储 state = { version = "0.5", features = ["tls"] } + +thiserror = "1.0.48" +regex = "1.9.1" +nacos-sdk = { version = "0.3.0", features = ["default"] } +serde_yaml = "0.9.22" diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs index afe9657b..06b840f0 100644 --- a/dubbo/src/cluster/directory.rs +++ b/dubbo/src/cluster/directory.rs @@ -24,7 +24,6 @@ use std::{ use crate::{ codegen::TripleInvoker, - invocation::{Invocation, RpcInvocation}, protocol::BoxInvoker, registry::{memory_registry::MemoryNotifyListener, BoxRegistry}, }; @@ -51,7 +50,7 @@ impl StaticDirectory { panic!("http uri parse error: {}, host: {}", err, host) } }; - StaticDirectory { uri: uri } + StaticDirectory { uri } } pub fn from_uri(uri: &http::Uri) -> StaticDirectory { @@ -60,12 +59,12 @@ impl StaticDirectory { } impl Directory for StaticDirectory { - fn list(&self, invocation: Arc) -> Vec { + fn list(&self, service_name: String) -> Vec { let url = Url::from_url(&format!( "tri://{}:{}/{}", self.uri.host().unwrap(), self.uri.port().unwrap(), - invocation.get_target_service_unique_name(), + service_name, )) .unwrap(); let invoker = Box::new(TripleInvoker::new(url)); @@ -89,8 +88,7 @@ impl RegistryDirectory { } impl Directory for RegistryDirectory { - fn list(&self, invocation: Arc) -> Vec { - let service_name = invocation.get_target_service_unique_name(); + fn list(&self, service_name: String) -> Vec { let url = Url::from_url(&format!( "triple://{}:{}/{}", diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs index d1f96f95..7411a675 100644 --- a/dubbo/src/cluster/mod.rs +++ b/dubbo/src/cluster/mod.rs @@ -15,28 +15,29 @@ * limitations under the License. */ -use std::{collections::HashMap, fmt::Debug, sync::Arc, task::Poll}; +use std::{fmt::Debug, sync::Arc}; -use aws_smithy_http::body::SdkBody; use dubbo_base::Url; -use dyn_clone::DynClone; +use thiserror::Error; +use tower::{ready_cache::ReadyCache, ServiceExt}; +use tower_service::Service; use crate::{ - empty_body, - invocation::RpcInvocation, - protocol::{BoxInvoker, Invoker}, + codegen::RpcInvocation, + invocation::Invocation, + protocol::{triple::triple_invoker::TripleInvoker, BoxInvoker, Invoker}, + triple::client::replay::ClonedBody, + StdError, }; pub mod directory; pub mod loadbalance; +pub mod router; -pub trait Directory: Debug + DynClone { - fn list(&self, invocation: Arc) -> Vec; - // fn is_empty(&self) -> bool; +pub trait Directory: Debug { + fn list(&self, service_name: String) -> Vec; } -dyn_clone::clone_trait_object!(Directory); - type BoxDirectory = Box; pub trait Cluster { @@ -51,18 +52,62 @@ impl Cluster for MockCluster { Box::new(FailoverCluster::new(dir)) } } -#[derive(Clone, Debug)] + +// 在Cluster上进行缓存Service +#[derive(Debug)] pub struct FailoverCluster { dir: Arc, + caches: ReadyCache>, } impl FailoverCluster { pub fn new(dir: BoxDirectory) -> FailoverCluster { - Self { dir: Arc::new(dir) } + Self { + dir: Arc::new(dir), + caches: ReadyCache::default(), + } } } -impl Invoker> for FailoverCluster { +#[derive(Error, Debug)] +#[error("no available service for {0}")] +pub struct NoAvailableServiceErr(String); + +#[derive(Error, Debug)] +#[error("invalid service name {0}")] +pub struct InvalidServiceNameErr(String); + +impl FailoverCluster { + async fn invoke( + req: http::Request, + mut invoker: BoxInvoker, + ) -> Result, (StdError, http::Request)> { + let clone_request = FailoverCluster::clone_request(&req); + let invoker = invoker + .ready() + .await + .map_err(|e| (e, FailoverCluster::clone_request(&req)))?; + let ret = invoker.call(req).await.map_err(|e| (e, clone_request))?; + + Ok(ret) + } + + fn clone_request(req: &http::Request) -> http::Request { + let mut clone = http::Request::new(req.body().clone()); + *clone.method_mut() = req.method().clone(); + *clone.uri_mut() = req.uri().clone(); + *clone.headers_mut() = req.headers().clone(); + *clone.version_mut() = req.version(); + + if let Some(inv) = req.extensions().get::().cloned() { + clone.extensions_mut().insert(inv); + } + + clone + } +} + +impl Service> for FailoverCluster { type Response = http::Response; type Error = crate::Error; @@ -71,109 +116,346 @@ impl Invoker> for FailoverCluster { fn poll_ready( &mut self, - _cx: &mut std::task::Context<'_>, + cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - // if self.dir.is_empty() return err - Poll::Ready(Ok(())) + self.caches.poll_pending(cx).map_err(|e| e.into()) } - fn call(&mut self, req: http::Request) -> Self::Future { - // let clone_body = req.body().try_clone().unwrap(); - // let mut clone_req = http::Request::builder() - // .uri(req.uri().clone()) - // .method(req.method().clone()); - // *clone_req.headers_mut().unwrap() = req.headers().clone(); - // let r = clone_req.body(clone_body).unwrap(); - let invokers = self.dir.list( - RpcInvocation::default() - .with_service_unique_name("hello".to_string()) - .into(), - ); - for mut invoker in invokers { - let fut = async move { - let res = invoker.call(req).await; - return res; - }; - return Box::pin(fut); + fn call(&mut self, req: http::Request) -> Self::Future { + let inv = req.extensions().get::(); + if inv.is_none() { + return Box::pin(async move { + return Err( + InvalidServiceNameErr("service name must not be null".to_owned()).into(), + ); + }); } + + let inv = inv.unwrap(); + let service_name = inv.get_target_service_unique_name(); + + let invokers = self.dir.list(service_name.clone()); + Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) + let mut current_req = req; + let mut last_err = None; + + let is_empty = invokers.is_empty(); + if is_empty { + return Err(NoAvailableServiceErr(service_name).into()); + } + + for invoker in invokers { + match FailoverCluster::invoke(current_req, invoker).await { + Ok(resp) => return Ok(resp), + Err((e, cloned_request)) => { + current_req = cloned_request; + last_err = Some(e); + } + } + } + + if last_err.is_none() { + return Err(NoAvailableServiceErr(service_name).into()); + } + + return Err(last_err.unwrap()); }) } +} +impl Invoker> for FailoverCluster { fn get_url(&self) -> dubbo_base::Url { Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap() } } -#[derive(Debug, Default, Clone)] +#[derive(Debug, Default)] pub struct MockDirectory { // router_chain: RouterChain, - invokers: Vec, } impl MockDirectory { - pub fn new(invokers: Vec) -> MockDirectory { + pub fn new() -> MockDirectory { + // let router_chain = get_global_router_manager().read().unwrap().get_router_chain(invocation); Self { - // router_chain: RouterChain::default(), - invokers, + // router_chain } } } impl Directory for MockDirectory { - fn list(&self, _invo: Arc) -> Vec { + fn list(&self, service_name: String) -> Vec { // tracing::info!("MockDirectory: {}", meta); - let _u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap(); - // vec![Box::new(TripleInvoker::new(u))] + let u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap(); + vec![Box::new(TripleInvoker::new(u))] // self.router_chain.route(u, invo); - self.invokers.clone() } - - // fn is_empty(&self) -> bool { - // false - // } } -#[derive(Debug, Default)] -pub struct RouterChain { - router: HashMap, - invokers: Vec, -} -impl RouterChain { - pub fn route(&self, url: Url, invo: Arc) -> Vec { - let r = self.router.get("mock").unwrap(); - r.route(self.invokers.clone(), url, invo) +// #[derive(Debug, Default)] +// pub struct RouterChain { +// router: HashMap, +// invokers: Arc>, +// } + +// impl RouterChain { +// pub fn route(&mut self, url: Url, invo: Arc) -> Arc> { +// let r = self.router.get("mock").unwrap(); +// r.route(self.invokers.clone(), url, invo) +// } +// } + +// pub trait Router: Debug { +// fn route( +// &self, +// invokers: Arc>, +// url: Url, +// invo: Arc, +// ) -> Arc>; +// } + +// pub type BoxRouter = Box; + +#[cfg(test)] +pub mod tests { + use std::task::Poll; + + use bytes::{Buf, BufMut, BytesMut}; + use dubbo_base::Url; + use futures_util::future::poll_fn; + use http::StatusCode; + use http_body::Body; + use thiserror::Error; + use tower::ServiceExt; + use tower_service::Service; + + use crate::{ + boxed, + cluster::FailoverCluster, + codegen::{Invoker, RpcInvocation}, + empty_body, + invocation::Invocation, + triple::client::replay::ClonedBody, + }; + + use super::Directory; + + #[derive(Error, Debug)] + #[error("{0}")] + struct NoResponseErr(String); + + #[derive(Debug)] + struct MockDirectory; + + impl Directory for MockDirectory { + fn list(&self, service_name: String) -> Vec { + println!("get invoker list for {}", service_name); + + vec![ + Box::new(MockInvoker(1)), + Box::new(MockInvoker(2)), + Box::new(MockInvoker(3)), + Box::new(MockInvoker(4)), + Box::new(MockInvoker(5)), + ] + } } -} -pub trait Router: Debug { - fn route( - &self, - invokers: Vec, - url: Url, - invo: Arc, - ) -> Vec; -} + #[derive(Debug)] + struct MockInvoker(u8); -pub type BoxRouter = Box; + impl Invoker> for MockInvoker { + fn get_url(&self) -> dubbo_base::Url { + let str = format!( + "triple://127.0.0.1:8888/failover_cluster_service/{}", + self.0 + ); + Url::from_url(str.as_str()).unwrap() + } + } -#[derive(Debug, Default)] -pub struct MockRouter {} - -impl Router for MockRouter { - fn route( - &self, - invokers: Vec, - _url: Url, - _invo: Arc, - ) -> Vec { - invokers + impl Service> for MockInvoker { + type Response = http::Response; + + type Error = crate::Error; + + type Future = crate::BoxFuture, crate::Error>; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: http::Request) -> Self::Future { + let inv = req.extensions().get::(); + if inv.is_none() { + return Box::pin(async move { + let response = http::Response::builder() + .status(StatusCode::OK) + .body(empty_body()) + .unwrap(); + + return Ok(response); + }); + } + + let inv = inv.unwrap(); + let method_name = inv.get_method_name(); + if method_name.eq("invoker_request") { + return Box::pin(async move { + let response = http::Response::builder() + .status(StatusCode::OK) + .body(boxed("invoker response".to_owned())) + .unwrap(); + + return Ok(response); + }); + } + + let self_index = self.0; + if method_name.eq("failover_request") { + return Box::pin(async move { + let body = req.into_body(); + let mut pin_body = Box::pin(body); + let ret = poll_fn(|cx| pin_body.as_mut().poll_data(cx)).await; + + if ret.is_none() { + #[derive(Error, Debug)] + #[error("{0}")] + struct BodyIsNoneErr(&'static str); + + return Err(BodyIsNoneErr("body must not be null").into()); + } + + let ret = ret.unwrap(); + + if ret.is_err() { + #[derive(Error, Debug)] + #[error("{0}")] + struct BodyIsErr(&'static str); + return Err(BodyIsErr("body must be ok").into()); + } + + let mut ret = ret.unwrap(); + + let index = ret.get_u8(); + + if index == self_index { + let ret_msg = format!("failover cluster index: {} was invoked", self_index); + let response = http::Response::builder() + .status(StatusCode::OK) + .body(boxed(ret_msg)) + .unwrap(); + + return Ok(response); + } + + #[derive(Error, Debug)] + #[error("{0}")] + struct NotTargetInvoker(String); + + let ret_msg = format!( + "failover cluster index: {} was invoked, but is not target invoker {}", + self_index, index + ); + + println!("{}", ret_msg); + return Err(NotTargetInvoker(ret_msg).into()); + }); + } + + return Box::pin(async move { return Err(NoResponseErr(method_name).into()) }); + } + } + + #[tokio::test] + async fn test_failover_cluster() { + let mut cluster = FailoverCluster::new(Box::new(MockDirectory)); + let cluster = cluster.ready().await; + assert!(cluster.is_ok()); + + let cluster = cluster.unwrap(); + + let empty_stream = futures::stream::empty(); + let cloned_body = ClonedBody::new(empty_stream); + + let rpc_inv = RpcInvocation::default() + .with_service_unique_name("failover_cluster_service".to_owned()) + .with_method_name("invoker_request".to_owned()); + + let req = http::Request::builder() + .extension(rpc_inv) + .body(cloned_body) + .unwrap(); + + let ret = cluster.call(req).await; + assert!(ret.is_ok()); + + let ret = ret.unwrap(); + + assert_eq!(ret.status(), StatusCode::OK); + + let body = ret.into_body(); + + let mut pin = Box::pin(body); + let data = poll_fn(|cx| pin.as_mut().poll_data(cx)).await; + assert!(data.is_some()); + let data = data.unwrap(); + assert!(data.is_ok()); + let data = data.unwrap(); + + assert_eq!( + String::from_utf8(data.to_vec()).unwrap(), + "invoker response" + ) + } + + #[tokio::test] + async fn test_failover_request() { + let mut cluster = FailoverCluster::new(Box::new(MockDirectory)); + let cluster = cluster.ready().await; + assert!(cluster.is_ok()); + + let cluster = cluster.unwrap(); + + let once_stream = futures::stream::once(async { + let mut mut_bytes = BytesMut::default(); + mut_bytes.put_u8(5); + return Ok(mut_bytes.freeze()); + }); + let cloned_body = ClonedBody::new(once_stream); + + let rpc_inv = RpcInvocation::default() + .with_service_unique_name("failover_cluster_service".to_owned()) + .with_method_name("failover_request".to_owned()); + + let req = http::Request::builder() + .extension(rpc_inv) + .body(cloned_body) + .unwrap(); + + let ret = cluster.call(req).await; + assert!(ret.is_ok()); + + let ret = ret.unwrap(); + + assert_eq!(ret.status(), StatusCode::OK); + + let body = ret.into_body(); + + let mut pin = Box::pin(body); + let data = poll_fn(|cx| pin.as_mut().poll_data(cx)).await; + assert!(data.is_some()); + let data = data.unwrap(); + assert!(data.is_ok()); + let data = data.unwrap(); + + let resp_str = String::from_utf8(data.to_vec()).unwrap(); + println!("{}", resp_str); + assert_eq!(resp_str, "failover cluster index: 5 was invoked") } } diff --git a/dubbo/src/cluster/router/condition/condition_router.rs b/dubbo/src/cluster/router/condition/condition_router.rs new file mode 100644 index 00000000..f39cd1c7 --- /dev/null +++ b/dubbo/src/cluster/router/condition/condition_router.rs @@ -0,0 +1,60 @@ +use crate::{ + cluster::router::{condition::single_router::ConditionSingleRouter, Router}, + codegen::RpcInvocation, +}; +use dubbo_base::Url; +use std::{ + fmt::Debug, + sync::{Arc, RwLock}, +}; + +#[derive(Default, Debug, Clone)] +pub struct ConditionRouter { + //condition router for service scope + pub service_routers: Option>>, + //condition router for application scope + pub application_routers: Option>>, +} + +impl Router for ConditionRouter { + fn route(&self, invokers: Vec, url: Url, invo: Arc) -> Vec { + let mut invokers_result = invokers.clone(); + if let Some(routers) = self.application_routers.clone() { + for router in &routers.read().unwrap().routers { + invokers_result = router.route(invokers_result, url.clone(), invo.clone()) + } + } + if let Some(routers) = self.service_routers.clone() { + for router in &routers.read().unwrap().routers { + invokers_result = router.route(invokers_result, url.clone(), invo.clone()) + } + } + invokers_result + } +} + +impl ConditionRouter { + pub fn new( + service_routers: Option>>, + application_routers: Option>>, + ) -> Self { + Self { + service_routers, + application_routers, + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct ConditionSingleRouters { + pub routers: Vec, +} + +impl ConditionSingleRouters { + pub fn new(routers: Vec) -> Self { + Self { routers } + } + pub fn is_null(&self) -> bool { + self.routers.is_empty() + } +} diff --git a/dubbo/src/cluster/router/condition/matcher.rs b/dubbo/src/cluster/router/condition/matcher.rs new file mode 100644 index 00000000..8c9177e8 --- /dev/null +++ b/dubbo/src/cluster/router/condition/matcher.rs @@ -0,0 +1,94 @@ +use crate::codegen::RpcInvocation; +use regex::Regex; +use std::{collections::HashSet, error::Error, option::Option, sync::Arc}; + +#[derive(Clone, Debug, Default)] +pub struct ConditionMatcher { + _key: String, + matches: HashSet, + mismatches: HashSet, +} + +impl ConditionMatcher { + pub fn new(_key: String) -> Self { + ConditionMatcher { + _key, + matches: HashSet::new(), + mismatches: HashSet::new(), + } + } + + pub fn is_match( + &self, + value: Option, + invocation: Arc, + _is_when: bool, + ) -> Result> { + match value { + None => { + // if key does not present in whichever of url, invocation or attachment based on the matcher type, then return false. + Ok(false) + } + Some(val) => { + if !self.matches.is_empty() && self.mismatches.is_empty() { + for match_ in self.matches.iter() { + if self.do_pattern_match(match_, &val, invocation.clone())? { + return Ok(true); + } + } + Ok(false) + } else if !self.mismatches.is_empty() && self.matches.is_empty() { + for mismatch in self.mismatches.iter() { + if self.do_pattern_match(mismatch, &val, invocation.clone())? { + return Ok(false); + } + } + Ok(true) + } else if !self.matches.is_empty() && !self.mismatches.is_empty() { + for mismatch in self.mismatches.iter() { + if self.do_pattern_match(mismatch, &val, invocation.clone())? { + return Ok(false); + } + } + for match_ in self.matches.iter() { + if self.do_pattern_match(match_, &val, invocation.clone())? { + return Ok(true); + } + } + Ok(false) + } else { + Ok(false) + } + } + } + } + pub fn get_matches(&mut self) -> &mut HashSet { + &mut self.matches + } + pub fn get_mismatches(&mut self) -> &mut HashSet { + &mut self.mismatches + } + + fn do_pattern_match( + &self, + pattern: &String, + value: &String, + _invocation: Arc, + ) -> Result> { + if pattern.contains("*") { + return Ok(star_matcher(pattern, value)); + } + Ok(pattern.eq(value)) + } +} + +pub fn star_matcher(pattern: &str, input: &str) -> bool { + // 将*替换为任意字符的正则表达式 + let pattern = pattern.replace("*", ".*"); + let regex = Regex::new(&pattern).unwrap(); + regex.is_match(input) +} + +pub fn _range_matcher(val: i32, min: i32, max: i32) -> bool { + min <= val && val <= max +} diff --git a/dubbo/src/cluster/router/condition/mod.rs b/dubbo/src/cluster/router/condition/mod.rs new file mode 100644 index 00000000..7285b88f --- /dev/null +++ b/dubbo/src/cluster/router/condition/mod.rs @@ -0,0 +1,3 @@ +pub mod condition_router; +pub mod matcher; +pub mod single_router; diff --git a/dubbo/src/cluster/router/condition/single_router.rs b/dubbo/src/cluster/router/condition/single_router.rs new file mode 100644 index 00000000..e290b150 --- /dev/null +++ b/dubbo/src/cluster/router/condition/single_router.rs @@ -0,0 +1,225 @@ +use dubbo_base::Url; +use dubbo_logger::tracing::info; +use regex::Regex; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; + +use crate::{ + cluster::router::{condition::matcher::ConditionMatcher, utils::to_original_map, Router}, + codegen::RpcInvocation, + invocation::Invocation, +}; + +#[derive(Debug, Clone, Default)] +pub struct ConditionSingleRouter { + pub name: String, + pub when_condition: HashMap>>, + pub then_condition: HashMap>>, + pub enabled: bool, + pub force: bool, +} + +impl Router for ConditionSingleRouter { + fn route(&self, invokers: Vec, url: Url, invocation: Arc) -> Vec { + if !self.enabled { + return invokers; + }; + let mut result = Vec::new(); + if self.match_when(url.clone(), invocation.clone()) { + for invoker in &invokers.clone() { + if self.match_then(invoker.clone(), invocation.clone()) { + result.push(invoker.clone()); + } + } + if result.is_empty() && self.force == false { + invokers + } else { + result + } + } else { + invokers + } + } +} + +impl ConditionSingleRouter { + pub fn new(rule: String, force: bool, enabled: bool) -> Self { + let mut router = Self { + name: "condition".to_string(), + when_condition: HashMap::new(), + then_condition: HashMap::new(), + enabled, + force, + }; + if enabled { + router.init(rule).expect("parse rule error"); + } + router + } + + fn init(&mut self, rule: String) -> Result<(), Box> { + match rule.trim().is_empty() { + true => Err("Illegal route rule!".into()), + false => { + let r = rule.replace("consumer.", "").replace("provider.", ""); + let i = r.find("=>").unwrap_or_else(|| r.len()); + let when_rule = r[..i].trim().to_string(); + let then_rule = r[(i + 2)..].trim().to_string(); + let when = if when_rule.is_empty() || when_rule == "true" { + HashMap::new() + } else { + self.parse_rule(&when_rule)? + }; + let then = if then_rule.is_empty() || then_rule == "false" { + HashMap::new() + } else { + self.parse_rule(&then_rule)? + }; + self.when_condition = when; + self.then_condition = then; + Ok(()) + } + } + } + + fn parse_rule( + &mut self, + rule: &str, + ) -> Result>>, Box> { + let mut conditions: HashMap>> = HashMap::new(); + let mut current_matcher: Option>> = None; + let regex = Regex::new(r"([&!=,]*)\s*([^&!=,\s]+)").unwrap(); + for cap in regex.captures_iter(rule) { + let separator = &cap[1]; + let content = &cap[2]; + + match separator { + "" => { + let current_key = content.to_string(); + current_matcher = + Some(Arc::new(RwLock::new(self.get_matcher(current_key.clone())))); + conditions.insert( + current_key.clone(), + current_matcher.as_ref().unwrap().clone(), + ); + } + "&" => { + let current_key = content.to_string(); + current_matcher = conditions.get(¤t_key).cloned(); + if current_matcher.is_none() { + let matcher = Arc::new(RwLock::new(self.get_matcher(current_key.clone()))); + conditions.insert(current_key.clone(), matcher.clone()); + current_matcher = Some(matcher); + } + } + "=" => { + if let Some(matcher) = ¤t_matcher { + let mut matcher_borrowed = matcher.write().unwrap(); + matcher_borrowed + .get_matches() + .insert(content.to_string().clone()); + } else { + return Err("Error: ...".into()); + } + } + "!=" => { + if let Some(matcher) = ¤t_matcher { + let mut matcher_borrowed = matcher.write().unwrap(); + matcher_borrowed + .get_mismatches() + .insert(content.to_string().clone()); + } else { + return Err("Error: ...".into()); + } + } + "," => { + if let Some(matcher) = ¤t_matcher { + let mut matcher_borrowed = matcher.write().unwrap(); + if matcher_borrowed.get_matches().is_empty() + && matcher_borrowed.get_mismatches().is_empty() + { + return Err("Error: ...".into()); + } + drop(matcher_borrowed); + let mut matcher_borrowed_mut = matcher.write().unwrap(); + matcher_borrowed_mut + .get_matches() + .insert(content.to_string().clone()); + } else { + return Err("Error: ...".into()); + } + } + _ => { + return Err("Error: ...".into()); + } + } + } + Ok(conditions) + } + + // pub fn is_runtime(&self) -> bool { + // // same as the Java version + // } + + pub fn get_matcher(&self, key: String) -> ConditionMatcher { + ConditionMatcher::new(key) + } + + pub fn match_when(&self, url: Url, invocation: Arc) -> bool { + if self.when_condition.is_empty() { + true + } else { + false + }; + self.do_match(url, &self.when_condition, invocation, true) + } + + pub fn match_then(&self, url: Url, invocation: Arc) -> bool { + if self.when_condition.is_empty() { + true + } else { + false + }; + self.do_match(url, &self.then_condition, invocation, false) + } + + pub fn do_match( + &self, + url: Url, + conditions: &HashMap>>, + invocation: Arc, + is_when: bool, + ) -> bool { + let sample: HashMap = to_original_map(url); + for (key, condition_matcher) in conditions { + let matcher = condition_matcher.read().unwrap(); + let value = get_value(key, &sample, invocation.clone()); + match matcher.is_match(value, invocation.clone(), is_when) { + Ok(result) => { + if !result { + return false; + } + } + Err(error) => { + info!("Error occurred: {:?}", error); + return false; + } + } + } + true + } +} + +fn get_value( + key: &String, + sample: &HashMap, + invocation: Arc, +) -> Option { + if key == "method" { + let method_param = invocation.get_method_name(); + return Some(method_param); + } + sample.get(key).cloned() +} diff --git a/dubbo/src/cluster/router/manager/condition_manager.rs b/dubbo/src/cluster/router/manager/condition_manager.rs new file mode 100644 index 00000000..4207b79b --- /dev/null +++ b/dubbo/src/cluster/router/manager/condition_manager.rs @@ -0,0 +1,75 @@ +use crate::cluster::router::condition::{ + condition_router::{ConditionRouter, ConditionSingleRouters}, + single_router::ConditionSingleRouter, +}; +use dubbo_config::router::ConditionRouterConfig; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; + +#[derive(Debug, Clone, Default)] +pub struct ConditionRouterManager { + //Application-level routing applies globally, while service-level routing only affects a specific service. + pub routers_service: HashMap>>, + pub routers_application: Arc>, +} + +impl ConditionRouterManager { + pub fn get_router(&self, service_name: String) -> Option { + let routers_service = self.routers_service.get(&service_name); + match routers_service { + Some(routers_service) => { + if self.routers_application.read().unwrap().is_null() { + return Some(ConditionRouter::new(Some(routers_service.clone()), None)); + } + Some(ConditionRouter::new( + Some(routers_service.clone()), + Some(self.routers_application.clone()), + )) + } + None => { + if self.routers_application.read().unwrap().is_null() { + return None; + } + Some(ConditionRouter::new( + None, + Some(self.routers_application.clone()), + )) + } + } + } + + pub fn update(&mut self, config: ConditionRouterConfig) { + let force = config.force; + let scope = config.scope; + let key = config.key; + let enable = config.enabled; + let mut routers = Vec::new(); + for condition in config.conditions { + routers.push(ConditionSingleRouter::new(condition, force, enable)); + } + match scope.as_str() { + "application" => { + self.routers_application.write().unwrap().routers = routers; + } + "service" => { + if let Some(x) = self.routers_service.get(&key) { + x.write().unwrap().routers = routers + } else { + self.routers_service.insert( + key, + Arc::new(RwLock::new(ConditionSingleRouters::new(routers))), + ); + } + } + _ => {} + } + } + + pub fn _parse_rules(&mut self, configs: Vec) { + for config in configs { + self.update(config) + } + } +} diff --git a/dubbo/src/cluster/router/manager/mod.rs b/dubbo/src/cluster/router/manager/mod.rs new file mode 100644 index 00000000..025f6c16 --- /dev/null +++ b/dubbo/src/cluster/router/manager/mod.rs @@ -0,0 +1,3 @@ +mod condition_manager; +pub mod router_manager; +mod tag_manager; diff --git a/dubbo/src/cluster/router/manager/router_manager.rs b/dubbo/src/cluster/router/manager/router_manager.rs new file mode 100644 index 00000000..a2a66586 --- /dev/null +++ b/dubbo/src/cluster/router/manager/router_manager.rs @@ -0,0 +1,161 @@ +use crate::{ + cluster::router::{ + manager::{condition_manager::ConditionRouterManager, tag_manager::TagRouterManager}, + nacos_config_center::nacos_client::NacosClient, + router_chain::RouterChain, + }, + invocation::{Invocation, RpcInvocation}, +}; +use dubbo_base::Url; +use dubbo_config::{ + get_global_config, + router::{ConditionRouterConfig, NacosConfig, TagRouterConfig}, +}; +use dubbo_logger::tracing::{info, trace}; +use once_cell::sync::OnceCell; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; + +pub static GLOBAL_ROUTER_MANAGER: OnceCell>> = OnceCell::new(); + +pub struct RouterManager { + pub condition_router_manager: ConditionRouterManager, + pub tag_router_manager: TagRouterManager, + pub nacos: Option, + pub consumer: HashMap, +} + +impl RouterManager { + pub fn get_router_chain(&self, invocation: Arc) -> RouterChain { + let service = invocation.get_target_service_unique_name().clone(); + let condition_router = self.condition_router_manager.get_router(service.clone()); + let tag_router = self.tag_router_manager.get_router(); + let mut chain = RouterChain::new(); + match self.consumer.get(service.as_str()) { + None => {} + Some(url) => { + chain.set_condition_router(condition_router); + chain.set_tag_router(tag_router); + chain.self_url = url.clone(); + } + } + chain + } + + pub fn notify(&mut self, event: RouterConfigChangeEvent) { + match event.router_kind.as_str() { + "condition" => { + let config: ConditionRouterConfig = + serde_yaml::from_str(event.content.as_str()).unwrap(); + self.condition_router_manager.update(config) + } + "tag" => { + let config: TagRouterConfig = serde_yaml::from_str(event.content.as_str()).unwrap(); + self.tag_router_manager.update(config) + } + _ => { + info!("other router change event") + } + } + } + + pub fn init_nacos(&mut self, config: NacosConfig) { + self.nacos = Some(NacosClient::new_init_client(config)); + self.init_router_managers_for_nacos(); + } + + pub fn init_router_managers_for_nacos(&mut self) { + let config = self + .nacos + .as_ref() + .unwrap() + .get_tag_config("application".to_string()); + match config { + None => {} + Some(tag_config) => { + self.tag_router_manager.init(); + self.tag_router_manager.update(tag_config) + } + } + for (service_name, _) in &self.consumer { + let config = self + .nacos + .as_ref() + .unwrap() + .get_condition_config(service_name.clone()); + match config { + None => {} + Some(condition_config) => self.condition_router_manager.update(condition_config), + } + } + } + + pub fn init(&mut self) { + let config = get_global_config().routers.clone(); + let consumer_configs = get_global_config() + .routers + .consumer + .clone() + .unwrap_or(Vec::new()); + for consumer_config in consumer_configs { + self.consumer.insert( + consumer_config.service.clone(), + Url::from_url( + format!("{}/{}", consumer_config.url, consumer_config.service).as_str(), + ) + .expect("consumer配置出错!Url生成错误"), + ); + } + match &config.nacos { + None => { + trace!("Nacos not configured, using local YAML configuration for routing"); + let condition = config.conditions.clone(); + match condition { + None => { + info!("Unconfigured Condition Router") + } + Some(cons) => { + for con in cons { + self.condition_router_manager.update(con) + } + } + } + let tag = config.tags.clone(); + match tag { + None => { + info!("Unconfigured Tag Router") + } + Some(ta) => { + self.tag_router_manager.init(); + self.tag_router_manager.update(ta) + } + } + } + Some(config) => { + self.init_nacos(config.clone()); + } + } + } +} + +pub fn get_global_router_manager() -> &'static Arc> { + GLOBAL_ROUTER_MANAGER.get_or_init(|| { + let mut router_manager = RouterManager { + condition_router_manager: ConditionRouterManager::default(), + tag_router_manager: TagRouterManager::default(), + nacos: None, + consumer: HashMap::new(), + }; + router_manager.init(); + return Arc::new(RwLock::new(router_manager)); + }) +} + +#[derive(Debug, Default, Clone)] +pub struct RouterConfigChangeEvent { + pub service_name: String, + pub router_kind: String, + pub content: String, +} diff --git a/dubbo/src/cluster/router/manager/tag_manager.rs b/dubbo/src/cluster/router/manager/tag_manager.rs new file mode 100644 index 00000000..ce30c92e --- /dev/null +++ b/dubbo/src/cluster/router/manager/tag_manager.rs @@ -0,0 +1,27 @@ +use crate::cluster::router::tag::tag_router::TagRouter; +use dubbo_config::router::TagRouterConfig; +use std::sync::{Arc, RwLock}; + +#[derive(Debug, Clone, Default)] +pub struct TagRouterManager { + pub tag_router: Option>>, +} + +impl TagRouterManager { + pub fn init(&mut self) { + self.tag_router = Some(Arc::new(RwLock::new(TagRouter::default()))) + } + + pub fn get_router(&self) -> Option>> { + self.tag_router.clone() + } + + pub fn update(&mut self, config: TagRouterConfig) { + self.tag_router + .as_ref() + .unwrap() + .write() + .unwrap() + .parse_config(config) + } +} diff --git a/dubbo/src/cluster/router/mod.rs b/dubbo/src/cluster/router/mod.rs new file mode 100644 index 00000000..84ceae15 --- /dev/null +++ b/dubbo/src/cluster/router/mod.rs @@ -0,0 +1,25 @@ +pub mod condition; +pub mod manager; +pub mod nacos_config_center; +pub mod router_chain; +pub mod tag; +pub mod utils; + +use crate::invocation::RpcInvocation; +use dubbo_base::Url; +use std::{fmt::Debug, sync::Arc}; + +pub trait Router: Debug + Clone { + fn route(&self, invokers: Vec, url: Url, invo: Arc) -> Vec; +} + +// pub type BoxRouter = Box; + +#[derive(Debug, Default, Clone)] +pub struct MockRouter {} + +impl Router for MockRouter { + fn route(&self, invokers: Vec, _url: Url, _invo: Arc) -> Vec { + invokers + } +} diff --git a/dubbo/src/cluster/router/nacos_config_center/mod.rs b/dubbo/src/cluster/router/nacos_config_center/mod.rs new file mode 100644 index 00000000..7878fa9f --- /dev/null +++ b/dubbo/src/cluster/router/nacos_config_center/mod.rs @@ -0,0 +1 @@ +pub mod nacos_client; diff --git a/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs b/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs new file mode 100644 index 00000000..1600e9b0 --- /dev/null +++ b/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs @@ -0,0 +1,132 @@ +use crate::cluster::router::manager::router_manager::{ + get_global_router_manager, RouterConfigChangeEvent, +}; +use dubbo_config::router::{ConditionRouterConfig, NacosConfig, TagRouterConfig}; +use dubbo_logger::{tracing, tracing::info}; +use nacos_sdk::api::{ + config::{ConfigChangeListener, ConfigResponse, ConfigService, ConfigServiceBuilder}, + props::ClientProps, +}; +use std::sync::{Arc, RwLock}; + +pub struct NacosClient { + pub client: Arc>, +} + +unsafe impl Send for NacosClient {} + +unsafe impl Sync for NacosClient {} + +pub struct ConfigChangeListenerImpl; + +impl NacosClient { + pub fn new_init_client(config: NacosConfig) -> Self { + let server_addr = config.addr; + let namespace = config.namespace; + let app = config.app; + match config.enable_auth { + None => { + info!("disable nacos auth!"); + info!("nacos init,addr:{}", server_addr); + let client = Arc::new(RwLock::new( + ConfigServiceBuilder::new( + ClientProps::new() + .server_addr(server_addr) + .namespace(namespace) + .app_name(app), + ) + .build() + .expect("NacosClient build failed!Please check NacosConfig"), + )); + Self { client } + } + Some(auth) => { + info!("enable nacos auth!"); + info!("nacos init,addr:{}", server_addr); + let client = Arc::new(RwLock::new( + ConfigServiceBuilder::new( + ClientProps::new() + .server_addr(server_addr) + .namespace(namespace) + .app_name(app) + .auth_username(auth.auth_username) + .auth_password(auth.auth_password), + ) + // .enable_auth_plugin_http() + .build() + .expect("NacosClient build failed!Please check NacosConfig"), + )); + return Self { client }; + } + } + } + pub fn get_condition_config(&self, data_id: String) -> Option { + let config_resp = self + .client + .read() + .unwrap() + .get_config(data_id.clone(), "condition".to_string()); + return match config_resp { + Ok(config_resp) => { + self.add_listener(data_id.clone(), "condition".to_string()); + let string = config_resp.content(); + Some(serde_yaml::from_str(string).unwrap()) + } + Err(_err) => None, + }; + } + + pub fn get_tag_config(&self, data_id: String) -> Option { + let config_resp = self + .client + .read() + .unwrap() + .get_config(data_id.clone(), "tag".to_string()); + return match config_resp { + Ok(config_resp) => { + self.add_listener(data_id.clone(), "tag".to_string()); + let string = config_resp.content(); + let result = serde_yaml::from_str(string); + match result { + Ok(config) => { + info!("success to get TagRouter config and parse success"); + Some(config) + } + _ => { + info!("failed to parse TagRouter rule"); + None + } + } + } + Err(_err) => None, + }; + } + pub fn add_listener(&self, data_id: String, group: String) { + let res_listener = self + .client + .write() + .expect("failed to create nacos config listener") + .add_listener(data_id, group, Arc::new(ConfigChangeListenerImpl {})); + match res_listener { + Ok(_) => { + info!("listening the config success"); + } + Err(err) => tracing::error!("listen config error {:?}", err), + } + } +} + +impl ConfigChangeListener for ConfigChangeListenerImpl { + fn notify(&self, config_resp: ConfigResponse) { + let content_type = config_resp.content_type(); + let event = RouterConfigChangeEvent { + service_name: config_resp.data_id().clone(), + router_kind: config_resp.group().clone(), + content: config_resp.content().clone(), + }; + if content_type == "yaml" { + get_global_router_manager().write().unwrap().notify(event); + } + info!("notify config={:?}", config_resp); + } +} diff --git a/dubbo/src/cluster/router/router_chain.rs b/dubbo/src/cluster/router/router_chain.rs new file mode 100644 index 00000000..930f0f29 --- /dev/null +++ b/dubbo/src/cluster/router/router_chain.rs @@ -0,0 +1,74 @@ +use crate::{ + cluster::router::{ + condition::condition_router::ConditionRouter, tag::tag_router::TagRouter, Router, + }, + invocation::RpcInvocation, +}; +use dubbo_base::Url; +use std::sync::{Arc, RwLock}; + +#[derive(Debug, Default, Clone)] +pub struct RouterChain { + pub condition_router: Option, + pub tag_router: Option>>, + pub self_url: Url, +} + +impl RouterChain { + pub fn new() -> Self { + RouterChain { + condition_router: None, + tag_router: None, + self_url: Url::new(), + } + } + pub fn set_condition_router(&mut self, router: Option) { + self.condition_router = router; + } + pub fn set_tag_router(&mut self, router: Option>>) { + self.tag_router = router; + } + pub fn route(&self, invokers: Vec, invocation: Arc) -> Vec { + let mut result = invokers.clone(); + match &self.tag_router { + None => {} + Some(router) => { + result = + router + .read() + .unwrap() + .route(result, self.self_url.clone(), invocation.clone()) + } + } + match &self.condition_router { + Some(router) => { + result = router.route(result, self.self_url.clone(), invocation.clone()) + } + None => {} + } + result + } +} + +#[test] +fn test() { + use crate::cluster::router::manager::router_manager::get_global_router_manager; + + let u1 = Url::from_url("triple://127.0.0.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap(); + let u2 = Url::from_url("triple://127.0.0.1:8889/org.apache.dubbo.sample.tri.Greeter").unwrap(); + let u3 = Url::from_url("triple://127.0.0.1:8800/org.apache.dubbo.sample.tri.Greeter").unwrap(); + let u4 = Url::from_url("triple://127.0.2.1:880/org.apache.dubbo.sample.tri.Greeter").unwrap(); + let u5 = Url::from_url("triple://127.0.1.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap(); + let invos = vec![u1, u2, u3, u4, u5]; + let invo = Arc::new( + RpcInvocation::default() + .with_method_name("greet".to_string()) + .with_service_unique_name("org.apache.dubbo.sample.tri.Greeter".to_string()), + ); + let x = get_global_router_manager() + .read() + .unwrap() + .get_router_chain(invo.clone()); + let result = x.route(invos, invo.clone()); + dbg!(result); +} diff --git a/dubbo/src/cluster/router/tag/mod.rs b/dubbo/src/cluster/router/tag/mod.rs new file mode 100644 index 00000000..6ac5b218 --- /dev/null +++ b/dubbo/src/cluster/router/tag/mod.rs @@ -0,0 +1 @@ +pub mod tag_router; diff --git a/dubbo/src/cluster/router/tag/tag_router.rs b/dubbo/src/cluster/router/tag/tag_router.rs new file mode 100644 index 00000000..d1a962e7 --- /dev/null +++ b/dubbo/src/cluster/router/tag/tag_router.rs @@ -0,0 +1,70 @@ +use crate::{ + cluster::router::{utils::to_original_map, Router}, + codegen::RpcInvocation, +}; +use dubbo_base::Url; +use dubbo_config::router::TagRouterConfig; +use std::{collections::HashMap, fmt::Debug, sync::Arc}; + +#[derive(Debug, Clone, Default)] +pub struct TagRouter { + pub tag_rules: HashMap>, + pub force: bool, + pub enabled: bool, +} + +impl TagRouter { + pub fn parse_config(&mut self, config: TagRouterConfig) { + self.tag_rules = HashMap::new(); + self.force = config.force; + self.enabled = config.enabled; + for tag in &config.tags { + let mut tags = HashMap::new(); + for rule in &tag.matches { + tags.insert(rule.key.clone(), rule.value.clone()); + } + self.tag_rules.insert(tag.name.clone(), tags); + } + } + + pub fn match_tag(&self, params: HashMap) -> Option { + let mut tag_result = None; + for (tag, tag_rules) in &self.tag_rules { + for (key, value) in tag_rules { + match params.get(key.as_str()) { + None => {} + Some(val) => { + if val == value { + tag_result = Some(tag.clone()) + } + } + } + } + } + tag_result + } +} + +impl Router for TagRouter { + fn route(&self, invokers: Vec, url: Url, _invocation: Arc) -> Vec { + if !self.enabled { + return invokers; + }; + let self_param = to_original_map(url); + let invocation_tag = self.match_tag(self_param); + let mut invokers_result = Vec::new(); + for invoker in &invokers { + let invoker_param = to_original_map(invoker.clone()); + let invoker_tag = self.match_tag(invoker_param); + if invoker_tag == invocation_tag { + invokers_result.push(invoker.clone()) + } + } + if invokers_result.is_empty() { + if !self.force { + return invokers; + } + } + invokers_result + } +} diff --git a/dubbo/src/cluster/router/utils.rs b/dubbo/src/cluster/router/utils.rs new file mode 100644 index 00000000..2ca50fcc --- /dev/null +++ b/dubbo/src/cluster/router/utils.rs @@ -0,0 +1,16 @@ +use dubbo_base::Url; +use std::{collections::HashMap, string::String}; + +pub fn to_original_map(url: Url) -> HashMap { + let mut result: HashMap = HashMap::new(); + result.insert("scheme".parse().unwrap(), url.scheme); + result.insert("location".parse().unwrap(), url.location); + result.insert("ip".parse().unwrap(), url.ip); + result.insert("port".parse().unwrap(), url.port); + result.insert("service_name".parse().unwrap(), url.service_name); + result.insert("service_key".parse().unwrap(), url.service_key); + for (key, value) in url.params { + result.insert(key, value); + } + result +} diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs index bdd152be..d3eb8ad0 100644 --- a/dubbo/src/invocation.rs +++ b/dubbo/src/invocation.rs @@ -196,7 +196,7 @@ pub trait Invocation { fn get_method_name(&self) -> String; } -#[derive(Default)] +#[derive(Default,Clone)] pub struct RpcInvocation { target_service_unique_name: String, method_name: String, @@ -224,4 +224,4 @@ impl Invocation for RpcInvocation { fn get_method_name(&self) -> String { self.method_name.clone() } -} +} \ No newline at end of file diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs index 145bcc8e..50080555 100644 --- a/dubbo/src/protocol/mod.rs +++ b/dubbo/src/protocol/mod.rs @@ -17,17 +17,17 @@ use std::{ fmt::Debug, - future::Future, task::{Context, Poll}, }; use async_trait::async_trait; use aws_smithy_http::body::SdkBody; -use dyn_clone::DynClone; use tower_service::Service; use dubbo_base::Url; +use crate::triple::client::replay::{ClonedBytesStream, ClonedBody}; + pub mod server_desc; pub mod triple; @@ -44,40 +44,20 @@ pub trait Exporter { fn unexport(&self); } -pub trait Invoker: Debug + DynClone { - type Response; - - type Error; - - type Future: Future>; - +pub trait Invoker: Debug + Service { fn get_url(&self) -> Url; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; - - fn call(&mut self, req: ReqBody) -> Self::Future; } pub type BoxExporter = Box; pub type BoxInvoker = Box< dyn Invoker< - http::Request, + http::Request, Response = http::Response, Error = crate::Error, Future = crate::BoxFuture, crate::Error>, - > + Send - + Sync, + > + Send, >; -dyn_clone::clone_trait_object!( - Invoker< - http::Request, - Response = http::Response, - Error = crate::Error, - Future = crate::BoxFuture, crate::Error>, - > -); - pub struct WrapperInvoker(T); impl Service> for WrapperInvoker diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs index fb661f9e..af174451 100644 --- a/dubbo/src/protocol/triple/triple_invoker.rs +++ b/dubbo/src/protocol/triple/triple_invoker.rs @@ -15,7 +15,6 @@ * limitations under the License. */ -use aws_smithy_http::body::SdkBody; use dubbo_base::Url; use std::{ fmt::{Debug, Formatter}, @@ -25,7 +24,7 @@ use tower_service::Service; use crate::{ protocol::Invoker, - triple::{client::builder::ClientBoxService, transport::connection::Connection}, + triple::{client::{builder::ClientBoxService, replay::ClonedBody}, transport::connection::Connection}, utils::boxed_clone::BoxCloneService, }; @@ -51,18 +50,14 @@ impl Debug for TripleInvoker { } } -impl Invoker> for TripleInvoker { +impl Service> for TripleInvoker { type Response = http::Response; type Error = crate::Error; type Future = crate::BoxFuture; - fn get_url(&self) -> Url { - self.url.clone() - } - - fn call(&mut self, req: http::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { self.conn.call(req) } @@ -73,3 +68,9 @@ impl Invoker> for TripleInvoker { self.conn.poll_ready(cx) } } + +impl Invoker> for TripleInvoker { + fn get_url(&self) -> Url { + self.url.clone() + } +} diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs index 06ecd627..5b2702fb 100644 --- a/dubbo/src/triple/client/builder.rs +++ b/dubbo/src/triple/client/builder.rs @@ -21,23 +21,22 @@ use crate::{ cluster::{directory::StaticDirectory, Cluster, Directory, MockCluster, MockDirectory}, codegen::{RegistryDirectory, RpcInvocation, TripleInvoker}, protocol::BoxInvoker, - triple::compression::CompressionEncoding, utils::boxed_clone::BoxCloneService, }; -use aws_smithy_http::body::SdkBody; use dubbo_base::Url; -use super::TripleClient; +use super::replay::ClonedBody; pub type ClientBoxService = - BoxCloneService, http::Response, crate::Error>; + BoxCloneService, http::Response, crate::Error>; +#[allow(dead_code)] #[derive(Clone, Debug, Default)] pub struct ClientBuilder { pub timeout: Option, pub connector: &'static str, - directory: Option>, + directory: Option>>, pub direct: bool, host: String, } @@ -57,9 +56,9 @@ impl ClientBuilder { Self { timeout: None, connector: "", - directory: Some(Box::new(StaticDirectory::new(&host))), + directory: Some(Arc::new(Box::new(StaticDirectory::new(&host)))), direct: true, - host: host.clone().to_string(), + host: host.to_string(), } } @@ -73,21 +72,21 @@ impl ClientBuilder { /// host: http://0.0.0.0:8888 pub fn with_directory(self, directory: Box) -> Self { Self { - directory: Some(directory), + directory: Some(Arc::new(directory)), ..self } } pub fn with_registry_directory(self, registry: RegistryDirectory) -> Self { Self { - directory: Some(Box::new(registry)), + directory: Some(Arc::new(Box::new(registry))), ..self } } pub fn with_host(self, host: &'static str) -> Self { Self { - directory: Some(Box::new(StaticDirectory::new(&host))), + directory: Some(Arc::new(Box::new(StaticDirectory::new(&host)))), ..self } } @@ -103,30 +102,14 @@ impl ClientBuilder { Self { direct, ..self } } - pub(crate) fn direct_build(self) -> TripleClient { - let mut cli = TripleClient { - send_compression_encoding: Some(CompressionEncoding::Gzip), - builder: Some(self.clone()), - invoker: None, - }; - cli.invoker = Some(Box::new(TripleInvoker::new( - Url::from_url(&self.host).unwrap(), - ))); - return cli; - } - - pub fn build(self, invocation: Arc) -> Option { + pub fn build(self) -> Option { if self.direct { return Some(Box::new(TripleInvoker::new( Url::from_url(&self.host).unwrap(), ))); } - let invokers = match self.directory { - Some(v) => v.list(invocation), - None => panic!("use direct connection"), - }; - let cluster = MockCluster::default().join(Box::new(MockDirectory::new(invokers))); + let cluster = MockCluster::default().join(Box::new(MockDirectory::new())); return Some(cluster); } diff --git a/dubbo/src/triple/client/mod.rs b/dubbo/src/triple/client/mod.rs index 7a8e0131..97be85eb 100644 --- a/dubbo/src/triple/client/mod.rs +++ b/dubbo/src/triple/client/mod.rs @@ -17,5 +17,5 @@ pub mod builder; pub mod triple; - +pub mod replay; pub use triple::TripleClient; diff --git a/dubbo/src/triple/client/replay.rs b/dubbo/src/triple/client/replay.rs new file mode 100644 index 00000000..4ba8b930 --- /dev/null +++ b/dubbo/src/triple/client/replay.rs @@ -0,0 +1,441 @@ +use std::{ + collections::VecDeque, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll}, +}; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use futures_core::{ready, stream::BoxStream, Stream}; +use futures_util::StreamExt; +use http_body::Body; +use pin_project::pin_project; + +use crate::status::Status; + +type BoxBytesStream = BoxStream<'static, Result>; +pub struct ClonedBytesStream { + shared: Arc>>, + owned: Option, + replay: bool, +} + +pub struct OwnedBytesStream { + bytes_stream: BoxBytesStream, + buf: InnerBuffer, +} + +#[derive(Clone)] +pub struct InnerBuffer { + bufs: VecDeque, + capacity: usize, +} + +impl ClonedBytesStream { + pub fn new(buffer_capacity: usize, stream: BoxBytesStream) -> Self { + Self { + shared: Arc::new(Mutex::new(None)), + owned: Some(OwnedBytesStream { + bytes_stream: stream, + buf: InnerBuffer { + bufs: Default::default(), + capacity: buffer_capacity, + }, + }), + replay: false, + } + } +} + +impl Clone for ClonedBytesStream { + fn clone(&self) -> Self { + Self { + shared: self.shared.clone(), + owned: None, + replay: true, + } + } +} + +impl Drop for ClonedBytesStream { + fn drop(&mut self) { + if let Some(owned) = self.owned.take() { + let lock = self.shared.lock(); + if let Ok(mut lock) = lock { + *lock = Some(owned); + } + } + } +} + +impl InnerBuffer { + pub fn push_bytes(&mut self, bytes: Bytes) -> Bytes { + self.bufs.push_back(bytes.clone()); + bytes + } + + pub fn is_capped(&self) -> bool { + self.capacity == 0 + } +} + +impl Buf for InnerBuffer { + fn remaining(&self) -> usize { + self.bufs.iter().map(|bytes| bytes.remaining()).sum() + } + + fn chunk(&self) -> &[u8] { + self.bufs.front().map(Buf::chunk).unwrap_or(&[]) + } + + fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize { + if dst.is_empty() { + return 0; + } + + let mut filled = 0; + + for bytes in self.bufs.iter() { + filled += bytes.chunks_vectored(&mut dst[filled..]) + } + + filled + } + + fn advance(&mut self, mut cnt: usize) { + while cnt > 0 { + let first = self.bufs.front_mut(); + if first.is_none() { + break; + } + let first = first.unwrap(); + let first_remaining = first.remaining(); + if first_remaining > cnt { + first.advance(cnt); + break; + } + + first.advance(first_remaining); + cnt = cnt - first_remaining; + self.bufs.pop_front(); + } + } + + fn copy_to_bytes(&mut self, len: usize) -> bytes::Bytes { + match self.bufs.front_mut() { + Some(buf) if len <= buf.remaining() => { + let bytes = buf.copy_to_bytes(len); + if buf.remaining() == 0 { + self.bufs.pop_front(); + } + bytes + } + _ => { + let mut bytes = BytesMut::with_capacity(len); + bytes.put(self.take(len)); + bytes.freeze() + } + } + } +} + +pub enum BytesData { + BufferedBytes(InnerBuffer), + OriginBytes(Bytes), +} + +impl Buf for BytesData { + fn remaining(&self) -> usize { + match self { + BytesData::BufferedBytes(bytes) => bytes.remaining(), + BytesData::OriginBytes(bytes) => bytes.remaining(), + } + } + + fn chunk(&self) -> &[u8] { + match self { + BytesData::BufferedBytes(bytes) => bytes.chunk(), + BytesData::OriginBytes(bytes) => bytes.chunk(), + } + } + + fn advance(&mut self, cnt: usize) { + match self { + BytesData::BufferedBytes(bytes) => bytes.advance(cnt), + BytesData::OriginBytes(bytes) => bytes.advance(cnt), + } + } + + fn copy_to_bytes(&mut self, len: usize) -> bytes::Bytes { + match self { + BytesData::BufferedBytes(bytes) => bytes.copy_to_bytes(len), + BytesData::OriginBytes(bytes) => bytes.copy_to_bytes(len), + } + } +} + +impl Stream for ClonedBytesStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut_self = self.get_mut(); + + let owned_bytes_stream = mut_self.owned.get_or_insert_with(|| { + let lock = mut_self.shared.lock(); + if let Err(e) = lock { + panic!("bytes streams get shared data lock failed. {}", e); + } + let mut data = lock.unwrap(); + + data.take().expect("cannot get shared bytes streams.") + }); + + if mut_self.replay { + mut_self.replay = false; + if owned_bytes_stream.buf.has_remaining() { + return Poll::Ready(Some(Ok(BytesData::BufferedBytes( + owned_bytes_stream.buf.clone(), + )))); + } + } + + let next = owned_bytes_stream.bytes_stream.poll_next_unpin(cx); + + let next = match ready!(next) { + Some(next) => match next { + Ok(next) => next, + Err(e) => return Poll::Ready(Some(Err(e))), + }, + _ => return Poll::Ready(None), + }; + + let len = next.len(); + owned_bytes_stream.buf.capacity = owned_bytes_stream.buf.capacity.saturating_sub(len); + let next = if owned_bytes_stream.buf.is_capped() { + if owned_bytes_stream.buf.has_remaining() { + owned_bytes_stream.buf.bufs = VecDeque::default(); + } + next + } else { + owned_bytes_stream.buf.push_bytes(next) + }; + + return Poll::Ready(Some(Ok(BytesData::OriginBytes(next)))); + } +} + +#[pin_project] +#[derive(Clone)] +pub struct ClonedBody(#[pin] ClonedBytesStream); + +impl ClonedBody { + pub fn new(inner_body: T) -> Self + where + T: Stream> + Send + 'static, + { + let inner_body = Box::pin(inner_body); + let inner_body = ClonedBytesStream::new(1024 * 64, inner_body); + ClonedBody(inner_body) + } +} + +impl Body for ClonedBody { + type Data = BytesData; + + type Error = Status; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + self.project().0.poll_next(cx) + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + Poll::Ready(Ok(None)) + } + + fn size_hint(&self) -> http_body::SizeHint { + http_body::SizeHint::default() + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + + #[tokio::test] + async fn test_cloned_bytes_stream() { + let bytes1 = Ok(Bytes::from("hello".as_bytes())); + let bytes2 = Ok(Bytes::from(" world".as_bytes())); + let bytes3 = Ok(Bytes::from(", end test!".as_bytes())); + + let stream = futures_util::stream::iter(vec![bytes1, bytes2, bytes3]); + + let mut origin_stream = ClonedBytesStream::new(64 * 1024, Box::pin(stream)); + let hello_bytes = origin_stream.next().await; + + assert!(hello_bytes.is_some()); + + let hello_bytes = hello_bytes.unwrap(); + + assert!(hello_bytes.is_ok()); + + assert_eq!(bytes_data_to_str(hello_bytes.unwrap()), "hello"); + + let world_bytes = origin_stream.next().await; + + assert!(world_bytes.is_some()); + + let world_bytes = world_bytes.unwrap(); + + assert!(world_bytes.is_ok()); + + assert_eq!(bytes_data_to_str(world_bytes.unwrap()), " world"); + + let end_bytes = origin_stream.next().await; + + assert!(end_bytes.is_some()); + + let end_bytes = end_bytes.unwrap(); + + assert!(end_bytes.is_ok()); + + assert_eq!(bytes_data_to_str(end_bytes.unwrap()), ", end test!"); + + let none_bytes = origin_stream.next().await; + + assert!(none_bytes.is_none()); + } + + #[tokio::test] + async fn test_cloned_bytes_stream_and_replay() { + let bytes1 = Ok(Bytes::from("hello".as_bytes())); + let bytes2 = Ok(Bytes::from(" world".as_bytes())); + let bytes3 = Ok(Bytes::from(", end test!".as_bytes())); + + let stream = futures_util::stream::iter(vec![bytes1, bytes2, bytes3]); + + let mut origin_stream = ClonedBytesStream::new(64 * 1024, Box::pin(stream)); + origin_stream.next().await; + origin_stream.next().await; + origin_stream.next().await; + + let none_bytes = origin_stream.next().await; + assert!(none_bytes.is_none()); + + let mut clone_origin_stream = origin_stream.clone(); + + drop(origin_stream); + + let cached_bytes = clone_origin_stream.next().await; + assert!(cached_bytes.is_some()); + + let cached_bytes = cached_bytes.unwrap(); + + assert!(cached_bytes.is_ok()); + + assert_eq!( + bytes_data_to_str(cached_bytes.unwrap()), + "hello world, end test!" + ); + + let none_bytes = clone_origin_stream.next().await; + assert!(none_bytes.is_none()); + } + + #[tokio::test] + async fn test_replay_stream_continue_poll_next() { + let bytes1 = Ok(Bytes::from("hello".as_bytes())); + let bytes2 = Ok(Bytes::from(" world".as_bytes())); + let bytes3 = Ok(Bytes::from(", end test!".as_bytes())); + + let stream = futures_util::stream::iter(vec![bytes1, bytes2, bytes3]); + + let mut origin_stream = ClonedBytesStream::new(1024 * 64, Box::pin(stream)); + origin_stream.next().await; + origin_stream.next().await; + + let mut clone_origin_stream = origin_stream.clone(); + + drop(origin_stream); + + let cached_bytes = clone_origin_stream.next().await; + assert!(cached_bytes.is_some()); + + let cached_bytes = cached_bytes.unwrap(); + + assert!(cached_bytes.is_ok()); + + assert_eq!(bytes_data_to_str(cached_bytes.unwrap()), "hello world"); + + let next_bytes = clone_origin_stream.next().await; + + assert!(next_bytes.is_some()); + + let next_bytes = next_bytes.unwrap(); + + assert!(next_bytes.is_ok()); + + assert_eq!(bytes_data_to_str(next_bytes.unwrap()), ", end test!"); + + let none_bytes = clone_origin_stream.next().await; + assert!(none_bytes.is_none()); + } + + #[tokio::test] + async fn test_cloned_bytes_stream_reached_max_capacity() { + let bytes1 = Ok(Bytes::from("hello".as_bytes())); + let bytes2 = Ok(Bytes::from(" world".as_bytes())); + let bytes3 = Ok(Bytes::from(", end test!".as_bytes())); + + let stream = futures_util::stream::iter(vec![bytes1, bytes2, bytes3]); + + let mut origin_stream = ClonedBytesStream::new(5, Box::pin(stream)); + let hello_bytes = origin_stream.next().await; + + assert!(hello_bytes.is_some()); + + let hello_bytes = hello_bytes.unwrap(); + + assert!(hello_bytes.is_ok()); + + assert_eq!(bytes_data_to_str(hello_bytes.unwrap()), "hello"); + + let world_bytes = origin_stream.next().await; + + assert!(world_bytes.is_some()); + + let world_bytes = world_bytes.unwrap(); + + assert!(world_bytes.is_ok()); + + assert_eq!(bytes_data_to_str(world_bytes.unwrap()), " world"); + + let mut cloned_origin_stream = origin_stream.clone(); + + drop(origin_stream); + + let end_bytes = cloned_origin_stream.next().await; + + assert!(end_bytes.is_some()); + + let end_bytes = end_bytes.unwrap(); + + assert!(end_bytes.is_ok()); + + assert_eq!(bytes_data_to_str(end_bytes.unwrap()), ", end test!"); + + let none_bytes = cloned_origin_stream.next().await; + assert!(none_bytes.is_none()); + } + + fn bytes_data_to_str(mut bytes_data: BytesData) -> String { + let len = bytes_data.remaining(); + let bytes = bytes_data.copy_to_bytes(len); + String::from_utf8(bytes.to_vec()).unwrap() + } +} diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs index 124cfcfd..22273415 100644 --- a/dubbo/src/triple/client/triple.rs +++ b/dubbo/src/triple/client/triple.rs @@ -19,37 +19,37 @@ use std::str::FromStr; use futures_util::{future, stream, StreamExt, TryStreamExt}; -use aws_smithy_http::body::SdkBody; use http::HeaderValue; use super::builder::ClientBuilder; +use super::replay::ClonedBody; use crate::codegen::RpcInvocation; use crate::{ invocation::{IntoStreamingRequest, Metadata, Request, Response}, - protocol::BoxInvoker, triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, encode::encode}, }; -#[derive(Debug, Clone, Default)] +#[derive(Debug, Default, Clone)] pub struct TripleClient { pub(crate) send_compression_encoding: Option, pub(crate) builder: Option, - pub invoker: Option, } impl TripleClient { pub fn connect(host: String) -> Self { let builder = ClientBuilder::from_static(&host).with_direct(true); - builder.direct_build() + TripleClient { + send_compression_encoding: Some(CompressionEncoding::Gzip), + builder: Some(builder), + } } pub fn new(builder: ClientBuilder) -> Self { TripleClient { send_compression_encoding: Some(CompressionEncoding::Gzip), builder: Some(builder), - invoker: None, } } @@ -57,8 +57,9 @@ impl TripleClient { &self, uri: http::Uri, path: http::uri::PathAndQuery, - body: SdkBody, - ) -> http::Request { + body: ClonedBody, + invocation: RpcInvocation, + ) -> http::Request { let mut parts = uri.into_parts(); parts.path_and_query = Some(path); @@ -110,6 +111,8 @@ impl TripleClient { http::HeaderValue::from_static("gzip"), ); + req.extensions_mut().insert(invocation); + // const ( // TripleContentType = "application/grpc+proto" // TripleUserAgent = "grpc-go/1.35.0-dev" @@ -138,28 +141,23 @@ impl TripleClient { M2: Send + Sync + 'static, { let req = req.map(|m| stream::once(future::ready(m))); - let body_stream = encode( + let en = encode( codec.encoder(), req.into_inner().map(Ok), self.send_compression_encoding, ) .into_stream(); - let body = hyper::Body::wrap_stream(body_stream); - let bytes = hyper::body::to_bytes(body).await.unwrap(); - let sdk_body = SdkBody::from(bytes); - - let mut conn = match self.invoker.clone() { - Some(v) => v, - None => self - .builder - .clone() - .unwrap() - .build(invocation.into()) - .unwrap(), - }; + let body = ClonedBody::new(en); + + let mut conn = self + .builder + .clone() + .unwrap() + .build() + .unwrap(); let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); - let req = self.map_request(http_uri.clone(), path, sdk_body); + let req = self.map_request(http_uri.clone(), path, body, invocation); let response = conn .call(req) @@ -213,21 +211,19 @@ impl TripleClient { self.send_compression_encoding, ) .into_stream(); - let body = hyper::Body::wrap_stream(en); - let sdk_body = SdkBody::from(body); - - let mut conn = match self.invoker.clone() { - Some(v) => v, - None => self - .builder - .clone() - .unwrap() - .build(invocation.into()) - .unwrap(), - }; + + + let body = ClonedBody::new(en); + + let mut conn = self + .builder + .clone() + .unwrap() + .build() + .unwrap(); let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); - let req = self.map_request(http_uri.clone(), path, sdk_body); + let req = self.map_request(http_uri.clone(), path, body, invocation); let response = conn .call(req) @@ -265,21 +261,17 @@ impl TripleClient { self.send_compression_encoding, ) .into_stream(); - let body = hyper::Body::wrap_stream(en); - let sdk_body = SdkBody::from(body); - - let mut conn = match self.invoker.clone() { - Some(v) => v, - None => self - .builder - .clone() - .unwrap() - .build(invocation.into()) - .unwrap(), - }; + let body = ClonedBody::new(en); + + let mut conn = self + .builder + .clone() + .unwrap() + .build() + .unwrap(); let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); - let req = self.map_request(http_uri.clone(), path, sdk_body); + let req = self.map_request(http_uri.clone(), path, body, invocation); // let mut conn = Connection::new().with_host(http_uri); let response = conn @@ -334,20 +326,18 @@ impl TripleClient { self.send_compression_encoding, ) .into_stream(); - let body = hyper::Body::wrap_stream(en); - let sdk_body = SdkBody::from(body); - - let mut conn = match self.invoker.clone() { - Some(v) => v, - None => self - .builder - .clone() - .unwrap() - .build(invocation.into()) - .unwrap(), - }; + + let body = ClonedBody::new(en); + + let mut conn = self + .builder + .clone() + .unwrap() + .build() + .unwrap(); + let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap(); - let req = self.map_request(http_uri.clone(), path, sdk_body); + let req = self.map_request(http_uri.clone(), path, body, invocation); let response = conn .call(req) diff --git a/examples/echo/src/generated/grpc.examples.echo.rs b/examples/echo/src/generated/grpc.examples.echo.rs index 16fb1638..e756d5c7 100644 --- a/examples/echo/src/generated/grpc.examples.echo.rs +++ b/examples/echo/src/generated/grpc.examples.echo.rs @@ -1,12 +1,12 @@ -// @generated by apache/dubbo-rust. - /// EchoRequest is the request for echo. +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct EchoRequest { #[prost(string, tag = "1")] pub message: ::prost::alloc::string::String, } /// EchoResponse is the response for echo. +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct EchoResponse { #[prost(string, tag = "1")] @@ -31,21 +31,21 @@ pub mod echo_client { inner: TripleClient::new(builder), } } - pub fn with_cluster(mut self, invoker: ClusterInvoker) -> Self { - self.inner = self.inner.with_cluster(invoker); - self - } /// UnaryEcho is unary echo. pub async fn unary_echo( &mut self, request: Request, ) -> Result, dubbo::status::Status> { - let codec = - dubbo::codegen::ProstCodec::::default(); + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("UnaryEcho")); - let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho"); + let path = http::uri::PathAndQuery::from_static( + "/grpc.examples.echo.Echo/UnaryEcho", + ); self.inner.unary(request, codec, path, invocation).await } /// ServerStreamingEcho is server side streaming. @@ -53,51 +53,51 @@ pub mod echo_client { &mut self, request: Request, ) -> Result>, dubbo::status::Status> { - let codec = - dubbo::codegen::ProstCodec::::default(); + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("ServerStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/ServerStreamingEcho", ); - self.inner - .server_streaming(request, codec, path, invocation) - .await + self.inner.server_streaming(request, codec, path, invocation).await } /// ClientStreamingEcho is client side streaming. pub async fn client_streaming_echo( &mut self, request: impl IntoStreamingRequest, ) -> Result, dubbo::status::Status> { - let codec = - dubbo::codegen::ProstCodec::::default(); + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("ClientStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/ClientStreamingEcho", ); - self.inner - .client_streaming(request, codec, path, invocation) - .await + self.inner.client_streaming(request, codec, path, invocation).await } /// BidirectionalStreamingEcho is bidi streaming. pub async fn bidirectional_streaming_echo( &mut self, request: impl IntoStreamingRequest, ) -> Result>, dubbo::status::Status> { - let codec = - dubbo::codegen::ProstCodec::::default(); + let codec = dubbo::codegen::ProstCodec::< + super::EchoRequest, + super::EchoResponse, + >::default(); let invocation = RpcInvocation::default() .with_service_unique_name(String::from("grpc.examples.echo.Echo")) .with_method_name(String::from("BidirectionalStreamingEcho")); let path = http::uri::PathAndQuery::from_static( "/grpc.examples.echo.Echo/BidirectionalStreamingEcho", ); - self.inner - .bidi_streaming(request, codec, path, invocation) - .await + self.inner.bidi_streaming(request, codec, path, invocation).await } } } @@ -114,7 +114,9 @@ pub mod echo_server { request: Request, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the ServerStreamingEcho method. - type ServerStreamingEchoStream: futures_util::Stream> + type ServerStreamingEchoStream: futures_util::Stream< + Item = Result, + > + Send + 'static; /// ServerStreamingEcho is server side streaming. @@ -128,14 +130,19 @@ pub mod echo_server { request: Request>, ) -> Result, dubbo::status::Status>; ///Server streaming response type for the BidirectionalStreamingEcho method. - type BidirectionalStreamingEchoStream: futures_util::Stream> + type BidirectionalStreamingEchoStream: futures_util::Stream< + Item = Result, + > + Send + 'static; /// BidirectionalStreamingEcho is bidi streaming. async fn bidirectional_streaming_echo( &self, request: Request>, - ) -> Result, dubbo::status::Status>; + ) -> Result< + Response, + dubbo::status::Status, + >; } /// Echo is the echo service. #[derive(Debug)] @@ -165,7 +172,10 @@ pub mod echo_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -178,18 +188,26 @@ pub mod echo_server { } impl UnarySvc for UnaryEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture, dubbo::status::Status>; - fn call(&mut self, request: Request) -> Self::Future { + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; + fn call( + &mut self, + request: Request, + ) -> Self::Future { let inner = self.inner.0.clone(); let fut = async move { inner.unary_echo(request).await }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default()); + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); let res = server.unary(UnaryEchoServer { inner }, req).await; Ok(res) }; @@ -200,22 +218,32 @@ pub mod echo_server { struct ServerStreamingEchoServer { inner: _Inner, } - impl ServerStreamingSvc for ServerStreamingEchoServer { + impl ServerStreamingSvc + for ServerStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::ServerStreamingEchoStream; - type Future = - BoxFuture, dubbo::status::Status>; - fn call(&mut self, request: Request) -> Self::Future { + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; + fn call( + &mut self, + request: Request, + ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { inner.server_streaming_echo(request).await }; + let fut = async move { + inner.server_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default()); + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); let res = server .server_streaming(ServerStreamingEchoServer { inner }, req) .await; @@ -228,23 +256,31 @@ pub mod echo_server { struct ClientStreamingEchoServer { inner: _Inner, } - impl ClientStreamingSvc for ClientStreamingEchoServer { + impl ClientStreamingSvc + for ClientStreamingEchoServer { type Response = super::EchoResponse; - type Future = BoxFuture, dubbo::status::Status>; + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = async move { inner.client_streaming_echo(request).await }; + let fut = async move { + inner.client_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default()); + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); let res = server .client_streaming(ClientStreamingEchoServer { inner }, req) .await; @@ -257,41 +293,56 @@ pub mod echo_server { struct BidirectionalStreamingEchoServer { inner: _Inner, } - impl StreamingSvc for BidirectionalStreamingEchoServer { + impl StreamingSvc + for BidirectionalStreamingEchoServer { type Response = super::EchoResponse; type ResponseStream = T::BidirectionalStreamingEchoStream; - type Future = - BoxFuture, dubbo::status::Status>; + type Future = BoxFuture< + Response, + dubbo::status::Status, + >; fn call( &mut self, request: Request>, ) -> Self::Future { let inner = self.inner.0.clone(); - let fut = - async move { inner.bidirectional_streaming_echo(request).await }; + let fut = async move { + inner.bidirectional_streaming_echo(request).await + }; Box::pin(fut) } } let fut = async move { - let mut server = TripleServer::new(dubbo::codegen::ProstCodec::< - super::EchoResponse, - super::EchoRequest, - >::default()); + let mut server = TripleServer::new( + dubbo::codegen::ProstCodec::< + super::EchoResponse, + super::EchoRequest, + >::default(), + ); let res = server - .bidi_streaming(BidirectionalStreamingEchoServer { inner }, req) + .bidi_streaming( + BidirectionalStreamingEchoServer { + inner, + }, + req, + ) .await; Ok(res) }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } }