diff --git a/platforms/allwinner-d1/boards/src/bin/mq-pro.rs b/platforms/allwinner-d1/boards/src/bin/mq-pro.rs index b6ff8eb0..802c2962 100644 --- a/platforms/allwinner-d1/boards/src/bin/mq-pro.rs +++ b/platforms/allwinner-d1/boards/src/bin/mq-pro.rs @@ -88,7 +88,9 @@ fn main() -> ! { .initialize(async move { // i2c_puppet demo: print each keypress to the console. i2c_puppet_up.await.unwrap(); - let mut i2c_puppet = I2cPuppetClient::from_registry(d1.kernel).await; + let mut i2c_puppet = I2cPuppetClient::from_registry(d1.kernel) + .await + .expect("no i2c_puppet driver"); tracing::info!("got i2c puppet client"); let mut keys = i2c_puppet @@ -108,7 +110,9 @@ fn main() -> ! { // LED to display useful information, but this is fun for now. let mut hue = 0; - let mut i2c_puppet = I2cPuppetClient::from_registry(d1.kernel).await; + let mut i2c_puppet = I2cPuppetClient::from_registry(d1.kernel) + .await + .expect("no i2c_puppet driver"); i2c_puppet .toggle_led(true) diff --git a/platforms/allwinner-d1/core/src/drivers/sharp_display.rs b/platforms/allwinner-d1/core/src/drivers/sharp_display.rs index f52e7f8f..e00cd3f0 100644 --- a/platforms/allwinner-d1/core/src/drivers/sharp_display.rs +++ b/platforms/allwinner-d1/core/src/drivers/sharp_display.rs @@ -30,7 +30,7 @@ use embedded_graphics::{pixelcolor::Gray8, prelude::*, primitives::Rectangle}; use kernel::{ maitake::sync::{Mutex, WaitQueue}, mnemos_alloc::containers::{Arc, FixedVec}, - registry::listener, + registry::{self, listener}, services::emb_display::{ DisplayMetadata, EmbDisplayService, FrameChunk, FrameError, FrameKind, MonoChunk, Request, Response, @@ -38,7 +38,7 @@ use kernel::{ Kernel, }; -use crate::spim::SpiSenderClient; +use crate::spim::{SpiSender, SpiSenderClient}; const WIDTH: usize = 400; const HEIGHT: usize = 240; @@ -70,6 +70,15 @@ mod commands { /// Implements the [`EmbDisplayService`] service interface pub struct SharpDisplay; +#[derive(Debug)] +pub enum RegistrationError { + /// Failed to register a display: either the kernel reported that there is + /// already an existing EmbDisplay, or the registry is full. + Registration(registry::RegistrationError), + /// No SPI sender service exists. + NoSpiSender(registry::ConnectError), +} + impl SharpDisplay { pub const WIDTH: usize = WIDTH; pub const HEIGHT: usize = HEIGHT; @@ -78,7 +87,22 @@ impl SharpDisplay { /// /// Registration will also start the simulated display, meaning that the display /// window will appear. - pub async fn register(kernel: &'static Kernel) -> Result<(), FrameError> { + pub async fn register(kernel: &'static Kernel) -> Result<(), RegistrationError> { + // acquire a SPI client first, so that we don't register the display + // service unless we can get a SPI client. + let spim = SpiSenderClient::from_registry(kernel) + .await + .map_err(RegistrationError::NoSpiSender)?; + + // bind a listener + let cmd = kernel + .registry() + .bind_konly(2) + .await + .map_err(RegistrationError::Registration)? + .into_request_stream(2) + .await; + let linebuf = FixedVec::new(FRAME_BYTES).await; let ctxt = Arc::new(Mutex::new(Context { @@ -87,12 +111,6 @@ impl SharpDisplay { })) .await; - let cmd = kernel - .bind_konly_service(2) - .await - .map_err(|_| FrameError::DisplayAlreadyExists)? - .into_request_stream(2) - .await; let commander = CommanderTask { cmd, ctxt: ctxt.clone(), @@ -107,7 +125,7 @@ impl SharpDisplay { let draw = Draw { kernel, buf: linebuf, - spim: SpiSenderClient::from_registry(kernel).await.unwrap(), + spim, ctxt, }; diff --git a/platforms/allwinner-d1/core/src/drivers/spim.rs b/platforms/allwinner-d1/core/src/drivers/spim.rs index 40e0de89..0669adef 100644 --- a/platforms/allwinner-d1/core/src/drivers/spim.rs +++ b/platforms/allwinner-d1/core/src/drivers/spim.rs @@ -110,7 +110,8 @@ impl SpiSenderServer { queued: usize, ) -> Result<(), registry::RegistrationError> { let reqs = kernel - .bind_konly_service::(queued) + .registry() + .bind_konly::(queued) .await? .into_request_stream(queued) .await; @@ -225,7 +226,18 @@ impl SpiSenderClient { pub async fn from_registry( kernel: &'static Kernel, ) -> Result> { - let hdl = kernel.registry().await.connect().await?; + let hdl = kernel.registry().connect(()).await?; + + Ok(SpiSenderClient { + hdl, + osc: Reusable::new_async().await, + }) + } + + pub async fn from_registry_no_retry( + kernel: &'static Kernel, + ) -> Result> { + let hdl = kernel.registry().try_connect(()).await?; Ok(SpiSenderClient { hdl, diff --git a/platforms/allwinner-d1/core/src/drivers/twi.rs b/platforms/allwinner-d1/core/src/drivers/twi.rs index 13fbc968..4c655142 100644 --- a/platforms/allwinner-d1/core/src/drivers/twi.rs +++ b/platforms/allwinner-d1/core/src/drivers/twi.rs @@ -305,7 +305,8 @@ impl I2c0 { queued: usize, ) -> Result<(), registry::RegistrationError> { let rx = kernel - .bind_konly_service::(queued) + .registry() + .bind_konly::(queued) .await? .into_request_stream(queued) .await; diff --git a/platforms/allwinner-d1/core/src/drivers/uart.rs b/platforms/allwinner-d1/core/src/drivers/uart.rs index f2a61e32..b5f6a471 100644 --- a/platforms/allwinner-d1/core/src/drivers/uart.rs +++ b/platforms/allwinner-d1/core/src/drivers/uart.rs @@ -184,7 +184,8 @@ impl D1Uart { let (fifo_a, fifo_b) = new_bidi_channel(cap_in, cap_out).await; let reqs = k - .bind_konly_service::(4) + .registry() + .bind_konly::(4) .await? .into_request_stream(4) .await; diff --git a/platforms/beepy/src/i2c_puppet.rs b/platforms/beepy/src/i2c_puppet.rs index 786e5154..e5e5864e 100644 --- a/platforms/beepy/src/i2c_puppet.rs +++ b/platforms/beepy/src/i2c_puppet.rs @@ -15,10 +15,10 @@ use kernel::{ registry::{self, Envelope, KernelHandle, RegisteredDriver}, retry::{AlwaysRetry, ExpBackoff, Retry, WithMaxRetries}, services::{ - i2c::{I2cClient, I2cError}, + i2c::{I2cClient, I2cError, I2cService}, keyboard::{ key_event::{self, KeyEvent, Modifiers}, - mux::KeyboardMuxClient, + mux::{KeyboardMuxClient, KeyboardMuxService}, }, }, tracing::{self, instrument, Instrument, Level}, @@ -108,19 +108,15 @@ impl I2cPuppetClient { /// /// If the [`I2cPuppetService`] hasn't been registered yet, we will retry until it /// has been registered. - pub async fn from_registry(kernel: &'static Kernel) -> Self { - loop { - match I2cPuppetClient::from_registry_no_retry(kernel).await { - Ok(port) => return port, - Err(registry::ConnectError::Rejected(_)) => { - unreachable!("the KeyboardMuxService does not return connect errors!") - } - Err(_) => { - // I2C probably isn't registered yet. Try again in a bit - kernel.sleep(Duration::from_millis(10)).await; - } - } - } + pub async fn from_registry( + kernel: &'static Kernel, + ) -> Result> { + let handle = kernel.registry().connect(()).await?; + + Ok(I2cPuppetClient { + handle, + reply: Reusable::new_async().await, + }) } /// Obtain an `I2cPuppetClient` @@ -132,11 +128,7 @@ impl I2cPuppetClient { pub async fn from_registry_no_retry( kernel: &'static Kernel, ) -> Result> { - let handle = kernel - .registry() - .await - .connect::() - .await?; + let handle = kernel.registry().try_connect(()).await?; Ok(I2cPuppetClient { handle, @@ -243,6 +235,8 @@ pub struct I2cPuppetServer { #[derive(Debug)] pub enum RegistrationError { Registry(registry::RegistrationError), + NoI2c(registry::ConnectError), + NoKeymux(registry::ConnectError), NoI2cPuppet(I2cError), InvalidSettings(&'static str), } @@ -351,7 +345,9 @@ impl I2cPuppetServer { irq_waker: impl Into>, ) -> Result<(), RegistrationError> { let keymux = if settings.keymux { - let keymux = KeyboardMuxClient::from_registry(kernel).await; + let keymux = KeyboardMuxClient::from_registry(kernel) + .await + .map_err(RegistrationError::NoKeymux)?; tracing::debug!("acquired keyboard mux client"); Some(keymux) } else { @@ -362,7 +358,10 @@ impl I2cPuppetServer { // The longest read or write operation we will perform is two bytes // long. Thus, we can reuse a single 2-byte buffer forever. let buf = FixedVec::new(2).await; - I2cClient::from_registry(kernel).await.with_cached_buf(buf) + I2cClient::from_registry(kernel) + .await + .map_err(RegistrationError::NoI2c)? + .with_cached_buf(buf) }; let subscriptions = FixedVec::new(settings.max_subscriptions).await; @@ -390,7 +389,8 @@ impl I2cPuppetServer { .map_err(RegistrationError::NoI2cPuppet)?; let rx = kernel - .bind_konly_service(settings.channel_capacity) + .registry() + .bind_konly(settings.channel_capacity) .await .map_err(RegistrationError::Registry)? .into_request_stream(settings.channel_capacity) diff --git a/platforms/esp32c3-buddy/src/drivers/uart.rs b/platforms/esp32c3-buddy/src/drivers/uart.rs index 9e397397..5259d93b 100644 --- a/platforms/esp32c3-buddy/src/drivers/uart.rs +++ b/platforms/esp32c3-buddy/src/drivers/uart.rs @@ -122,7 +122,8 @@ impl C3Uart { let old = UART_RX.swap(leaked_prod, Ordering::AcqRel); assert_eq!(old, null_mut()); - k.register_konly::(registration) + k.registry() + .register_konly::(registration) .await?; Ok(()) diff --git a/platforms/esp32c3-buddy/src/drivers/usb_serial.rs b/platforms/esp32c3-buddy/src/drivers/usb_serial.rs index 63480125..1ab64f0b 100644 --- a/platforms/esp32c3-buddy/src/drivers/usb_serial.rs +++ b/platforms/esp32c3-buddy/src/drivers/usb_serial.rs @@ -181,7 +181,8 @@ impl UsbSerialServer { k.spawn(self.worker(fifo_a)).await; - k.register_konly::(registration) + k.registry() + .register_konly::(registration) .await?; Ok(()) diff --git a/platforms/melpomene/src/sim_drivers/emb_display.rs b/platforms/melpomene/src/sim_drivers/emb_display.rs index 4d2d6197..abb3cf94 100644 --- a/platforms/melpomene/src/sim_drivers/emb_display.rs +++ b/platforms/melpomene/src/sim_drivers/emb_display.rs @@ -36,8 +36,7 @@ use mnemos_kernel::{ registry, services::{ emb_display::{ - DisplayMetadata, EmbDisplayService, FrameChunk, FrameError, FrameKind, MonoChunk, - Request, Response, + DisplayMetadata, EmbDisplayService, FrameChunk, FrameKind, MonoChunk, Request, Response, }, keyboard::{ key_event::{self, KeyCode, Modifiers}, @@ -63,12 +62,12 @@ impl SimDisplay { settings: DisplayConfig, width: u32, height: u32, - ) -> Result<(), FrameError> { + ) -> Result<(), registry::RegistrationError> { tracing::debug!("initializing SimDisplay server ({width}x{height})..."); let cmd = kernel - .bind_konly_service(settings.kchannel_depth) - .await - .map_err(|_| FrameError::DisplayAlreadyExists)? + .registry() + .bind_konly(settings.kchannel_depth) + .await? .into_request_stream(settings.kchannel_depth) .await; @@ -245,7 +244,9 @@ async fn render_loop( frames_per_second: usize, ) { let mut idle_ticks = 0; - let mut keymux = KeyboardMuxClient::from_registry(kernel).await; + let mut keymux = KeyboardMuxClient::from_registry(kernel) + .await + .expect("no keyboard mux service!"); let mut first_done = false; let sleep_time = Duration::from_micros(1_000_000 / (frames_per_second as u64)); loop { diff --git a/platforms/melpomene/src/sim_drivers/tcp_serial.rs b/platforms/melpomene/src/sim_drivers/tcp_serial.rs index 9dab2ff6..1ed9f17d 100644 --- a/platforms/melpomene/src/sim_drivers/tcp_serial.rs +++ b/platforms/melpomene/src/sim_drivers/tcp_serial.rs @@ -26,7 +26,8 @@ impl TcpSerial { let (a_ring, b_ring) = new_bidi_channel(settings.incoming_size, settings.outgoing_size).await; let reqs = kernel - .bind_konly_service::(settings.kchannel_depth) + .registry() + .bind_konly::(settings.kchannel_depth) .await? .into_request_stream(settings.kchannel_depth) .await; diff --git a/platforms/pomelo/src/sim_drivers/emb_display.rs b/platforms/pomelo/src/sim_drivers/emb_display.rs index 460d9777..0caf957a 100644 --- a/platforms/pomelo/src/sim_drivers/emb_display.rs +++ b/platforms/pomelo/src/sim_drivers/emb_display.rs @@ -39,7 +39,7 @@ use mnemos_kernel::{ }, keyboard::{ key_event::{self, KeyCode, Modifiers}, - mux::KeyboardMuxClient, + mux::{KeyboardMuxClient, KeyboardMuxService}, KeyEvent, }, }, @@ -77,6 +77,13 @@ async fn draw_complete_handler(rx: KConsumer) { } } } + +#[derive(Debug)] +pub enum RegistrationError { + Register(registry::RegistrationError), + NoKeymux(registry::ConnectError), +} + impl SimDisplay { /// Register the driver instance /// @@ -87,25 +94,29 @@ impl SimDisplay { kernel: &'static Kernel, width: u32, height: u32, - ) -> Result<(), FrameError> { + ) -> Result<(), RegistrationError> { tracing::debug!("initializing SimDisplay server ({width}x{height})..."); + let keymux = KeyboardMuxClient::from_registry(kernel) + .await + .map_err(RegistrationError::NoKeymux)?; + + let cmd = kernel + .registry() + .bind_konly(2) + .await + .map_err(RegistrationError::Register)? + .into_request_stream(2) + .await; + // TODO settings.kchannel_depth - let (listener, registration) = registry::Listener::new(2).await; - let cmd = listener.into_request_stream(2).await; let commander = CommanderTask { cmd, width, height }; kernel.spawn(commander.run(kernel, width, height)).await; let (key_tx, key_rx) = KChannel::new_async(32).await.split(); - let keymux = KeyboardMuxClient::from_registry(kernel).await; kernel.spawn(key_event_handler(key_rx, keymux)).await; - kernel - .register_konly::(registration) - .await - .map_err(|_| FrameError::DisplayAlreadyExists)?; - // listen for key events let on_keydown = Closure::::new(move |event: web_sys::KeyboardEvent| { event.prevent_default(); diff --git a/platforms/pomelo/src/sim_drivers/serial.rs b/platforms/pomelo/src/sim_drivers/serial.rs index fed35348..17542a2b 100644 --- a/platforms/pomelo/src/sim_drivers/serial.rs +++ b/platforms/pomelo/src/sim_drivers/serial.rs @@ -27,9 +27,13 @@ impl Serial { recv: mpsc::Receiver, recv_callback: fn(String), ) -> Result<(), registry::RegistrationError> { + let cons = kernel + .registry() + .bind_konly::(2) + .await? + .into_request_stream(2) + .await; let (a_ring, b_ring) = new_bidi_channel(incoming_size, outgoing_size).await; - let (listener, registration) = registry::Listener::new(2).await; - let cons = listener.into_request_stream(2).await; kernel .spawn(async move { @@ -66,9 +70,7 @@ impl Serial { .instrument(info_span!("Serial", ?port)), ) .await; - kernel - .register_konly::(registration) - .await + Ok(()) } } diff --git a/source/kernel/src/daemons/shells.rs b/source/kernel/src/daemons/shells.rs index ffbbd0a2..b1e02663 100644 --- a/source/kernel/src/daemons/shells.rs +++ b/source/kernel/src/daemons/shells.rs @@ -155,8 +155,12 @@ pub async fn graphical_shell_mono(k: &'static Kernel, settings: GraphicalShellSe font, } = settings; - let mut keyboard = KeyClient::from_registry(k, Default::default()).await; - let mut disp_hdl = EmbDisplayClient::from_registry(k).await; + let mut keyboard = KeyClient::from_registry(k, Default::default()) + .await + .expect("failed to get keyboard service"); + let mut disp_hdl = EmbDisplayClient::from_registry(k) + .await + .expect("failed to get EmbDisplayClient"); let char_y = font.character_size.height; let char_x = font.character_size.width + font.character_spacing; diff --git a/source/kernel/src/forth/mod.rs b/source/kernel/src/forth/mod.rs index 1bd387fc..319c192d 100644 --- a/source/kernel/src/forth/mod.rs +++ b/source/kernel/src/forth/mod.rs @@ -322,7 +322,8 @@ impl MnemosContext { SpawnulatorClient::from_registry(kernel), ) .await - .expect("Spawnulator client timed out - is the spawnulator running?"), + .expect("Spawnulator client timed out - is the spawnulator running?") + .expect("failed to get spawnulator"), } } } @@ -414,7 +415,9 @@ async fn sermux_open_port(forth: &mut forth3::Forth) -> Result<() // We could codify that zero is an invalid BOH_TOKEN, and put zero on the // stack instead, to allow userspace to handle errors if wanted. // - let mut mux_hdl = SerialMuxClient::from_registry(forth.host_ctxt.kernel).await; + let mut mux_hdl = SerialMuxClient::from_registry(forth.host_ctxt.kernel) + .await + .map_err(|_| forth3::Error::InternalError)?; let port = mux_hdl .open_port(port, sz) diff --git a/source/kernel/src/lib.rs b/source/kernel/src/lib.rs index feb8dd0f..3cd7d90d 100644 --- a/source/kernel/src/lib.rs +++ b/source/kernel/src/lib.rs @@ -98,14 +98,13 @@ pub use embedded_hal_async; pub use maitake; use maitake::{ scheduler::LocalScheduler, - sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, task::{BoxStorage, JoinHandle, Storage}, time::{Duration, Sleep, Timeout, Timer}, }; pub use mnemos_alloc; use mnemos_alloc::containers::Box; -use registry::{Listener, RegisteredDriver, Registry}; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use registry::Registry; +use serde::{Deserialize, Serialize}; use services::{ forth_spawnulator::{SpawnulatorServer, SpawnulatorSettings}, keyboard::mux::{KeyboardMuxServer, KeyboardMuxSettings}, @@ -133,8 +132,8 @@ pub struct Kernel { /// Items that do not require a lock to access, and must only /// be accessed with shared refs inner: KernelInner, - /// The run-time driver registry, accessed via an async [`RwLock`]. - registry: RwLock, + /// The run-time driver registry. + registry: Registry, } unsafe impl Sync for Kernel {} @@ -175,11 +174,8 @@ impl Kernel { timer: Timer::new(settings.timer_granularity), }; - let new_kernel = Box::try_new(Kernel { - inner, - registry: RwLock::new(registry), - }) - .map_err(|_| "Kernel allocation failed.")?; + let new_kernel = + Box::try_new(Kernel { inner, registry }).map_err(|_| "Kernel allocation failed.")?; Ok(new_kernel) } @@ -227,129 +223,11 @@ impl Kernel { self.spawn_allocated(bx) } - /// Registers a new kernel-only [`RegisteredDriver`] with the kernel's - /// service [`Registry`]. - /// - /// This is equivalent to calling - /// ```rust - /// # use serde::{Serialize, de::DeserializeOwned}; - /// # use kernel::{Kernel, registry::{RegisteredDriver, Registration}}; - /// # async fn example() -> Result<(), kernel::registry::RegistrationError> - /// # where RD: RegisteredDriver { - /// # let kernel: Kernel = unimplemented!("this test never actually creates a kernel"); - /// # let registration: Registration = unimplemented!(); - /// kernel.with_registry(|registry| { registry.register_konly::(registration) }).await - /// # } - /// ``` - pub async fn register_konly( - &'static self, - registration: registry::Registration, - ) -> Result<(), registry::RegistrationError> - where - RD: RegisteredDriver, - { - self.registry_mut().await.register_konly(registration) - } - - /// Registers a new [`RegisteredDriver`] with the kernel's service [`Registry`]. - /// - /// This is equivalent to calling - /// ```rust - /// # use serde::{Serialize, de::DeserializeOwned}; - /// # use kernel::{Kernel, registry::{RegisteredDriver, Registration}}; - /// # async fn example() -> Result<(), kernel::registry::RegistrationError> - /// # where - /// # RD: RegisteredDriver + 'static, - /// # RD::Hello: Serialize + DeserializeOwned, - /// # RD::ConnectError: Serialize + DeserializeOwned, - /// # RD::Request: Serialize + DeserializeOwned, - /// # RD::Response: Serialize + DeserializeOwned, - /// # { - /// # let kernel: Kernel = unimplemented!("this test never actually creates a kernel"); - /// # let registration: Registration = unimplemented!(); - /// kernel.with_registry(|registry| { registry.register::(registration) }).await - /// # } - /// ``` - pub async fn register( - &'static self, - registration: registry::Registration, - ) -> Result<(), registry::RegistrationError> - where - RD: RegisteredDriver + 'static, - RD::Hello: Serialize + DeserializeOwned, - RD::ConnectError: Serialize + DeserializeOwned, - RD::Request: Serialize + DeserializeOwned, - RD::Response: Serialize + DeserializeOwned, - { - self.registry_mut().await.register(registration) - } - - /// Bind a kernel-only [`Listener`] for a driver service of type `RD`. - /// - /// This is a helper method which creates a [`Listener`] using - /// [`Listener::new`] and then registers that [`Listener`]'s - /// [`listener::Registration`] with the kernel's [`Registry`] using - /// [`Registry::register_konly`]. - /// - /// Driver services registered with [`Kernel::bind_konly_service`] can NOT - /// be queried or interfaced with from Userspace. If a registered service - /// has request and response types that are serializable, it can instead be - /// registered with [`Kernel::bind_service`], which allows for userspace - /// access. - /// - /// [`listener::Registration`]: registry::listener::Registration - pub async fn bind_konly_service( - &self, - capacity: usize, - ) -> Result, registry::RegistrationError> - where - RD: RegisteredDriver, - { - let (listener, registration) = Listener::new(capacity).await; - self.registry_mut().await.register_konly(registration)?; - Ok(listener) - } - - /// Bind a [`Listener`] for a driver service of type `RD`. - /// - /// This is a helper method which creates a [`Listener`] using - /// [`Listener::new`] and then registers that [`Listener`]'s - /// [`listener::Registration`] with the registry using - /// [`Registry::register`]. - /// - /// Driver services registered with [`Registry::bind`] can be accessed both - /// by the kernel and by userspace. This requires that the - /// [`RegisteredDriver`]'s message types implement [`Serialize`] and - /// [`DeserializeOwned`]. Driver services whose message types are *not* - /// serializable may still bind listeners using - /// [`Kernel::bind_konly_service`], but these listeners will not be - /// accessible from userspace. - /// - /// [`listener::Registration`]: registry::listener::Registration - pub async fn bind_service( - &mut self, - capacity: usize, - ) -> Result, registry::RegistrationError> - where - RD: RegisteredDriver + 'static, - RD::Hello: Serialize + DeserializeOwned, - RD::ConnectError: Serialize + DeserializeOwned, - RD::Request: Serialize + DeserializeOwned, - RD::Response: Serialize + DeserializeOwned, - { - let (listener, registration) = Listener::new(capacity).await; - self.registry_mut().await.register(registration)?; - Ok(listener) - } - /// Immutably borrow the kernel's [`Registry`]. - pub async fn registry(&self) -> RwLockReadGuard<'_, Registry> { - self.registry.read().await - } - - /// Mutably borrow the kernel's [`Registry`]. - pub async fn registry_mut(&self) -> RwLockWriteGuard<'_, Registry> { - self.registry.write().await + #[inline] + #[must_use] + pub fn registry(&self) -> &Registry { + &self.registry } #[track_caller] diff --git a/source/kernel/src/registry/mod.rs b/source/kernel/src/registry/mod.rs index 640831dd..ceaf6b67 100644 --- a/source/kernel/src/registry/mod.rs +++ b/source/kernel/src/registry/mod.rs @@ -6,6 +6,7 @@ use core::{ }; use crate::comms::{kchannel, oneshot::Reusable}; +use maitake::sync::{RwLock, WaitQueue}; use mnemos_alloc::containers::FixedVec; use portable_atomic::{AtomicU32, Ordering}; use postcard::experimental::max_size::MaxSize; @@ -63,9 +64,9 @@ pub mod known_uuids { /// /// Typically used with [`Registry::register`] or [`Registry::register_konly`]. /// A connection to the service can be established using [`Registry::connect`], -/// [`Registry::connect_with_hello`], or -/// [`Registry::connect_userspace_with_hello`] (depending on the service), after -/// the service has been registered.. +/// [`Registry::try_connect`], [`Registry::connect_userspace`], or +/// [`Registry::try_connect_userspace] (depending on the service), after +/// the service has been registered. pub trait RegisteredDriver { /// This is the type of the request sent TO the driver service type Request: 'static; @@ -119,8 +120,9 @@ pub struct RegistryType { /// The driver registry used by the kernel. pub struct Registry { - items: FixedVec, + items: RwLock>, counter: AtomicU32, + service_added: WaitQueue, } // TODO: This probably goes into the ABI crate, here is fine for now @@ -268,11 +270,13 @@ pub enum RegistrationError { RegistryFull, } -/// Errors returned by [`Registry::connect`] and -/// [`Registry::connect_with_hello`]. +/// Errors returned by [`Registry::connect`] and [`Registry::try_connect`]. pub enum ConnectError { /// No [`RegisteredDriver`] of this type was found! - NotFound, + /// + /// The [`RegisteredDriver::Hello`] message is returned, so that it can be + /// used again. + NotFound(D::Hello), /// The remote [`RegisteredDriver`] rejected the connection. Rejected(D::ConnectError), /// The remote [`RegisteredDriver`] has been registered, but the service @@ -280,11 +284,16 @@ pub enum ConnectError { DriverDead, } -/// Errors returned by [`Registry::connect_userspace_with_hello`] +/// Errors returned by [`Registry::connect_userspace`] and +/// [`Registry::try_connect_userspace`]. pub enum UserConnectError { - /// A connection error occurred: either the driver was not found in the - /// registry, it was no longer running, or it rejected the connection. - Connect(ConnectError), + /// No [`RegisteredDriver`] of this type was found! + NotFound, + /// The remote [`RegisteredDriver`] rejected the connection. + Rejected(D::ConnectError), + /// The remote [`RegisteredDriver`] has been registered, but the service + /// task has terminated. + DriverDead, /// Deserializing the userspace `Hello` message failed. DeserializationFailed(postcard::Error), /// The requested driver is not exposed. @@ -377,6 +386,7 @@ struct RegistryValue { /// userspace requests are serialized and deserialized. /// /// [vtable]: https://en.wikipedia.org/wiki/Virtual_method_table +#[derive(Copy, Clone)] struct UserVtable { /// Deserializes userspace requests. req_deser: ErasedReqDeser, @@ -404,9 +414,11 @@ impl RegistryType { impl Registry { /// Create a new registry with room for up to `max_items` registered drivers. pub fn new(max_items: usize) -> Self { + let items = FixedVec::try_new(max_items).unwrap(); Self { - items: FixedVec::try_new(max_items).unwrap(), + items: RwLock::new(items), counter: AtomicU32::new(0), + service_added: WaitQueue::new(), } } @@ -421,15 +433,12 @@ impl Registry { /// or interfaced with from Userspace. If a registered service has request /// and response types that are serializable, it can instead be registered /// with [`Registry::bind`] which allows for userspace access. - pub async fn bind_konly( - &mut self, - capacity: usize, - ) -> Result, RegistrationError> + pub async fn bind_konly(&self, capacity: usize) -> Result, RegistrationError> where RD: RegisteredDriver, { let (listener, registration) = Listener::new(capacity).await; - self.register_konly(registration)?; + self.register_konly(registration).await?; Ok(listener) } @@ -446,7 +455,7 @@ impl Registry { /// [`DeserializeOwned`]. Driver services whose message types are *not* /// serializable may still bind listeners using [`Registry::bind_konly`], /// but these listeners will not be accessible from userspace. - pub async fn bind(&mut self, capacity: usize) -> Result, RegistrationError> + pub async fn bind(&self, capacity: usize) -> Result, RegistrationError> where RD: RegisteredDriver + 'static, RD::Hello: Serialize + DeserializeOwned, @@ -455,7 +464,7 @@ impl Registry { RD::Response: Serialize + DeserializeOwned, { let (listener, registration) = Listener::new(capacity).await; - self.register(registration)?; + self.register(registration).await?; Ok(listener) } @@ -471,28 +480,25 @@ impl Registry { skip(self, registration), fields(svc = %any::type_name::()), )] - pub fn register_konly( - &mut self, + pub async fn register_konly( + &self, registration: listener::Registration, ) -> Result<(), RegistrationError> { - if self.items.as_slice().iter().any(|i| i.key == RD::UUID) { - return Err(RegistrationError::UuidAlreadyRegistered); - } let conn_prod = registration.tx.type_erase(); - let service_id = self.counter.fetch_add(1, Ordering::Relaxed); - self.items - .try_push(RegistryItem { - key: RD::UUID, - value: RegistryValue { - req_resp_tuple_id: RD::type_id().type_of(), - conn_prod, - user_vtable: None, - service_id: ServiceId(service_id), - }, - }) - .map_err(|_| RegistrationError::RegistryFull)?; + self.insert_item(RegistryItem { + key: RD::UUID, + value: RegistryValue { + req_resp_tuple_id: RD::type_id().type_of(), + conn_prod, + user_vtable: None, + service_id: ServiceId(service_id), + }, + }) + .await?; + info!(uuid = ?RD::UUID, service_id, "Registered KOnly"); + Ok(()) } @@ -507,8 +513,8 @@ impl Registry { skip(self, registration), fields(svc = %any::type_name::()), )] - pub fn register( - &mut self, + pub async fn register( + &self, registration: listener::Registration, ) -> Result<(), RegistrationError> where @@ -518,30 +524,25 @@ impl Registry { RD::Request: Serialize + DeserializeOwned, RD::Response: Serialize + DeserializeOwned, { - if self.items.as_slice().iter().any(|i| i.key == RD::UUID) { - return Err(RegistrationError::UuidAlreadyRegistered); - } - let service_id = self.counter.fetch_add(1, Ordering::Relaxed); let conn_prod = registration.tx.type_erase(); - self.items - .try_push(RegistryItem { - key: RD::UUID, - value: RegistryValue { - req_resp_tuple_id: RD::type_id().type_of(), - conn_prod, - user_vtable: Some(UserVtable::new::()), - service_id: ServiceId(service_id), - }, - }) - .map_err(|_| RegistrationError::RegistryFull)?; + self.insert_item(RegistryItem { + key: RD::UUID, + value: RegistryValue { + req_resp_tuple_id: RD::type_id().type_of(), + conn_prod, + user_vtable: Some(UserVtable::new::()), + service_id: ServiceId(service_id), + }, + }) + .await?; info!(svc = %any::type_name::(), uuid = ?RD::UUID, service_id, "Registered"); Ok(()) } - /// Get a kernelspace (including drivers) handle of a given driver service, + /// Attempt to get a kernelspace (including drivers) handle of a given driver service, /// which does not require sending a [`RegisteredDriver::Hello`] message. /// /// This can be used by drivers and tasks to interface with a registered driver @@ -549,36 +550,60 @@ impl Registry { /// /// The driver service MUST have already been registered using [Registry::register] or /// [Registry::register_konly] prior to making this call, otherwise no handle will - /// be returned. + /// be returned. To wait until a driver is registered, use + /// [`Registry::connect`] instead. /// /// # Returns /// /// - [`Ok`]`(`[KernelHandle`]`)` if the requested service was found and /// a connection was successfully established. /// - /// - [`Err`]`(`[`ConnectError`]`)` if the requested service was not - /// found in the registry, or if the service [rejected] the incoming - /// connection. + /// - [`Ok`]`(`[KernelHandle`]`)` if the requested service was found and + /// a connection was successfully established. + /// + /// - [`Err`]`(`[`ConnectError::Rejected`]`)` if the service [rejected] the + /// incoming connection. + /// + /// - [`Err`]`(`[`ConnectError::DriverDead`]`)` if the service has been + /// registered but is no longer running. + /// + /// - [`Err`]`(`[`ConnectError::NotFound`]`)` if no service matching the + /// requested [`RegisteredDriver`] type exists in the registry. /// /// [rejected]: listener::Handshake::reject #[tracing::instrument( - name = "Registry::connect_with_hello", + name = "Registry::try_connect", level = "debug", skip(self, hello), fields(svc = %any::type_name::()), )] - pub async fn connect_with_hello( + pub async fn try_connect( &self, hello: RD::Hello, ) -> Result, ConnectError> { - let item = self.get::()?; - - // cast the erased connection sender back to a typed sender. - let tx = unsafe { - // Safety: we just checked that the type IDs match above. - item.value - .conn_prod - .clone_typed::>() + let (tx, service_id) = { + // /!\ WARNING: Load-bearing scope /!\ + // + // We need to ensure that we only hold the lock on `self.items` + // while we're accessing the item; *not* while we're `await`ing a + // bunch of other stuff to connect to the service. This is + // important, because if we held the lock, no other task would be + // able to connect while we're waiting for the handshake, + // potentially causing a deadlock... + let items = self.items.read().await; + let item = match Self::get::(&items) { + Some(item) => item, + None => return Err(ConnectError::NotFound(hello)), + }; + + // cast the erased connection sender back to a typed sender. + let tx = unsafe { + // Safety: we just checked that the type IDs match above. + item.value + .conn_prod + .clone_typed::>() + }; + (tx, item.value.service_id) }; // TODO(eliza): it would be nice if we could reuse the oneshot receiver @@ -610,28 +635,73 @@ impl Registry { let client_id = self.counter.fetch_add(1, Ordering::Relaxed); let res = Ok(KernelHandle { prod, - service_id: item.value.service_id, + service_id, client_id: ClientId(client_id), request_ctr: 0, }); - info!(svc = %any::type_name::(), uuid = ?RD::UUID, service_id = item.value.service_id.0, client_id, "Got KernelHandle from Registry"); + info!(svc = %any::type_name::(), uuid = ?RD::UUID, service_id = service_id.0, client_id, "Got KernelHandle from Registry"); res } /// Get a kernelspace (including drivers) handle of a given driver service, - /// which does not require sending a [`RegisteredDriver::Hello`] message. + /// waiting until the service is registered if it does not already exist. /// - /// This method is equivalent to [`Registry::connect_with_hello`] when the - /// [`RegisteredDriver::Hello`] type is [`()`]. + /// This can be used by drivers and tasks to interface with a registered + /// driver service. + /// + /// If no service matching the requested [`RegisteredDriver`] type has been + /// registered, this method will wait until that service is added to the + /// registry, unless the registry becomes full. + /// + /// # Returns + /// + /// - [`Ok`]`(`[KernelHandle`]`)` if the requested service was found and + /// a connection was successfully established. + /// + /// - [`Err`]`(`[`ConnectError::Rejected`]`)` if the service [rejected] the + /// incoming connection. + /// + /// - [`Err`]`(`[`ConnectError::DriverDead`]`)` if the service has been + /// registered but is no longer running. + /// + /// - [`Err`]`(`[`ConnectError::NotFound`]`)` if no service matching the + /// requested [`RegisteredDriver`] type exists *and* the registry was + /// full. + /// + /// [rejected]: listener::Handshake::reject + #[tracing::instrument( + name = "Registry::connect", + level = "debug", + skip(self, hello), + fields(svc = %any::type_name::()), + )] + pub async fn connect(&self, hello: RD::Hello) -> Result, ConnectError> + where + RD: RegisteredDriver, + { + let mut hello = Some(hello); + let mut is_full = false; + loop { + match self.try_connect(hello.take().unwrap()).await { + Ok(handle) => return Ok(handle), + Err(ConnectError::NotFound(h)) if !is_full => { + hello = Some(h); + tracing::debug!("no service found; waiting for one to be added..."); + // wait for a service to be added to the registry + is_full = self.service_added.wait().await.is_err(); + } + Err(err) => return Err(err), + } + } + } + + /// Get a kernelspace (including drivers) handle of a given driver service, + /// waiting until the service is registered if it does not already exist. /// /// This can be used by drivers and tasks to interface with a registered driver /// service. /// - /// The driver service MUST have already been registered using [Registry::register] or - /// [Registry::register_konly] prior to making this call, otherwise no handle will - /// be returned. - /// /// # Returns /// /// - [`Ok`]`(`[KernelHandle`]`)` if the requested service was found and @@ -639,33 +709,58 @@ impl Registry { /// /// - [`Err`]`(`[`ConnectError`]`)` if the requested service was not /// found in the registry, or if the service [rejected] the incoming - /// connection. + /// connection. Note that [`ConnectError::NotFound`] is not returned + /// _unless_ the registry is full and no more services will be added. /// /// [rejected]: listener::Handshake::reject - - pub async fn connect(&self) -> Result, ConnectError> + #[tracing::instrument( + name = "Registry::connect_userspace", + level = "debug", + skip(self), + fields(svc = %any::type_name::()), + )] + pub async fn connect_userspace( + &self, + scheduler: &maitake::scheduler::LocalScheduler, + user_hello: &[u8], + ) -> Result> where - RD: RegisteredDriver, + RD: RegisteredDriver, + RD::Hello: Serialize + DeserializeOwned, + RD::ConnectError: Serialize + DeserializeOwned, + RD::Request: Serialize + DeserializeOwned, + RD::Response: Serialize + DeserializeOwned, { - self.connect_with_hello(()).await + let mut is_full = false; + loop { + match self.try_connect_userspace(scheduler, user_hello).await { + Ok(handle) => return Ok(handle), + Err(UserConnectError::NotFound) if !is_full => { + tracing::debug!("no service found; waiting for one to be added..."); + // wait for a service to be added to the registry + is_full = self.service_added.wait().await.is_err(); + } + Err(err) => return Err(err), + } + } } - /// Get a handle capable of processing serialized userspace messages to a + /// Try to get a handle capable of processing serialized userspace messages to a /// registered driver service, given a byte buffer for the userspace /// [`RegisteredDriver::Hello`] message. /// /// The driver service MUST have already been registered using [Registry::register] or /// prior to making this call, otherwise no handle will be returned. /// - /// Driver services registered with [Registry::register_konly] cannot be retrieved via - /// a call to [Registry::connect_userspace_with_hello]. + /// Driver services registered with [`Registry::register_konly`] cannot be + /// retrieved via a call to [`Registry::try_connect_userspace`]. #[tracing::instrument( - name = "Registry::connect_userspace_with_hello", + name = "Registry::try_connect_userspace", level = "debug", skip(self, scheduler), fields(svc = %any::type_name::()), )] - pub async fn connect_userspace_with_hello( + pub async fn try_connect_userspace( &self, scheduler: &maitake::scheduler::LocalScheduler, user_hello: &[u8], @@ -677,24 +772,36 @@ impl Registry { RD::Request: Serialize + DeserializeOwned, RD::Response: Serialize + DeserializeOwned, { - let item = self.get::().map_err(UserConnectError::Connect)?; - let vtable = item - .value - .user_vtable - .as_ref() - // if the registry item has no userspace vtable, it's not exposed to - // userspace. - // this is *weird*, since this method requires that `RD`'s message - // types be serializable/deserializable, but it's possible that the - // driver was (accidentally?) registered with `register_konly` even - // though it didn't *need* to be due to serializability... - .ok_or(UserConnectError::NotUserspace)?; + let (vtable, conn_prod, service_id) = { + // /!\ WARNING: Load-bearing scope /!\ + // + // We need to ensure that we only hold the lock on `self.items` + // while we're accessing the item; *not* while we're `await`ing a + // bunch of other stuff to connect to the service. This is + // important, because if we held the lock, no other task would be + // able to connect while we're waiting for the handshake, + // potentially causing a deadlock... + let items = self.items.read().await; + let item = Self::get::(&items).ok_or_else(|| UserConnectError::NotFound)?; + let vtable = item + .value + .user_vtable + // if the registry item has no userspace vtable, it's not exposed to + // userspace. + // this is *weird*, since this method requires that `RD`'s message + // types be serializable/deserializable, but it's possible that the + // driver was (accidentally?) registered with `register_konly` even + // though it didn't *need* to be due to serializability... + .ok_or(UserConnectError::NotUserspace)?; + let conn_prod = item.value.conn_prod.clone(); + let service_id = item.value.service_id; + (vtable, conn_prod, service_id) + }; let mut handshake_result = mem::MaybeUninit::>::uninit(); let outptr = ptr::NonNull::from(&mut handshake_result).cast::<()>(); - let handshake = - unsafe { (vtable.handshake)(scheduler, user_hello, &item.value.conn_prod, outptr) }; + let handshake = unsafe { (vtable.handshake)(scheduler, user_hello, &conn_prod, outptr) }; let req_producer_leaked = match handshake.await { // Outer `Result` is the `JoinError` from `maitake` --- it should // always succeed, because we own the task's joinhandle, and we @@ -709,10 +816,7 @@ impl Registry { // Safety: `handshake_result` is guaranteed to be initialized by // `erased_handshake` if and only if its future completes with // an `Ok(())`. and it did! - handshake_result - .assume_init() - .map_err(UserConnectError::Connect)? - .type_erase() + handshake_result.assume_init()?.type_erase() }, }; @@ -720,7 +824,7 @@ impl Registry { info!( svc = %any::type_name::(), uuid = ?RD::UUID, - service_id = item.value.service_id.0, + service_id = service_id.0, client_id, "Got KernelHandle from Registry", ); @@ -728,19 +832,40 @@ impl Registry { Ok(UserspaceHandle { req_producer_leaked, req_deser: vtable.req_deser, - service_id: item.value.service_id, + service_id, client_id: ClientId(client_id), }) } - fn get(&self) -> Result<&RegistryItem, ConnectError> { - let Some(item) = self.items.as_slice().iter().find(|i| i.key == RD::UUID) else { + async fn insert_item(&self, item: RegistryItem) -> Result<(), RegistrationError> { + { + let mut items = self.items.write().await; + if items.as_slice().iter().any(|i| i.key == item.key) { + return Err(RegistrationError::UuidAlreadyRegistered); + } + + items.try_push(item).map_err(|_| { + tracing::warn!("failed to insert new registry item; the registry is full!"); + // close the "service added" waitcell, because no new services will + // ever be added. + self.service_added.close(); + RegistrationError::RegistryFull + })?; + } + + self.service_added.wake_all(); + + Ok(()) + } + + fn get(items: &FixedVec) -> Option<&RegistryItem> { + let Some(item) = items.as_slice().iter().find(|i| i.key == RD::UUID) else { tracing::debug!( svc = %any::type_name::(), uuid = ?RD::UUID, "No service for this UUID exists in the registry!" ); - return Err(ConnectError::NotFound); + return None; }; let expected_type_id = RD::type_id().type_of(); @@ -753,10 +878,10 @@ impl Registry { type_id.actual = ?actual_type_id, "Registry entry's type ID did not match driver's type ID. This is (probably?) a bug!" ); - return Err(ConnectError::NotFound); + return None; } - Ok(item) + Some(item) } } @@ -1046,7 +1171,7 @@ where .map_err(|_| UserHandlerError::QueueFull) } -type UserHandshakeResult = Result>, ConnectError>; +type UserHandshakeResult = Result>, UserConnectError>; /// Perform a type-erased userspace handshake, deserializing the /// [`RegisteredDriver::Hello`] message from `hello_bytes` and returning a @@ -1108,9 +1233,9 @@ where .await // this is a `Reusable>>`, so // the outer `Result` is the error returned by `receive()`... - .map_err(|_| ConnectError::DriverDead) + .map_err(|_| UserConnectError::DriverDead) // ...and the inner result is the connect error returned by the service. - .and_then(|res| res.map_err(ConnectError::Rejected)); + .and_then(|res| res.map_err(UserConnectError::Rejected)); outptr // Safety: the caller is responsible for ensuring the out pointer is @@ -1144,7 +1269,7 @@ where fn eq(&self, other: &Self) -> bool { match (self, other) { (Self::DriverDead, Self::DriverDead) => true, - (Self::NotFound, Self::NotFound) => true, + (Self::NotFound(_), Self::NotFound(_)) => true, (Self::Rejected(this), Self::Rejected(that)) => this == that, _ => false, } @@ -1166,7 +1291,7 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut dbs = match self { Self::DriverDead => f.debug_struct("DriverDead"), - Self::NotFound => f.debug_struct("NotFound"), + Self::NotFound(_) => f.debug_struct("NotFound"), Self::Rejected(error) => { let mut d = f.debug_struct("Rejected"); d.field("error", error); @@ -1187,7 +1312,7 @@ where let name = any::type_name::(); match self { Self::DriverDead => write!(f, "the {name} service has terminated"), - Self::NotFound => write!(f, "no {name} service found in the registry",), + Self::NotFound(_) => write!(f, "no {name} service found in the registry",), Self::Rejected(err) => write!(f, "the {name} service rejected the connection: {err}",), } } @@ -1203,7 +1328,9 @@ where fn eq(&self, other: &Self) -> bool { match (self, other) { (Self::DeserializationFailed(this), Self::DeserializationFailed(that)) => this == that, - (Self::Connect(this), Self::Connect(that)) => this == that, + (Self::Rejected(this), Self::Rejected(that)) => this == that, + (Self::NotFound, Self::NotFound) => true, + (Self::DriverDead, Self::DriverDead) => true, (Self::NotUserspace, Self::NotUserspace) => true, _ => false, } @@ -1224,17 +1351,23 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::DeserializationFailed(error) => f - .debug_struct("DeserializationFailed") - .field("error", error) - .field("svc", &mycelium_util::fmt::display(any::type_name::())) - .finish(), - Self::Connect(err) => f.debug_tuple("Connect").field(err).finish(), - Self::NotUserspace => f - .debug_tuple("NotUserspace") - .field(&mycelium_util::fmt::display(any::type_name::())) - .finish(), + Self::DeserializationFailed(error) => { + let mut d = f.debug_struct("DeserializationFailed"); + + d.field("error", error); + d + } + Self::NotFound => f.debug_struct("NotFound"), + Self::DriverDead => f.debug_struct("NotFound"), + Self::Rejected(error) => { + let mut d = f.debug_struct("Rejected"); + d.field("error", &error); + d + } + Self::NotUserspace => f.debug_struct("NotUserspace"), } + .field("svc", &mycelium_util::fmt::display(any::type_name::())) + .finish() } } @@ -1244,8 +1377,11 @@ where D::ConnectError: fmt::Display, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let name = any::type_name::(); match self { - Self::Connect(err) => write!(f, "failed to connect from userspace: {err}"), + Self::DriverDead => write!(f, "the {name} service has terminated"), + Self::NotFound => write!(f, "no {name} service found in the registry"), + Self::Rejected(err) => write!(f, "the {name} service rejected the connection: {err}",), Self::DeserializationFailed(err) => write!( f, "failed to deserialize userspace Hello for the {} service: {err}", diff --git a/source/kernel/src/registry/tests.rs b/source/kernel/src/registry/tests.rs index e815b222..74a22e12 100644 --- a/source/kernel/src/registry/tests.rs +++ b/source/kernel/src/registry/tests.rs @@ -48,13 +48,12 @@ fn konly_connect() { }) .await; - k.register_konly(registration).await.unwrap(); + k.registry().register_konly(registration).await.unwrap(); let reply = comms::oneshot::Reusable::new_async().await; let mut client1 = k .registry() - .await - .connect_with_hello::(TestMessage(1)) + .connect::(TestMessage(1)) .await .expect("connect should succeed"); @@ -65,11 +64,7 @@ fn konly_connect() { assert_eq!(rsp.body, Ok(TestMessage(2))); // should be rejected - let res = k - .registry() - .await - .connect_with_hello::(TestMessage(2)) - .await; + let res = k.registry().connect::(TestMessage(2)).await; match res { Ok(_) => panic!("rejected connect should fail"), Err(ConnectError::Rejected(TestMessage(666))) => {} @@ -81,8 +76,7 @@ fn konly_connect() { let mut client2 = k .registry() - .await - .connect_with_hello::(TestMessage(1)) + .connect::(TestMessage(1)) .await .expect("connect with accepted Hello should succeed"); @@ -143,7 +137,7 @@ fn user_connect() { }) .await; - k.register(registration).await.unwrap(); + k.registry().register(registration).await.unwrap(); #[tracing::instrument(skip(k), err(Debug))] async fn user_connect( @@ -153,8 +147,7 @@ fn user_connect() { let bytes = postcard::to_stdvec(&hello).expect("must serialize!"); k.registry() - .await - .connect_userspace_with_hello::(&k.inner().scheduler, &bytes[..]) + .connect_userspace::(&k.inner().scheduler, &bytes[..]) .await } @@ -200,10 +193,7 @@ fn user_connect() { let res = user_connect(k, TestMessage(2)).await; match res { Ok(_) => panic!("request with rejected hello should fail"), - Err(e) => assert_eq!( - e, - UserConnectError::Connect(ConnectError::Rejected(TestMessage(666))) - ), + Err(e) => assert_eq!(e, UserConnectError::Rejected(TestMessage(666))), }; let client2 = user_connect(k, TestMessage(1)) diff --git a/source/kernel/src/services/emb_display.rs b/source/kernel/src/services/emb_display.rs index a716a9e7..88391732 100644 --- a/source/kernel/src/services/emb_display.rs +++ b/source/kernel/src/services/emb_display.rs @@ -8,8 +8,6 @@ //! //! See the docs of [FrameChunk] and [EmbDisplayClient] for additional details //! of use. -use core::time::Duration; - use embedded_graphics::{ pixelcolor::{BinaryColor, Gray8}, prelude::*, @@ -62,9 +60,6 @@ pub enum Response { #[derive(Debug, Eq, PartialEq)] pub enum FrameError { - /// Failed to register a display, the kernel reported that there is already - /// an existing EmbDisplay - DisplayAlreadyExists, /// We are still waiting for a response from the last request Busy, /// Internal Error @@ -86,18 +81,15 @@ impl EmbDisplayClient { /// [`EmbDisplayService`]. /// /// Will retry until success - pub async fn from_registry(kernel: &'static Kernel) -> Self { - loop { - match Self::from_registry_no_retry(kernel).await { - Ok(me) => return me, - Err(registry::ConnectError::Rejected(_)) => { - unreachable!("the EmbDisplayService does not return connect errors!") - } - Err(_) => { - kernel.sleep(Duration::from_millis(10)).await; - } - } - } + pub async fn from_registry( + kernel: &'static Kernel, + ) -> Result> { + let prod = kernel.registry().connect::(()).await?; + + Ok(EmbDisplayClient { + prod, + reply: Reusable::new_async().await, + }) } /// Obtain a new client handle by querying the registry for a registered @@ -109,8 +101,7 @@ impl EmbDisplayClient { ) -> Result> { let prod = kernel .registry() - .await - .connect::() + .try_connect::(()) .await?; Ok(EmbDisplayClient { diff --git a/source/kernel/src/services/forth_spawnulator.rs b/source/kernel/src/services/forth_spawnulator.rs index 3b5e6e6d..b604aeec 100644 --- a/source/kernel/src/services/forth_spawnulator.rs +++ b/source/kernel/src/services/forth_spawnulator.rs @@ -38,7 +38,7 @@ //! different queue (the scheduler's run queue), but I couldn't easily come up //! with another solution... -use core::{convert::Infallible, time::Duration}; +use core::convert::Infallible; use crate::{ comms::oneshot::Reusable, @@ -83,18 +83,15 @@ pub struct SpawnulatorClient { } impl SpawnulatorClient { - pub async fn from_registry(kernel: &'static Kernel) -> Self { - loop { - match Self::from_registry_no_retry(kernel).await { - Ok(me) => return me, - Err(registry::ConnectError::Rejected(_)) => { - unreachable!("the SpawnulatorService does not return connect errors!") - } - Err(_) => { - kernel.sleep(Duration::from_millis(10)).await; - } - } - } + pub async fn from_registry( + kernel: &'static Kernel, + ) -> Result> { + let prod = kernel.registry().connect::(()).await?; + + Ok(SpawnulatorClient { + hdl: prod, + reply: Reusable::new_async().await, + }) } pub async fn from_registry_no_retry( @@ -102,8 +99,7 @@ impl SpawnulatorClient { ) -> Result> { let prod = kernel .registry() - .await - .connect::() + .try_connect::(()) .await?; Ok(SpawnulatorClient { @@ -156,7 +152,8 @@ impl SpawnulatorServer { settings: SpawnulatorSettings, ) -> Result<(), registry::RegistrationError> { let vms = kernel - .bind_konly_service::(settings.capacity) + .registry() + .bind_konly::(settings.capacity) .await? .into_request_stream(settings.capacity) .await; diff --git a/source/kernel/src/services/i2c.rs b/source/kernel/src/services/i2c.rs index ce7b09ff..da7b8f46 100644 --- a/source/kernel/src/services/i2c.rs +++ b/source/kernel/src/services/i2c.rs @@ -100,7 +100,7 @@ use crate::{ registry::{self, known_uuids, Envelope, KernelHandle, RegisteredDriver}, Kernel, }; -use core::{convert::Infallible, fmt, time::Duration}; +use core::{convert::Infallible, fmt}; use embedded_hal_async::i2c::{self, AddressMode}; use uuid::Uuid; @@ -304,18 +304,16 @@ impl I2cClient { /// /// If the [`I2cService`] hasn't been registered yet, we will retry until it /// has been registered. - pub async fn from_registry(kernel: &'static Kernel) -> Self { - loop { - match Self::from_registry_no_retry(kernel).await { - Ok(me) => return me, - Err(registry::ConnectError::Rejected(_)) => { - unreachable!("the I2cService does not return connect errors!") - } - Err(_) => { - kernel.sleep(Duration::from_millis(10)).await; - } - } - } + pub async fn from_registry( + kernel: &'static Kernel, + ) -> Result> { + let handle = kernel.registry().connect::(()).await?; + + Ok(I2cClient { + handle, + reply: Reusable::new_async().await, + cached_buf: None, + }) } /// Obtain an `I2cClient` @@ -327,7 +325,7 @@ impl I2cClient { pub async fn from_registry_no_retry( kernel: &'static Kernel, ) -> Result> { - let handle = kernel.registry().await.connect::().await?; + let handle = kernel.registry().try_connect::(()).await?; Ok(I2cClient { handle, diff --git a/source/kernel/src/services/keyboard/mod.rs b/source/kernel/src/services/keyboard/mod.rs index 1aaeab99..044097b5 100644 --- a/source/kernel/src/services/keyboard/mod.rs +++ b/source/kernel/src/services/keyboard/mod.rs @@ -19,10 +19,9 @@ use crate::{ kchannel::{self, KChannel}, oneshot, }, - registry::{known_uuids, RegisteredDriver}, + registry::{self, known_uuids, RegisteredDriver}, Kernel, }; -use core::time::Duration; pub mod key_event; pub mod mux; @@ -100,22 +99,28 @@ pub struct KeyClient { rx: kchannel::KConsumer, } +#[derive(Debug)] +pub enum FromRegistryError { + Connect(registry::ConnectError), + Service(KeyboardError), + Request(registry::OneshotRequestError), +} + impl KeyClient { /// Obtain a `KeyClient` /// /// If the [`KeyboardService`] hasn't been registered yet, we will retry until it /// has been registered. - #[must_use] - pub async fn from_registry(kernel: &'static Kernel, subscribe: Subscribe) -> Self { - loop { - match Self::from_registry_no_retry(kernel, subscribe).await { - Some(port) => return port, - None => { - // I2C probably isn't registered yet. Try again in a bit - kernel.sleep(Duration::from_millis(10)).await; - } - } - } + pub async fn from_registry( + kernel: &'static Kernel, + subscribe: Subscribe, + ) -> Result { + let handle = kernel + .registry() + .connect::(()) + .await + .map_err(FromRegistryError::Connect)?; + Self::from_handle(subscribe, handle).await } /// Obtain an `KeyClient` @@ -124,25 +129,30 @@ impl KeyClient { /// /// Prefer [`KeyClient::from_registry`] unless you will not be spawning one /// around the same time as obtaining a client. - #[must_use] pub async fn from_registry_no_retry( kernel: &'static Kernel, subscribe: Subscribe, - ) -> Option { - let mut handle = kernel + ) -> Result { + let handle = kernel .registry() + .try_connect::(()) .await - .connect::() - .await - .ok()?; + .map_err(FromRegistryError::Connect)?; + Self::from_handle(subscribe, handle).await + } + + async fn from_handle( + subscribe: Subscribe, + mut handle: registry::KernelHandle, + ) -> Result { let reply = oneshot::Reusable::new_async().await; let Subscribed { rx } = handle .request_oneshot(subscribe, &reply) .await - .ok()? + .map_err(FromRegistryError::Request)? .body - .ok()?; - Some(Self { rx }) + .map_err(FromRegistryError::Service)?; + Ok(Self { rx }) } /// Returns the next [`KeyEvent`] received from the [`KeyboardService`]. diff --git a/source/kernel/src/services/keyboard/mux.rs b/source/kernel/src/services/keyboard/mux.rs index 6dd9b3e2..0e0b1657 100644 --- a/source/kernel/src/services/keyboard/mux.rs +++ b/source/kernel/src/services/keyboard/mux.rs @@ -17,12 +17,11 @@ use crate::{ mnemos_alloc::containers::FixedVec, registry::{ self, known_uuids, listener, Envelope, KernelHandle, OneshotRequestError, RegisteredDriver, - RegistrationError, }, services::serial_mux, Kernel, }; -use core::{convert::Infallible, time::Duration}; +use core::convert::Infallible; use futures::{future, FutureExt}; use serde::{Deserialize, Serialize}; use tracing::Level; @@ -75,18 +74,14 @@ impl KeyboardMuxClient { /// /// If the [`KeyboardMuxService`] hasn't been registered yet, we will retry until it /// has been registered. - pub async fn from_registry(kernel: &'static Kernel) -> Self { - loop { - match Self::from_registry_no_retry(kernel).await { - Ok(me) => return me, - Err(registry::ConnectError::Rejected(_)) => { - unreachable!("the KeyboardMuxService does not return connect errors!") - } - Err(_) => { - kernel.sleep(Duration::from_millis(10)).await; - } - } - } + pub async fn from_registry( + kernel: &'static Kernel, + ) -> Result> { + let handle = kernel.registry().connect::(()).await?; + Ok(Self { + handle, + reply: Reusable::new_async().await, + }) } /// Obtain an `KeyboardMuxClient` @@ -100,8 +95,7 @@ impl KeyboardMuxClient { ) -> Result> { let handle = kernel .registry() - .await - .connect::() + .try_connect::(()) .await?; Ok(Self { handle, @@ -153,6 +147,14 @@ pub struct KeyboardMuxSettings { pub sermux_port: Option, } +#[derive(Debug)] +pub enum RegistrationError { + RegisterMux(registry::RegistrationError), + RegisterKeyboard(registry::RegistrationError), + NoSermux(registry::ConnectError), + NoSermuxPort, +} + impl KeyboardMuxServer { /// Register the `KeyboardMuxServer`. /// @@ -170,19 +172,25 @@ impl KeyboardMuxServer { settings: KeyboardMuxSettings, ) -> Result<(), RegistrationError> { let key_rx = kernel - .bind_konly_service::(settings.buffer_capacity) - .await? + .registry() + .bind_konly::(settings.buffer_capacity) + .await + .map_err(RegistrationError::RegisterMux)? .into_request_stream(settings.buffer_capacity) .await; let sub_rx = kernel - .bind_konly_service::(8) - .await? + .registry() + .bind_konly::(8) + .await + .map_err(RegistrationError::RegisterKeyboard)? .into_request_stream(8) .await; let subscriptions = FixedVec::new(settings.max_keyboards).await; let sermux_port = if let Some(port) = settings.sermux_port { - let mut client = serial_mux::SerialMuxClient::from_registry(kernel).await; + let mut client = serial_mux::SerialMuxClient::from_registry(kernel) + .await + .map_err(RegistrationError::NoSermux)?; tracing::info!("opening Serial Mux port {port}"); Some( client diff --git a/source/kernel/src/services/serial_mux.rs b/source/kernel/src/services/serial_mux.rs index 5281d88e..c0788490 100644 --- a/source/kernel/src/services/serial_mux.rs +++ b/source/kernel/src/services/serial_mux.rs @@ -5,9 +5,6 @@ //! This module includes the service definition, client definition, as well //! as a server definition that relies on the [`SimpleSerial`][crate::services::simple_serial] //! service to provide the service implementation. - -use core::time::Duration; - use crate::comms::bbq::GrantR; use crate::{ comms::{bbq, oneshot::Reusable}, @@ -88,18 +85,15 @@ impl SerialMuxClient { /// Obtain a `SerialMuxClient` /// /// If the [`SerialMuxServer`] hasn't been registered yet, we will retry until it has been - pub async fn from_registry(kernel: &'static Kernel) -> Self { - loop { - match Self::from_registry_no_retry(kernel).await { - Ok(me) => return me, - Err(registry::ConnectError::Rejected(_)) => { - unreachable!("the SerialMuxService does not return connect errors!") - } - Err(_) => { - kernel.sleep(Duration::from_millis(10)).await; - } - } - } + pub async fn from_registry( + kernel: &'static Kernel, + ) -> Result> { + let prod = kernel.registry().connect::(()).await?; + + Ok(SerialMuxClient { + prod, + reply: Reusable::new_async().await, + }) } /// Obtain a `SerialMuxClient` @@ -113,8 +107,7 @@ impl SerialMuxClient { ) -> Result> { let prod = kernel .registry() - .await - .connect::() + .try_connect::(()) .await?; Ok(SerialMuxClient { @@ -145,7 +138,7 @@ impl PortHandle { /// If you need to open multiple ports at once, probably get a [SerialMuxClient] instead /// to reuse it for both ports pub async fn open(kernel: &'static Kernel, port_id: u16, capacity: usize) -> Option { - let mut client = SerialMuxClient::from_registry(kernel).await; + let mut client = SerialMuxClient::from_registry(kernel).await.ok()?; client.open_port(port_id, capacity).await } @@ -208,19 +201,10 @@ impl SerialMuxServer { kernel: &'static Kernel, settings: SerialMuxSettings, ) -> Result<(), RegistrationError> { - loop { - match SerialMuxServer::register_no_retry(kernel, settings).await { - Ok(_) => break, - Err(RegistrationError::Connect(registry::ConnectError::NotFound)) => { - // Uart probably isn't registered yet. Try again in a bit - kernel.sleep(Duration::from_millis(10)).await; - } - Err(e) => { - panic!("uhhhh {e:?}"); - } - } - } - Ok(()) + let serial_handle = SimpleSerialClient::from_registry(kernel) + .await + .map_err(RegistrationError::Connect)?; + Self::register_inner(kernel, settings, serial_handle).await } /// Register the SerialMuxServer. @@ -232,22 +216,29 @@ impl SerialMuxServer { /// than once. Prefer [`SerialMuxServer::register`] unless you will not be /// spawning one around the same time as registering this server. pub async fn register_no_retry( + kernel: &'static Kernel, + settings: SerialMuxSettings, + ) -> Result<(), RegistrationError> { + let serial_handle = SimpleSerialClient::from_registry_no_retry(kernel) + .await + .map_err(RegistrationError::Connect)?; + Self::register_inner(kernel, settings, serial_handle).await + } + + async fn register_inner( kernel: &'static Kernel, SerialMuxSettings { max_ports, max_frame, .. }: SerialMuxSettings, + mut serial_handle: SimpleSerialClient, ) -> Result<(), RegistrationError> { let max_ports = max_ports as usize; - let mut serial_handle = SimpleSerialClient::from_registry(kernel) - .await - .map_err(RegistrationError::Connect)?; let serial_port = serial_handle .get_port() .await .ok_or(RegistrationError::NoSerialPortAvailable)?; - let (sprod, scons) = serial_port.split(); let sprod = sprod.into_mpmc_producer().await; @@ -255,7 +246,8 @@ impl SerialMuxServer { let imutex = Arc::new(Mutex::new(MuxingInfo { ports, max_frame })).await; let listener = kernel - .bind_konly_service::(max_ports) + .registry() + .bind_konly::(max_ports) .await .map_err(|_| RegistrationError::MuxAlreadyRegistered)?; diff --git a/source/kernel/src/services/simple_serial.rs b/source/kernel/src/services/simple_serial.rs index a1a636e4..3a25f5f9 100644 --- a/source/kernel/src/services/simple_serial.rs +++ b/source/kernel/src/services/simple_serial.rs @@ -63,11 +63,21 @@ pub struct SimpleSerialClient { impl SimpleSerialClient { pub async fn from_registry( kernel: &'static Kernel, + ) -> Result> { + let kprod = kernel.registry().connect::(()).await?; + + Ok(SimpleSerialClient { + kprod, + rosc: Reusable::new_async().await, + }) + } + + pub async fn from_registry_no_retry( + kernel: &'static Kernel, ) -> Result> { let kprod = kernel .registry() - .await - .connect::() + .try_connect::(()) .await?; Ok(SimpleSerialClient {