Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: auto conn infinite loop if conn closed before send full bytes #36

Merged
merged 1 commit into from
Sep 16, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 69 additions & 26 deletions src/server/conn/auto.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
//! Http1 or Http2 connection.

use crate::{common::rewind::Rewind, rt::TokioIo};
use std::future::Future;
use std::io::{Error as IoError, ErrorKind, Result as IoResult};
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use std::{error::Error as StdError, marker::Unpin, time::Duration};

use bytes::Bytes;
use http::{Request, Response};
use http_body::Body;
Expand All @@ -10,8 +16,10 @@ use hyper::{
server::conn::{http1, http2},
service::Service,
};
use std::{error::Error as StdError, marker::Unpin, time::Duration};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

use crate::{common::rewind::Rewind, rt::TokioIo};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

Expand Down Expand Up @@ -57,7 +65,7 @@ impl<E> Builder<E> {
}

/// Bind a connection together with a [`Service`].
pub async fn serve_connection<I, S, B>(&self, mut io: I, service: S) -> Result<()>
pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
where
S: Service<Request<Incoming>, Response = Response<B>> + Send,
S::Future: Send + 'static,
Expand All @@ -68,35 +76,70 @@ impl<E> Builder<E> {
I: AsyncRead + AsyncWrite + Unpin + 'static,
E: Http2ConnExec<S::Future, B>,
{
enum Protocol {
H1,
H2,
let (version, io) = read_version(io).await?;
let io = TokioIo::new(io);
match version {
Version::H1 => self.http1.serve_connection(io, service).await?,
Version::H2 => self.http2.serve_connection(io, service).await?,
}

let mut buf = Vec::new();
Ok(())
}
}
#[derive(Copy, Clone)]
enum Version {
H1,
H2,
}
async fn read_version<'a, A>(mut reader: A) -> IoResult<(Version, Rewind<A>)>
where
A: AsyncRead + Unpin,
{
let mut buf = [0; 24];
let (version, buf) = ReadVersion {
reader: &mut reader,
buf: ReadBuf::new(&mut buf),
version: Version::H1,
_pin: PhantomPinned,
}
.await?;
Ok((version, Rewind::new_buffered(reader, Bytes::from(buf))))
}
pin_project! {
struct ReadVersion<'a, A: ?Sized> {
reader: &'a mut A,
buf: ReadBuf<'a>,
version: Version,
// Make this future `!Unpin` for compatibility with async trait methods.
#[pin]
_pin: PhantomPinned,
}
}

let protocol = loop {
if buf.len() < 24 {
io.read_buf(&mut buf).await?;
impl<A> Future for ReadVersion<'_, A>
where
A: AsyncRead + Unpin + ?Sized,
{
type Output = IoResult<(Version, Vec<u8>)>;

let len = buf.len().min(H2_PREFACE.len());
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<(Version, Vec<u8>)>> {
let this = self.project();

if buf[0..len] != H2_PREFACE[0..len] {
break Protocol::H1;
}
} else {
break Protocol::H2;
while this.buf.remaining() != 0 {
if this.buf.filled() != &H2_PREFACE[0..this.buf.filled().len()] {
return Poll::Ready(Ok((*this.version, this.buf.filled().to_vec())));
}
// if our buffer is empty, then we need to read some data to continue.
let rem = this.buf.remaining();
ready!(Pin::new(&mut *this.reader).poll_read(cx, this.buf))?;
if this.buf.remaining() == rem {
return Err(IoError::new(ErrorKind::UnexpectedEof, "early eof")).into();
}
};

let io = TokioIo::new(Rewind::new_buffered(io, Bytes::from(buf)));

match protocol {
Protocol::H1 => self.http1.serve_connection(io, service).await?,
Protocol::H2 => self.http2.serve_connection(io, service).await?,
}

Ok(())
if this.buf.filled() == H2_PREFACE {
*this.version = Version::H2;
}
return Poll::Ready(Ok((*this.version, this.buf.filled().to_vec())));
}
}

Expand Down