Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor rpcinfo to eliminate useless unwrap #284

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 77 additions & 88 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions volo-build/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "volo-build"
version = "0.8.6"
version = "0.9.0"
edition.workspace = true
homepage.workspace = true
repository.workspace = true
Expand All @@ -17,7 +17,7 @@ keywords = ["thrift", "grpc", "protobuf", "volo", "build"]
maintenance = { status = "actively-developed" }

[dependencies]
volo = { version = "0.8", path = "../volo" }
volo = { version = "0.9", path = "../volo" }

pilota-build.workspace = true

Expand Down
2 changes: 1 addition & 1 deletion volo-build/src/grpc_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ impl CodegenBackend for VoloGrpcBackend {

async fn call<'s, 'cx>(&'s self, cx: &'cx mut ::volo_grpc::context::ServerContext, req: ::volo_grpc::Request<{req_enum_name_recv}>) -> ::std::result::Result<Self::Response, Self::Error> {{
let inner = self.inner.clone();
match cx.rpc_info.method().unwrap().as_str() {{
match cx.rpc_info.method().as_str() {{
{req_matches}
path => {{
let path = path.to_string();
Expand Down
4 changes: 2 additions & 2 deletions volo-cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "volo-cli"
version = "0.8.3"
version = "0.9.0"
edition.workspace = true
homepage.workspace = true
repository.workspace = true
Expand All @@ -22,7 +22,7 @@ keywords = ["thrift", "grpc", "protobuf", "volo", "cli"]
maintenance = { status = "actively-developed" }

[dependencies]
volo-build = { version = "0.8", path = "../volo-build" }
volo-build = { version = "0.9", path = "../volo-build" }
pilota-thrift-parser.workspace = true

anyhow.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions volo-grpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "volo-grpc"
version = "0.8.0"
version = "0.9.0"
edition.workspace = true
homepage.workspace = true
repository.workspace = true
Expand All @@ -22,7 +22,7 @@ maintenance = { status = "actively-developed" }

[dependencies]
pilota.workspace = true
volo = { version = "0.8", path = "../volo" }
volo = { version = "0.9", path = "../volo" }
motore = { workspace = true, features = ["tower"] }
metainfo.workspace = true

Expand Down
6 changes: 3 additions & 3 deletions volo-grpc/src/client/callopt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ impl volo::client::Apply<crate::context::ClientContext> for CallOpt {
type Error = crate::Status;

fn apply(self, cx: &mut crate::context::ClientContext) -> Result<(), Self::Error> {
let caller = cx.rpc_info.caller_mut().unwrap();
let caller = cx.rpc_info.caller_mut();
if !self.caller_faststr_tags.is_empty() {
caller.faststr_tags.extend(self.caller_faststr_tags);
}
if !self.caller_tags.is_empty() {
caller.tags.extend(self.caller_tags);
}

let callee = cx.rpc_info.callee_mut().unwrap();
let callee = cx.rpc_info.callee_mut();
if !self.callee_faststr_tags.is_empty() {
callee.faststr_tags.extend(self.callee_faststr_tags);
}
Expand All @@ -84,7 +84,7 @@ impl volo::client::Apply<crate::context::ClientContext> for CallOpt {
if let Some(addr) = self.address {
callee.set_address(addr);
}
cx.rpc_info.config_mut().unwrap().merge(self.config);
cx.rpc_info.config_mut().merge(self.config);
Ok(())
}
}
21 changes: 10 additions & 11 deletions volo-grpc/src/client/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,14 @@ where
}

// caller
if let Some(caller) = cx.rpc_info.caller.as_ref() {
metadata.insert(SOURCE_SERVICE, caller.service_name().parse()?);
}
metadata.insert(SOURCE_SERVICE, cx.rpc_info.caller().service_name().parse()?);

// callee
if let Some(callee) = cx.rpc_info.callee.as_ref() {
metadata.insert(DESTINATION_SERVICE, callee.service_name().parse()?);
if let Some(method) = cx.rpc_info.method() {
metadata.insert(DESTINATION_METHOD, method.parse()?);
}
}
metadata.insert(
DESTINATION_SERVICE,
cx.rpc_info.callee().service_name().parse()?,
);
metadata.insert(DESTINATION_METHOD, cx.rpc_info.method().parse()?);

Ok::<(), Status>(())
});
Expand All @@ -85,8 +82,10 @@ where
// callee
if let Some(ad) = metadata.remove(HEADER_TRANS_REMOTE_ADDR) {
let maybe_addr = ad.to_str()?.parse::<SocketAddr>();
if let (Some(callee), Ok(addr)) = (cx.rpc_info_mut().callee.as_mut(), maybe_addr) {
callee.set_address(volo::net::Address::from(addr));
if let Ok(addr) = maybe_addr {
cx.rpc_info_mut()
.callee_mut()
.set_address(volo::net::Address::from(addr));
}
}

Expand Down
14 changes: 14 additions & 0 deletions volo-grpc/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,20 @@ pub struct Config {
pub(crate) send_compressions: Option<Vec<CompressionEncoding>>,
}

impl Reusable for Config {
fn clear(&mut self) {
self.connect_timeout = None;
self.read_timeout = None;
self.write_timeout = None;
if let Some(v) = self.accept_compressions.as_mut() {
v.clear();
}
if let Some(v) = self.send_compressions.as_mut() {
v.clear();
}
}
}

impl Config {
pub fn merge(&mut self, other: Self) {
if let Some(t) = other.connect_timeout {
Expand Down
12 changes: 3 additions & 9 deletions volo-grpc/src/layer/loadbalance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use volo::{
context::Context,
discovery::Discover,
loadbalance::{error::LoadBalanceError, LoadBalance, MkLbLayer},
Layer, Unwrap,
Layer,
};

use crate::Request;
Expand Down Expand Up @@ -92,11 +92,7 @@ where
cx: &'cx mut Cx,
req: Request<T>,
) -> Result<Self::Response, Self::Error> {
debug_assert!(
cx.rpc_info().callee.is_some(),
"must set callee endpoint before load balance service"
);
let callee = cx.rpc_info().callee().volo_unwrap();
let callee = cx.rpc_info().callee();

let mut picker = match &callee.address {
None => self
Expand All @@ -110,9 +106,7 @@ where
};

if let Some(addr) = picker.next() {
if let Some(callee) = cx.rpc_info_mut().callee_mut() {
callee.address = Some(addr.clone())
}
cx.rpc_info_mut().callee_mut().address = Some(addr.clone());

return match self.service.call(cx, req).await {
Ok(resp) => Ok(resp),
Expand Down
17 changes: 7 additions & 10 deletions volo-grpc/src/server/meta.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use std::{cell::RefCell, net::SocketAddr, str::FromStr, sync::Arc};

use metainfo::{Backward, Forward};
use volo::{
context::{Context, Endpoint},
net::Address,
FastStr, Service,
};
use volo::{context::Context, net::Address, FastStr, Service};

use crate::{
body::Body,
Expand Down Expand Up @@ -59,7 +55,7 @@ where

metainfo::METAINFO
.scope(RefCell::new(metainfo::MetaInfo::default()), async move {
cx.rpc_info.method = Some(FastStr::new(req.uri().path()));
cx.rpc_info.set_method(FastStr::new(req.uri().path()));

let mut volo_req = Request::from_http(req);

Expand All @@ -71,7 +67,8 @@ where
// caller
if let Some(source_service) = metadata.remove(SOURCE_SERVICE) {
let source_service = Arc::<str>::from(source_service.to_str()?);
let mut caller = Endpoint::new(source_service.into());
let caller = cx.rpc_info_mut().caller_mut();
caller.set_service_name(source_service.into());
if let Some(ad) = metadata.remove(HEADER_TRANS_REMOTE_ADDR) {
let addr = ad.to_str()?.parse::<SocketAddr>();
if let Ok(addr) = addr {
Expand All @@ -81,14 +78,14 @@ where
if caller.address.is_none() {
caller.address = peer_addr;
}
cx.rpc_info_mut().caller = Some(caller);
}

// callee
if let Some(destination_service) = metadata.remove(DESTINATION_SERVICE) {
let destination_service = Arc::<str>::from(destination_service.to_str()?);
let callee = Endpoint::new(destination_service.into());
cx.rpc_info_mut().callee = Some(callee);
cx.rpc_info_mut()
.callee_mut()
.set_service_name(destination_service.into());
}

// persistent and transient
Expand Down
2 changes: 1 addition & 1 deletion volo-grpc/src/server/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
cx: &'cx mut ServerContext,
req: Request<B>,
) -> Result<Self::Response, Self::Error> {
let path = cx.rpc_info.method.as_ref().unwrap();
let path = cx.rpc_info.method();
match self.node.at(path) {
Ok(match_) => {
let id = match_.value;
Expand Down
2 changes: 1 addition & 1 deletion volo-grpc/src/server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ where
)?;

let message = T::from_body(
cx.rpc_info.method.as_deref(),
Some(cx.rpc_info.method().as_str()),
body,
Kind::Request,
recv_compression,
Expand Down
17 changes: 6 additions & 11 deletions volo-grpc/src/transport/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use http::{
use hyper::Client as HyperClient;
use motore::Service;
use tower::{util::ServiceExt, Service as TowerService};
use volo::{net::Address, Unwrap};
use volo::net::Address;

use super::connect::Connector;
use crate::{
Expand Down Expand Up @@ -114,18 +114,13 @@ where
let mut http_client = self.http_client.clone();
// SAFETY: parameters controlled by volo-grpc are guaranteed to be valid.
// get the call address from the context
let target = cx
.rpc_info
.callee()
.volo_unwrap()
.address()
.ok_or_else(|| {
io::Error::new(std::io::ErrorKind::InvalidData, "address is required")
})?;
let target = cx.rpc_info.callee().address().ok_or_else(|| {
io::Error::new(std::io::ErrorKind::InvalidData, "address is required")
})?;

let (metadata, extensions, message) = volo_req.into_parts();
let path = cx.rpc_info.method().volo_unwrap();
let rpc_config = cx.rpc_info.config().volo_unwrap();
let path = cx.rpc_info.method();
let rpc_config = cx.rpc_info.config();
let accept_compressions = &rpc_config.accept_compressions;

// select the compression algorithm with the highest priority by user's config
Expand Down
2 changes: 1 addition & 1 deletion volo-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ keywords = ["async", "rpc", "http"]
maintenance = { status = "actively-developed" }

[dependencies]
volo = { version = "0.8", path = "../volo" }
volo = { version = "0.9", path = "../volo" }

http-body-util = "0.1"
hyper = { version = "1", features = ["server", "http1", "http2"] }
Expand Down
2 changes: 1 addition & 1 deletion volo-macros/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "volo-macros"
version = "0.8.0"
version = "0.9.0"
edition.workspace = true
homepage.workspace = true
repository.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions volo-thrift/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "volo-thrift"
version = "0.8.4"
version = "0.9.0"
edition.workspace = true
homepage.workspace = true
repository.workspace = true
Expand All @@ -18,7 +18,7 @@ keywords = ["async", "rpc", "thrift"]
maintenance = { status = "actively-developed" }

[dependencies]
volo = { version = "0.8", path = "../volo" }
volo = { version = "0.9", path = "../volo" }
pilota.workspace = true
motore.workspace = true
metainfo.workspace = true
Expand Down
40 changes: 18 additions & 22 deletions volo-thrift/src/client/layer/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,27 @@ where
cx: &'cx mut ClientContext,
req: Req,
) -> Result<Self::Response, Self::Error> {
if let Some(config) = cx.rpc_info.config() {
match config.rpc_timeout() {
Some(duration) => {
let start = std::time::Instant::now();
match tokio::time::timeout(duration, self.inner.call(cx, req)).await {
Ok(r) => r.map_err(Into::into),
Err(_) => {
let msg = format!(
"[VOLO] thrift rpc call timeout, rpcinfo: {:?}, elpased: {:?}, \
timeout config: {:?}",
cx.rpc_info,
start.elapsed(),
duration
);
warn!(msg);
Err(crate::Error::Transport(
std::io::Error::new(std::io::ErrorKind::TimedOut, msg).into(),
))
}
match cx.rpc_info.config().rpc_timeout() {
Some(duration) => {
let start = std::time::Instant::now();
match tokio::time::timeout(duration, self.inner.call(cx, req)).await {
Ok(r) => r.map_err(Into::into),
Err(_) => {
let msg = format!(
"[VOLO] thrift rpc call timeout, rpcinfo: {:?}, elpased: {:?}, \
timeout config: {:?}",
cx.rpc_info,
start.elapsed(),
duration
);
warn!(msg);
Err(crate::Error::Transport(
std::io::Error::new(std::io::ErrorKind::TimedOut, msg).into(),
))
}
}
None => self.inner.call(cx, req).await.map_err(Into::into),
}
} else {
unreachable!("rpc_info.config should never be None")
None => self.inner.call(cx, req).await.map_err(Into::into),
}
}
}
Expand Down
12 changes: 4 additions & 8 deletions volo-thrift/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,20 +622,16 @@ impl<S> Client<S> {
// reset rpc_info
cx.rpc_info_mut()
.caller_mut()
.unwrap()
.set_service_name(self.inner.caller_name.clone());
cx.rpc_info_mut()
.callee_mut()
.unwrap()
.set_service_name(self.inner.callee_name.clone());
if let Some(target) = &self.inner.address {
cx.rpc_info_mut()
.callee_mut()
.unwrap()
.set_address(target.clone());
cx.rpc_info_mut().callee_mut().set_address(target.clone());
}
cx.rpc_info_mut().config = Some(self.inner.config);
cx.rpc_info_mut().method = Some(FastStr::from_static_str(method));
cx.rpc_info_mut().set_config(self.inner.config);
cx.rpc_info_mut()
.set_method(FastStr::from_static_str(method));
Some(cx)
})
.unwrap_or_else(|| {
Expand Down
Loading
Loading