Skip to content

Commit

Permalink
pass test on linux
Browse files Browse the repository at this point in the history
  • Loading branch information
irvingoujAtDevolution committed Jan 22, 2024
1 parent 5075fd1 commit f324a80
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 10 deletions.
48 changes: 39 additions & 9 deletions crates/network-scanner-net/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,42 @@ impl<'a> Future for ConnectFuture<'a> {

fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
if self.is_first_poll {
tracing::trace!("first poll connect future");
// cannot call connect twice
self.is_first_poll = false;
return match self.socket.connect(self.addr) {
Ok(a) => std::task::Poll::Ready(Ok(a)),
Err(e) => resolve(e, &self.socket, &self.runtime, Event::all(self.id), cx.waker()),
let err = match self.socket.connect(self.addr) {
Ok(a) => {
return std::task::Poll::Ready(Ok(a));
}
Err(e) => e,
};

// code 115, EINPROGRESS, only for linux
// reference: https://linux.die.net/man/2/connect
// it is the same as WouldBlock but for connect(2) only
#[cfg(target_os = "linux")]
let in_progress = err.kind() == std::io::ErrorKind::WouldBlock || err.raw_os_error() == Some(115);

#[cfg(not(target_os = "linux"))]
let in_progress = err.kind() == std::io::ErrorKind::WouldBlock;

if in_progress {
tracing::trace!("connect should register");
if let Err(e) = self
.runtime
.register(&self.socket, Event::writable(self.id), cx.waker().clone())
{
tracing::warn!(?self.socket, ?self.addr, "failed to register socket to poller");
return std::task::Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("failed to register socket to poller: {}", e),
)));
}
return std::task::Poll::Pending;
}
}
std::task::Poll::Ready(Ok(())) // is second poll really ready?
tracing::trace!("second poll connect future");
std::task::Poll::Ready(Ok(()))
}
}

Expand Down Expand Up @@ -261,13 +289,13 @@ impl_drop!(SendFuture<'_>);
impl_drop!(RecvFuture<'_>);

fn resolve<T>(
e: std::io::Error,
err: std::io::Error,
socket: &Arc<Socket>,
runtime: &Arc<Socket2Runtime>,
event: Event,
waker: &std::task::Waker,
) -> std::task::Poll<std::io::Result<T>> {
if e.kind() == std::io::ErrorKind::WouldBlock {
if err.kind() == std::io::ErrorKind::WouldBlock {
tracing::trace!(?event, "operation would block");
if let Err(e) = runtime.register(socket, event, waker.clone()) {
tracing::warn!(?socket, ?event, "failed to register socket to poller");
Expand All @@ -276,8 +304,10 @@ fn resolve<T>(
format!("failed to register socket to poller: {}", e),
)));
}
std::task::Poll::Pending
} else {
std::task::Poll::Ready(Err(e))

return std::task::Poll::Pending;
}

tracing::warn!(?err, "operation failed");
std::task::Poll::Ready(Err(err))
}
2 changes: 1 addition & 1 deletion crates/network-scanner-net/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ fn handle_client(mut stream: std::net::TcpStream) -> std::io::Result<()> {
// Read data from the stream
let size = stream.read(&mut buffer)?;
println!("Received {} bytes: {:?}", size, &buffer[..size]);
std::thread::sleep(std::time::Duration::from_millis(500)); // simulate some work
std::thread::sleep(std::time::Duration::from_millis(200)); // simulate some work
stream.write_all(&buffer[..size])?; // Echo the data back to the client
println!("Echoed back {} bytes", size);
}
Expand Down

0 comments on commit f324a80

Please sign in to comment.