diff --git a/volo-thrift/src/transport/pingpong/client.rs b/volo-thrift/src/transport/pingpong/client.rs index a5f7cb55..78758fb7 100644 --- a/volo-thrift/src/transport/pingpong/client.rs +++ b/volo-thrift/src/transport/pingpong/client.rs @@ -2,7 +2,10 @@ use std::{io, marker::PhantomData}; use motore::service::{Service, UnaryService}; use pilota::thrift::TransportException; -use volo::net::{conn::ConnExt, dial::MakeTransport, Address}; +use volo::{ + net::{conn::ConnExt, dial::MakeTransport, shm::TransportEndpoint, Address}, + FastStr, +}; use crate::{ codec::MakeCodec, @@ -10,7 +13,7 @@ use crate::{ protocol::TMessageType, transport::{ pingpong::thrift_transport::ThriftTransport, - pool::{Config, PooledMakeTransport, Ver}, + pool::{Config, PooledMakeTransport, Transport, Ver}, }, EntryMessage, ThriftMessage, }; @@ -128,6 +131,11 @@ where .call((target, shmipc_target.clone(), Ver::PingPong)) .await?; cx.stats.record_make_transport_end_at(); + if let Transport::UnPooled(_) = transport { + cx.rpc_info + .caller_mut() + .set_transport(volo::net::shm::Transport(FastStr::new("shmipc"))) + } let resp = transport.send(cx, req, oneway).await; if let Ok(None) = resp { if !oneway { diff --git a/volo-thrift/src/transport/pingpong/server.rs b/volo-thrift/src/transport/pingpong/server.rs index a99e15d4..ad8e8fc1 100644 --- a/volo-thrift/src/transport/pingpong/server.rs +++ b/volo-thrift/src/transport/pingpong/server.rs @@ -9,8 +9,11 @@ use pilota::thrift::ThriftException; use tokio::sync::futures::Notified; use tracing::*; use volo::{ - net::{shm::ShmExt, Address}, - volo_unreachable, + net::{ + shm::{ShmExt, TransportEndpoint}, + Address, + }, + volo_unreachable, FastStr, }; use crate::{ @@ -53,7 +56,16 @@ pub async fn serve( cache.pop().unwrap_or_default() }); if let Some(peer_addr) = &peer_addr { - cx.rpc_info.caller_mut().set_address(peer_addr.clone()); + if stream.is_none() { + cx.rpc_info.caller_mut().set_address(peer_addr.clone()); + } else { + cx.rpc_info + .caller_mut() + .set_transport(volo::net::shm::Transport(FastStr::new("shmipc"))); + cx.rpc_info + .caller_mut() + .set_shmipc_address(peer_addr.clone()); + } } let msg = tokio::select! { diff --git a/volo/src/net/shm.rs b/volo/src/net/shm.rs index 91e4ffe8..61c7b65f 100644 --- a/volo/src/net/shm.rs +++ b/volo/src/net/shm.rs @@ -1,3 +1,35 @@ +use crate::context::Endpoint; + +crate::new_type! { + #[derive(Debug, Hash, PartialEq, Eq, Clone)] + pub struct Transport(pub faststr::FastStr); +} + +pub trait TransportEndpoint { + fn get_transport(&self) -> Option; + fn has_transport(&self) -> bool; + fn set_transport(&mut self, transport: Transport); +} + +impl TransportEndpoint for Endpoint { + #[inline] + fn get_transport(&self) -> Option { + self.get_faststr::() + .cloned() + .map(Transport::from) + } + + #[inline] + fn has_transport(&self) -> bool { + self.contains_faststr::() + } + + #[inline] + fn set_transport(&mut self, transport: Transport) { + self.insert_faststr::(transport.0); + } +} + #[async_trait::async_trait] pub trait ShmExt: Send + Sync { fn release_read_and_reuse(&self) {}