Skip to content

Commit

Permalink
feat: add transport tag
Browse files Browse the repository at this point in the history
  • Loading branch information
Millione committed Aug 8, 2024
1 parent 74ccd8f commit 28c5612
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 5 deletions.
12 changes: 10 additions & 2 deletions volo-thrift/src/transport/pingpong/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@ use std::{io, marker::PhantomData};

use motore::service::{Service, UnaryService};
use pilota::thrift::TransportException;
use volo::net::{conn::ConnExt, dial::MakeTransport, Address};
use volo::{
net::{conn::ConnExt, dial::MakeTransport, shm::TransportEndpoint, Address},
FastStr,
};

use crate::{
codec::MakeCodec,
context::ClientContext,
protocol::TMessageType,
transport::{
pingpong::thrift_transport::ThriftTransport,
pool::{Config, PooledMakeTransport, Ver},
pool::{Config, PooledMakeTransport, Transport, Ver},
},
EntryMessage, ThriftMessage,
};
Expand Down Expand Up @@ -128,6 +131,11 @@ where
.call((target, shmipc_target.clone(), Ver::PingPong))
.await?;
cx.stats.record_make_transport_end_at();
if let Transport::UnPooled(_) = transport {
cx.rpc_info
.caller_mut()
.set_transport(volo::net::shm::Transport(FastStr::new("shmipc")))
}
let resp = transport.send(cx, req, oneway).await;
if let Ok(None) = resp {
if !oneway {
Expand Down
18 changes: 15 additions & 3 deletions volo-thrift/src/transport/pingpong/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ use pilota::thrift::ThriftException;
use tokio::sync::futures::Notified;
use tracing::*;
use volo::{
net::{shm::ShmExt, Address},
volo_unreachable,
net::{
shm::{ShmExt, TransportEndpoint},
Address,
},
volo_unreachable, FastStr,
};

use crate::{
Expand Down Expand Up @@ -53,7 +56,16 @@ pub async fn serve<Svc, Req, Resp, E, D, SP>(
cache.pop().unwrap_or_default()
});
if let Some(peer_addr) = &peer_addr {
cx.rpc_info.caller_mut().set_address(peer_addr.clone());
if stream.is_none() {
cx.rpc_info.caller_mut().set_address(peer_addr.clone());
} else {
cx.rpc_info
.caller_mut()
.set_transport(volo::net::shm::Transport(FastStr::new("shmipc")));
cx.rpc_info
.caller_mut()
.set_shmipc_address(peer_addr.clone());
}
}

let msg = tokio::select! {
Expand Down
32 changes: 32 additions & 0 deletions volo/src/net/shm.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,35 @@
use crate::context::Endpoint;

crate::new_type! {
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
pub struct Transport(pub faststr::FastStr);
}

pub trait TransportEndpoint {
fn get_transport(&self) -> Option<Transport>;
fn has_transport(&self) -> bool;
fn set_transport(&mut self, transport: Transport);
}

impl TransportEndpoint for Endpoint {
#[inline]
fn get_transport(&self) -> Option<Transport> {
self.get_faststr::<Transport>()
.cloned()
.map(Transport::from)
}

#[inline]
fn has_transport(&self) -> bool {
self.contains_faststr::<Transport>()
}

#[inline]
fn set_transport(&mut self, transport: Transport) {
self.insert_faststr::<Transport>(transport.0);
}
}

#[async_trait::async_trait]
pub trait ShmExt: Send + Sync {
fn release_read_and_reuse(&self) {}
Expand Down

0 comments on commit 28c5612

Please sign in to comment.