Skip to content

Commit

Permalink
Remove GGRS-specific builders and ggrs impl for WebRtcSocket
Browse files Browse the repository at this point in the history
To create GGRS sockets, users can still create new sockets by specifying
ChannelConfig::unreliable() and using WebRtcSocket::take_channel() to
detach the channel so it can be owned by GGRS.

This makes it clearer that there's nothing special about GGRS sockets.
  • Loading branch information
caspark committed Dec 14, 2024
1 parent 4c875f4 commit 469e694
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 99 deletions.
6 changes: 4 additions & 2 deletions bevy_matchbox/src/signaling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ impl MatchboxServer {

#[cfg(test)]
mod tests {
use crate::matchbox_signaling::topologies::client_server::{ClientServer, ClientServerState};
use crate::prelude::*;
use crate::{
matchbox_signaling::topologies::client_server::{ClientServer, ClientServerState},
prelude::*,
};
use bevy::prelude::*;
use std::net::Ipv4Addr;

Expand Down
17 changes: 0 additions & 17 deletions bevy_matchbox/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,21 +173,4 @@ impl MatchboxSocket {
pub fn new_reliable(room_url: impl Into<String>) -> MatchboxSocket {
Self::from(WebRtcSocket::new_reliable(room_url))
}

/// Create a new socket with a single ggrs-compatible channel
///
/// ```rust
/// use bevy_matchbox::prelude::*;
/// use bevy::prelude::*;
///
/// fn open_channel_system(mut commands: Commands) {
/// let room_url = "wss://matchbox.example.com";
/// let socket = MatchboxSocket::new_ggrs(room_url);
/// commands.spawn(socket);
/// }
/// ```
#[cfg(feature = "ggrs")]
pub fn new_ggrs(room_url: impl Into<String>) -> MatchboxSocket {
Self::from(WebRtcSocket::new_ggrs(room_url))
}
}
15 changes: 9 additions & 6 deletions examples/bevy_ggrs/src/box_game.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ const FRICTION: f32 = 0.0018;
const PLANE_SIZE: f32 = 5.0;
const CUBE_SIZE: f32 = 0.2;

// You need to define a config struct to bundle all the generics of GGRS. bevy_ggrs provides a sensible default in `GgrsConfig`.
// (optional) You can define a type here for brevity.
// You need to define a config struct to bundle all the generics of GGRS. bevy_ggrs provides a
// sensible default in `GgrsConfig`. (optional) You can define a type here for brevity.
pub type BoxConfig = GgrsConfig<BoxInput, PeerId>;

#[repr(C)]
Expand Down Expand Up @@ -58,7 +58,8 @@ pub struct FrameCount {
pub frame: u32,
}

/// Collects player inputs during [`ReadInputs`](`bevy_ggrs::ReadInputs`) and creates a [`LocalInputs`] resource.
/// Collects player inputs during [`ReadInputs`](`bevy_ggrs::ReadInputs`) and creates a
/// [`LocalInputs`] resource.
pub fn read_local_inputs(
mut commands: Commands,
keyboard_input: Res<ButtonInput<KeyCode>>,
Expand Down Expand Up @@ -154,8 +155,9 @@ pub fn setup_scene(
}

// Example system, manipulating a resource, will be added to the rollback schedule.
// Increases the frame count by 1 every update step. If loading and saving resources works correctly,
// you should see this resource rolling back, counting back up and finally increasing by 1 every update step
// Increases the frame count by 1 every update step. If loading and saving resources works
// correctly, you should see this resource rolling back, counting back up and finally increasing by
// 1 every update step
#[allow(dead_code)]
pub fn increase_frame_system(mut frame_count: ResMut<FrameCount>) {
frame_count.frame += 1;
Expand All @@ -167,7 +169,8 @@ pub fn increase_frame_system(mut frame_count: ResMut<FrameCount>) {
#[allow(dead_code)]
pub fn move_cube_system(
mut query: Query<(&mut Transform, &mut Velocity, &Player), With<Rollback>>,
// ^------^ Added by `add_rollback` earlier
// ^------^ Added by
// `add_rollback` earlier
inputs: Res<PlayerInputs<BoxConfig>>,
// Thanks to RollbackTimePlugin, this is rollback safe
time: Res<Time>,
Expand Down
8 changes: 5 additions & 3 deletions examples/bevy_ggrs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ fn main() {
.set_rollback_schedule_fps(FPS)
.add_systems(ReadInputs, read_local_inputs)
// Rollback behavior can be customized using a variety of extension methods and plugins:
// The FrameCount resource implements Copy, we can use that to have minimal overhead rollback
// The FrameCount resource implements Copy, we can use that to have minimal overhead
// rollback
.rollback_resource_with_copy::<FrameCount>()
// Transform and Velocity components only implement Clone, so instead we'll use that to snapshot and rollback with
// Transform and Velocity components only implement Clone, so instead we'll use that to
// snapshot and rollback with
.rollback_component_with_clone::<Transform>()
.rollback_component_with_clone::<Velocity>()
.insert_resource(ClearColor(SKY_COLOR))
Expand Down Expand Up @@ -66,7 +68,7 @@ fn start_matchbox_socket(mut commands: Commands, args: Res<Args>) {
let room_url = format!("{}/{}", &args.matchbox, room_id);
info!("connecting to matchbox server: {room_url:?}");

commands.insert_resource(MatchboxSocket::new_ggrs(room_url));
commands.insert_resource(MatchboxSocket::new_unreliable(room_url));
}

// Marker components for UI
Expand Down
71 changes: 15 additions & 56 deletions matchbox_socket/src/ggrs_socket.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,7 @@
use ggrs::{Message, PlayerType};
use matchbox_protocol::PeerId;

use crate::{
ChannelConfig, MessageLoopFuture, Packet, WebRtcChannel, WebRtcSocket, WebRtcSocketBuilder,
};

pub const GGRS_CHANNEL_ID: usize = 0;

impl ChannelConfig {
/// Creates a [`ChannelConfig`] suitable for use with GGRS.
pub fn ggrs() -> Self {
Self::unreliable()
}
}

impl WebRtcSocketBuilder {
/// Adds a new channel suitable for use with GGRS to the [`WebRtcSocket`] configuration.
///
/// This must be called as the first channel.
pub fn add_ggrs_channel(mut self) -> WebRtcSocketBuilder {
assert_eq!(
self.config.channels.len(),
GGRS_CHANNEL_ID,
"ggrs channel is expected to be the first channel added"
);
self.config.channels.push(ChannelConfig::ggrs());
self
}
}

impl WebRtcSocket {
/// Creates a [`WebRtcSocket`] and the corresponding [`MessageLoopFuture`] for a
/// socket with a single channel configured correctly for usage with GGRS.
///
/// The returned [`MessageLoopFuture`] should be awaited in order for messages to
/// be sent and received.
///
/// Please use the [`WebRtcSocketBuilder`] to create non-trivial sockets.
pub fn new_ggrs(room_url: impl Into<String>) -> (WebRtcSocket, MessageLoopFuture) {
WebRtcSocketBuilder::new(room_url)
.add_channel(ChannelConfig::ggrs())
.build()
}
}
use crate::{Packet, WebRtcChannel, WebRtcSocket};

impl WebRtcSocket {
/// Returns a Vec of connected peers as [`ggrs::PlayerType`]
Expand Down Expand Up @@ -80,22 +39,22 @@ fn deserialize_packet(message: (PeerId, Packet)) -> (PeerId, Message) {
(message.0, bincode::deserialize(&message.1).unwrap())
}

impl ggrs::NonBlockingSocket<PeerId> for WebRtcSocket {
fn send_to(&mut self, msg: &Message, addr: &PeerId) {
self.channel_mut(GGRS_CHANNEL_ID)
.send(build_packet(msg), *addr);
}
fn receive_all_messages(&mut self) -> Vec<(PeerId, Message)> {
self.channel_mut(GGRS_CHANNEL_ID)
.receive()
.into_iter()
.map(deserialize_packet)
.collect()
}
}

impl ggrs::NonBlockingSocket<PeerId> for WebRtcChannel {
fn send_to(&mut self, msg: &Message, addr: &PeerId) {
if self.config().max_retransmits != Some(0) || self.config().ordered {
// Using a reliable or ordered channel with ggrs is wasteful in that ggrs implements its
// own reliability layer (including unconfirmed inputs in frames) and can
// handle out of order messages just fine on its own.
// It's likely that in poor network conditions this will cause GGRS to unnecessarily
// delay confirming or rolling back simulation frames, which will impact performance
// (or, worst case, cause GGRS to temporarily stop advancing frames).
// So we better warn the user about this.
log::warn!(
"Sending GGRS traffic over reliable or ordered channel ({:?}), which may reduce performance.\
You should use an unreliable and unordered channel instead.",
self.config()
);
}
self.send(build_packet(msg), *addr);
}

Expand Down
2 changes: 0 additions & 2 deletions matchbox_socket/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,3 @@ pub use webrtc_socket::{
error::ChannelError, ChannelConfig, MessageLoopFuture, Packet, PeerState, RtcIceServerConfig,
WebRtcChannel, WebRtcSocket, WebRtcSocketBuilder,
};
#[cfg(feature = "ggrs")]
pub use ggrs_socket::GGRS_CHANNEL_ID;
39 changes: 26 additions & 13 deletions matchbox_socket/src/webrtc_socket/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct RtcIceServerConfig {

/// Configuration options for a data channel
/// See also: <https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel>
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
pub struct ChannelConfig {
/// Whether messages sent on the channel are guaranteed to arrive in order
/// See also: <https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/ordered>
Expand All @@ -42,16 +42,16 @@ pub struct ChannelConfig {

impl ChannelConfig {
/// Messages sent via an unreliable channel may arrive in any order or not at all, but arrive as
/// quickly as possible
/// quickly as possible.
pub fn unreliable() -> Self {
ChannelConfig {
ordered: false,
max_retransmits: Some(0),
}
}

/// Messages sent via a reliable channel are guaranteed to arrive in order and will be resent
/// until they arrive
/// Messages sent via a reliable channel are guaranteed to arrive (and guaranteed to arrive in
/// order the order they were sent) and will be retransmitted until they arrive.
pub fn reliable() -> Self {
ChannelConfig {
ordered: true,
Expand Down Expand Up @@ -184,15 +184,22 @@ impl WebRtcSocketBuilder {

let (peer_state_tx, peer_state_rx) = futures_channel::mpsc::unbounded();

let (messages_from_peers_tx, messages_from_peers_rx) =
new_senders_and_receivers(&self.config.channels);
let (peer_messages_out_tx, peer_messages_out_rx) =
new_senders_and_receivers(&self.config.channels);
let channels = messages_from_peers_rx
.into_iter()
.zip(peer_messages_out_tx)
.map(|(rx, tx)| Some(WebRtcChannel { rx, tx }))
.collect();
let mut peer_messages_out_rx = Vec::with_capacity(self.config.channels.len());
let mut messages_from_peers_tx = Vec::with_capacity(self.config.channels.len());
let mut channels = Vec::with_capacity(self.config.channels.len());
for channel_config in self.config.channels.iter() {
let (messages_from_peers_tx_curr, messages_from_peers_rx_curr) =
futures_channel::mpsc::unbounded();
let (peer_messages_out_tx_curr, peer_messages_out_rx_curr) =
futures_channel::mpsc::unbounded();
peer_messages_out_rx.push(peer_messages_out_rx_curr);
messages_from_peers_tx.push(messages_from_peers_tx_curr);
channels.push(Some(WebRtcChannel {
config: *channel_config,
rx: messages_from_peers_rx_curr,
tx: peer_messages_out_tx_curr,
}));
}

let (id_tx, id_rx) = futures_channel::oneshot::channel();

Expand Down Expand Up @@ -250,11 +257,17 @@ pub enum PeerState {
/// [`WebRtcSocket`].
#[derive(Debug)]
pub struct WebRtcChannel {
config: ChannelConfig,
tx: UnboundedSender<(PeerId, Packet)>,
rx: UnboundedReceiver<(PeerId, Packet)>,
}

impl WebRtcChannel {
/// Returns the [`ChannelConfig`] used to create this channel.
pub fn config(&self) -> &ChannelConfig {
&self.config
}

/// Returns whether it's still possible to send messages.
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
Expand Down

0 comments on commit 469e694

Please sign in to comment.