diff --git a/src/event.rs b/src/event.rs index 192e0b8c2..86a9f8176 100644 --- a/src/event.rs +++ b/src/event.rs @@ -26,6 +26,8 @@ use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::secp256k1::PublicKey; use bitcoin::OutPoint; +use core::future::Future; +use core::task::Poll; use rand::{thread_rng, Rng}; use std::collections::VecDeque; use std::ops::Deref; @@ -157,6 +159,11 @@ where locked_queue.front().map(|e| e.clone()) } + pub(crate) async fn next_event_async(&self) -> Event { + let ev = EventFuture { event_queue: &self }; + ev.await + } + pub(crate) fn wait_next_event(&self) -> Event { let locked_queue = self.notifier.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap(); @@ -240,6 +247,30 @@ impl Writeable for EventQueueSerWrapper<'_> { } } +struct EventFuture<'a, K: KVStore + Sync + Send, L: Deref> +where + L::Target: Logger, +{ + event_queue: &'a EventQueue, +} + +impl<'a, K: KVStore + Sync + Send, L: Deref> Future for EventFuture<'a, K, L> +where + L::Target: Logger, +{ + type Output = Event; + + fn poll( + self: core::pin::Pin<&mut Self>, _cx: &mut core::task::Context<'_>, + ) -> core::task::Poll { + if let Some(event) = self.event_queue.next_event() { + Poll::Ready(event) + } else { + Poll::Pending + } + } +} + pub(crate) struct EventHandler where L::Target: Logger, diff --git a/src/lib.rs b/src/lib.rs index 3d51ef984..bd507e7e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -800,6 +800,15 @@ impl Node { self.event_queue.next_event() } + /// Returns the next event in the event queue. + /// + /// Will asynchronously poll the event queue until the next event is ready. + /// + /// **Note:** this will always return the same event until handling is confirmed via [`Node::event_handled`]. + pub async fn next_event_async(&self) -> Event { + self.event_queue.next_event_async().await + } + /// Returns the next event in the event queue. /// /// Will block the current thread until the next event is available.