Skip to content

Commit

Permalink
Introduce subscription support in the Wallet crate.
Browse files Browse the repository at this point in the history
The main goal is to add a subscription to CDK Mint updates into the wallet.
This feature will be particularly useful for improving the code whenever loops
hit the mint server to check status changes.

The goal is to add an easy-to-use interface that will hide the fact that we're
connecting to WebSocket and subscribing to events. This will also hide the fact
that the CDK-mint server may not support WebSocket updates.

To be fully backward compatible, the HttpClientMethods traits have a new
method, `subscribe,` which will return an object that implements
`ActiveSubscription.`

In the primary implementation, there is a `SubscriptionClient` that will
attempt to connect through WebSocket and will fall to the HTTP-status pull and
sleep approach (the current approach), but upper stream code will receive
updates as if they come from a stream of updates through WebSocket. This
`SubscriptionClient` struct will also manage reconnections to WebSockets (with
automatic resubscriptions) and all the low-level stuff, providing an
easy-to-use interface and leaving the upper-level code with a nice interface
that is hard to misuse. When `ActiveSubscription` is dropped, it will
automatically unsubscribe.
  • Loading branch information
crodas committed Nov 30, 2024
1 parent 7d15587 commit cc155a9
Show file tree
Hide file tree
Showing 26 changed files with 1,320 additions and 517 deletions.
15 changes: 15 additions & 0 deletions crates/cdk-axum/src/ws/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use cdk::nuts::nut17::ws::WsErrorBody;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -17,3 +18,17 @@ pub enum WsError {
/// Custom error
ServerError(i32, String),
}

impl From<WsError> for WsErrorBody {
fn from(val: WsError) -> Self {
let (id, message) = match val {
WsError::ParseError => (-32700, "Parse error".to_string()),
WsError::InvalidRequest => (-32600, "Invalid Request".to_string()),
WsError::MethodNotFound => (-32601, "Method not found".to_string()),
WsError::InvalidParams => (-32602, "Invalid params".to_string()),
WsError::InternalError => (-32603, "Internal error".to_string()),
WsError::ServerError(code, message) => (code, message),
};
WsErrorBody { code: id, message }
}
}
71 changes: 0 additions & 71 deletions crates/cdk-axum/src/ws/handler.rs

This file was deleted.

53 changes: 19 additions & 34 deletions crates/cdk-axum/src/ws/mod.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,32 @@
use std::collections::HashMap;

use axum::extract::ws::{Message, WebSocket};
use cdk::nuts::nut17::ws::{
NotificationInner, WsErrorBody, WsMessageOrResponse, WsMethodRequest, WsRequest,
};
use cdk::nuts::nut17::{NotificationPayload, SubId};
use futures::StreamExt;
use handler::{WsHandle, WsNotification};
use serde::{Deserialize, Serialize};
use subscribe::Notification;
use tokio::sync::mpsc;

use crate::MintState;

mod error;
mod handler;
mod subscribe;
mod unsubscribe;

/// JSON RPC version
pub const JSON_RPC_VERSION: &str = "2.0";

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WsRequest {
jsonrpc: String,
#[serde(flatten)]
method: WsMethod,
id: usize,
}
async fn process(
context: &mut WsContext,
body: WsRequest,
) -> Result<serde_json::Value, serde_json::Error> {
let response = match body.method {
WsMethodRequest::Subscribe(sub) => subscribe::handle(context, sub).await,
WsMethodRequest::Unsubscribe(unsub) => unsubscribe::handle(context, unsub).await,
}
.map_err(WsErrorBody::from);

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "method", content = "params")]
pub enum WsMethod {
Subscribe(subscribe::Method),
Unsubscribe(unsubscribe::Method),
}
let response: WsMessageOrResponse = (body.id, response).into();

impl WsMethod {
pub async fn process(
self,
req_id: usize,
context: &mut WsContext,
) -> Result<serde_json::Value, serde_json::Error> {
match self {
WsMethod::Subscribe(sub) => sub.process(req_id, context),
WsMethod::Unsubscribe(unsub) => unsub.process(req_id, context),
}
.await
}
serde_json::to_value(response)
}

