Skip to content

Commit

Permalink
feat(socketio): disconnect return SocketError/BroadcastError
Browse files Browse the repository at this point in the history
  • Loading branch information
Totodore committed Dec 24, 2024
1 parent f4f2c54 commit 063c50d
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 85 deletions.
13 changes: 3 additions & 10 deletions crates/socketioxide/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,20 +302,13 @@ impl<A: Adapter> EngineIoHandler for Client<A> {
.filter_map(|ns| ns.get_socket(socket.id).ok())
.collect();

let _res: Result<Vec<_>, _> = socks
let _cnt = socks
.into_iter()
.map(|s| s.close(reason.clone().into()))
.collect();
.count();

#[cfg(feature = "tracing")]
match _res {
Ok(vec) => {
tracing::debug!("disconnect handle spawned for {} namespaces", vec.len())
}
Err(_e) => {
tracing::debug!("error while disconnecting socket: {}", _e)
}
}
tracing::debug!("disconnect handle spawned for {_cnt} namespaces");
}

fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<EIoSocket<SocketData<A>>>) {
Expand Down
41 changes: 1 addition & 40 deletions crates/socketioxide/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use engineioxide::{sid::Sid, socket::DisconnectReason as EIoDisconnectReason};
use serde::{Deserialize, Serialize};
use socketioxide_core::errors::{AdapterError, SocketError};
use std::fmt::Debug;
use tokio::time::error::Elapsed;

pub use matchit::InsertError as NsInsertError;

pub use crate::parser::ParserError;
pub use socketioxide_core::errors::{AdapterError, BroadcastError, SocketError};

/// Error type for socketio
#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -45,21 +45,6 @@ pub enum AckError {
Socket(#[from] SocketError),
}

/// Error type for broadcast operations.
#[derive(thiserror::Error, Debug)]
pub enum BroadcastError {
/// An error occurred while sending packets.
#[error("Error sending data through the engine.io socket: {0:?}")]
Socket(Vec<SocketError>),

/// An error occurred while serializing the packet.
#[error("Error serializing packet: {0:?}")]
Serialize(#[from] ParserError),

/// An error occured while broadcasting to other nodes.
#[error("Adapter error: {0}")]
Adapter(#[from] AdapterError),
}
/// Error type for sending operations.
#[derive(thiserror::Error, Debug)]
pub enum SendError {
Expand All @@ -83,30 +68,6 @@ pub enum EmitWithAckError {
Adapter(#[from] Box<dyn std::error::Error + Send>),
}

// impl<T> From<TrySendError<T>> for SocketError {
// fn from(value: TrySendError<T>) -> Self {
// match value {
// TrySendError::Full(_) => Self::InternalChannelFull,
// TrySendError::Closed(_) => Self::Closed,
// }
// }
// }

impl From<Vec<SocketError>> for BroadcastError {
/// Converts a vector of `SendError` into a `BroadcastError`.
///
/// # Arguments
///
/// * `value` - A vector of `SendError` representing the sending errors.
///
/// # Returns
///
/// A `BroadcastError` containing the sending errors.
fn from(value: Vec<SocketError>) -> Self {
Self::Socket(value)
}
}

impl From<Elapsed> for AckError {
fn from(_: Elapsed) -> Self {
Self::Timeout
Expand Down
6 changes: 3 additions & 3 deletions crates/socketioxide/src/extract/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use crate::{
handler::{FromConnectParts, FromDisconnectParts, FromMessageParts},
packet::Packet,
socket::{DisconnectReason, Socket},
DisconnectError, SendError, SocketIo,
SendError, SocketIo,
};
use serde::Serialize;
use socketioxide_core::{parser::Parse, Value};
use socketioxide_core::{errors::SocketError, parser::Parse, Value};

/// An Extractor that returns a reference to a [`Socket`].
#[derive(Debug)]
Expand Down Expand Up @@ -69,7 +69,7 @@ impl<A: Adapter> SocketRef<A> {
///
/// It will also call the disconnect handler if it is set.
#[inline(always)]
pub fn disconnect(self) -> Result<(), DisconnectError> {
pub fn disconnect(self) -> Result<(), SocketError> {
self.0.disconnect()
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/socketioxide/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
operators::BroadcastOperators,
parser::Parser,
service::SocketIoService,
BroadcastError, DisconnectError, EmitWithAckError,
BroadcastError, EmitWithAckError,
};

/// The parser to use to encode and decode socket.io packets
Expand Down Expand Up @@ -572,7 +572,7 @@ impl<A: Adapter> SocketIo<A> {
/// _Alias for `io.of("/").unwrap().disconnect()`_. If the **default namespace "/" is not found** this fn will panic!
#[doc = include_str!("../docs/operators/disconnect.md")]
#[inline]
pub async fn disconnect(&self) -> Result<(), Vec<DisconnectError>> {
pub async fn disconnect(&self) -> Result<(), BroadcastError> {
self.get_default_op().disconnect().await
}

Expand Down
4 changes: 2 additions & 2 deletions crates/socketioxide/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,10 @@ pub mod socket;

pub use engineioxide::TransportType;
pub use errors::{
AckError, BroadcastError, EmitWithAckError, NsInsertError, ParserError, SendError,
AckError, AdapterError, BroadcastError, EmitWithAckError, NsInsertError, ParserError,
SendError, SocketError,
};
pub use io::{ParserConfig, SocketIo, SocketIoBuilder, SocketIoConfig};
pub use socketioxide_core::errors::{AdapterError, DisconnectError, SocketError};
//TODO: remove packet export
pub use socketioxide_core::packet;

Expand Down
18 changes: 7 additions & 11 deletions crates/socketioxide/src/ns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
use engineioxide::{sid::Sid, Str};
use socketioxide_core::{
adapter::{CoreLocalAdapter, SocketEmitter},
errors::{DisconnectError, SocketError},
errors::SocketError,
parser::Parse,
Value,
};
Expand Down Expand Up @@ -197,11 +197,7 @@ impl<A: Adapter> Namespace<A> {
} else {
for s in sockets.into_values() {
let _sid = s.id;
let _err = s.close(reason);
#[cfg(feature = "tracing")]
if let Err(err) = _err {
tracing::debug!(?_sid, ?err, "error closing socket");
}
s.close(reason);
}
}
#[cfg(feature = "tracing")]
Expand Down Expand Up @@ -229,7 +225,7 @@ trait InnerEmitter: Send + Sync + 'static {
timeout: Option<Duration>,
) -> AckInnerStream;
/// Disconnect all the sockets in the list.
fn disconnect_many(&self, sid: Vec<Sid>) -> Result<(), Vec<DisconnectError>>;
fn disconnect_many(&self, sid: Vec<Sid>) -> Result<(), Vec<SocketError>>;
}

impl<A: Adapter> InnerEmitter for Namespace<A> {
Expand Down Expand Up @@ -268,7 +264,7 @@ impl<A: Adapter> InnerEmitter for Namespace<A> {
AckInnerStream::broadcast(packet, sockets, timeout)
}

fn disconnect_many(&self, sids: Vec<Sid>) -> Result<(), Vec<DisconnectError>> {
fn disconnect_many(&self, sids: Vec<Sid>) -> Result<(), Vec<SocketError>> {
let sockets: Vec<Arc<Socket<A>>> = self
.sockets
.read()
Expand All @@ -277,10 +273,10 @@ impl<A: Adapter> InnerEmitter for Namespace<A> {
.filter(|s| sids.contains(&s.id))
.cloned()
.collect();
let errs: Vec<crate::DisconnectError> = sockets
let errs = sockets
.into_iter()
.filter_map(|socket| socket.disconnect().err())
.collect();
.collect::<Vec<_>>();
if errs.is_empty() {
Ok(())
} else {
Expand Down Expand Up @@ -331,7 +327,7 @@ impl SocketEmitter for Emitter {
}
}

fn disconnect_many(&self, sids: Vec<Sid>) -> Result<(), Vec<DisconnectError>> {
fn disconnect_many(&self, sids: Vec<Sid>) -> Result<(), Vec<SocketError>> {
match self.ns.upgrade() {
Some(ns) => ns.disconnect_many(sids),
None => Ok(()),
Expand Down
18 changes: 11 additions & 7 deletions crates/socketioxide/src/operators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
packet::Packet,
parser::Parser,
socket::Socket,
BroadcastError, DisconnectError, EmitWithAckError, SendError,
BroadcastError, EmitWithAckError, SendError,
};

use socketioxide_core::{
Expand Down Expand Up @@ -238,11 +238,15 @@ impl<A: Adapter> BroadcastOperators<A> {
) -> impl Future<Output = Result<(), BroadcastError>> + Send {
let packet = self.get_packet(event, data);
async move {
if let Err(e) = self.ns.adapter.broadcast(packet?, self.opts).await {
#[cfg(feature = "tracing")]
tracing::debug!("broadcast error: {e:?}");
return Err(BroadcastError::Socket(e));
}
self.ns
.adapter
.broadcast(packet?, self.opts)
.await
.map_err(|e| {
#[cfg(feature = "tracing")]
tracing::debug!("broadcast error: {e}");
e
})?;
Ok(())
}
}
Expand Down Expand Up @@ -276,7 +280,7 @@ impl<A: Adapter> BroadcastOperators<A> {
}

#[doc = include_str!("../docs/operators/disconnect.md")]
pub async fn disconnect(self) -> Result<(), Vec<DisconnectError>> {
pub async fn disconnect(self) -> Result<(), BroadcastError> {
self.ns.adapter.disconnect_socket(self.opts).await
}

Expand Down
16 changes: 6 additions & 10 deletions crates/socketioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
ns::Namespace,
operators::{BroadcastOperators, ConfOperators},
parser::Parser,
AckError, AdapterError, DisconnectError, SendError, SocketError, SocketIo,
AckError, SendError, SocketError, SocketIo,
};
use socketioxide_core::{
adapter::{Room, RoomParam},
Expand Down Expand Up @@ -453,13 +453,12 @@ impl<A: Adapter> Socket<A> {
/// # Disconnect the socket from the current namespace,
///
/// It will also call the disconnect handler if it is set with a [`DisconnectReason::ServerNSDisconnect`].
pub fn disconnect(self: Arc<Self>) -> Result<(), DisconnectError> {
pub fn disconnect(self: Arc<Self>) -> Result<(), SocketError> {
let res = self.send(Packet::disconnect(self.ns.path.clone()));
if let Err(SocketError::InternalChannelFull) = res {
return Err(DisconnectError::InternalChannelFull);
return Err(SocketError::InternalChannelFull);
}

self.close(DisconnectReason::ServerNSDisconnect)?;
self.close(DisconnectReason::ServerNSDisconnect);
Ok(())
}

Expand Down Expand Up @@ -574,7 +573,7 @@ impl<A: Adapter> Socket<A> {
/// Called when the socket is gracefully disconnected from the server or the client
///
/// It maybe also close when the underlying transport is closed or failed.
pub(crate) fn close(self: Arc<Self>, reason: DisconnectReason) -> Result<(), AdapterError> {
pub(crate) fn close(self: Arc<Self>, reason: DisconnectReason) {
self.set_connected(false);

let handler = { self.disconnect_handler.lock().unwrap().take() };
Expand All @@ -586,17 +585,14 @@ impl<A: Adapter> Socket<A> {
}

self.ns.remove_socket(self.id);
Ok(())
}

/// Receive data from client
pub(crate) fn recv(self: Arc<Self>, packet: PacketData) -> Result<(), Error> {
match packet {
PacketData::Event(d, ack) | PacketData::BinaryEvent(d, ack) => self.recv_event(d, ack),
PacketData::EventAck(d, ack) | PacketData::BinaryAck(d, ack) => self.recv_ack(d, ack),
PacketData::Disconnect => self
.close(DisconnectReason::ClientNSDisconnect)
.map_err(Error::from),
PacketData::Disconnect => Ok(self.close(DisconnectReason::ClientNSDisconnect)),
_ => unreachable!(),
}
}
Expand Down

0 comments on commit 063c50d

Please sign in to comment.