Skip to content

Commit

Permalink
init work
Browse files Browse the repository at this point in the history
  • Loading branch information
mox692 committed Feb 26, 2024
1 parent 94db07b commit 5781b2f
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 8 deletions.
14 changes: 12 additions & 2 deletions tokio/src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,23 @@ impl TcpListener {
/// }
/// ```
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
self.accept_with_interest(Interest::READABLE | Interest::WRITABLE)
.await
}

/// comment
pub async fn accept_with_interest(
&self,
interest: Interest,
) -> io::Result<(TcpStream, SocketAddr)> {
let (mio, addr) = self
.io
.registration()
.async_io(Interest::READABLE, || self.io.accept())
.async_io(interest, || self.io.accept())
.await?;

let stream = TcpStream::new(mio)?;
// TODO: clear here
let stream = TcpStream::new_with_interest(mio, interest)?;
Ok((stream, addr))
}

Expand Down
44 changes: 38 additions & 6 deletions tokio/src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,28 @@ impl TcpStream {
let mut last_err = None;

for addr in addrs {
match TcpStream::connect_addr(addr).await {
match TcpStream::connect_addr_with_interest(addr, Interest::READABLE | Interest::WRITABLE).await {
Ok(stream) => return Ok(stream),
Err(e) => last_err = Some(e),
}
}

Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any address",
)
}))
}

/// foooo
pub async fn connect_with_interest<A: ToSocketAddrs>(addr: A, interest: Interest) -> io::Result<TcpStream> {
let addrs = to_socket_addrs(addr).await?;

let mut last_err = None;

for addr in addrs {
match TcpStream::connect_addr_with_interest(addr, interest).await {
Ok(stream) => return Ok(stream),
Err(e) => last_err = Some(e),
}
Expand All @@ -132,13 +153,17 @@ impl TcpStream {
}

/// Establishes a connection to the specified `addr`.
async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
async fn connect_addr_with_interest(addr: SocketAddr, interest: Interest) -> io::Result<TcpStream> {
let sys = mio::net::TcpStream::connect(addr)?;
TcpStream::connect_mio(sys).await
TcpStream::connect_mio_with_interest(sys, interest).await
}

pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
let stream = TcpStream::new(sys)?;
Self::connect_mio_with_interest(sys, Interest::READABLE | Interest::WRITABLE).await
}

pub(crate) async fn connect_mio_with_interest(sys: mio::net::TcpStream, interest: Interest) -> io::Result<TcpStream> {
let stream = TcpStream::new_with_interest(sys, interest)?;

// Once we've connected, wait for the stream to be writable as
// that's when the actual connection has been initiated. Once we're
Expand All @@ -157,8 +182,7 @@ impl TcpStream {
}

pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
let io = PollEvented::new(connected)?;
Ok(TcpStream { io })
Self::new_with_interest(connected, Interest::READABLE | Interest::WRITABLE)
}

/// Creates new `TcpStream` from a `std::net::TcpStream`.
Expand Down Expand Up @@ -205,6 +229,14 @@ impl TcpStream {
Ok(TcpStream { io })
}

pub(crate) fn new_with_interest(
connected: mio::net::TcpStream,
interest: Interest,
) -> io::Result<TcpStream> {
let io = PollEvented::new_with_interest(connected, interest)?;
Ok(TcpStream { io })
}

/// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
///
/// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`.
Expand Down
101 changes: 101 additions & 0 deletions tokio/tests/tcp_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,3 +398,104 @@ async fn write_closed() {

assert!(!ready_event.is_write_closed());
}

#[cfg(any(target_os = "linux", target_os = "android"))]
#[tokio::test]
async fn priority_interest() {
use std::os::fd::AsRawFd;

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let stream = TcpStream::connect(listener.local_addr().unwrap())
.await
.unwrap();

tokio::spawn(async move {
let (socket, _) = listener
.accept_with_interest(Interest::PRIORITY)
.await
.unwrap();
let ready = socket.ready(Interest::PRIORITY).await.unwrap();
assert!(ready.is_priority());
});

let ready = stream
.ready(Interest::READABLE | Interest::WRITABLE)
.await
.unwrap();
if ready.is_writable() {
fn send_oob_data(stream: &TcpStream, data: &[u8]) -> io::Result<usize> {
unsafe {
let res = libc::send(
stream.as_raw_fd(),
data.as_ptr().cast(),
data.len(),
libc::MSG_OOB,
);
if res == -1 {
Err(io::Error::last_os_error())
} else {
Ok(res as usize)
}
}
}
send_oob_data(&stream, b"hello").unwrap();
}
}

#[cfg(any(target_os = "linux", target_os = "android"))]
#[tokio::test]
async fn connect_with_interest() {
use std::os::fd::AsRawFd;
// TODO: should be minimized.

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let stream = TcpStream::connect_with_interest(
listener.local_addr().unwrap(),
Interest::PRIORITY | Interest::READABLE | Interest::WRITABLE,
)
.await
.unwrap();

tokio::spawn(async move {
let (socket, _) = listener
.accept_with_interest(Interest::READABLE | Interest::WRITABLE)
.await
.unwrap();

loop {
let ready = socket
.ready(Interest::READABLE | Interest::WRITABLE)
.await
.unwrap();
if ready.is_writable() {
fn send_oob_data(stream: &TcpStream, data: &[u8]) -> io::Result<usize> {
unsafe {
let res = libc::send(
stream.as_raw_fd(),
data.as_ptr().cast(),
data.len(),
libc::MSG_OOB,
);
if res == -1 {
Err(io::Error::last_os_error())
} else {
Ok(res as usize)
}
}
}
send_oob_data(&socket, b"hello").unwrap();
break;
}
if ready.is_readable() {
continue;
}
}
});

let ready = stream.ready(Interest::WRITABLE).await.unwrap();
if ready.is_writable() {
stream.try_write(&[1, 2, 3]).unwrap();
}
let ready = stream.ready(Interest::PRIORITY).await.unwrap();
assert!(ready.is_priority());
}

0 comments on commit 5781b2f

Please sign in to comment.