Skip to content

Commit

Permalink
Add async client support
Browse files Browse the repository at this point in the history
Add ttrpc::r#async::Client
Update compiler to support async/await on client side
Add example async-client
Move client.rs/server.rs to sync folder

Signed-off-by: Tim Zhang <[email protected]>
  • Loading branch information
Tim-Zhang committed May 21, 2020
1 parent 480500b commit 4779829
Show file tree
Hide file tree
Showing 13 changed files with 452 additions and 57 deletions.
78 changes: 52 additions & 26 deletions compiler/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use std::io::{self, Write};
use std::path::Path;

use super::util::{
self, async_on, def_async_fn, fq_grpc, to_camel_case, to_snake_case, MethodType,
self, async_on, def_async_fn, fq_grpc, pub_async_fn, to_camel_case, to_snake_case, MethodType,
};

struct MethodGen<'a> {
Expand Down Expand Up @@ -220,36 +220,12 @@ impl<'a> MethodGen<'a> {
)
}

fn unary_opt(&self, method_name: &str) -> String {
format!(
"{}_opt(&self, req: &{}, opt: {}) -> {}<{}>",
method_name,
self.input(),
fq_grpc("CallOption"),
fq_grpc("Result"),
self.output()
)
}

fn unary_async(&self, method_name: &str) -> String {
format!(
"{}_async(&self, req: &{}) -> {}<{}<{}>>",
method_name,
self.input(),
fq_grpc("Result"),
fq_grpc("ClientUnaryReceiver"),
self.output()
)
}

fn unary_async_opt(&self, method_name: &str) -> String {
format!(
"{}_async_opt(&self, req: &{}, opt: {}) -> {}<{}<{}>>",
"{}(&mut self, req: &{}, timeout_nano: i64) -> {}<{}>",
method_name,
self.input(),
fq_grpc("CallOption"),
fq_grpc("Result"),
fq_grpc("ClientUnaryReceiver"),
self.output()
)
}
Expand Down Expand Up @@ -351,6 +327,26 @@ impl<'a> MethodGen<'a> {
};
}

fn write_async_client(&self, w: &mut CodeWriter) {
let method_name = self.name();
match self.method_type().0 {
// Unary
MethodType::Unary => {
pub_async_fn(w, &self.unary_async(&method_name), |w| {
w.write_line(&format!("let mut cres = {}::new();", self.output()));
w.write_line(&format!(
"::ttrpc::async_client_request!(self, req, timeout_nano, \"{}.{}\", \"{}\", cres);",
self.package_name,
self.service_name,
&self.proto.get_name(),
));
});
}

_ => {}
};
}

