Skip to content

Commit

Permalink
Merge pull request #1052 from stlankes/eventfd
Browse files Browse the repository at this point in the history
add basic support a linux-like syscall `evendfd`
  • Loading branch information
stlankes authored Feb 13, 2024
2 parents 971aaf1 + 84f985e commit b24a802
Show file tree
Hide file tree
Showing 11 changed files with 510 additions and 90 deletions.
1 change: 1 addition & 0 deletions src/arch/x86_64/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ pub fn boot_processor_init() {

apic::init();
scheduler::install_timer_handler();
serial::install_serial_interrupt();
finish_processor_init();
interrupts::enable();
}
Expand Down
32 changes: 32 additions & 0 deletions src/arch/x86_64/kernel/serial.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use x86_64::instructions::port::Port;

use crate::arch::x86_64::kernel::interrupts::{self, IDT};
use crate::arch::x86_64::kernel::{apic, COM1};

enum Inner {
Uart(uart_16550::SerialPort),
Uhyve(Port<u8>),
Expand All @@ -19,6 +22,14 @@ impl SerialPort {
}
}

pub fn receive(&mut self) -> u8 {
if let Inner::Uart(s) = &mut self.0 {
s.receive()
} else {
0
}
}

pub fn send(&mut self, buf: &[u8]) {
match &mut self.0 {
Inner::Uhyve(s) => {
Expand All @@ -36,3 +47,24 @@ impl SerialPort {
}
}
}

extern "x86-interrupt" fn serial_interrupt(_stack_frame: crate::interrupts::ExceptionStackFrame) {
let c = unsafe { char::from_u32_unchecked(COM1.lock().as_mut().unwrap().receive().into()) };
if !c.is_ascii_control() {
print!("{}", c);
}

apic::eoi();
}

pub(crate) fn install_serial_interrupt() {
const SERIAL_IRQ: usize = 36;

unsafe {
let mut idt = IDT.lock();
idt[SERIAL_IRQ]
.set_handler_fn(serial_interrupt)
.set_stack_index(0);
}
interrupts::add_irq_name((SERIAL_IRQ - 32).try_into().unwrap(), "COM1");
}
1 change: 1 addition & 0 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ where
backoff.snooze();
}
}

