diff --git a/crates/cdk-axum/src/ws/subscribe.rs b/crates/cdk-axum/src/ws/subscribe.rs index a1cc6ecd..cfc021f4 100644 --- a/crates/cdk-axum/src/ws/subscribe.rs +++ b/crates/cdk-axum/src/ws/subscribe.rs @@ -15,7 +15,18 @@ pub(crate) async fn handle( return Err(WsError::InvalidParams); } - let mut subscription = context.state.mint.pubsub_manager.subscribe(params).await; + let mut subscription = if let Ok(subscription) = context + .state + .mint + .pubsub_manager + .try_subscribe(params) + .await + { + subscription + } else { + return Err(WsError::ParseError); + }; + let publisher = context.publisher.clone(); context.subscriptions.insert( sub_id.clone(), diff --git a/crates/cdk-integration-tests/tests/mint.rs b/crates/cdk-integration-tests/tests/mint.rs index 3c9f1258..2bcd281a 100644 --- a/crates/cdk-integration-tests/tests/mint.rs +++ b/crates/cdk-integration-tests/tests/mint.rs @@ -230,12 +230,13 @@ pub async fn test_p2pk_swap() -> Result<()> { let mut listener = mint .pubsub_manager - .subscribe(Params { + .try_subscribe(Params { kind: cdk::nuts::nut17::Kind::ProofState, filters: public_keys_to_listen.clone(), id: "test".into(), }) - .await; + .await + .expect("valid subscription"); match mint.process_swap_request(swap_request).await { Ok(_) => bail!("Proofs spent without sig"), diff --git a/crates/cdk/src/nuts/nut17/manager.rs b/crates/cdk/src/nuts/nut17/manager.rs index 59d05b39..21103d6e 100644 --- a/crates/cdk/src/nuts/nut17/manager.rs +++ b/crates/cdk/src/nuts/nut17/manager.rs @@ -101,8 +101,14 @@ mod test { // responsibility of the implementor to make sure that SubId are unique // either globally or per client let subscriptions = vec![ - manager.subscribe(params.clone()).await, - manager.subscribe(params).await, + manager + .try_subscribe(params.clone()) + .await + .expect("valid subscription"), + manager + .try_subscribe(params) + .await + .expect("valid subscription"), ]; assert_eq!(2, manager.active_subscriptions()); drop(subscriptions); @@ -117,7 +123,7 @@ mod test { let manager = PubSubManager::default(); let mut subscriptions = [ manager - .subscribe(Params { + .try_subscribe(Params { kind: Kind::ProofState, filters: vec![ "02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104" @@ -125,9 +131,10 @@ mod test { ], id: "uno".into(), }) - .await, + .await + .expect("valid subscription"), manager - .subscribe(Params { + .try_subscribe(Params { kind: Kind::ProofState, filters: vec![ "02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104" @@ -135,7 +142,8 @@ mod test { ], id: "dos".into(), }) - .await, + .await + .expect("valid subscription"), ]; let event = ProofState { @@ -174,11 +182,11 @@ mod test { async fn json_test() { let manager = PubSubManager::default(); let mut subscription = manager - .subscribe::( + .try_subscribe::( serde_json::from_str(r#"{"kind":"proof_state","filters":["02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104"],"subId":"uno"}"#) .expect("valid json"), ) - .await; + .await.expect("valid subscription"); manager.broadcast( ProofState { diff --git a/crates/cdk/src/nuts/nut17/mod.rs b/crates/cdk/src/nuts/nut17/mod.rs index 437b0c4f..a7b0698c 100644 --- a/crates/cdk/src/nuts/nut17/mod.rs +++ b/crates/cdk/src/nuts/nut17/mod.rs @@ -168,8 +168,22 @@ impl AsRef for Params { } } -impl From for Vec> { - fn from(val: Params) -> Self { +/// Parsing error +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Uuid Error: {0}")] + /// Uuid Error + Uuid(#[from] uuid::Error), + + #[error("PublicKey Error: {0}")] + /// PublicKey Error + PublicKey(#[from] crate::nuts::nut01::Error), +} + +impl TryFrom for Vec> { + type Error = Error; + + fn try_from(val: Params) -> Result { let sub_id: SubscriptionGlobalId = Default::default(); val.filters .into_iter() @@ -186,8 +200,6 @@ impl From for Vec> { Ok(Index::from((idx, val.id.clone(), sub_id))) }) - .collect::>() - .unwrap() - // TODO don't unwrap, move to try from + .collect::>() } } diff --git a/crates/cdk/src/pub_sub/mod.rs b/crates/cdk/src/pub_sub/mod.rs index a8269290..cff76970 100644 --- a/crates/cdk/src/pub_sub/mod.rs +++ b/crates/cdk/src/pub_sub/mod.rs @@ -157,16 +157,14 @@ where Self::broadcast_impl(&self.indexes, event).await; } - /// Subscribe to a specific event - pub async fn subscribe + Into>>>( + /// Specific of the subscription, this is the abstraction between `subscribe` and `try_subscribe` + #[inline(always)] + async fn subscribe_inner( &self, - params: P, + sub_id: SubId, + indexes: Vec>, ) -> ActiveSubscription { let (sender, receiver) = mpsc::channel(10); - let sub_id: SubId = params.as_ref().clone(); - - let indexes: Vec> = params.into(); - if let Some(on_new_subscription) = self.on_new_subscription.as_ref() { match on_new_subscription .on_new_subscription(&indexes.iter().map(|x| x.deref()).collect::>()) @@ -204,6 +202,25 @@ where } } + /// Try to subscribe to a specific event + pub async fn try_subscribe + TryInto>>>( + &self, + params: P, + ) -> Result, P::Error> { + Ok(self + .subscribe_inner(params.as_ref().clone(), params.try_into()?) + .await) + } + + /// Subscribe to a specific event + pub async fn subscribe + Into>>>( + &self, + params: P, + ) -> ActiveSubscription { + self.subscribe_inner(params.as_ref().clone(), params.into()) + .await + } + /// Return number of active subscriptions pub fn active_subscriptions(&self) -> usize { self.active_subscriptions.load(atomic::Ordering::SeqCst)