Skip to content

Commit

Permalink
refactor: drop typestate pattern for building sockets
Browse files Browse the repository at this point in the history
The typestate pattern caused a lot of boilerplate w.r.t. both the
implementation and the usage of matchbox socket.

There appeared to be only three actual uses of the typestate pattern:

* Preventing client code from calling build() without adding a channel.
* Some convenience methods to read/write from the first channel in the
  case where where was only one channel.
* Making sure that ggrs::NonBlockingSocket is only implemented for
  WebRtcSocket in the case where it is a single channel socket, to avoid
  the case where a reliable socket is accidentally used as a GGRS
  socket (or a unreliable socket is used for GGRS + some other data).

The first and (mostly) the last points can be accomplished at runtime
via asserts, and owing to the "set it up and use it from one place"
nature of this library I think it's highly unlikely that having a
runtime assertion is problematic.

This is a breaking change owing to the removal of the
SingleChannel-variant convenience methods, but it should be trivially
easy to migrate by looking at modified example code.
  • Loading branch information
caspark committed Dec 9, 2024
1 parent 545f609 commit ee70adc
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 224 deletions.
10 changes: 6 additions & 4 deletions bevy_matchbox/examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
use bevy::{prelude::*, time::common_conditions::on_timer, utils::Duration};
use bevy_matchbox::prelude::*;

const CHANNEL_ID: usize = 0;

