Skip to content

Commit

Permalink
Introduce next_event_async allowing to poll event queue
Browse files Browse the repository at this point in the history
We implement a way to asynchronously poll the queue for new events,
providing an async alternative to `wait_next_event`.
  • Loading branch information
tnull committed Jan 3, 2024
1 parent 5b20403 commit 4ddd053
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
31 changes: 31 additions & 0 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
use bitcoin::blockdata::locktime::absolute::LockTime;
use bitcoin::secp256k1::PublicKey;
use bitcoin::OutPoint;
use core::future::Future;
use core::task::Poll;
use rand::{thread_rng, Rng};
use std::collections::VecDeque;
use std::ops::Deref;
Expand Down Expand Up @@ -157,6 +159,11 @@ where
locked_queue.front().map(|e| e.clone())
}

pub(crate) async fn next_event_async(&self) -> Event {
let ev = EventFuture { event_queue: &self };
ev.await
}

pub(crate) fn wait_next_event(&self) -> Event {
let locked_queue =
self.notifier.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap();
Expand Down Expand Up @@ -240,6 +247,30 @@ impl Writeable for EventQueueSerWrapper<'_> {
}
}

struct EventFuture<'a, K: KVStore + Sync + Send, L: Deref>
where
L::Target: Logger,
{
event_queue: &'a EventQueue<K, L>,
}

impl<'a, K: KVStore + Sync + Send, L: Deref> Future for EventFuture<'a, K, L>
where
L::Target: Logger,
{
type Output = Event;

fn poll(
self: core::pin::Pin<&mut Self>, _cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
if let Some(event) = self.event_queue.next_event() {
Poll::Ready(event)
} else {
Poll::Pending
}
}
}

pub(crate) struct EventHandler<K: KVStore + Sync + Send, L: Deref>
where
L::Target: Logger,
Expand Down
9 changes: 9 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,15 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
self.event_queue.next_event()
}

/// Returns the next event in the event queue.
///
/// Will asynchronously poll the event queue until the next event is ready.
///
/// **Note:** this will always return the same event until handling is confirmed via [`Node::event_handled`].
pub async fn next_event_async(&self) -> Event {
self.event_queue.next_event_async().await
}

/// Returns the next event in the event queue.
///
/// Will block the current thread until the next event is available.
Expand Down

0 comments on commit 4ddd053

Please sign in to comment.