Skip to content

Commit

Permalink
Introduce subscription support in the Wallet crate.
Browse files Browse the repository at this point in the history
The main goal is to add a subscription to CDK Mint updates into the wallet.
This feature will be particularly useful for improving the code whenever loops
hit the mint server to check status changes.

The goal is to add an easy-to-use interface that will hide the fact that we're
connecting to WebSocket and subscribing to events. This will also hide the fact
that the CDK-mint server may not support WebSocket updates.

To be fully backward compatible, the HttpClientMethods traits have a new
method, `subscribe,` which will return an object that implements
`ActiveSubscription.`

In the primary implementation, there is a `SubscriptionClient` that will
attempt to connect through WebSocket and will fall to the HTTP-status pull and
sleep approach (the current approach), but upper stream code will receive
updates as if they come from a stream of updates through WebSocket. This
`SubscriptionClient` struct will also manage reconnections to WebSockets (with
automatic resubscriptions) and all the low-level stuff, providing an
easy-to-use interface and leaving the upper-level code with a nice interface
that is hard to misuse. When `ActiveSubscription` is dropped, it will
automatically unsubscribe.
  • Loading branch information
crodas committed Dec 5, 2024
1 parent a0dca22 commit f3f5d32
Show file tree
Hide file tree
Showing 30 changed files with 1,596 additions and 536 deletions.
86 changes: 81 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions crates/cdk-axum/src/ws/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use cdk::nuts::nut17::ws::WsErrorBody;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -17,3 +18,17 @@ pub enum WsError {
/// Custom error
ServerError(i32, String),
}

impl From<WsError> for WsErrorBody {
fn from(val: WsError) -> Self {
let (id, message) = match val {
WsError::ParseError => (-32700, "Parse error".to_string()),
WsError::InvalidRequest => (-32600, "Invalid Request".to_string()),
WsError::MethodNotFound => (-32601, "Method not found".to_string()),
WsError::InvalidParams => (-32602, "Invalid params".to_string()),
WsError::InternalError => (-32603, "Internal error".to_string()),
WsError::ServerError(code, message) => (code, message),
};
WsErrorBody { code: id, message }
}
}
71 changes: 0 additions & 71 deletions crates/cdk-axum/src/ws/handler.rs

This file was deleted.

56 changes: 21 additions & 35 deletions crates/cdk-axum/src/ws/mod.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,41 @@
use std::collections::HashMap;

use axum::extract::ws::{Message, WebSocket};
use cdk::nuts::nut17::ws::{
NotificationInner, WsErrorBody, WsMessageOrResponse, WsMethodRequest, WsRequest,
};
use cdk::nuts::nut17::{NotificationPayload, SubId};
use futures::StreamExt;
use handler::{WsHandle, WsNotification};
use serde::{Deserialize, Serialize};
use subscribe::Notification;
use tokio::sync::mpsc;
use uuid::Uuid;

use crate::MintState;

mod error;
mod handler;
mod subscribe;
mod unsubscribe;

/// JSON RPC version
pub const JSON_RPC_VERSION: &str = "2.0";

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WsRequest {
jsonrpc: String,
#[serde(flatten)]
method: WsMethod,
id: usize,
}
async fn process(
context: &mut WsContext,
body: WsRequest,
) -> Result<serde_json::Value, serde_json::Error> {
let response = match body.method {
WsMethodRequest::Subscribe(sub) => subscribe::handle(context, sub).await,
WsMethodRequest::Unsubscribe(unsub) => unsubscribe::handle(context, unsub).await,
}
.map_err(WsErrorBody::from);

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "method", content = "params")]
pub enum WsMethod {
Subscribe(subscribe::Method),
Unsubscribe(unsubscribe::Method),
}
let response: WsMessageOrResponse = (body.id, response).into();

impl WsMethod {
pub async fn process(
self,
req_id: usize,
context: &mut WsContext,
) -> Result<serde_json::Value, serde_json::Error> {
match self {
WsMethod::Subscribe(sub) => sub.process(req_id, context),
WsMethod::Unsubscribe(unsub) => unsub.process(req_id, context),
}
.await
}
serde_json::to_value(response)
}

pub use error::WsError;

pub struct WsContext {
state: MintState,
subscriptions: HashMap<SubId, tokio::task::JoinHandle<()>>,
publisher: mpsc::Sender<(SubId, NotificationPayload)>,
publisher: mpsc::Sender<(SubId, NotificationPayload<Uuid>)>,
}

/// Main function for websocket connections
Expand All @@ -78,7 +61,10 @@ pub async fn main_websocket(mut socket: WebSocket, state: MintState) {
// unsubscribed from the subscription manager, just ignore it.
continue;
}
let notification: WsNotification<Notification> = (sub_id, payload).into();
let notification: WsMessageOrResponse= NotificationInner {
sub_id,
payload,
}.into();
let message = match serde_json::to_string(&notification) {
Ok(message) => message,
Err(err) => {
Expand All @@ -101,7 +87,7 @@ pub async fn main_websocket(mut socket: WebSocket, state: MintState) {
}
};

match request.method.process(request.id, &mut context).await {
match process(&mut context, request).await {
Ok(result) => {
if let Err(err) = socket
.send(Message::Text(result.to_string()))
Expand Down
Loading

0 comments on commit f3f5d32

Please sign in to comment.