From bc544dd22bf578e2e03bf235f70f762f9003fa7d Mon Sep 17 00:00:00 2001 From: Stefan Lankes Date: Wed, 18 Dec 2024 08:20:01 +0100 Subject: [PATCH] revise handling of the serial device - add Wakers for async handling of the input stream --- src/arch/aarch64/kernel/mod.rs | 11 ++++++++ src/arch/riscv64/kernel/mod.rs | 11 ++++++++ src/arch/x86_64/kernel/mod.rs | 10 +++++++- src/arch/x86_64/kernel/serial.rs | 31 +++++++++++++++-------- src/arch/x86_64/mm/virtualmem.rs | 2 +- src/console.rs | 18 +++++++++---- src/executor/mod.rs | 34 +++++++++++++++++++++++++ src/executor/vsock.rs | 38 ++-------------------------- src/fd/stdio.rs | 43 +++++++++++++++++++++++++++++++- 9 files changed, 143 insertions(+), 55 deletions(-) diff --git a/src/arch/aarch64/kernel/mod.rs b/src/arch/aarch64/kernel/mod.rs index fb0f5685c3..c29bedc0c7 100644 --- a/src/arch/aarch64/kernel/mod.rs +++ b/src/arch/aarch64/kernel/mod.rs @@ -15,6 +15,7 @@ pub mod systemtime; use core::arch::global_asm; use core::str; use core::sync::atomic::{AtomicU32, AtomicU64, Ordering}; +use core::task::Waker; use memory_addresses::arch::aarch64::{PhysAddr, VirtAddr}; @@ -52,6 +53,16 @@ impl Console { self.serial_port.write_byte(*byte); } } + + pub fn read(&mut self) -> Option { + None + } + + pub fn is_empty(&self) -> bool { + true + } + + pub fn register_waker(&mut self, _waker: &Waker) {} } impl Default for Console { diff --git a/src/arch/riscv64/kernel/mod.rs b/src/arch/riscv64/kernel/mod.rs index c975de6adc..4e67665d62 100644 --- a/src/arch/riscv64/kernel/mod.rs +++ b/src/arch/riscv64/kernel/mod.rs @@ -14,6 +14,7 @@ pub mod systemtime; use alloc::vec::Vec; use core::ptr; use core::sync::atomic::{AtomicPtr, AtomicU32, AtomicU64, Ordering}; +use core::task::Waker; use fdt::Fdt; use memory_addresses::{PhysAddr, VirtAddr}; @@ -41,6 +42,16 @@ impl Console { sbi_rt::console_write_byte(*byte); } } + + pub fn read(&mut self) -> Option { + None + } + + pub fn is_empty(&self) -> bool { + true + } + + pub fn register_waker(&mut self, _waker: &Waker) {} } impl Default for Console { diff --git a/src/arch/x86_64/kernel/mod.rs b/src/arch/x86_64/kernel/mod.rs index 85219bfa1b..45e313ca7a 100644 --- a/src/arch/x86_64/kernel/mod.rs +++ b/src/arch/x86_64/kernel/mod.rs @@ -3,6 +3,7 @@ use core::arch::asm; use core::num::NonZeroU64; use core::ptr; use core::sync::atomic::{AtomicPtr, AtomicU32, Ordering}; +use core::task::Waker; use hermit_entry::boot_info::{PlatformInfo, RawBootInfo}; use memory_addresses::{PhysAddr, VirtAddr}; @@ -68,10 +69,17 @@ impl Console { self.serial_port.buffer_input(); } - #[cfg(feature = "shell")] pub fn read(&mut self) -> Option { self.serial_port.read() } + + pub fn is_empty(&self) -> bool { + self.serial_port.is_empty() + } + + pub fn register_waker(&mut self, waker: &Waker) { + self.serial_port.register_waker(waker); + } } impl Default for Console { diff --git a/src/arch/x86_64/kernel/serial.rs b/src/arch/x86_64/kernel/serial.rs index aef8eb107c..1d40795286 100644 --- a/src/arch/x86_64/kernel/serial.rs +++ b/src/arch/x86_64/kernel/serial.rs @@ -1,11 +1,12 @@ -#[cfg(feature = "shell")] use alloc::collections::VecDeque; +use core::task::Waker; use x86_64::instructions::port::Port; use crate::arch::x86_64::kernel::apic; use crate::arch::x86_64::kernel::core_local::increment_irq_counter; use crate::arch::x86_64::kernel::interrupts::{self, IDT}; +use crate::executor::WakerRegistration; const SERIAL_IRQ: u8 = 36; @@ -16,8 +17,8 @@ enum SerialInner { pub struct SerialPort { inner: SerialInner, - #[cfg(feature = "shell")] buffer: VecDeque, + waker: WakerRegistration, } impl SerialPort { @@ -26,37 +27,44 @@ impl SerialPort { let serial = Port::new(base); Self { inner: SerialInner::Uhyve(serial), - #[cfg(feature = "shell")] buffer: VecDeque::new(), + waker: WakerRegistration::new(), } } else { let mut serial = unsafe { uart_16550::SerialPort::new(base) }; serial.init(); Self { inner: SerialInner::Uart(serial), - #[cfg(feature = "shell")] buffer: VecDeque::new(), + waker: WakerRegistration::new(), } } } pub fn buffer_input(&mut self) { if let SerialInner::Uart(s) = &mut self.inner { - let c = unsafe { char::from_u32_unchecked(s.receive().into()) }; - #[cfg(not(feature = "shell"))] - if !c.is_ascii_control() { - print!("{}", c); + let c = s.receive(); + if c == b'\r' { + self.buffer.push_back(b'\n'); + } else { + self.buffer.push_back(c); } - #[cfg(feature = "shell")] - self.buffer.push_back(c.try_into().unwrap()); + self.waker.wake(); } } - #[cfg(feature = "shell")] + pub fn register_waker(&mut self, waker: &Waker) { + self.waker.register(waker); + } + pub fn read(&mut self) -> Option { self.buffer.pop_front() } + pub fn is_empty(&self) -> bool { + self.buffer.is_empty() + } + pub fn send(&mut self, buf: &[u8]) { match &mut self.inner { SerialInner::Uhyve(s) => { @@ -78,6 +86,7 @@ impl SerialPort { extern "x86-interrupt" fn serial_interrupt(_stack_frame: crate::interrupts::ExceptionStackFrame) { crate::console::CONSOLE.lock().0.buffer_input(); increment_irq_counter(SERIAL_IRQ); + crate::executor::run(); apic::eoi(); } diff --git a/src/arch/x86_64/mm/virtualmem.rs b/src/arch/x86_64/mm/virtualmem.rs index daaee94014..5a8654adac 100644 --- a/src/arch/x86_64/mm/virtualmem.rs +++ b/src/arch/x86_64/mm/virtualmem.rs @@ -27,7 +27,7 @@ pub fn init() { } else { PageRange::new( mm::kernel_end_address().as_usize(), - kernel_heap_end().as_usize() + 1 + kernel_heap_end().as_usize() + 1, ) .unwrap() }; diff --git a/src/console.rs b/src/console.rs index 9818ebdaaa..64b1396b37 100644 --- a/src/console.rs +++ b/src/console.rs @@ -1,10 +1,11 @@ +use core::task::Waker; use core::{fmt, mem}; use hermit_sync::{InterruptTicketMutex, Lazy}; use crate::arch; -pub struct Console(pub arch::kernel::Console); +pub(crate) struct Console(pub arch::kernel::Console); impl Console { fn new() -> Self { @@ -15,10 +16,17 @@ impl Console { self.0.write(buf); } - #[cfg(feature = "shell")] pub fn read(&mut self) -> Option { self.0.read() } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + pub fn register_waker(&mut self, waker: &Waker) { + self.0.register_waker(waker); + } } /// A collection of methods that are required to format @@ -35,17 +43,17 @@ impl fmt::Write for Console { } } -pub static CONSOLE: Lazy> = +pub(crate) static CONSOLE: Lazy> = Lazy::new(|| InterruptTicketMutex::new(Console::new())); #[doc(hidden)] -pub fn _print(args: fmt::Arguments<'_>) { +pub(crate) fn _print(args: fmt::Arguments<'_>) { use fmt::Write; CONSOLE.lock().write_fmt(args).unwrap(); } #[doc(hidden)] -pub fn _panic_print(args: fmt::Arguments<'_>) { +pub(crate) fn _panic_print(args: fmt::Arguments<'_>) { use fmt::Write; let mut console = unsafe { CONSOLE.make_guard_unchecked() }; console.write_fmt(args).ok(); diff --git a/src/executor/mod.rs b/src/executor/mod.rs index aa87210d70..5c5e0e8d1e 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -34,6 +34,40 @@ use crate::io; use crate::scheduler::PerCoreSchedulerExt; use crate::synch::futex::*; +/// WakerRegistration is derived from smoltcp's +/// implementation. +#[derive(Debug)] +pub(crate) struct WakerRegistration { + waker: Option, +} + +impl WakerRegistration { + pub const fn new() -> Self { + Self { waker: None } + } + + /// Register a waker. Overwrites the previous waker, if any. + pub fn register(&mut self, w: &Waker) { + match self.waker { + // Optimization: If both the old and new Wakers wake the same task, we can simply + // keep the old waker, skipping the clone. + Some(ref w2) if (w2.will_wake(w)) => {} + // In all other cases + // - we have no waker registered + // - we have a waker registered but it's for a different task. + // then clone the new waker and store it + _ => self.waker = Some(w.clone()), + } + } + + /// Wake the registered waker, if any. + pub fn wake(&mut self) { + if let Some(w) = self.waker.take() { + w.wake(); + } + } +} + struct TaskNotify { /// Futex to wakeup a single task futex: AtomicU32, diff --git a/src/executor/vsock.rs b/src/executor/vsock.rs index fe7708f29e..a2bed4154d 100644 --- a/src/executor/vsock.rs +++ b/src/executor/vsock.rs @@ -1,7 +1,7 @@ use alloc::collections::BTreeMap; use alloc::vec::Vec; use core::future; -use core::task::{Poll, Waker}; +use core::task::Poll; use hermit_sync::InterruptTicketMutex; use virtio::vsock::{Hdr, Op, Type}; @@ -11,7 +11,7 @@ use virtio::{le16, le32}; use crate::arch::kernel::mmio as hardware; #[cfg(feature = "pci")] use crate::drivers::pci as hardware; -use crate::executor::spawn; +use crate::executor::{spawn, WakerRegistration}; use crate::io; use crate::io::Error::EADDRINUSE; @@ -27,40 +27,6 @@ pub(crate) enum VsockState { Shutdown, } -/// WakerRegistration is derived from smoltcp's -/// implementation. -#[derive(Debug)] -pub(crate) struct WakerRegistration { - waker: Option, -} - -impl WakerRegistration { - pub const fn new() -> Self { - Self { waker: None } - } - - /// Register a waker. Overwrites the previous waker, if any. - pub fn register(&mut self, w: &Waker) { - match self.waker { - // Optimization: If both the old and new Wakers wake the same task, we can simply - // keep the old waker, skipping the clone. - Some(ref w2) if (w2.will_wake(w)) => {} - // In all other cases - // - we have no waker registered - // - we have a waker registered but it's for a different task. - // then clone the new waker and store it - _ => self.waker = Some(w.clone()), - } - } - - /// Wake the registered waker, if any. - pub fn wake(&mut self) { - if let Some(w) = self.waker.take() { - w.wake(); - } - } -} - pub(crate) const RAW_SOCKET_BUFFER_SIZE: usize = 256 * 1024; #[derive(Debug)] diff --git a/src/fd/stdio.rs b/src/fd/stdio.rs index 203793e6bb..c474f6c157 100644 --- a/src/fd/stdio.rs +++ b/src/fd/stdio.rs @@ -1,12 +1,15 @@ use alloc::boxed::Box; +use core::future; #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] use core::ptr; +use core::task::Poll; use async_trait::async_trait; #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] use memory_addresses::VirtAddr; #[cfg(target_arch = "x86_64")] use x86::io::*; +use zerocopy::IntoBytes; #[cfg(any(target_arch = "x86_64", target_arch = "aarch64"))] use crate::arch::mm::paging; @@ -70,7 +73,45 @@ fn uhyve_send(_port: u16, _data: &mut T) { #[derive(Debug)] pub struct GenericStdin; -impl ObjectInterface for GenericStdin {} +#[async_trait] +impl ObjectInterface for GenericStdin { + async fn poll(&self, event: PollEvent) -> io::Result { + let available = if CONSOLE.lock().is_empty() { + PollEvent::empty() + } else { + PollEvent::POLLIN | PollEvent::POLLRDNORM | PollEvent::POLLRDBAND + }; + + Ok(event & available) + } + + async fn read(&self, buf: &mut [u8]) -> io::Result { + future::poll_fn(|cx| { + let mut read_bytes = 0; + let mut guard = CONSOLE.lock(); + + while let Some(byte) = guard.read() { + let c = unsafe { char::from_u32_unchecked(byte.into()) }; + guard.write(c.as_bytes()); + + buf[read_bytes] = byte; + read_bytes += 1; + + if read_bytes >= buf.len() { + return Poll::Ready(Ok(read_bytes)); + } + } + + if read_bytes > 0 { + Poll::Ready(Ok(read_bytes)) + } else { + guard.register_waker(cx.waker()); + Poll::Pending + } + }) + .await + } +} impl GenericStdin { pub const fn new() -> Self {