fn main() {
App::new()
.add_plugins(DefaultPlugins)
Expand All @@ -21,22 +23,22 @@ fn start_socket(mut commands: Commands) {
commands.insert_resource(socket);
}

fn send_message(mut socket: ResMut<MatchboxSocket<SingleChannel>>) {
fn send_message(mut socket: ResMut<MatchboxSocket>) {
let peers: Vec<_> = socket.connected_peers().collect();

for peer in peers {
let message = "Hello";
info!("Sending message: {message:?} to {peer}");
socket.send(message.as_bytes().into(), peer);
socket.channel_mut(CHANNEL_ID).send(message.as_bytes().into(), peer);
}
}

fn receive_messages(mut socket: ResMut<MatchboxSocket<SingleChannel>>) {
fn receive_messages(mut socket: ResMut<MatchboxSocket>) {
for (peer, state) in socket.update_peers() {
info!("{peer}: {state:?}");
}

for (_id, message) in socket.receive() {
for (_id, message) in socket.channel_mut(CHANNEL_ID).receive() {
match std::str::from_utf8(&message) {
Ok(message) => info!("Received message: {message:?}"),
Err(e) => error!("Failed to convert message to string: {e}"),
Expand Down
2 changes: 1 addition & 1 deletion bevy_matchbox/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod prelude {
pub use crate::{CloseSocketExt, MatchboxSocket, OpenSocketExt};
use cfg_if::cfg_if;
pub use matchbox_socket::{
BuildablePlurality, ChannelConfig, MultipleChannels, PeerId, PeerState, SingleChannel,
ChannelConfig, PeerId, PeerState,
WebRtcSocketBuilder,
};

Expand Down
59 changes: 29 additions & 30 deletions bevy_matchbox/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ use bevy::{
};
pub use matchbox_socket;
use matchbox_socket::{
BuildablePlurality, MessageLoopFuture, SingleChannel, WebRtcSocket, WebRtcSocketBuilder,
MessageLoopFuture, WebRtcSocket, WebRtcSocketBuilder,
};
use std::{
fmt::Debug,
marker::PhantomData,
ops::{Deref, DerefMut},
};

Expand All @@ -28,7 +27,7 @@ use std::{
///
/// fn close_socket_system(
/// mut commands: Commands,
/// socket: Query<Entity, With<MatchboxSocket<SingleChannel>>>
/// socket: Query<Entity, With<MatchboxSocket>>
/// ) {
/// let socket = socket.single();
/// commands.entity(socket).despawn();
Expand All @@ -46,7 +45,7 @@ use std::{
/// }
///
/// fn close_socket_system(mut commands: Commands) {
/// commands.close_socket::<SingleChannel>();
/// commands.close_socket();
/// }
/// ```
///
Expand All @@ -58,93 +57,93 @@ use std::{
/// fn open_socket_system(mut commands: Commands) {
/// let room_url = "wss://matchbox.example.com";
///
/// let socket: MatchboxSocket<SingleChannel> = WebRtcSocketBuilder::new(room_url)
/// let socket: MatchboxSocket = WebRtcSocketBuilder::new(room_url)
/// .add_channel(ChannelConfig::reliable())
/// .into();
///
/// commands.insert_resource(socket);
/// }
///
/// fn close_socket_system(mut commands: Commands) {
/// commands.remove_resource::<MatchboxSocket<SingleChannel>>();
/// commands.remove_resource::<MatchboxSocket>();
/// }
/// ```
#[derive(Resource, Component, Debug)]
#[allow(dead_code)] // keep the task alive so it doesn't drop before the socket
pub struct MatchboxSocket<C: BuildablePlurality>(WebRtcSocket<C>, Box<dyn Debug + Send + Sync>);
pub struct MatchboxSocket(WebRtcSocket, Box<dyn Debug + Send + Sync>);

impl<C: BuildablePlurality> Deref for MatchboxSocket<C> {
type Target = WebRtcSocket<C>;
impl Deref for MatchboxSocket {
type Target = WebRtcSocket;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<C: BuildablePlurality> DerefMut for MatchboxSocket<C> {
impl DerefMut for MatchboxSocket {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl<C: BuildablePlurality> From<WebRtcSocketBuilder<C>> for MatchboxSocket<C> {
fn from(builder: WebRtcSocketBuilder<C>) -> Self {
impl From<WebRtcSocketBuilder> for MatchboxSocket {
fn from(builder: WebRtcSocketBuilder) -> Self {
Self::from(builder.build())
}
}

impl<C: BuildablePlurality> From<(WebRtcSocket<C>, MessageLoopFuture)> for MatchboxSocket<C> {
fn from((socket, message_loop_fut): (WebRtcSocket<C>, MessageLoopFuture)) -> Self {
impl From<(WebRtcSocket, MessageLoopFuture)> for MatchboxSocket {
fn from((socket, message_loop_fut): (WebRtcSocket, MessageLoopFuture)) -> Self {
let task_pool = IoTaskPool::get();
let task = task_pool.spawn(message_loop_fut);
MatchboxSocket(socket, Box::new(task))
}
}

/// A [`Command`] used to open a [`MatchboxSocket`] and allocate it as a resource.
struct OpenSocket<C: BuildablePlurality>(WebRtcSocketBuilder<C>);
struct OpenSocket(WebRtcSocketBuilder);

impl<C: BuildablePlurality + 'static> Command for OpenSocket<C> {
impl Command for OpenSocket {
fn apply(self, world: &mut World) {
world.insert_resource(MatchboxSocket::from(self.0));
}
}

/// A [`Commands`] extension used to open a [`MatchboxSocket`] and allocate it as a resource.
pub trait OpenSocketExt<C: BuildablePlurality> {
pub trait OpenSocketExt {
/// Opens a [`MatchboxSocket`] and allocates it as a resource.
fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder<C>);
fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder);
}

impl<'w, 's, C: BuildablePlurality + 'static> OpenSocketExt<C> for Commands<'w, 's> {
fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder<C>) {
impl<'w, 's> OpenSocketExt for Commands<'w, 's> {
fn open_socket(&mut self, socket_builder: WebRtcSocketBuilder) {
self.add(OpenSocket(socket_builder))
}
}

/// A [`Command`] used to close a [`WebRtcSocket`], deleting the [`MatchboxSocket`] resource.
struct CloseSocket<C: BuildablePlurality>(PhantomData<C>);
struct CloseSocket;

impl<C: BuildablePlurality + 'static> Command for CloseSocket<C> {
impl Command for CloseSocket {
fn apply(self, world: &mut World) {
world.remove_resource::<MatchboxSocket<C>>();
world.remove_resource::<MatchboxSocket>();
}
}

/// A [`Commands`] extension used to close a [`WebRtcSocket`], deleting the [`MatchboxSocket`]
/// resource.
pub trait CloseSocketExt {
/// Delete the [`MatchboxSocket`] resource.
fn close_socket<C: BuildablePlurality + 'static>(&mut self);
fn close_socket(&mut self);
}

impl<'w, 's> CloseSocketExt for Commands<'w, 's> {
fn close_socket<C: BuildablePlurality + 'static>(&mut self) {
self.add(CloseSocket::<C>(PhantomData))
fn close_socket(&mut self) {
self.add(CloseSocket)
}
}

impl MatchboxSocket<SingleChannel> {
impl MatchboxSocket {
/// Create a new socket with a single unreliable channel
///
/// ```rust
Expand All @@ -157,7 +156,7 @@ impl MatchboxSocket<SingleChannel> {
/// commands.spawn(socket);
/// }
/// ```
pub fn new_unreliable(room_url: impl Into<String>) -> MatchboxSocket<SingleChannel> {
pub fn new_unreliable(room_url: impl Into<String>) -> MatchboxSocket {
Self::from(WebRtcSocket::new_unreliable(room_url))
}

Expand All @@ -173,7 +172,7 @@ impl MatchboxSocket<SingleChannel> {
/// commands.spawn(socket);
/// }
/// ```
pub fn new_reliable(room_url: impl Into<String>) -> MatchboxSocket<SingleChannel> {
pub fn new_reliable(room_url: impl Into<String>) -> MatchboxSocket {
Self::from(WebRtcSocket::new_reliable(room_url))
}

Expand All @@ -190,7 +189,7 @@ impl MatchboxSocket<SingleChannel> {
/// }
/// ```
#[cfg(feature = "ggrs")]
pub fn new_ggrs(room_url: impl Into<String>) -> MatchboxSocket<SingleChannel> {
pub fn new_ggrs(room_url: impl Into<String>) -> MatchboxSocket {
Self::from(WebRtcSocket::new_ggrs(room_url))
}
}
2 changes: 1 addition & 1 deletion examples/bevy_ggrs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ fn lobby_cleanup(query: Query<Entity, With<LobbyUI>>, mut commands: Commands) {
fn lobby_system(
mut app_state: ResMut<NextState<AppState>>,
args: Res<Args>,
mut socket: ResMut<MatchboxSocket<SingleChannel>>,
mut socket: ResMut<MatchboxSocket>,
mut commands: Commands,
mut query: Query<&mut Text, With<LobbyText>>,
) {
Expand Down
8 changes: 5 additions & 3 deletions examples/error_handling/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use log::{info, warn};
use matchbox_socket::{Error as SocketError, PeerId, PeerState, WebRtcSocket};
use std::time::Duration;

const CHANNEL_ID: usize = 0;

#[cfg(target_arch = "wasm32")]
fn main() {
// Setup logging
Expand Down Expand Up @@ -65,7 +67,7 @@ async fn async_main() {
PeerState::Connected => {
info!("Peer joined: {peer}");
let packet = "hello friend!".as_bytes().to_vec().into_boxed_slice();
socket.send(packet, peer);
socket.channel_mut(CHANNEL_ID).send(packet, peer);
}
PeerState::Disconnected => {
info!("Peer left: {peer}");
Expand All @@ -74,7 +76,7 @@ async fn async_main() {
}

// Accept any messages incoming
for (peer, packet) in socket.receive() {
for (peer, packet) in socket.channel_mut(CHANNEL_ID).receive() {
let message = String::from_utf8_lossy(&packet);
info!("Message from {peer}: {message:?}");
}
Expand All @@ -85,7 +87,7 @@ async fn async_main() {
let peers: Vec<PeerId> = socket.connected_peers().collect();
for peer in peers {
let packet = "ping!".as_bytes().to_vec().into_boxed_slice();
socket.send(packet, peer);
socket.channel_mut(CHANNEL_ID).send(packet, peer);
}
timeout.reset(Duration::from_millis(10));
}
Expand Down
6 changes: 4 additions & 2 deletions examples/simple/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use log::info;
use matchbox_socket::{PeerState, WebRtcSocket};
use std::time::Duration;

const CHANNEL_ID: usize = 0;

#[cfg(target_arch = "wasm32")]
fn main() {
// Setup logging
Expand Down Expand Up @@ -46,7 +48,7 @@ async fn async_main() {
PeerState::Connected => {
info!("Peer joined: {peer}");
let packet = "hello friend!".as_bytes().to_vec().into_boxed_slice();
socket.send(packet, peer);
socket.channel_mut(CHANNEL_ID).send(packet, peer);
}
PeerState::Disconnected => {
info!("Peer left: {peer}");
Expand All @@ -55,7 +57,7 @@ async fn async_main() {
}

// Accept any messages incoming
for (peer, packet) in socket.receive() {
for (peer, packet) in socket.channel_mut(CHANNEL_ID).receive() {
let message = String::from_utf8_lossy(&packet);
info!("Message from {peer}: {message:?}");
}
Expand Down
51 changes: 15 additions & 36 deletions matchbox_socket/src/ggrs_socket.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,28 @@
use std::marker::PhantomData;

use ggrs::{Message, PlayerType};
use matchbox_protocol::PeerId;

use crate::{
ChannelConfig, ChannelPlurality, MessageLoopFuture, MultipleChannels, NoChannels, Packet,
SingleChannel, WebRtcChannel, WebRtcSocket, WebRtcSocketBuilder,
ChannelConfig, MessageLoopFuture, Packet,
WebRtcChannel, WebRtcSocket, WebRtcSocketBuilder,
};

pub const GGRS_CHANNEL_ID: usize = 0;

impl ChannelConfig {
/// Creates a [`ChannelConfig`] suitable for use with GGRS.
pub fn ggrs() -> Self {
Self::unreliable()
}
}

impl WebRtcSocketBuilder<NoChannels> {
/// Adds a new channel suitable for use with GGRS to the [`WebRtcSocket`] configuration.
pub fn add_ggrs_channel(mut self) -> WebRtcSocketBuilder<SingleChannel> {
self.config.channels.push(ChannelConfig::ggrs());
WebRtcSocketBuilder {
config: self.config,
channel_plurality: PhantomData,
}
}
}

impl WebRtcSocketBuilder<SingleChannel> {
impl WebRtcSocketBuilder {
/// Adds a new channel suitable for use with GGRS to the [`WebRtcSocket`] configuration.
pub fn add_ggrs_channel(mut self) -> WebRtcSocketBuilder<MultipleChannels> {
self.config.channels.push(ChannelConfig::ggrs());
WebRtcSocketBuilder {
config: self.config,
channel_plurality: PhantomData,
}
}
}
impl WebRtcSocketBuilder<MultipleChannels> {
/// Adds a new channel suitable for use with GGRS to the [`WebRtcSocket`] configuration.
pub fn add_ggrs_channel(mut self) -> WebRtcSocketBuilder<MultipleChannels> {
///
/// This must be called as the first channel.
pub fn add_ggrs_channel(mut self) -> WebRtcSocketBuilder {
assert_eq!(self.config.channels.len(), GGRS_CHANNEL_ID, "ggrs channel is expected to be the first channel added");
self.config.channels.push(ChannelConfig::ggrs());
WebRtcSocketBuilder {
config: self.config,
channel_plurality: PhantomData,
}
self
}
}

Expand All @@ -57,14 +36,14 @@ impl WebRtcSocket {
/// Please use the [`WebRtcSocketBuilder`] to create non-trivial sockets.
pub fn new_ggrs(
room_url: impl Into<String>,
) -> (WebRtcSocket<SingleChannel>, MessageLoopFuture) {
) -> (WebRtcSocket, MessageLoopFuture) {
WebRtcSocketBuilder::new(room_url)
.add_channel(ChannelConfig::ggrs())
.build()
}
}

impl<C: ChannelPlurality> WebRtcSocket<C> {
impl WebRtcSocket {
/// Returns a Vec of connected peers as [`ggrs::PlayerType`]
pub fn players(&mut self) -> Vec<PlayerType<PeerId>> {
let Some(our_id) = self.id() else {
Expand Down Expand Up @@ -100,12 +79,12 @@ fn deserialize_packet(message: (PeerId, Packet)) -> (PeerId, Message) {
(message.0, bincode::deserialize(&message.1).unwrap())
}

impl ggrs::NonBlockingSocket<PeerId> for WebRtcSocket<SingleChannel> {
impl ggrs::NonBlockingSocket<PeerId> for WebRtcSocket {
fn send_to(&mut self, msg: &Message, addr: &PeerId) {
self.send(build_packet(msg), *addr);
self.channel_mut(GGRS_CHANNEL_ID).send(build_packet(msg), *addr);
}
fn receive_all_messages(&mut self) -> Vec<(PeerId, Message)> {
self.receive().into_iter().map(deserialize_packet).collect()
self.channel_mut(GGRS_CHANNEL_ID).receive().into_iter().map(deserialize_packet).collect()
}
}

Expand Down
4 changes: 2 additions & 2 deletions matchbox_socket/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod webrtc_socket;
pub use error::Error;
pub use matchbox_protocol::PeerId;
pub use webrtc_socket::{
error::ChannelError, BuildablePlurality, ChannelConfig, ChannelPlurality, MessageLoopFuture,
MultipleChannels, NoChannels, Packet, PeerState, RtcIceServerConfig, SingleChannel,
error::ChannelError, ChannelConfig, MessageLoopFuture,
Packet, PeerState, RtcIceServerConfig,
WebRtcChannel, WebRtcSocket, WebRtcSocketBuilder,
};
Loading

0 comments on commit ee70adc

Please sign in to comment.