Skip to content

Commit

Permalink
fix auto conn bug
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislearn committed Sep 16, 2023
1 parent 334209d commit eca0477
Showing 1 changed file with 69 additions and 26 deletions.
95 changes: 69 additions & 26 deletions src/server/conn/auto.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
//! Http1 or Http2 connection.
use crate::{common::rewind::Rewind, rt::TokioIo};
use std::future::Future;
use std::io::{Error as IoError, ErrorKind, Result as IoResult};
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use std::{error::Error as StdError, marker::Unpin, time::Duration};

use bytes::Bytes;
use http::{Request, Response};
use http_body::Body;
Expand All @@ -10,8 +16,10 @@ use hyper::{
server::conn::{http1, http2},
service::Service,
};
use std::{error::Error as StdError, marker::Unpin, time::Duration};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

use crate::{common::rewind::Rewind, rt::TokioIo};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

Expand Down Expand Up @@ -57,7 +65,7 @@ impl<E> Builder<E> {
}

/// Bind a connection together with a [`Service`].
pub async fn serve_connection<I, S, B>(&self, mut io: I, service: S) -> Result<()>
pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
where
S: Service<Request<Incoming>, Response = Response<B>> + Send,
S::Future: Send + 'static,
Expand All @@ -68,35 +76,70 @@ impl<E> Builder<E> {
I: AsyncRead + AsyncWrite + Unpin + 'static,
E: Http2ConnExec<S::Future, B>,
{
enum Protocol {
H1,
H2,
let (version, io) = read_version(io).await?;
let io = TokioIo::new(io);
match version {
Version::H1 => self.http1.serve_connection(io, service).await?,
Version::H2 => self.http2.serve_connection(io, service).await?,
}

let mut buf = Vec::new();
Ok(())
}
}
#[derive(Copy, Clone)]
enum Version {
H1,
H2,
}
async fn read_version<'a, A>(mut reader: A) -> IoResult<(Version, Rewind<A>)>
where
A: AsyncRead + Unpin,
{
let mut buf = [0; 24];
let (version, buf) = ReadVersion {
reader: &mut reader,
buf: ReadBuf::new(&mut buf),
version: Version::H1,
_pin: PhantomPinned,
}
.await?;
Ok((version, Rewind::new_buffered(reader, Bytes::from(buf))))
}
pin_project! {
struct ReadVersion<'a, A: ?Sized> {
reader: &'a mut A,
buf: ReadBuf<'a>,
version: Version,
// Make this future `!Unpin` for compatibility with async trait methods.
#[pin]
_pin: PhantomPinned,
}
}

let protocol = loop {
if buf.len() < 24 {
io.read_buf(&mut buf).await?;
impl<A> Future for ReadVersion<'_, A>
where
A: AsyncRead + Unpin + ?Sized,
{
type Output = IoResult<(Version, Vec<u8>)>;

let len = buf.len().min(H2_PREFACE.len());
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<(Version, Vec<u8>)>> {
let this = self.project();

if buf[0..len] != H2_PREFACE[0..len] {
break Protocol::H1;
}
} else {
break Protocol::H2;
while this.buf.remaining() != 0 {
if this.buf.filled() != &H2_PREFACE[0..this.buf.filled().len()] {
return Poll::Ready(Ok((*this.version, this.buf.filled().to_vec())));
}
// if our buffer is empty, then we need to read some data to continue.
let rem = this.buf.remaining();
ready!(Pin::new(&mut *this.reader).poll_read(cx, this.buf))?;
if this.buf.remaining() == rem {
return Err(IoError::new(ErrorKind::UnexpectedEof, "early eof")).into();
}
};

let io = TokioIo::new(Rewind::new_buffered(io, Bytes::from(buf)));

match protocol {
Protocol::H1 => self.http1.serve_connection(io, service).await?,
Protocol::H2 => self.http2.serve_connection(io, service).await?,
}

Ok(())
if this.buf.filled() == H2_PREFACE {
*this.version = Version::H2;
}
return Poll::Ready(Ok((*this.version, this.buf.filled().to_vec())));
}
}

Expand Down

0 comments on commit eca0477

Please sign in to comment.