Skip to content

Commit

Permalink
feat: Add Router Module(#144) (#153)
Browse files Browse the repository at this point in the history
add condition router,
add tag router,
use nacos as router config center
  • Loading branch information
AdachiAndShimamura authored Sep 1, 2023
1 parent df0e0e5 commit c07b96d
Show file tree
Hide file tree
Showing 22 changed files with 1,084 additions and 28 deletions.
35 changes: 34 additions & 1 deletion application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,37 @@ dubbo:
references:
GreeterClientImpl:
url: tri://localhost:20000
protocol: tri
protocol: tri
routers:
consumer:
- service: "org.apache.dubbo.sample.tri.Greeter"
url: triple://localhost:20000
protocol: triple
nacos:
addr: "127.0.0.1:8848"
namespace: ""
app: ""
conditions:
- scope: "service"
force: false
runtime: true
enabled: true
key: "org.apache.dubbo.sample.tri.Greeter"
conditions:
- method=greet => port=8889
- scope: "service"
force: true
runtime: true
enabled: true
key: "user.UserService"
conditions:
- method=get_s => port=2003
tags:
force: true
enabled: true
key: shop-detail
tags:
- name: gray
matches:
- key: env
value: gray
6 changes: 5 additions & 1 deletion config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::{collections::HashMap, env, path::PathBuf};

use crate::{protocol::Protocol, registry::RegistryConfig};
use crate::{protocol::Protocol, registry::RegistryConfig, router::RouterConfig};
use dubbo_logger::tracing;
use dubbo_utils::yaml_util::yaml_file_parser;
use once_cell::sync::OnceCell;
Expand All @@ -44,6 +44,9 @@ pub struct RootConfig {
#[serde(default)]
pub registries: HashMap<String, RegistryConfig>,

#[serde(default)]
pub routers: RouterConfig,

#[serde(default)]
pub data: HashMap<String, String>,
}
Expand All @@ -63,6 +66,7 @@ impl RootConfig {
protocols: HashMap::new(),
registries: HashMap::new(),
provider: ProviderConfig::new(),
routers: RouterConfig::default(),
data: HashMap::new(),
}
}
Expand Down
1 change: 1 addition & 0 deletions config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ pub mod config;
pub mod protocol;
pub mod provider;
pub mod registry;
pub mod router;
pub mod service;
67 changes: 67 additions & 0 deletions config/src/router.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
pub struct ConditionRouterConfig {
pub scope: String,
pub force: bool,
pub runtime: bool,
pub enabled: bool,
pub key: String,
pub conditions: Vec<String>,
}

#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
pub struct TagRouterConfig {
pub force: bool,
pub enabled: bool,
pub key: String,
pub tags: Vec<Tag>,
}

#[derive(Serialize, Deserialize, Clone, PartialEq, Default, Debug)]
pub struct ConsumerConfig {
pub service: String,
pub url: String,
pub protocol: String,
}

#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
pub struct Tag {
pub name: String,
pub matches: Vec<TagMatchRule>,
}

#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
pub struct TagMatchRule {
pub key: String,
pub value: String,
}

impl ConditionRouterConfig {
pub fn new(config: &String) -> Self {
serde_yaml::from_str(config).expect("parse error")
}
}

#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq)]
pub struct EnableAuth {
pub auth_username: String,
pub auth_password: String,
}

#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq)]
pub struct NacosConfig {
pub addr: String,
pub namespace: String,
pub app: String,
pub enable_auth: Option<EnableAuth>,
pub enable_auth_plugin_http: Option<bool>,
}

#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
pub struct RouterConfig {
pub consumer: Option<Vec<ConsumerConfig>>,
pub nacos: Option<NacosConfig>,
pub conditions: Option<Vec<ConditionRouterConfig>>,
pub tags: Option<TagRouterConfig>,
}
5 changes: 5 additions & 0 deletions dubbo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ urlencoding.workspace = true
lazy_static.workspace = true
dubbo-base.workspace = true
dubbo-logger.workspace = true
once_cell.workspace = true

dubbo-config = { path = "../config", version = "0.3.0" }

#对象存储
state = { version = "0.5", features = ["tls"] }

