Skip to content

Commit

Permalink
example: make async and sync don't conflict with each other
Browse files Browse the repository at this point in the history
Modify build.rs to generate async codes to asynchronous folder
and generate sync codes to sync folder.

Signed-off-by: Tim Zhang <[email protected]>
  • Loading branch information
Tim-Zhang committed May 21, 2020
1 parent 4779829 commit a3ea1a4
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 65 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ Cargo.lock
.vscode
.idea
*.o
example/protocols/*.rs
!example/protocols/mod.rs
example/protocols/**/*.rs
!example/protocols/**/mod.rs
13 changes: 7 additions & 6 deletions example/async-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
mod protocols;

use nix::sys::socket::*;
use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc};
use tokio;
use ttrpc::r#async::Client;

Expand All @@ -26,16 +27,16 @@ async fn main() {
connect(fd, &sockaddr).unwrap();

let c = Client::new(fd);
let mut hc = protocols::health_ttrpc::HealthClient::new(c.clone());
let mut ac = protocols::agent_ttrpc::AgentServiceClient::new(c);
let mut hc = health_ttrpc::HealthClient::new(c.clone());
let mut ac = agent_ttrpc::AgentServiceClient::new(c);

let mut thc = hc.clone();
let mut tac = ac.clone();

let now = std::time::Instant::now();

let t1 = tokio::spawn(async move {
let req = protocols::health::CheckRequest::new();
let req = health::CheckRequest::new();
println!(
"Green Thread 1 - {} started: {:?}",
"health.check()",
Expand All @@ -57,7 +58,7 @@ async fn main() {
);

let show = match tac
.list_interfaces(&protocols::agent::ListInterfacesRequest::new(), 0)
.list_interfaces(&agent::ListInterfacesRequest::new(), 0)
.await
{
Err(e) => format!("{:?}", e),
Expand All @@ -80,7 +81,7 @@ async fn main() {
);

let show = match ac
.online_cpu_mem(&protocols::agent::OnlineCPUMemRequest::new(), 0)
.online_cpu_mem(&agent::OnlineCPUMemRequest::new(), 0)
.await
{
Err(e) => format!("{:?}", e),
Expand All @@ -101,7 +102,7 @@ async fn main() {
println!(
"Green Thread 3 - {} -> {:?} ended: {:?}",
"health.version()",
hc.version(&protocols::health::CheckRequest::new(), 0).await,
hc.version(&health::CheckRequest::new(), 0).await,
now.elapsed()
);
});
Expand Down
34 changes: 17 additions & 17 deletions example/async-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::sync::Arc;

use log::LevelFilter;

use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc, types};
use ttrpc::asynchronous::server::*;
use ttrpc::error::{Error, Result};
use ttrpc::ttrpc::{Code, Status};
Expand All @@ -23,12 +24,12 @@ use tokio::signal::unix::{signal, SignalKind};
struct HealthService;

#[async_trait]
impl protocols::health_ttrpc::Health for HealthService {
impl health_ttrpc::Health for HealthService {
async fn check(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
_req: protocols::health::CheckRequest,
) -> Result<protocols::health::HealthCheckResponse> {
_req: health::CheckRequest,
) -> Result<health::HealthCheckResponse> {
let mut status = Status::new();

status.set_code(Code::NOT_FOUND);
Expand All @@ -42,10 +43,10 @@ impl protocols::health_ttrpc::Health for HealthService {
async fn version(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
req: protocols::health::CheckRequest,
) -> Result<protocols::health::VersionCheckResponse> {
req: health::CheckRequest,
) -> Result<health::VersionCheckResponse> {
info!("version {:?}", req);
let mut rep = protocols::health::VersionCheckResponse::new();
let mut rep = health::VersionCheckResponse::new();
rep.agent_version = "mock.0.1".to_string();
rep.grpc_version = "0.0.1".to_string();
let mut status = Status::new();
Expand All @@ -57,22 +58,22 @@ impl protocols::health_ttrpc::Health for HealthService {
struct AgentService;

#[async_trait]
impl protocols::agent_ttrpc::AgentService for AgentService {
impl agent_ttrpc::AgentService for AgentService {
async fn list_interfaces(
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
_req: protocols::agent::ListInterfacesRequest,
) -> ::ttrpc::Result<protocols::agent::Interfaces> {
_req: agent::ListInterfacesRequest,
) -> ::ttrpc::Result<agent::Interfaces> {
let mut rp = protobuf::RepeatedField::new();

let mut i = protocols::types::Interface::new();
let mut i = types::Interface::new();
i.set_name("first".to_string());
rp.push(i);
let mut i = protocols::types::Interface::new();
let mut i = types::Interface::new();
i.set_name("second".to_string());
rp.push(i);

let mut i = protocols::agent::Interfaces::new();
let mut i = agent::Interfaces::new();
i.set_Interfaces(rp);

Ok(i)
Expand All @@ -83,14 +84,13 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
async fn main() {
simple_logging::log_to_stderr(LevelFilter::Trace);

let h = Box::new(HealthService {}) as Box<dyn protocols::health_ttrpc::Health + Send + Sync>;
let h = Box::new(HealthService {}) as Box<dyn health_ttrpc::Health + Send + Sync>;
let h = Arc::new(h);
let hservice = protocols::health_ttrpc::create_health(h);
let hservice = health_ttrpc::create_health(h);

let a =
Box::new(AgentService {}) as Box<dyn protocols::agent_ttrpc::AgentService + Send + Sync>;
let a = Box::new(AgentService {}) as Box<dyn agent_ttrpc::AgentService + Send + Sync>;
let a = Arc::new(a);
let aservice = protocols::agent_ttrpc::create_agent_service(a);
let aservice = agent_ttrpc::create_agent_service(a);

let server = Server::new()
.bind("unix:///tmp/1")
Expand Down
24 changes: 21 additions & 3 deletions example/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,18 @@ fn main() {
.for_each(|p| println!("cargo:rerun-if-changed={}", &p));

protoc_rust_ttrpc::Codegen::new()
.out_dir("protocols")
.out_dir("protocols/sync")
.inputs(&protos)
.include("protocols/protos")
.rust_protobuf()
.customize(Customize {
..Default::default()
})
.run()
.expect("Gen sync codes failed.");

protoc_rust_ttrpc::Codegen::new()
.out_dir("protocols/asynchronous")
.inputs(&protos)
.include("protocols/protos")
.rust_protobuf()
Expand All @@ -31,13 +42,20 @@ fn main() {
..Default::default()
})
.run()
.expect("Codegen failed.");
.expect("Gen async codes failed.");

// There is a message named 'Box' in oci.proto
// so there is a struct named 'Box', we should replace Box<Self> to ::std::boxed::Box<Self>
// to avoid the conflict.
replace_text_in_file(
"protocols/oci.rs",
"protocols/sync/oci.rs",
"self: Box<Self>",
"self: ::std::boxed::Box<Self>",
)
.unwrap();

replace_text_in_file(
"protocols/asynchronous/oci.rs",
"self: Box<Self>",
"self: ::std::boxed::Box<Self>",
)
Expand Down
13 changes: 7 additions & 6 deletions example/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod protocols;

use nix::sys::socket::*;
use protocols::sync::{agent, agent_ttrpc, health, health_ttrpc};
use std::thread;
use ttrpc::client::Client;

Expand All @@ -34,16 +35,16 @@ fn main() {
connect(fd, &sockaddr).unwrap();

let c = Client::new(fd);
let hc = protocols::health_ttrpc::HealthClient::new(c.clone());
let ac = protocols::agent_ttrpc::AgentServiceClient::new(c);
let hc = health_ttrpc::HealthClient::new(c.clone());
let ac = agent_ttrpc::AgentServiceClient::new(c);

let thc = hc.clone();
let tac = ac.clone();

let now = std::time::Instant::now();

let t = thread::spawn(move || {
let req = protocols::health::CheckRequest::new();
let req = health::CheckRequest::new();
println!(
"OS Thread {:?} - {} started: {:?}",
std::thread::current().id(),
Expand All @@ -67,7 +68,7 @@ fn main() {
now.elapsed(),
);

let show = match tac.list_interfaces(&protocols::agent::ListInterfacesRequest::new(), 0) {
let show = match tac.list_interfaces(&agent::ListInterfacesRequest::new(), 0) {
Err(e) => format!("{:?}", e),
Ok(s) => format!("{:?}", s),
};
Expand All @@ -86,7 +87,7 @@ fn main() {
"agent.online_cpu_mem()",
now.elapsed()
);
let show = match ac.online_cpu_mem(&protocols::agent::OnlineCPUMemRequest::new(), 0) {
let show = match ac.online_cpu_mem(&agent::OnlineCPUMemRequest::new(), 0) {
Err(e) => format!("{:?}", e),
Ok(s) => format!("{:?}", s),
};
Expand All @@ -107,7 +108,7 @@ fn main() {
println!(
"Main OS Thread - {} -> {:?} ended: {:?}",
"health.version()",
hc.version(&protocols::health::CheckRequest::new(), 0),
hc.version(&health::CheckRequest::new(), 0),
now.elapsed()
);

Expand Down
12 changes: 12 additions & 0 deletions example/protocols/asynchronous/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//

pub mod agent;
pub mod agent_ttrpc;
pub mod empty;
pub mod health;
pub mod health_ttrpc;
mod oci;
pub mod types;
21 changes: 7 additions & 14 deletions example/protocols/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
pub mod agent;
pub mod agent_ttrpc;
pub mod health;
pub mod health_ttrpc;
mod oci;
pub mod types;
pub mod empty;
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//

#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}
}
pub mod asynchronous;
pub mod sync;
pub use asynchronous as r#async;
12 changes: 12 additions & 0 deletions example/protocols/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//

pub mod agent;
pub mod agent_ttrpc;
pub mod empty;
pub mod health;
pub mod health_ttrpc;
mod oci;
pub mod types;
34 changes: 17 additions & 17 deletions example/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ use log::LevelFilter;
use std::sync::Arc;
use std::thread;

use protocols::sync::{agent, agent_ttrpc, health, health_ttrpc, types};
use ttrpc::error::{Error, Result};
use ttrpc::server::*;
use ttrpc::ttrpc::{Code, Status};

struct HealthService;
impl protocols::health_ttrpc::Health for HealthService {
impl health_ttrpc::Health for HealthService {
fn check(
&self,
_ctx: &::ttrpc::TtrpcContext,
_req: protocols::health::CheckRequest,
) -> Result<protocols::health::HealthCheckResponse> {
_req: health::CheckRequest,
) -> Result<health::HealthCheckResponse> {
let mut status = Status::new();
status.set_code(Code::NOT_FOUND);
status.set_message("Just for fun".to_string());
Expand All @@ -40,10 +41,10 @@ impl protocols::health_ttrpc::Health for HealthService {
fn version(
&self,
_ctx: &::ttrpc::TtrpcContext,
req: protocols::health::CheckRequest,
) -> Result<protocols::health::VersionCheckResponse> {
req: health::CheckRequest,
) -> Result<health::VersionCheckResponse> {
info!("version {:?}", req);
let mut rep = protocols::health::VersionCheckResponse::new();
let mut rep = health::VersionCheckResponse::new();
rep.agent_version = "mock.0.1".to_string();
rep.grpc_version = "0.0.1".to_string();
let mut status = Status::new();
Expand All @@ -53,22 +54,22 @@ impl protocols::health_ttrpc::Health for HealthService {
}

struct AgentService;
impl protocols::agent_ttrpc::AgentService for AgentService {
impl agent_ttrpc::AgentService for AgentService {
fn list_interfaces(
&self,
_ctx: &::ttrpc::TtrpcContext,
_req: protocols::agent::ListInterfacesRequest,
) -> ::ttrpc::Result<protocols::agent::Interfaces> {
_req: agent::ListInterfacesRequest,
) -> ::ttrpc::Result<agent::Interfaces> {
let mut rp = protobuf::RepeatedField::new();

let mut i = protocols::types::Interface::new();
let mut i = types::Interface::new();
i.set_name("first".to_string());
rp.push(i);
let mut i = protocols::types::Interface::new();
let mut i = types::Interface::new();
i.set_name("second".to_string());
rp.push(i);

let mut i = protocols::agent::Interfaces::new();
let mut i = agent::Interfaces::new();
i.set_Interfaces(rp);

Ok(i)
Expand All @@ -79,14 +80,13 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
fn main() {
simple_logging::log_to_stderr(LevelFilter::Trace);

let h = Box::new(HealthService {}) as Box<dyn protocols::health_ttrpc::Health + Send + Sync>;
let h = Box::new(HealthService {}) as Box<dyn health_ttrpc::Health + Send + Sync>;
let h = Arc::new(h);
let hservice = protocols::health_ttrpc::create_health(h);
let hservice = health_ttrpc::create_health(h);

let a =
Box::new(AgentService {}) as Box<dyn protocols::agent_ttrpc::AgentService + Send + Sync>;
let a = Box::new(AgentService {}) as Box<dyn agent_ttrpc::AgentService + Send + Sync>;
let a = Arc::new(a);
let aservice = protocols::agent_ttrpc::create_agent_service(a);
let aservice = agent_ttrpc::create_agent_service(a);

let mut server = Server::new()
.bind("unix:///tmp/1")
Expand Down

0 comments on commit a3ea1a4

Please sign in to comment.