Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature http transport #296

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
4 changes: 4 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
- name: Run tests
run: |
cargo check --features multiplex
cargo check --features rustls,native-tls,vendored
cargo test

test-linux-aarch64:
Expand All @@ -48,6 +49,7 @@ jobs:
- name: Run tests
run: |
cargo check --features multiplex
cargo check --features rustls,native-tls,vendored
cargo test

test-macos:
Expand All @@ -62,6 +64,7 @@ jobs:
- name: Run tests
run: |
cargo check --features multiplex
cargo check --features rustls,native-tls,vendored
cargo test

test-windows:
Expand All @@ -76,6 +79,7 @@ jobs:
- name: Run tests
run: |
cargo check --features multiplex
cargo check --features rustls,native-tls,vendored
cargo test

lint:
Expand Down
49 changes: 47 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pilota-thrift-parser = "0.10"
motore = "0.4"
# motore = { git = "https://github.com/cloudwego/motore", branch = "main" }

metainfo = "0.7"
metainfo = "0.7.7"

anyhow = "1"
async-broadcast = "0.6"
Expand Down Expand Up @@ -62,6 +62,7 @@ http-body-util = "0.1"
hyper = "1"
hyper-timeout = "0.5"
hyper-util = "0.1.2"
reqwest = { version = "0.11", features = ["json", "stream"] }
itertools = "0"
lazy_static = "1"
libc = "0.2"
Expand Down
4 changes: 4 additions & 0 deletions volo-grpc/src/transport/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl<U> ClientTransport<U> {
rpc_config.connect_timeout,
rpc_config.read_timeout,
rpc_config.write_timeout,
None,
);
let mut http_client = http2::Builder::new(TokioExecutor::new());
http_client
Expand Down Expand Up @@ -79,6 +80,7 @@ impl<U> ClientTransport<U> {
rpc_config.connect_timeout,
rpc_config.read_timeout,
rpc_config.write_timeout,
None,
);
let mut http_client = http2::Builder::new(TokioExecutor::new());
http_client
Expand Down Expand Up @@ -231,6 +233,8 @@ fn build_uri(addr: Address, path: &str) -> hyper::Uri {
.path_and_query(path)
.build()
.expect("fail to build unix uri"),
Address::Http(url) => hyper::Uri::try_from(url.to_string())
.expect("fail to build http uri"),
}
}