fn write_service(&self, w: &mut CodeWriter) {
let req_stream_type = format!("{}<{}>", fq_grpc("RequestStream"), self.input());
let (req, req_type, _resp_type) = match self.method_type().0 {
Expand Down Expand Up @@ -454,6 +450,14 @@ impl<'a> ServiceGen<'a> {
}

fn write_client(&self, w: &mut CodeWriter) {
if async_on(self.customize, "client") {
self.write_async_client(w)
} else {
self.write_sync_client(w)
}
}

fn write_sync_client(&self, w: &mut CodeWriter) {
w.write_line("#[derive(Clone)]");
w.pub_struct(&self.client_name(), |w| {
w.field_decl("client", "::ttrpc::Client");
Expand All @@ -475,6 +479,28 @@ impl<'a> ServiceGen<'a> {
});
}

fn write_async_client(&self, w: &mut CodeWriter) {
w.write_line("#[derive(Clone)]");
w.pub_struct(&self.client_name(), |w| {
w.field_decl("client", "::ttrpc::r#async::Client");
});

w.write_line("");

w.impl_self_block(&self.client_name(), |w| {
w.pub_fn("new(client: ::ttrpc::r#async::Client) -> Self", |w| {
w.expr_block(&self.client_name(), |w| {
w.field_entry("client", "client");
});
});

for method in &self.methods {
w.write_line("");
method.write_async_client(w);
}
});
}

fn write_server(&self, w: &mut CodeWriter) {
let mut trait_name = self.service_name();
let mut method_handler_name = "::ttrpc::MethodHandler";
Expand Down
7 changes: 7 additions & 0 deletions compiler/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ where
}
}

pub fn pub_async_fn<F>(w: &mut CodeWriter, sig: &str, cb: F)
where
F: Fn(&mut CodeWriter),
{
async_fn_block(w, true, sig, cb);
}

pub fn def_async_fn<F>(w: &mut CodeWriter, sig: &str, cb: F)
where
F: Fn(&mut CodeWriter),
Expand Down
4 changes: 4 additions & 0 deletions example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ path = "./server.rs"
name = "async-server"
path = "./async-server.rs"

[[example]]
name = "async-client"
path = "./async-client.rs"

[build-dependencies]
protoc-rust = "2.8.0"
cmd_lib = "0.7.8"
Expand Down
110 changes: 110 additions & 0 deletions example/async-client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//

mod protocols;

use nix::sys::socket::*;
use tokio;
use ttrpc::r#async::Client;

#[tokio::main(core_threads = 1)]
async fn main() {
let path = "/tmp/1";

let fd = socket(
AddressFamily::Unix,
SockType::Stream,
SockFlag::empty(),
None,
)
.unwrap();
let sockaddr = path.to_owned() + &"\x00".to_string();
let sockaddr = UnixAddr::new_abstract(sockaddr.as_bytes()).unwrap();
let sockaddr = SockAddr::Unix(sockaddr);
connect(fd, &sockaddr).unwrap();

let c = Client::new(fd);
let mut hc = protocols::health_ttrpc::HealthClient::new(c.clone());
let mut ac = protocols::agent_ttrpc::AgentServiceClient::new(c);

let mut thc = hc.clone();
let mut tac = ac.clone();

let now = std::time::Instant::now();

let t1 = tokio::spawn(async move {
let req = protocols::health::CheckRequest::new();
println!(
"Green Thread 1 - {} started: {:?}",
"health.check()",
now.elapsed(),
);
println!(
"Green Thread 1 - {} -> {:?} ended: {:?}",
"health.check()",
thc.check(&req, 0).await,
now.elapsed(),
);
});

let t2 = tokio::spawn(async move {
println!(
"Green Thread 2 - {} started: {:?}",
"agent.list_interfaces()",
now.elapsed(),
);

let show = match tac
.list_interfaces(&protocols::agent::ListInterfacesRequest::new(), 0)
.await
{
Err(e) => format!("{:?}", e),
Ok(s) => format!("{:?}", s),
};

println!(
"Green Thread 2 - {} -> {} ended: {:?}",
"agent.list_interfaces()",
show,
now.elapsed(),
);
});

let t3 = tokio::spawn(async move {
println!(
"Green Thread 3 - {} started: {:?}",
"agent.online_cpu_mem()",
now.elapsed()
);

let show = match ac
.online_cpu_mem(&protocols::agent::OnlineCPUMemRequest::new(), 0)
.await
{
Err(e) => format!("{:?}", e),
Ok(s) => format!("{:?}", s),
};
println!(
"Green Thread 3 - {} -> {} ended: {:?}",
"agent.online_cpu_mem()",
show,
now.elapsed()
);

println!(
"Green Thread 3 - {} started: {:?}",
"health.version()",
now.elapsed()
);
println!(
"Green Thread 3 - {} -> {:?} ended: {:?}",
"health.version()",
hc.version(&protocols::health::CheckRequest::new(), 0).await,
now.elapsed()
);
});

let _ = tokio::join!(t1, t2, t3);
}
2 changes: 1 addition & 1 deletion example/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn main() {
.include("protocols/protos")
.rust_protobuf()
.customize(Customize {
async_server: true,
async_all: true,
..Default::default()
})
.run()
Expand Down
4 changes: 2 additions & 2 deletions example/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ fn main() {
thread::sleep(std::time::Duration::from_secs(2));
println!(
"Main OS Thread - {} started: {:?}",
"agent.online_cpu_mem()",
"health.version()",
now.elapsed()
);
println!(
"Main OS Thread - {} -> {:?} ended: {:?}",
"agent.online_cpu_mem()",
"health.version()",
hc.version(&protocols::health::CheckRequest::new(), 0),
now.elapsed()
);
Expand Down
Loading

0 comments on commit 4779829

Please sign in to comment.