regex = "1.9.1"
nacos-sdk = { version = "0.3.0", features = ["default"] }
serde_yaml = "0.9.22"
28 changes: 3 additions & 25 deletions dubbo/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{

pub mod directory;
pub mod loadbalance;
pub mod router;

pub trait Directory: Debug {
fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker>;
Expand Down Expand Up @@ -116,8 +117,9 @@ pub struct MockDirectory {

impl MockDirectory {
pub fn new() -> MockDirectory {
// let router_chain = get_global_router_manager().read().unwrap().get_router_chain(invocation);
Self {
// router_chain: RouterChain::default(),
// router_chain
}
}
}
Expand All @@ -130,27 +132,3 @@ impl Directory for MockDirectory {
// self.router_chain.route(u, invo);
}
}

// #[derive(Debug, Default)]
// pub struct RouterChain {
// router: HashMap<String, BoxRouter>,
// invokers: Arc<Vec<BoxInvoker>>,
// }

// impl RouterChain {
// pub fn route(&mut self, url: Url, invo: Arc<RpcInvocation>) -> Arc<Vec<BoxInvoker>> {
// let r = self.router.get("mock").unwrap();
// r.route(self.invokers.clone(), url, invo)
// }
// }

// pub trait Router: Debug {
// fn route(
// &self,
// invokers: Arc<Vec<BoxInvoker>>,
// url: Url,
// invo: Arc<RpcInvocation>,
// ) -> Arc<Vec<BoxInvoker>>;
// }

// pub type BoxRouter = Box<dyn Router + Sync + Send>;
60 changes: 60 additions & 0 deletions dubbo/src/cluster/router/condition/condition_router.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use crate::{
cluster::router::{condition::single_router::ConditionSingleRouter, Router},
codegen::RpcInvocation,
};
use dubbo_base::Url;
use std::{
fmt::Debug,
sync::{Arc, RwLock},
};

#[derive(Default, Debug, Clone)]
pub struct ConditionRouter {
//condition router for service scope
pub service_routers: Option<Arc<RwLock<ConditionSingleRouters>>>,
//condition router for application scope
pub application_routers: Option<Arc<RwLock<ConditionSingleRouters>>>,
}

impl Router for ConditionRouter {
fn route(&self, invokers: Vec<Url>, url: Url, invo: Arc<RpcInvocation>) -> Vec<Url> {
let mut invokers_result = invokers.clone();
if let Some(routers) = self.application_routers.clone() {
for router in &routers.read().unwrap().routers {
invokers_result = router.route(invokers_result, url.clone(), invo.clone())
}
}
if let Some(routers) = self.service_routers.clone() {
for router in &routers.read().unwrap().routers {
invokers_result = router.route(invokers_result, url.clone(), invo.clone())
}
}
invokers_result
}
}

impl ConditionRouter {
pub fn new(
service_routers: Option<Arc<RwLock<ConditionSingleRouters>>>,
application_routers: Option<Arc<RwLock<ConditionSingleRouters>>>,
) -> Self {
Self {
service_routers,
application_routers,
}
}
}

#[derive(Debug, Clone, Default)]
pub struct ConditionSingleRouters {
pub routers: Vec<ConditionSingleRouter>,
}

impl ConditionSingleRouters {
pub fn new(routers: Vec<ConditionSingleRouter>) -> Self {
Self { routers }
}
pub fn is_null(&self) -> bool {
self.routers.is_empty()
}
}
94 changes: 94 additions & 0 deletions dubbo/src/cluster/router/condition/matcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use crate::codegen::RpcInvocation;
use regex::Regex;
use std::{collections::HashSet, error::Error, option::Option, sync::Arc};

#[derive(Clone, Debug, Default)]
pub struct ConditionMatcher {
_key: String,
matches: HashSet<String>,
mismatches: HashSet<String>,
}

impl ConditionMatcher {
pub fn new(_key: String) -> Self {
ConditionMatcher {
_key,
matches: HashSet::new(),
mismatches: HashSet::new(),
}
}

pub fn is_match(
&self,
value: Option<String>,
invocation: Arc<RpcInvocation>,
_is_when: bool,
) -> Result<bool, Box<dyn Error>> {
match value {
None => {
// if key does not present in whichever of url, invocation or attachment based on the matcher type, then return false.
Ok(false)
}
Some(val) => {
if !self.matches.is_empty() && self.mismatches.is_empty() {
for match_ in self.matches.iter() {
if self.do_pattern_match(match_, &val, invocation.clone())? {
return Ok(true);
}
}
Ok(false)
} else if !self.mismatches.is_empty() && self.matches.is_empty() {
for mismatch in self.mismatches.iter() {
if self.do_pattern_match(mismatch, &val, invocation.clone())? {
return Ok(false);
}
}
Ok(true)
} else if !self.matches.is_empty() && !self.mismatches.is_empty() {
for mismatch in self.mismatches.iter() {
if self.do_pattern_match(mismatch, &val, invocation.clone())? {
return Ok(false);
}
}
for match_ in self.matches.iter() {
if self.do_pattern_match(match_, &val, invocation.clone())? {
return Ok(true);
}
}
Ok(false)
} else {
Ok(false)
}
}
}
}
pub fn get_matches(&mut self) -> &mut HashSet<String> {
&mut self.matches
}
pub fn get_mismatches(&mut self) -> &mut HashSet<String> {
&mut self.mismatches
}

fn do_pattern_match(
&self,
pattern: &String,
value: &String,
_invocation: Arc<RpcInvocation>,
) -> Result<bool, Box<dyn Error>> {
if pattern.contains("*") {
return Ok(star_matcher(pattern, value));
}
Ok(pattern.eq(value))
}
}

pub fn star_matcher(pattern: &str, input: &str) -> bool {
// 将*替换为任意字符的正则表达式
let pattern = pattern.replace("*", ".*");
let regex = Regex::new(&pattern).unwrap();
regex.is_match(input)
}

pub fn _range_matcher(val: i32, min: i32, max: i32) -> bool {
min <= val && val <= max
}
3 changes: 3 additions & 0 deletions dubbo/src/cluster/router/condition/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod condition_router;
pub mod matcher;
pub mod single_router;
Loading

0 comments on commit c07b96d

Please sign in to comment.