pub use error::WsError;
Expand Down Expand Up @@ -78,7 +60,10 @@ pub async fn main_websocket(mut socket: WebSocket, state: MintState) {
// unsubscribed from the subscription manager, just ignore it.
continue;
}
let notification: WsNotification<Notification> = (sub_id, payload).into();
let notification: WsMessageOrResponse= NotificationInner {
sub_id,
payload,
}.into();
let message = match serde_json::to_string(&notification) {
Ok(message) => message,
Err(err) => {
Expand All @@ -101,7 +86,7 @@ pub async fn main_websocket(mut socket: WebSocket, state: MintState) {
}
};

match request.method.process(request.id, &mut context).await {
match process(&mut context, request).await {
Ok(result) => {
if let Err(err) = socket
.send(Message::Text(result.to_string()))
Expand Down
97 changes: 29 additions & 68 deletions crates/cdk-axum/src/ws/subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,33 @@
use cdk::nuts::nut17::{NotificationPayload, Params};
use cdk::pub_sub::SubId;

use super::handler::{WsHandle, WsNotification};
use super::{WsContext, WsError, JSON_RPC_VERSION};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Method(Params);

#[derive(Debug, Clone, serde::Serialize)]
/// The response to a subscription request
pub struct Response {
/// Status
status: String,
/// Subscription ID
#[serde(rename = "subId")]
sub_id: SubId,
}

#[derive(Debug, Clone, serde::Serialize)]
/// The notification
///
/// This is the notification that is sent to the client when an event matches a
/// subscription
pub struct Notification {
/// The subscription ID
#[serde(rename = "subId")]
pub sub_id: SubId,

/// The notification payload
pub payload: NotificationPayload,
}

impl From<(SubId, NotificationPayload)> for WsNotification<Notification> {
fn from((sub_id, payload): (SubId, NotificationPayload)) -> Self {
WsNotification {
jsonrpc: JSON_RPC_VERSION.to_owned(),
method: "subscribe".to_string(),
params: Notification { sub_id, payload },
}
use cdk::nuts::nut17::ws::{WsResponseResult, WsSubscribeResponse};
use cdk::nuts::nut17::Params;

use super::{WsContext, WsError};

/// The `handle` method is called when a client sends a subscription request
pub(crate) async fn handle(
context: &mut WsContext,
params: Params,
) -> Result<WsResponseResult, WsError> {
let sub_id = params.id.clone();
if context.subscriptions.contains_key(&sub_id) {
// Subscription ID already exits. Returns an error instead of
// replacing the other subscription or avoiding it.
return Err(WsError::InvalidParams);
}
}

#[async_trait::async_trait]
impl WsHandle for Method {
type Response = Response;

/// The `handle` method is called when a client sends a subscription request
async fn handle(self, context: &mut WsContext) -> Result<Self::Response, WsError> {
let sub_id = self.0.id.clone();
if context.subscriptions.contains_key(&sub_id) {
// Subscription ID already exits. Returns an error instead of
// replacing the other subscription or avoiding it.
return Err(WsError::InvalidParams);
}

let mut subscription = context.state.mint.pubsub_manager.subscribe(self.0).await;
let publisher = context.publisher.clone();
context.subscriptions.insert(
sub_id.clone(),
tokio::spawn(async move {
while let Some(response) = subscription.recv().await {
let _ = publisher.send(response).await;
}
}),
);
Ok(Response {
status: "OK".to_string(),
sub_id,
})
let mut subscription = context.state.mint.pubsub_manager.subscribe(params).await;
let publisher = context.publisher.clone();
context.subscriptions.insert(
sub_id.clone(),
tokio::spawn(async move {
while let Some(response) = subscription.recv().await {
let _ = publisher.send(response).await;
}
}),
);
Ok(WsSubscribeResponse {
status: "OK".to_string(),
sub_id,
}
.into())
}
38 changes: 12 additions & 26 deletions crates/cdk-axum/src/ws/unsubscribe.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,18 @@
use cdk::pub_sub::SubId;
use cdk::nuts::nut17::ws::{WsResponseResult, WsUnsubscribeRequest, WsUnsubscribeResponse};

use super::handler::WsHandle;
use super::{WsContext, WsError};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Method {
#[serde(rename = "subId")]
pub sub_id: SubId,
}

#[derive(Debug, Clone, serde::Serialize)]
pub struct Response {
status: String,
sub_id: SubId,
}

#[async_trait::async_trait]
impl WsHandle for Method {
type Response = Response;

async fn handle(self, context: &mut WsContext) -> Result<Self::Response, WsError> {
if context.subscriptions.remove(&self.sub_id).is_some() {
Ok(Response {
status: "OK".to_string(),
sub_id: self.sub_id,
})
} else {
Err(WsError::InvalidParams)
pub(crate) async fn handle(
context: &mut WsContext,
req: WsUnsubscribeRequest,
) -> Result<WsResponseResult, WsError> {
if context.subscriptions.remove(&req.sub_id).is_some() {
Ok(WsUnsubscribeResponse {
status: "OK".to_string(),
sub_id: req.sub_id,
}
.into())
} else {
Err(WsError::InvalidParams)
}
}
22 changes: 12 additions & 10 deletions crates/cdk-cli/src/sub_commands/mint.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use cdk::amount::SplitTarget;
use cdk::cdk_database::{Error, WalletDatabase};
use cdk::mint_url::MintUrl;
use cdk::nuts::{CurrencyUnit, MintQuoteState};
use cdk::nuts::{CurrencyUnit, MintQuoteState, NotificationPayload};
use cdk::wallet::multi_mint_wallet::WalletKey;
use cdk::wallet::{MultiMintWallet, Wallet};
use cdk::wallet::{MultiMintWallet, Wallet, WalletSubscription};
use cdk::Amount;
use clap::Args;
use serde::{Deserialize, Serialize};
use tokio::time::sleep;

#[derive(Args, Serialize, Deserialize)]
pub struct MintSubCommand {
Expand Down Expand Up @@ -59,14 +57,18 @@ pub async fn mint(

println!("Please pay: {}", quote.request);

loop {
let status = wallet.mint_quote_state(&quote.id).await?;
let mut subscription = wallet
.subscribe(WalletSubscription::Bolt11MintQuoteState(vec![quote
.id
.clone()]))
.await;

if status.state == MintQuoteState::Paid {
break;
while let Some(msg) = subscription.recv().await {
if let NotificationPayload::MintQuoteBolt11Response(response) = msg {
if response.state == MintQuoteState::Paid {
break;
}
}

sleep(Duration::from_secs(2)).await;
}

let receive_amount = wallet.mint(&quote.id, SplitTarget::default(), None).await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/cdk-integration-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ rust-version = "1.63.0"


[features]

http_subscription = ["cdk/http_subscription"]

[dependencies]
axum = "0.6.20"
Expand Down
Loading

0 comments on commit cc155a9

Please sign in to comment.