Skip to content

Commit

Permalink
fix(volo-thrift): multiplex client erroneous data (#274)
Browse files Browse the repository at this point in the history
fix(volo-thrift): multiplex client errornous data
  • Loading branch information
PureWhiteWu committed Dec 5, 2023
1 parent 5b2ffdc commit d1c8b4c
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 23 deletions.
38 changes: 19 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion volo-thrift/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "volo-thrift"
version = "0.8.3"
version = "0.8.4"
edition.workspace = true
homepage.workspace = true
repository.workspace = true
Expand Down
26 changes: 23 additions & 3 deletions volo-thrift/src/transport/multiplex/thrift_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ lazy_static::lazy_static! {
#[pin_project]
pub struct ThriftTransport<E, Resp> {
write_half: Arc<Mutex<WriteHalf<E>>>,
dirty: Arc<AtomicBool>,
#[allow(clippy::type_complexity)]
tx_map: Arc<
Mutex<
Expand All @@ -54,6 +55,7 @@ impl<E, Resp> Clone for ThriftTransport<E, Resp> {
fn clone(&self) -> Self {
Self {
write_half: self.write_half.clone(),
dirty: self.dirty.clone(),
tx_map: self.tx_map.clone(),
write_error: self.write_error.clone(),
read_error: self.read_error.clone(),
Expand Down Expand Up @@ -176,6 +178,7 @@ where
});
Self {
write_half: Arc::new(Mutex::new(write_half)),
dirty: Arc::new(AtomicBool::new(false)),
tx_map,
write_error,
read_error,
Expand All @@ -195,8 +198,6 @@ where
msg: ThriftMessage<Req>,
oneway: bool,
) -> Result<Option<ThriftMessage<Resp>>, Error> {
let (tx, rx) = oneshot::channel();
let mut tx_map = self.tx_map.lock().await;
// check error and closed
if self.read_error.load(std::sync::atomic::Ordering::Relaxed) {
return Err(Error::Application(ApplicationError::new(
Expand All @@ -210,12 +211,31 @@ where
"multiplex connection closed".to_string(),
)));
}
let (tx, rx) = oneshot::channel();
let mut tx_map = self.tx_map.lock().await;
let seq_id = msg.meta.seq_id;
if !oneway {
tx_map.insert(seq_id, tx);
}
drop(tx_map);
if let Err(e) = self.write_half.lock().await.send(cx, msg).await {
let mut wh = self.write_half.lock().await;
// check connection dirty
if self.dirty.load(std::sync::atomic::Ordering::Relaxed) {
// connection is dirty, we should also set write error to indicate the connection should
// not be reused
self.write_error
.store(true, std::sync::atomic::Ordering::Relaxed);
return Err(Error::Application(ApplicationError::new(
ApplicationErrorKind::UNKNOWN,
"multiplex connection is dirty".to_string(),
)));
}
self.dirty.store(true, std::sync::atomic::Ordering::Relaxed);
let res = wh.send(cx, msg).await;
self.dirty
.store(false, std::sync::atomic::Ordering::Relaxed);
drop(wh);
if let Err(e) = res {
self.write_error
.store(true, std::sync::atomic::Ordering::Relaxed);
if !oneway {
Expand Down

0 comments on commit d1c8b4c

Please sign in to comment.