Skip to content

Commit

Permalink
Introduce try_subscription
Browse files Browse the repository at this point in the history
Fix #493
  • Loading branch information
crodas committed Dec 5, 2024
1 parent 04d38ef commit 8b3ff74
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 23 deletions.
13 changes: 12 additions & 1 deletion crates/cdk-axum/src/ws/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,18 @@ pub(crate) async fn handle(
return Err(WsError::InvalidParams);
}

let mut subscription = context.state.mint.pubsub_manager.subscribe(params).await;
let mut subscription = if let Ok(subscription) = context
.state
.mint
.pubsub_manager
.try_subscribe(params)
.await
{
subscription
} else {
return Err(WsError::ParseError);
};

let publisher = context.publisher.clone();
context.subscriptions.insert(
sub_id.clone(),
Expand Down
5 changes: 3 additions & 2 deletions crates/cdk-integration-tests/tests/mint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,13 @@ pub async fn test_p2pk_swap() -> Result<()> {

let mut listener = mint
.pubsub_manager
.subscribe(Params {
.try_subscribe(Params {
kind: cdk::nuts::nut17::Kind::ProofState,
filters: public_keys_to_listen.clone(),
id: "test".into(),
})
.await;
.await
.expect("valid subscription");

match mint.process_swap_request(swap_request).await {
Ok(_) => bail!("Proofs spent without sig"),
Expand Down
24 changes: 16 additions & 8 deletions crates/cdk/src/nuts/nut17/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,14 @@ mod test {
// responsibility of the implementor to make sure that SubId are unique
// either globally or per client
let subscriptions = vec![
manager.subscribe(params.clone()).await,
manager.subscribe(params).await,
manager
.try_subscribe(params.clone())
.await
.expect("valid subscription"),
manager
.try_subscribe(params)
.await
.expect("valid subscription"),
];
assert_eq!(2, manager.active_subscriptions());
drop(subscriptions);
Expand All @@ -117,25 +123,27 @@ mod test {
let manager = PubSubManager::default();
let mut subscriptions = [
manager
.subscribe(Params {
.try_subscribe(Params {
kind: Kind::ProofState,
filters: vec![
"02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104"
.to_string(),
],
id: "uno".into(),
})
.await,
.await
.expect("valid subscription"),
manager
.subscribe(Params {
.try_subscribe(Params {
kind: Kind::ProofState,
filters: vec![
"02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104"
.to_string(),
],
id: "dos".into(),
})
.await,
.await
.expect("valid subscription"),
];

let event = ProofState {
Expand Down Expand Up @@ -174,11 +182,11 @@ mod test {
async fn json_test() {
let manager = PubSubManager::default();
let mut subscription = manager
.subscribe::<Params>(
.try_subscribe::<Params>(
serde_json::from_str(r#"{"kind":"proof_state","filters":["02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104"],"subId":"uno"}"#)
.expect("valid json"),
)
.await;
.await.expect("valid subscription");

manager.broadcast(
ProofState {
Expand Down
22 changes: 17 additions & 5 deletions crates/cdk/src/nuts/nut17/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,22 @@ impl AsRef<SubId> for Params {
}
}

impl From<Params> for Vec<Index<Notification>> {
fn from(val: Params) -> Self {
/// Parsing error
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Uuid Error: {0}")]
/// Uuid Error
Uuid(#[from] uuid::Error),

#[error("PublicKey Error: {0}")]
/// PublicKey Error
PublicKey(#[from] crate::nuts::nut01::Error),
}

impl TryFrom<Params> for Vec<Index<Notification>> {
type Error = Error;

fn try_from(val: Params) -> Result<Self, Self::Error> {
let sub_id: SubscriptionGlobalId = Default::default();
val.filters
.into_iter()
Expand All @@ -186,8 +200,6 @@ impl From<Params> for Vec<Index<Notification>> {

Ok(Index::from((idx, val.id.clone(), sub_id)))
})
.collect::<Result<_, anyhow::Error>>()
.unwrap()
// TODO don't unwrap, move to try from
.collect::<Result<_, _>>()
}
}
31 changes: 24 additions & 7 deletions crates/cdk/src/pub_sub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,14 @@ where
Self::broadcast_impl(&self.indexes, event).await;
}

/// Subscribe to a specific event
pub async fn subscribe<P: AsRef<SubId> + Into<Vec<Index<I>>>>(
/// Specific of the subscription, this is the abstraction between `subscribe` and `try_subscribe`
#[inline(always)]
async fn subscribe_inner(
&self,
params: P,
sub_id: SubId,
indexes: Vec<Index<I>>,
) -> ActiveSubscription<T, I> {
let (sender, receiver) = mpsc::channel(10);
let sub_id: SubId = params.as_ref().clone();

let indexes: Vec<Index<I>> = params.into();

if let Some(on_new_subscription) = self.on_new_subscription.as_ref() {
match on_new_subscription
.on_new_subscription(&indexes.iter().map(|x| x.deref()).collect::<Vec<_>>())
Expand Down Expand Up @@ -204,6 +202,25 @@ where
}
}

/// Try to subscribe to a specific event
pub async fn try_subscribe<P: AsRef<SubId> + TryInto<Vec<Index<I>>>>(
&self,
params: P,
) -> Result<ActiveSubscription<T, I>, P::Error> {
Ok(self
.subscribe_inner(params.as_ref().clone(), params.try_into()?)
.await)
}

/// Subscribe to a specific event
pub async fn subscribe<P: AsRef<SubId> + Into<Vec<Index<I>>>>(
&self,
params: P,
) -> ActiveSubscription<T, I> {
self.subscribe_inner(params.as_ref().clone(), params.into())
.await
}

/// Return number of active subscriptions
pub fn active_subscriptions(&self) -> usize {
self.active_subscriptions.load(atomic::Ordering::SeqCst)
Expand Down

0 comments on commit 8b3ff74

Please sign in to comment.