Expand Down
2 changes: 1 addition & 1 deletion volo-grpc/src/transport/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Connector {

#[cfg(any(feature = "rustls", feature = "native-tls"))]
pub fn new_with_tls(cfg: Option<Config>, tls_config: ClientTlsConfig) -> Self {
let mut mt = TlsMakeTransport::new(cfg.unwrap_or_default(), tls_config);
let mut mt = TlsMakeTransport::new(cfg.clone().unwrap_or_default(), tls_config);
if let Some(cfg) = cfg {
mt.set_connect_timeout(cfg.connect_timeout);
mt.set_read_timeout(cfg.read_timeout);
Expand Down
1 change: 1 addition & 0 deletions volo-thrift/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ bytes.workspace = true
chrono.workspace = true
futures.workspace = true
fxhash.workspace = true
http.workspace = true
lazy_static.workspace = true
linkedbytes.workspace = true
linked-hash-map.workspace = true
Expand Down
42 changes: 42 additions & 0 deletions volo-thrift/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{
sync::{atomic::AtomicI32, Arc},
};

use http::{HeaderMap, header::IntoHeaderName, HeaderValue};
use motore::{
layer::{Identity, Layer, Stack},
service::{BoxCloneService, Service},
Expand Down Expand Up @@ -53,6 +54,7 @@ pub struct ClientBuilder<IL, OL, MkClient, Req, Resp, MkT, MkC, LB> {
callee_name: FastStr,
caller_name: FastStr,
address: Option<Address>, // maybe address use Arc avoid memory alloc
headers: Option<HeaderMap>,
inner_layer: IL,
outer_layer: OL,
make_transport: MkT,
Expand All @@ -75,6 +77,7 @@ impl<C, Req, Resp>
Req,
Resp,
DefaultMakeTransport,

DefaultMakeCodec<MakeTTHeaderCodec<MakeFramedCodec<MakeThriftCodec>>>,
LbConfig<WeightedRandomBalance<<DummyDiscover as Discover>::Key>, DummyDiscover>,
>
Expand All @@ -86,10 +89,12 @@ impl<C, Req, Resp>
caller_name: "".into(),
callee_name: FastStr::new(service_name),
address: None,
headers: None,
inner_layer: Identity::new(),
outer_layer: Identity::new(),
mk_client: service_client,
make_transport: DefaultMakeTransport::default(),

make_codec: DefaultMakeCodec::default(),
mk_lb: LbConfig::new(WeightedRandomBalance::new(), DummyDiscover {}),
_marker: PhantomData,
Expand All @@ -115,6 +120,7 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB, DISC>
caller_name: self.caller_name,
callee_name: self.callee_name,
address: self.address,
headers: self.headers,
inner_layer: self.inner_layer,
outer_layer: self.outer_layer,
mk_client: self.mk_client,
Expand All @@ -140,6 +146,7 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB, DISC>
caller_name: self.caller_name,
callee_name: self.callee_name,
address: self.address,
headers: self.headers,
inner_layer: self.inner_layer,
outer_layer: self.outer_layer,
mk_client: self.mk_client,
Expand Down Expand Up @@ -214,6 +221,7 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
caller_name: self.caller_name,
callee_name: self.callee_name,
address: self.address,
headers: self.headers,
inner_layer: self.inner_layer,
outer_layer: self.outer_layer,
mk_client: self.mk_client,
Expand Down Expand Up @@ -247,6 +255,7 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
caller_name: self.caller_name,
callee_name: self.callee_name,
address: self.address,
headers: self.headers,
inner_layer: self.inner_layer,
outer_layer: self.outer_layer,
mk_client: self.mk_client,
Expand Down Expand Up @@ -274,6 +283,7 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
caller_name: self.caller_name,
callee_name: self.callee_name,
address: self.address,
headers: self.headers,
inner_layer: self.inner_layer,
outer_layer: self.outer_layer,
mk_client: self.mk_client,
Expand All @@ -299,6 +309,30 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
self
}

/// Add transport header
///
pub fn header<K: IntoHeaderName>(mut self, key: K, value: HeaderValue) -> Self {
if let Some(existing) = &mut self.headers {
existing.append(key, value);
} else {
let mut headers = HeaderMap::new();
headers.append(key, value);
self.headers = Some(headers);
}
self
}

/// Add transport headers
///
pub fn headers(mut self, headers: HeaderMap) -> Self {
if let Some(existing) = &mut self.headers {
existing.extend(headers);
} else {
self.headers = Some(headers);
}
self
}

/// Adds a new inner layer to the client.
///
/// The layer's `Service` should be `Send + Sync + Clone + 'static`.
Expand All @@ -322,6 +356,7 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
caller_name: self.caller_name,
callee_name: self.callee_name,
address: self.address,
headers: self.headers,
inner_layer: Stack::new(layer, self.inner_layer),
outer_layer: self.outer_layer,
mk_client: self.mk_client,
Expand Down Expand Up @@ -360,6 +395,7 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
caller_name: self.caller_name,
callee_name: self.callee_name,
address: self.address,
headers: self.headers,
inner_layer: self.inner_layer,
outer_layer: Stack::new(layer, self.outer_layer),
mk_client: self.mk_client,
Expand Down Expand Up @@ -398,6 +434,7 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
caller_name: self.caller_name,
callee_name: self.callee_name,
address: self.address,
headers: self.headers,
inner_layer: self.inner_layer,
outer_layer: Stack::new(self.outer_layer, layer),
mk_client: self.mk_client,
Expand All @@ -423,6 +460,7 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
caller_name: self.caller_name,
callee_name: self.callee_name,
address: self.address,
headers: self.headers,
inner_layer: self.inner_layer,
outer_layer: self.outer_layer,
mk_client: self.mk_client,
Expand Down Expand Up @@ -529,6 +567,10 @@ where
if let Some(timeout) = self.config.read_write_timeout() {
self.make_transport.set_write_timeout(Some(timeout));
}
if let Some(headers) = self.headers {
self.make_transport.set_headers(Some(headers))
}

let msg_svc = MessageService {
#[cfg(not(feature = "multiplex"))]
inner: pingpong::Client::new(self.make_transport, self.pool, self.make_codec),
Expand Down
6 changes: 5 additions & 1 deletion volo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ async-broadcast.workspace = true
dashmap.workspace = true
faststr.workspace = true
futures.workspace = true
reqwest.workspace = true
# http.workspace = true
hyper.workspace = true
hyper-util.workspace = true
lazy_static.workspace = true
libc.workspace = true
metainfo.workspace = true
Expand Down Expand Up @@ -57,6 +61,6 @@ tokio-native-tls = { workspace = true, optional = true }

[features]
default = []

vendored = ["tokio-native-tls/vendored"]
rustls = ["tokio-rustls", "librustls"]
native-tls = ["tokio-native-tls"]
Loading
Loading