diff --git a/Cargo.lock b/Cargo.lock index 5ae003da..64490c0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,30 +76,30 @@ checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" [[package]] name = "anstyle-parse" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317b9a89c1868f5ea6ff1d9539a69f45dffc21ce321ac1fd1160dfa48c8e2140" +checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" +checksum = "a3a318f1f38d2418400f8209655bfd825785afd25aa30bb7ba6cc792e4596748" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.1" +version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" dependencies = [ "anstyle", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -274,9 +274,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.10" +version = "4.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41fffed7514f420abec6d183b1d3acfd9099c79c3a10a06ade4f8203f1411272" +checksum = "bfaff671f6b22ca62406885ece523383b9b64022e341e53e009a62ebc47a45f2" dependencies = [ "clap_builder", "clap_derive", @@ -284,9 +284,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.9" +version = "4.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63361bae7eef3771745f02d8d892bec2fee5f6e34af316ba556e7f97a7069ff1" +checksum = "a216b506622bb1d316cd51328dce24e07bdff4a6128a47c7e7fad11878d5adbb" dependencies = [ "anstream", "anstyle", @@ -1448,9 +1448,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "openssl" -version = "0.10.60" +version = "0.10.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79a4c6c3a2b158f7f8f2a2fc5a969fa3a068df6fc9dbb4a43845436e3af7c800" +checksum = "6b8419dc8cc6d866deb801274bba2e6f8f6108c1bb7fcc10ee5ab864931dbb45" dependencies = [ "bitflags 2.4.1", "cfg-if", @@ -1480,9 +1480,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.96" +version = "0.9.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3812c071ba60da8b5677cc12bcb1d42989a65553772897a7e0355545a819838f" +checksum = "c3eaad34cdd97d81de97964fc7f29e2d104f483840d906ef56daa1912338460b" dependencies = [ "cc", "libc", @@ -2961,7 +2961,7 @@ version = "0.8.0" [[package]] name = "volo-thrift" -version = "0.8.3" +version = "0.8.4" dependencies = [ "anyhow", "bytes", @@ -3286,9 +3286,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.19" +version = "0.5.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "829846f3e3db426d4cee4510841b71a8e58aa2a76b1132579487ae430ccd9c7b" +checksum = "0383266b19108dfc6314a56047aa545a1b4d1be60e799b4dbdd407b56402704b" dependencies = [ "memchr", ] diff --git a/volo-thrift/Cargo.toml b/volo-thrift/Cargo.toml index 3950b685..6ab02706 100644 --- a/volo-thrift/Cargo.toml +++ b/volo-thrift/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volo-thrift" -version = "0.8.3" +version = "0.8.4" edition.workspace = true homepage.workspace = true repository.workspace = true diff --git a/volo-thrift/src/transport/multiplex/thrift_transport.rs b/volo-thrift/src/transport/multiplex/thrift_transport.rs index 22fdefdd..0564dc03 100644 --- a/volo-thrift/src/transport/multiplex/thrift_transport.rs +++ b/volo-thrift/src/transport/multiplex/thrift_transport.rs @@ -32,6 +32,7 @@ lazy_static::lazy_static! { #[pin_project] pub struct ThriftTransport { write_half: Arc>>, + dirty: Arc, #[allow(clippy::type_complexity)] tx_map: Arc< Mutex< @@ -54,6 +55,7 @@ impl Clone for ThriftTransport { fn clone(&self) -> Self { Self { write_half: self.write_half.clone(), + dirty: self.dirty.clone(), tx_map: self.tx_map.clone(), write_error: self.write_error.clone(), read_error: self.read_error.clone(), @@ -176,6 +178,7 @@ where }); Self { write_half: Arc::new(Mutex::new(write_half)), + dirty: Arc::new(AtomicBool::new(false)), tx_map, write_error, read_error, @@ -195,8 +198,6 @@ where msg: ThriftMessage, oneway: bool, ) -> Result>, Error> { - let (tx, rx) = oneshot::channel(); - let mut tx_map = self.tx_map.lock().await; // check error and closed if self.read_error.load(std::sync::atomic::Ordering::Relaxed) { return Err(Error::Application(ApplicationError::new( @@ -210,12 +211,31 @@ where "multiplex connection closed".to_string(), ))); } + let (tx, rx) = oneshot::channel(); + let mut tx_map = self.tx_map.lock().await; let seq_id = msg.meta.seq_id; if !oneway { tx_map.insert(seq_id, tx); } drop(tx_map); - if let Err(e) = self.write_half.lock().await.send(cx, msg).await { + let mut wh = self.write_half.lock().await; + // check connection dirty + if self.dirty.load(std::sync::atomic::Ordering::Relaxed) { + // connection is dirty, we should also set write error to indicate the connection should + // not be reused + self.write_error + .store(true, std::sync::atomic::Ordering::Relaxed); + return Err(Error::Application(ApplicationError::new( + ApplicationErrorKind::UNKNOWN, + "multiplex connection is dirty".to_string(), + ))); + } + self.dirty.store(true, std::sync::atomic::Ordering::Relaxed); + let res = wh.send(cx, msg).await; + self.dirty + .store(false, std::sync::atomic::Ordering::Relaxed); + drop(wh); + if let Err(e) = res { self.write_error .store(true, std::sync::atomic::Ordering::Relaxed); if !oneway {