Skip to content

Commit

Permalink
fix: AddrStream::pool_shutdown was not called pooled until completion
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Sep 27, 2023
1 parent b4f5913 commit 8e9d322
Showing 1 changed file with 32 additions and 48 deletions.
80 changes: 32 additions & 48 deletions dozer-api/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,38 +33,38 @@ use tower::{Layer, Service};
use crate::shutdown::ShutdownReceiver;

#[derive(Debug)]
enum ShutdownAddrStream<F> {
Alive { inner: AddrStream, shutdown: F },
Shutdown { inner: AddrStream },
Temp,
struct ShutdownAddrStream<F> {
inner: AddrStream,
state: ShutdownState<F>,
}

#[derive(Debug)]
enum ShutdownState<F> {
SignalPending(F),
ShutdownPending,
Done,
}

impl<F: Future<Output = ()> + Unpin> ShutdownAddrStream<F> {
fn check_shutdown(&mut self, cx: &mut Context<'_>) -> Result<(), io::Error> {
loop {
match self {
Self::Alive { shutdown, .. } => {
if let Poll::Ready(()) = Pin::new(shutdown).poll(cx) {
let mut temp = Self::Temp;
std::mem::swap(self, &mut temp);
let Self::Alive { inner, .. } = temp else {
unreachable!()
};
*self = Self::Shutdown { inner };
continue;
} else {
return Ok(());
}
match &mut self.state {
ShutdownState::SignalPending(signal) => {
if let Poll::Ready(()) = Pin::new(signal).poll(cx) {
self.state = ShutdownState::ShutdownPending;
self.check_shutdown(cx)
} else {
Ok(())
}
Self::Shutdown { inner } => {
if let Poll::Ready(Err(e)) = Pin::new(inner).poll_shutdown(cx) {
return Err(e);
} else {
return Ok(());
}
}
Self::Temp => unreachable!(),
}
ShutdownState::ShutdownPending => match Pin::new(&mut self.inner).poll_shutdown(cx) {
Poll::Ready(Ok(())) => {
self.state = ShutdownState::Done;
Ok(())
}
Poll::Ready(Err(e)) => Err(e),
Poll::Pending => Ok(()),
},
ShutdownState::Done => Ok(()),
}
}

Expand All @@ -78,11 +78,7 @@ impl<F: Future<Output = ()> + Unpin> ShutdownAddrStream<F> {
return Poll::Ready(Err(e));
}

match this {
Self::Alive { inner, .. } => func(Pin::new(inner), cx),
Self::Shutdown { inner } => func(Pin::new(inner), cx),
Self::Temp => unreachable!(),
}
func(Pin::new(&mut this.inner), cx)
}
}

Expand All @@ -97,11 +93,7 @@ impl<F: Future<Output = ()> + Unpin> AsyncRead for ShutdownAddrStream<F> {
return Poll::Ready(Err(e));
}

match this {
Self::Alive { inner, .. } => Pin::new(inner).poll_read(cx, buf),
Self::Shutdown { inner } => Pin::new(inner).poll_read(cx, buf),
Self::Temp => unreachable!(),
}
Pin::new(&mut this.inner).poll_read(cx, buf)
}
}

Expand All @@ -116,11 +108,7 @@ impl<F: Future<Output = ()> + Unpin> AsyncWrite for ShutdownAddrStream<F> {
return Poll::Ready(Err(e));
}

match this {
Self::Alive { inner, .. } => Pin::new(inner).poll_write(cx, buf),
Self::Shutdown { inner } => Pin::new(inner).poll_write(cx, buf),
Self::Temp => unreachable!(),
}
Pin::new(&mut this.inner).poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Expand All @@ -136,11 +124,7 @@ impl<F> Connected for ShutdownAddrStream<F> {
type ConnectInfo = TcpConnectInfo;

fn connect_info(&self) -> Self::ConnectInfo {
match self {
Self::Alive { inner, .. } => inner.connect_info(),
Self::Shutdown { inner } => inner.connect_info(),
Self::Temp => unreachable!(),
}
self.inner.connect_info()
}
}

Expand All @@ -160,9 +144,9 @@ where
let incoming = incoming.map(|stream| {
stream.map(|stream| {
let shutdown = shutdown.create_shutdown_future();
ShutdownAddrStream::Alive {
ShutdownAddrStream {
inner: stream,
shutdown: Box::pin(shutdown),
state: ShutdownState::SignalPending(Box::pin(shutdown)),
}
})
});
Expand Down

0 comments on commit 8e9d322

Please sign in to comment.