Skip to content

Commit

Permalink
Use tokio::select instead of future::select
Browse files Browse the repository at this point in the history
  • Loading branch information
crodas committed Oct 23, 2024
1 parent faee37b commit e30f732
Showing 1 changed file with 7 additions and 19 deletions.
26 changes: 7 additions & 19 deletions crates/cdk-axum/src/ws/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use crate::MintState;
use axum::extract::ws::{Message, WebSocket};
use cdk::nuts::nut17::{NotificationPayload, SubId};
use futures::{
future::{self, Either},
StreamExt,
};
use futures::StreamExt;
use handler::{WsHandle, WsNotification};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
Expand Down Expand Up @@ -62,6 +59,7 @@ pub struct WsContext {
///
/// For simplicity sake this function will spawn tasks for each subscription and
/// keep them in a hashmap, and will have a single subscriber for all of them.
#[allow(clippy::incompatible_msrv)]
pub async fn main_websocket(mut socket: WebSocket, state: MintState) {
let (publisher, mut subscriber) = mpsc::channel(100);
let mut context = WsContext {
Expand All @@ -71,11 +69,8 @@ pub async fn main_websocket(mut socket: WebSocket, state: MintState) {
};

loop {
let recv_future = Box::pin(subscriber.recv());
let websocket_future = Box::pin(socket.next());

match future::select(recv_future, websocket_future).await {
Either::Left((Some((sub_id, payload)), _)) => {
tokio::select! {
Some((sub_id, payload)) = subscriber.recv() => {
if !context.subscriptions.contains_key(&sub_id) {
// It may be possible an incoming message has come from a dropped Subscriptions that has not yet been
// unsubscribed from the subscription manager, just ignore it.
Expand All @@ -93,8 +88,7 @@ pub async fn main_websocket(mut socket: WebSocket, state: MintState) {
break;
}
}
// WebSocket stream resolved first
Either::Right((Some(Ok(Message::Text(text))), _)) => {
Some(Ok(Message::Text(text))) = socket.next() => {
let request = match serde_json::from_str::<WsRequest>(&text) {
Ok(request) => request,
Err(err) => {
Expand All @@ -119,15 +113,9 @@ pub async fn main_websocket(mut socket: WebSocket, state: MintState) {
}
}
}
Either::Right((Some(Ok(Message::Close(_))), _)) => {
tracing::info!("WebSocket closed");
break;
}
Either::Left((None, _)) => {
tracing::info!("WebSocket closed");
break;
else => {

}
_ => {}
}
}
}

0 comments on commit e30f732

Please sign in to comment.