#[cfg(not(any(feature = "tcp", feature = "udp")))]
{
if backoff.is_completed() {
Expand Down
202 changes: 202 additions & 0 deletions src/fd/eventfd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
use alloc::boxed::Box;
use alloc::collections::vec_deque::VecDeque;
use core::future::{self, Future};
use core::mem;
use core::task::{Poll, Waker};

use async_lock::Mutex;
use async_trait::async_trait;

use crate::fd::{block_on, EventFlags, IoError, ObjectInterface, PollEvent};

#[derive(Debug)]
struct EventState {
pub counter: u64,
pub read_queue: VecDeque<Waker>,
pub write_queue: VecDeque<Waker>,
}

impl EventState {
pub fn new(counter: u64) -> Self {
Self {
counter,
read_queue: VecDeque::new(),
write_queue: VecDeque::new(),
}
}
}

#[derive(Debug)]
pub(crate) struct EventFd {
state: Mutex<EventState>,
flags: EventFlags,
}

impl Clone for EventFd {
fn clone(&self) -> Self {
let counter = block_on(async { Ok(self.state.lock().await.counter) }, None).unwrap();
Self {
state: Mutex::new(EventState::new(counter)),
flags: self.flags,
}
}
}

impl EventFd {
pub fn new(initval: u64, flags: EventFlags) -> Self {
debug!("Create EventFd {}, {:?}", initval, flags);
Self {
state: Mutex::new(EventState::new(initval)),
flags,
}
}
}

#[async_trait]
impl ObjectInterface for EventFd {
async fn async_read(&self, buf: &mut [u8]) -> Result<usize, IoError> {
let len = mem::size_of::<u64>();

if buf.len() < len {
return Err(IoError::EINVAL);
}

future::poll_fn(|cx| {
if self.flags.contains(EventFlags::EFD_SEMAPHORE) {
let mut pinned = core::pin::pin!(self.state.lock());
if let Poll::Ready(mut guard) = pinned.as_mut().poll(cx) {
if guard.counter > 0 {
guard.counter -= 1;
let tmp = u64::to_ne_bytes(1);
buf[..len].copy_from_slice(&tmp);
if let Some(cx) = guard.write_queue.pop_front() {
cx.wake_by_ref();
}
Poll::Ready(Ok(len))
} else {
guard.read_queue.push_back(cx.waker().clone());
Poll::Pending
}
} else {
Poll::Pending
}
} else {
let mut pinned = core::pin::pin!(self.state.lock());
if let Poll::Ready(mut guard) = pinned.as_mut().poll(cx) {
let tmp = guard.counter;
if tmp > 0 {
guard.counter = 0;
buf[..len].copy_from_slice(&u64::to_ne_bytes(tmp));
if let Some(cx) = guard.read_queue.pop_front() {
cx.wake_by_ref();
}
Poll::Ready(Ok(len))
} else {
guard.read_queue.push_back(cx.waker().clone());
Poll::Pending
}
} else {
Poll::Pending
}
}
})
.await
}

async fn async_write(&self, buf: &[u8]) -> Result<usize, IoError> {
let len = mem::size_of::<u64>();

if buf.len() < len {
return Err(IoError::EINVAL);
}

let c = u64::from_ne_bytes(buf[..len].try_into().unwrap());

future::poll_fn(|cx| {
let mut pinned = core::pin::pin!(self.state.lock());
if let Poll::Ready(mut guard) = pinned.as_mut().poll(cx) {
if u64::MAX - guard.counter > c {
guard.counter += c;
if self.flags.contains(EventFlags::EFD_SEMAPHORE) {
for _i in 0..c {
if let Some(cx) = guard.read_queue.pop_front() {
cx.wake_by_ref();
} else {
break;
}
}
} else if let Some(cx) = guard.read_queue.pop_front() {
cx.wake_by_ref();
}

Poll::Ready(Ok(len))
} else {
guard.write_queue.push_back(cx.waker().clone());
Poll::Pending
}
} else {
Poll::Pending
}
})
.await
}

async fn poll(&self, event: PollEvent) -> Result<PollEvent, IoError> {
let mut result: PollEvent = PollEvent::empty();
let guard = self.state.lock().await;

if guard.counter < u64::MAX - 1 {
if event.contains(PollEvent::POLLOUT) {
result.insert(PollEvent::POLLOUT);
}
if event.contains(PollEvent::POLLWRNORM) {
result.insert(PollEvent::POLLWRNORM);
}
if event.contains(PollEvent::POLLWRBAND) {
result.insert(PollEvent::POLLWRBAND);
}
}

if guard.counter > 0 {
if event.contains(PollEvent::POLLIN) {
result.insert(PollEvent::POLLIN);
}
if event.contains(PollEvent::POLLRDNORM) {
result.insert(PollEvent::POLLRDNORM);
}
if event.contains(PollEvent::POLLRDBAND) {
result.insert(PollEvent::POLLRDBAND);
}
}

future::poll_fn(|cx| {
if result.is_empty() {
let mut pinned = core::pin::pin!(self.state.lock());
if let Poll::Ready(mut guard) = pinned.as_mut().poll(cx) {
if event.intersects(
PollEvent::POLLIN | PollEvent::POLLRDNORM | PollEvent::POLLRDNORM,
) {
guard.read_queue.push_back(cx.waker().clone());
Poll::Pending
} else if event.intersects(
PollEvent::POLLOUT | PollEvent::POLLWRNORM | PollEvent::POLLWRBAND,
) {
guard.write_queue.push_back(cx.waker().clone());
Poll::Pending
} else {
Poll::Ready(Ok(result))
}
} else {
Poll::Pending
}
} else {
Poll::Ready(Ok(result))
}
})
.await
}

fn is_nonblocking(&self) -> bool {
self.flags.contains(EventFlags::EFD_NONBLOCK)
}
}
78 changes: 51 additions & 27 deletions src/fd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::executor::{block_on, poll_on};
use crate::fd::stdio::*;
use crate::fs::{self, DirectoryEntry, FileAttr, SeekWhence};

mod eventfd;
#[cfg(all(any(feature = "tcp", feature = "udp"), not(feature = "newlib")))]
pub(crate) mod socket;
mod stdio;
Expand Down Expand Up @@ -90,7 +91,6 @@ bitflags! {
bitflags! {
#[derive(Debug, Copy, Clone, Default)]
pub struct PollEvent: i16 {
const EMPTY = 0;
const POLLIN = 0x1;
const POLLPRI = 0x2;
const POLLOUT = 0x4;
Expand All @@ -116,6 +116,15 @@ pub struct PollFd {
pub revents: PollEvent,
}

bitflags! {
#[derive(Debug, Default, Copy, Clone)]
pub struct EventFlags: i16 {
const EFD_SEMAPHORE = 0o1;
const EFD_NONBLOCK = 0o4000;
const EFD_CLOEXEC = 0o40000;
}
}

bitflags! {
#[derive(Debug, Copy, Clone)]
pub struct AccessPermission: u32 {
Expand Down Expand Up @@ -152,7 +161,7 @@ impl Default for AccessPermission {
pub(crate) trait ObjectInterface: Sync + Send + core::fmt::Debug + DynClone {
/// check if an IO event is possible
async fn poll(&self, _event: PollEvent) -> Result<PollEvent, IoError> {
Ok(PollEvent::EMPTY)
Ok(PollEvent::empty())
}

/// `async_read` attempts to read `len` bytes from the object references
Expand Down Expand Up @@ -368,50 +377,65 @@ pub(crate) fn write(fd: FileDescriptor, buf: &[u8]) -> Result<usize, IoError> {
}
}

async fn poll_fds(fds: &mut [PollFd]) -> Result<(), IoError> {
async fn poll_fds(fds: &mut [PollFd]) -> Result<u64, IoError> {
future::poll_fn(|cx| {
let mut ready: bool = false;
let mut counter: u64 = 0;

for i in &mut *fds {
let fd = i.fd;
i.revents = PollEvent::empty();
let mut pinned_obj = core::pin::pin!(async_get_object(fd));
if let Ready(Ok(obj)) = pinned_obj.as_mut().poll(cx) {
let mut pinned = core::pin::pin!(obj.poll(i.events));
if let Ready(Ok(e)) = pinned.as_mut().poll(cx) {
ready = true;
i.revents = e;
if !e.is_empty() {
counter += 1;
i.revents = e;
}
}
}
}

if ready {
Ready(())
if counter > 0 {
Ready(Ok(counter))
} else {
Pending
}
})
.await;
.await
}

Ok(())
/// The unix-like `poll` waits for one of a set of file descriptors
/// to become ready to perform I/O. The set of file descriptors to be
/// monitored is specified in the `fds` argument, which is an array
/// of structs of `PollFd`.
pub fn poll(fds: &mut [PollFd], timeout: Option<Duration>) -> Result<u64, IoError> {
block_on(poll_fds(fds), timeout)
}

pub(crate) fn poll(fds: &mut [PollFd], timeout: i32) -> Result<(), IoError> {
if timeout >= 0 {
// for larger timeouts, we block on the async function
if timeout >= 5000 {
block_on(
poll_fds(fds),
Some(Duration::from_millis(timeout.try_into().unwrap())),
)
} else {
poll_on(
poll_fds(fds),
Some(Duration::from_millis(timeout.try_into().unwrap())),
)
}
} else {
block_on(poll_fds(fds), None)
}
/// `eventfd` creates an linux-like "eventfd object" that can be used
/// as an event wait/notify mechanism by user-space applications, and by
/// the kernel to notify user-space applications of events. The
/// object contains an unsigned 64-bit integer counter
/// that is maintained by the kernel. This counter is initialized
/// with the value specified in the argument `initval`.
///
/// As its return value, `eventfd` returns a new file descriptor that
/// can be used to refer to the eventfd object.
///
/// The following values may be bitwise set in flags to change the
/// behavior of `eventfd`:
///
/// `EFD_NONBLOCK`: Set the file descriptor in non-blocking mode
/// `EFD_SEMAPHORE`: Provide semaphore-like semantics for reads
/// from the new file descriptor.
pub fn eventfd(initval: u64, flags: EventFlags) -> Result<FileDescriptor, IoError> {
let obj = self::eventfd::EventFd::new(initval, flags);
let fd = FD_COUNTER.fetch_add(1, Ordering::SeqCst);

block_on(async_insert_object(fd, Arc::new(obj)), None)?;

Ok(fd)
}

#[inline]
Expand Down
Loading

0 comments on commit b24a802

Please sign in to comment.