-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Add unix socket support? #172
Draft
javaarchive
wants to merge
8
commits into
lemunozm:master
Choose a base branch
from
javaarchive:master
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from 2 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
a0bc09c
it works I guess?
javaarchive fd26ed6
e
javaarchive 1b416ec
Store bind path and delete on drop
javaarchive 1779a4c
Don't enable unix socket by default (some platforms may not support).
javaarchive d9a6af8
config funcs to construct
javaarchive ed219da
quick barely tested impl of datagram
javaarchive e109950
fix feature gate
javaarchive 2d263be
mark datagram socket as broken
javaarchive File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
#![allow(unused_variables)] | ||
|
||
use crate::network::adapter::{ | ||
Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo, | ||
ListeningInfo, PendingStatus, | ||
}; | ||
use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen}; | ||
|
||
use mio::event::{Source}; | ||
use mio::net::{UnixListener, UnixStream}; | ||
|
||
use std::mem::MaybeUninit; | ||
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | ||
use std::io::{self, ErrorKind, Read, Write}; | ||
use std::ops::Deref; | ||
use std::path::{Path, PathBuf}; | ||
|
||
// Note: net.core.rmem_max = 212992 by default on linux systems | ||
// not used because w euse unixstream I think? | ||
// TODO: delete this if I PR | ||
pub const MAX_PAYLOAD_LEN: usize = 212992; | ||
|
||
/// From tcp.rs | ||
/// Size of the internal reading buffer. | ||
/// It implies that at most the generated [`crate::network::NetEvent::Message`] | ||
/// will contains a chunk of data of this value. | ||
pub const INPUT_BUFFER_SIZE: usize = u16::MAX as usize; // 2^16 - 1 | ||
|
||
// We don't use the SocketAddr, we just striaght up get the path from config. | ||
pub fn create_null_socketaddr() -> SocketAddr { | ||
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), 0) | ||
} | ||
|
||
#[derive(Clone, PartialEq, Eq, Hash, Debug)] | ||
pub struct UnixSocketListenConfig { | ||
path: PathBuf, | ||
} | ||
|
||
impl Default for UnixSocketListenConfig { | ||
fn default() -> Self { | ||
// TODO: better idea? I could make this into an option later and complain if empty. | ||
Self { path: "/tmp/mio.sock".into() } | ||
} | ||
} | ||
|
||
#[derive(Clone, PartialEq, Eq, Hash, Debug)] | ||
pub struct UnixSocketConnectConfig { | ||
path: PathBuf, | ||
} | ||
|
||
impl Default for UnixSocketConnectConfig { | ||
fn default() -> Self { | ||
// TODO: better idea? I could make this into an option later and complain if empty. | ||
Self { path: "/tmp/mio.sock".into() } | ||
} | ||
} | ||
|
||
pub(crate) struct UnixSocketAdapter; | ||
impl Adapter for UnixSocketAdapter { | ||
type Remote = RemoteResource; | ||
type Local = LocalResource; | ||
} | ||
|
||
pub(crate) struct RemoteResource { | ||
stream: UnixStream | ||
} | ||
|
||
impl Resource for RemoteResource { | ||
fn source(&mut self) -> &mut dyn Source { | ||
&mut self.stream | ||
} | ||
} | ||
|
||
// taken from tcp impl | ||
pub fn check_stream_ready(stream: &UnixStream) -> PendingStatus{ | ||
if let Ok(Some(_)) = stream.take_error() { | ||
return PendingStatus::Disconnected; | ||
} | ||
|
||
return PendingStatus::Ready; | ||
} | ||
|
||
impl Remote for RemoteResource { | ||
fn connect_with( | ||
config: TransportConnect, | ||
remote_addr: RemoteAddr, | ||
) -> io::Result<ConnectionInfo<Self>> { | ||
|
||
let stream_config = match config { | ||
TransportConnect::UnixSocket(config) => config, | ||
_ => panic!("Internal error: Got wrong config"), | ||
}; | ||
|
||
match UnixStream::connect(stream_config.path) { | ||
Ok(stream) => { | ||
Ok(ConnectionInfo { | ||
remote: Self { | ||
stream | ||
}, | ||
// the unixstream uses SocketAddr from mio that can't be converted | ||
local_addr: create_null_socketaddr(), // stream.local_addr()?, | ||
peer_addr: create_null_socketaddr() // stream.peer_addr()?.into(), | ||
}) | ||
}, | ||
Err(err) => { | ||
return Err(err); | ||
}, | ||
} | ||
|
||
|
||
} | ||
|
||
fn receive(&self, mut process_data: impl FnMut(&[u8])) -> ReadStatus { | ||
// Most of this is reused from tcp.rs | ||
let buffer: MaybeUninit<[u8; INPUT_BUFFER_SIZE]> = MaybeUninit::uninit(); | ||
let mut input_buffer = unsafe { buffer.assume_init() }; // Avoid to initialize the array | ||
|
||
loop { | ||
let stream = &self.stream; | ||
match stream.deref().read(&mut input_buffer) { | ||
Ok(0) => break ReadStatus::Disconnected, | ||
Ok(size) => process_data(&input_buffer[..size]), | ||
Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, | ||
Err(ref err) if err.kind() == ErrorKind::WouldBlock => { | ||
break ReadStatus::WaitNextEvent | ||
} | ||
Err(ref err) if err.kind() == ErrorKind::ConnectionReset => { | ||
break ReadStatus::Disconnected | ||
} | ||
Err(err) => { | ||
log::error!("Unix socket receive error: {}", err); | ||
break ReadStatus::Disconnected // should not happen | ||
} | ||
} | ||
} | ||
} | ||
|
||
fn send(&self, data: &[u8]) -> SendStatus { | ||
// Most of this is reused from tcp.rs | ||
let mut total_bytes_sent = 0; | ||
loop { | ||
let stream = &self.stream; | ||
match stream.deref().write(&data[total_bytes_sent..]) { | ||
Ok(bytes_sent) => { | ||
total_bytes_sent += bytes_sent; | ||
if total_bytes_sent == data.len() { | ||
break SendStatus::Sent | ||
} | ||
} | ||
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue, | ||
|
||
// Others errors are considered fatal for the connection. | ||
// a Event::Disconnection will be generated later. | ||
Err(err) => { | ||
log::error!("unix socket receive error: {}", err); | ||
break SendStatus::ResourceNotFound // should not happen | ||
} | ||
} | ||
} | ||
} | ||
|
||
fn pending(&self, _readiness: Readiness) -> PendingStatus { | ||
check_stream_ready(&self.stream) | ||
} | ||
} | ||
|
||
pub(crate) struct LocalResource { | ||
listener: UnixListener | ||
} | ||
|
||
impl Resource for LocalResource { | ||
fn source(&mut self) -> &mut dyn Source { | ||
&mut self.listener | ||
} | ||
} | ||
|
||
impl Local for LocalResource { | ||
type Remote = RemoteResource; | ||
|
||
fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>> { | ||
let config = match config { | ||
TransportListen::UnixSocket(config) => config, | ||
_ => panic!("Internal error: Got wrong config"), | ||
}; | ||
|
||
// TODO: fallback to ip when we are able to set path to none | ||
let listener = UnixListener::bind(config.path)?; | ||
let local_addr = listener.local_addr()?; | ||
Ok(ListeningInfo { | ||
local: Self { | ||
listener | ||
}, | ||
// same issue as above my change in https://github.com/tokio-rs/mio/pull/1749 | ||
// relevant issue https://github.com/tokio-rs/mio/issues/1527 | ||
local_addr: create_null_socketaddr(), | ||
}) | ||
} | ||
|
||
fn accept(&self, mut accept_remote: impl FnMut(AcceptedType<'_, Self::Remote>)) { | ||
loop { | ||
match self.listener.accept() { | ||
Ok((stream, addr)) => accept_remote(AcceptedType::Remote( | ||
create_null_socketaddr(), // TODO: provide correct address | ||
RemoteResource { stream }, | ||
)), | ||
Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, | ||
Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, | ||
Err(err) => break log::error!("unix socket accept error: {}", err), // Should not happen | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, what then identifies an unix socket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
local_addr and peer_addr are currently a placeholder value, the unix sockets seem to have a totally different local and peer addr format.