From c0348024665a615a30fd8fe2f02e8c93cf9c6332 Mon Sep 17 00:00:00 2001 From: Arnold Loubriat Date: Mon, 8 Jan 2024 21:44:16 +0100 Subject: [PATCH] fix: Make full use of tokio ecosystem if the tokio feature is enabled on Unix (#336) --- Cargo.lock | 24 ++++++++++ platforms/unix/Cargo.toml | 20 +++++--- platforms/unix/src/adapter.rs | 86 +++++++++++++++++------------------ platforms/unix/src/context.rs | 32 ++++++++++--- 4 files changed, 107 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e7f70fefa..bde8d4c5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -83,6 +83,7 @@ dependencies = [ "once_cell", "serde", "tokio", + "tokio-stream", "zbus", ] @@ -1990,10 +1991,33 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "socket2 0.5.4", + "tokio-macros", "tracing", "windows-sys 0.48.0", ] +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.32", +] + +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml_datetime" version = "0.6.1" diff --git a/platforms/unix/Cargo.toml b/platforms/unix/Cargo.toml index 6ae743925..c7edb80a7 100644 --- a/platforms/unix/Cargo.toml +++ b/platforms/unix/Cargo.toml @@ -12,19 +12,27 @@ edition = "2021" [features] default = ["async-io"] -async-io = ["atspi/async-std", "zbus/async-io"] -tokio = ["dep:tokio", "atspi/tokio", "zbus/tokio"] +async-io = ["dep:async-channel", "dep:async-lock", "dep:futures-util", "atspi/async-std", "zbus/async-io"] +tokio = ["dep:tokio", "dep:tokio-stream", "atspi/tokio", "zbus/tokio"] [dependencies] accesskit = { version = "0.12.2", path = "../../common" } accesskit_consumer = { version = "0.17.0", path = "../../consumer" } -async-channel = "2.1.1" -async-lock = "2.7.0" async-once-cell = "0.5.3" atspi = { version = "0.19", default-features = false } futures-lite = "1.13" -futures-util = "0.3.27" once_cell = "1.17.1" serde = "1.0" -tokio = { version = "1.32.0", optional = true, features = ["rt", "net", "time"] } zbus = { version = "3.14", default-features = false } + +# async-io support +async-channel = { version = "2.1.1", optional = true } +async-lock = { version = "2.7.0", optional = true } +futures-util = { version = "0.3.27", optional = true } + +# tokio support +tokio-stream = { version = "0.1.14", optional = true } +[dependencies.tokio] +version = "1.32.0" +optional = true +features = ["macros", "net", "rt", "sync", "time"] diff --git a/platforms/unix/src/adapter.rs b/platforms/unix/src/adapter.rs index 0102eac7f..3cbf68b7d 100644 --- a/platforms/unix/src/adapter.rs +++ b/platforms/unix/src/adapter.rs @@ -15,6 +15,7 @@ use crate::{ }; use accesskit::{ActionHandler, NodeId, Rect, Role, TreeUpdate}; use accesskit_consumer::{DetachedNode, FilterResult, Node, Tree, TreeChangeHandler, TreeState}; +#[cfg(not(feature = "tokio"))] use async_channel::Sender; use async_once_cell::Lazy; use atspi::{InterfaceSet, Live, State}; @@ -26,6 +27,8 @@ use std::{ Arc, Mutex, Weak, }, }; +#[cfg(feature = "tokio")] +use tokio::sync::mpsc::UnboundedSender as Sender; struct AdapterChangeHandler<'a> { adapter: &'a AdapterImpl, @@ -274,34 +277,35 @@ impl AdapterImpl { } } + pub(crate) async fn send_message(&self, message: Message) { + #[cfg(not(feature = "tokio"))] + self.messages.send(message).await.unwrap(); + #[cfg(feature = "tokio")] + self.messages.send(message).unwrap(); + } + async fn register_interfaces(&self, id: NodeId, new_interfaces: InterfaceSet) { - self.messages - .send(Message::RegisterInterfaces { - adapter_id: self.id, - context: Arc::downgrade(&self.context), - node_id: id, - interfaces: new_interfaces, - }) - .await - .unwrap(); + self.send_message(Message::RegisterInterfaces { + adapter_id: self.id, + context: Arc::downgrade(&self.context), + node_id: id, + interfaces: new_interfaces, + }) + .await; } async fn unregister_interfaces(&self, id: NodeId, old_interfaces: InterfaceSet) { - self.messages - .send(Message::UnregisterInterfaces { - adapter_id: self.id, - node_id: id, - interfaces: old_interfaces, - }) - .await - .unwrap(); + self.send_message(Message::UnregisterInterfaces { + adapter_id: self.id, + node_id: id, + interfaces: old_interfaces, + }) + .await; } pub(crate) async fn emit_object_event(&self, target: ObjectId, event: ObjectEvent) { - self.messages - .send(Message::EmitEvent(Event::Object { target, event })) - .await - .unwrap(); + self.send_message(Message::EmitEvent(Event::Object { target, event })) + .await; } fn set_root_window_bounds(&self, bounds: WindowBounds) { @@ -336,17 +340,15 @@ impl AdapterImpl { } async fn window_activated(&self, window: &NodeWrapper<'_>) { - self.messages - .send(Message::EmitEvent(Event::Window { - target: ObjectId::Node { - adapter: self.id, - node: window.id(), - }, - name: window.name().unwrap_or_default(), - event: WindowEvent::Activated, - })) - .await - .unwrap(); + self.send_message(Message::EmitEvent(Event::Window { + target: ObjectId::Node { + adapter: self.id, + node: window.id(), + }, + name: window.name().unwrap_or_default(), + event: WindowEvent::Activated, + })) + .await; self.emit_object_event( ObjectId::Node { adapter: self.id, @@ -366,17 +368,15 @@ impl AdapterImpl { } async fn window_deactivated(&self, window: &NodeWrapper<'_>) { - self.messages - .send(Message::EmitEvent(Event::Window { - target: ObjectId::Node { - adapter: self.id, - node: window.id(), - }, - name: window.name().unwrap_or_default(), - event: WindowEvent::Deactivated, - })) - .await - .unwrap(); + self.send_message(Message::EmitEvent(Event::Window { + target: ObjectId::Node { + adapter: self.id, + node: window.id(), + }, + name: window.name().unwrap_or_default(), + event: WindowEvent::Deactivated, + })) + .await; self.emit_object_event( ObjectId::Node { adapter: self.id, diff --git a/platforms/unix/src/context.rs b/platforms/unix/src/context.rs index 6f048b58e..cc1496f00 100644 --- a/platforms/unix/src/context.rs +++ b/platforms/unix/src/context.rs @@ -5,13 +5,23 @@ use accesskit::{ActionHandler, ActionRequest}; use accesskit_consumer::Tree; +#[cfg(not(feature = "tokio"))] use async_channel::Sender; +#[cfg(not(feature = "tokio"))] use async_lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}; use async_once_cell::OnceCell as AsyncOnceCell; use atspi::proxy::bus::StatusProxy; -use futures_util::{pin_mut, select, StreamExt}; +#[cfg(not(feature = "tokio"))] +use futures_util::{pin_mut as pin, select, StreamExt}; use once_cell::sync::OnceCell; use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak}; +#[cfg(feature = "tokio")] +use tokio::{ + pin, select, + sync::{mpsc::UnboundedSender as Sender, Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}, +}; +#[cfg(feature = "tokio")] +use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; use zbus::{Connection, Task}; use crate::{ @@ -170,10 +180,21 @@ impl ActivationContext { async fn listen(session_bus: Connection) -> zbus::Result<()> { let status = StatusProxy::new(&session_bus).await?; let changes = status.receive_is_enabled_changed().await.fuse(); - pin_mut!(changes); - let (tx, rx) = async_channel::unbounded(); - let messages = rx.fuse(); - pin_mut!(messages); + pin!(changes); + + #[cfg(not(feature = "tokio"))] + let (tx, messages) = { + let (tx, rx) = async_channel::unbounded(); + let messages = rx.fuse(); + (tx, messages) + }; + #[cfg(feature = "tokio")] + let (tx, messages) = { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let messages = UnboundedReceiverStream::new(rx).fuse(); + (tx, messages) + }; + pin!(messages); let mut atspi_bus = None; loop { @@ -206,7 +227,6 @@ async fn listen(session_bus: Connection) -> zbus::Result<()> { process_adapter_message(atspi_bus, message).await?; } } - complete => return Ok(()), } } }