Skip to content

Commit

Permalink
feat(core): async adapter & redis adapter (#419)
Browse files Browse the repository at this point in the history
* feat(socketio): async adapter (#395)

* feat: improve `Adapter` error types for `broadcast` and `disconnect` methods (#408)

* feat(socketio/ns): improve `SocketEmitter` trait (#410)

* feat(core/adapter): pass filter to get_sids

* feat: add custom `Iterator` return for apply_opts fn

* chore(docs): add doc on async adapters (#411)

* feat(adapter): add a remote socket API (#412)

* feat(core): switch to a new type for server uid (#413)

* chore(deps): rm `__test_harness` && `tracing` feature flags.

* chore(deps): add fuzzing as cfg flag

* chore(ci): msrv

* chore(clippy): fix redundant import (#414)

* feat(core): improve local adapter perf (#415)

* test(e2e): provide e2e testing system for adapters (#422)

* test(e2e): provide e2e testing system for adapters

* fix(e2e/adapter): remove redis deps

* feat(socketio): check namespaces at insertion

* feat(redis): redis adapter crate (#402)
  • Loading branch information
Totodore authored Jan 12, 2025
1 parent 2e775b6 commit 4ce3966
Show file tree
Hide file tree
Showing 126 changed files with 7,380 additions and 1,579 deletions.
112 changes: 112 additions & 0 deletions .github/workflows/adapter-ci/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
name: adapter-setup
services:
valkey:
image: valkey/valkey
network_mode: host
healthcheck:
test: "valkey-cli ping"
interval: 2s
timeout: 5s
redis-node-0:
image: docker.io/bitnami/redis-cluster:7.0
network_mode: host
healthcheck:
test: "redis-cli ping"
interval: 2s
timeout: 5s
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_PORT_NUMBER=7000
- REDIS_CLUSTER_ANNOUNCE_PORT=7000
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1 # host ip address
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17000
- REDIS_CLUSTER_DYNAMIC_IPS=no
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005

redis-node-1:
image: docker.io/bitnami/redis-cluster:7.0
network_mode: host
healthcheck:
test: "redis-cli ping"
interval: 2s
timeout: 5s
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_PORT_NUMBER=7001
- REDIS_CLUSTER_ANNOUNCE_PORT=7001
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17001
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1
- REDIS_CLUSTER_DYNAMIC_IPS=no
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005

redis-node-2:
image: docker.io/bitnami/redis-cluster:7.0
network_mode: host
healthcheck:
test: "redis-cli ping"
interval: 2s
timeout: 5s
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_PORT_NUMBER=7002
- REDIS_CLUSTER_ANNOUNCE_PORT=7002
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17002
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1
- REDIS_CLUSTER_DYNAMIC_IPS=no
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005

redis-node-3:
image: docker.io/bitnami/redis-cluster:7.0
network_mode: host
healthcheck:
test: "redis-cli ping"
interval: 2s
timeout: 5s
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_PORT_NUMBER=7003
- REDIS_CLUSTER_ANNOUNCE_PORT=7003
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17003
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1
- REDIS_CLUSTER_DYNAMIC_IPS=no
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005

redis-node-4:
image: docker.io/bitnami/redis-cluster:7.0
network_mode: host
healthcheck:
test: "redis-cli ping"
interval: 2s
timeout: 5s
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_PORT_NUMBER=7004
- REDIS_CLUSTER_ANNOUNCE_PORT=7004
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17004
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1
- REDIS_CLUSTER_DYNAMIC_IPS=no
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005

redis-node-5:
image: docker.io/bitnami/redis-cluster:7.0
network_mode: host
healthcheck:
test: "redis-cli ping"
interval: 2s
timeout: 5s
depends_on:
- redis-node-0
- redis-node-1
- redis-node-2
- redis-node-3
- redis-node-4
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_CLUSTER_REPLICAS=1
- REDIS_PORT_NUMBER=7005
- REDIS_CLUSTER_ANNOUNCE_PORT=7005
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17005
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1
- REDIS_CLUSTER_DYNAMIC_IPS=no
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005
- REDIS_CLUSTER_CREATOR=yes
4 changes: 2 additions & 2 deletions .github/workflows/fuzzing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
- run: cargo install cargo-fuzz
- name: download corpus data
run: |
cd crates/${{ matrix.crate }}
cd crates/${{ matrix.crate }}/fuzz
wget https://github.com/Totodore/socketioxide-fuzzing-corpus/archive/refs/tags/v$TEST_DATA_VERSION.zip
unzip v$TEST_DATA_VERSION.zip
rm v$TEST_DATA_VERSION.zip
Expand Down Expand Up @@ -69,5 +69,5 @@ jobs:
- run: cargo install cargo-fuzz
- name: cargo fuzz run decode_packet
run: |
cd crates/${{ matrix.crate }}
cd crates/${{ matrix.crate }}/fuzz
cargo fuzz run ${{ matrix.target }} -- -timeout=5 -max_len=2048 -runs=2000000 -only_ascii=1
64 changes: 58 additions & 6 deletions .github/workflows/github-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ on:
branches:
- main
pull_request:
branches:
- main

jobs:
format:
Expand Down Expand Up @@ -58,10 +56,10 @@ jobs:
target/
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}-nightly
- name: Check unused dependencies on default features
run: cargo udeps --workspace
run: RUSTFLAGS="--cfg fuzzing" cargo udeps

- name: Check unused dependencies on all features
run: cargo udeps --all-features --workspace
run: RUSTFLAGS="--cfg fuzzing" cargo udeps --all-features

msrv:
runs-on: ubuntu-latest
Expand All @@ -85,7 +83,7 @@ jobs:
components: rustfmt, clippy

- name: check crates
run: cargo check -p socketioxide -p engineioxide --all-features
run: cargo check --all-features

feature_set:
runs-on: ubuntu-latest
Expand All @@ -110,7 +108,7 @@ jobs:
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}

- name: check --feature-powerset
run: cargo hack check --feature-powerset --no-dev-deps -p socketioxide -p engineioxide
run: cargo hack check --feature-powerset --no-dev-deps -p socketioxide -p engineioxide -p socketioxide-redis

examples:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -271,3 +269,57 @@ jobs:
- name: Client output
if: always()
run: cat client.txt
adapter:
runs-on: ubuntu-latest
needs: [socket_io, engine_io]
strategy:
matrix:
socketio-version: [v4, v4-msgpack, v5, v5-msgpack]
adapter: [fred-e2e, redis-e2e, redis-cluster-e2e, fred-cluster-e2e]
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@master
with:
toolchain: stable
- uses: actions/cache@v4
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
key: ${{ runner.os }}-cargo-adapter
- uses: actions/setup-node@v4
with:
node-version: 22
- name: install adapter infra
uses: hoverkraft-tech/[email protected]
with:
compose-file: ./.github/workflows/adapter-ci/docker-compose.yml
- run: cd e2e/adapter && npm install && npm install ts-node --location=global
- name: Install deps & run tests
run: |
PARSER=$(echo ${{ matrix.socketio-version }} | cut -d'-' -f2 -s)
VERSION=$(echo ${{ matrix.socketio-version }} | cut -d'-' -f1)
cargo build -p adapter-e2e --bin ${{ matrix.adapter }} --features $VERSION,$PARSER
cd e2e/adapter && CMD="cargo run -p adapter-e2e --bin ${{ matrix.adapter }} --features $VERSION,$PARSER" ts-node client.ts
- name: Server output
if: always()
run: cat e2e/adapter/*.log
all_passed:
runs-on: ubuntu-latest
needs:
[
adapter,
feature_set,
format,
udeps,
msrv,
examples,
doctest,
rust-clippy-analyze,
]
steps:
- name: All passed
run: echo "All tests passed"
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ hyper-util.version = "0.1"
hyper = "1.5"
pin-project-lite = "0.2"
matchit = "0.8"
rmp-serde = "1.3"
rmp = "0.8"
rustversion = "1"

# Dev deps
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/engineioxide/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "engineioxide"
description = "Engine IO server implementation in rust as a Tower Service."
version.workspace = true
version = "0.15.1"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions crates/engineioxide/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ impl EngineIoHandler for MyHandler {
let cnt = self.user_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
socket.emit(cnt.to_string()).ok();
}
fn on_message(&self, msg: Str, socket: Arc<Socket<SocketState>>) {
fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<SocketState>>) {
*socket.data.id.lock().unwrap() = msg.into(); // bind a provided user id to a socket
}
fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<SocketState>>) { }
}

// Create a new engineio layer
Expand Down
8 changes: 4 additions & 4 deletions crates/engineioxide/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
//! type Data = ();
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//!
//! let config = EngineIoConfig::builder()
Expand Down Expand Up @@ -150,12 +150,12 @@ impl EngineIoConfigBuilder {
/// println!("socket disconnect {}", socket.id);
/// }
///
/// fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) {
/// fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) {
/// println!("Ping pong message {:?}", msg);
/// socket.emit(msg).unwrap();
/// }
///
/// fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) {
/// fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) {
/// println!("Ping pong binary message {:?}", data);
/// socket.emit_binary(data).unwrap();
/// }
Expand Down
4 changes: 2 additions & 2 deletions crates/engineioxide/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ mod tests {
println!("socket disconnect {} {:?}", socket.id, reason);
}

fn on_message(&self, msg: Str, socket: Arc<Socket<Self::Data>>) {
fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<Self::Data>>) {
println!("Ping pong message {:?}", msg);
socket.emit(msg).ok();
}

fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>) {
fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<Self::Data>>) {
println!("Ping pong binary message {:?}", data);
socket.emit_binary(data).ok();
}
Expand Down
8 changes: 4 additions & 4 deletions crates/engineioxide/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
//! let cnt = self.user_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
//! socket.emit(cnt.to_string()).ok();
//! }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<SocketState>>) {
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<SocketState>>) {
//! *socket.data.id.lock().unwrap() = msg.into(); // bind a provided user id to a socket
//! }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! }
//!
//! // Create an engine io service with the given handler
Expand All @@ -60,8 +60,8 @@ pub trait EngineIoHandler: std::fmt::Debug + Send + Sync + 'static {
fn on_disconnect(&self, socket: Arc<Socket<Self::Data>>, reason: DisconnectReason);

/// Called when a message is received from the client.
fn on_message(&self, msg: Str, socket: Arc<Socket<Self::Data>>);
fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<Self::Data>>);

/// Called when a binary message is received from the client.
fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>);
fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<Self::Data>>);
}
4 changes: 2 additions & 2 deletions crates/engineioxide/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
//! type Data = ();
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//! // Create a new engineio layer
//! let layer = EngineIoLayer::new(Arc::new(MyHandler));
Expand Down
4 changes: 2 additions & 2 deletions crates/engineioxide/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
//! type Data = ();
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//!
//! // Create a new engine.io service that will return a 404 not found response for other requests
Expand Down
6 changes: 3 additions & 3 deletions crates/engineioxide/src/sid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ pub struct Sid([u8; 16]);
impl Sid {
/// A zeroed session id
pub const ZERO: Self = Self([0u8; 16]);
/// Generate a new random session id (base64 10 chars)
/// Generate a new random session id (base64 16 chars)
pub fn new() -> Self {
Self::default()
}

/// Get the session id as a base64 10 chars string
pub fn as_str(&self) -> &str {
/// Get the session id as a base64 16 chars string
pub const fn as_str(&self) -> &str {
// SAFETY: SID is always a base64 chars string
unsafe { std::str::from_utf8_unchecked(&self.0) }
}
Expand Down
4 changes: 2 additions & 2 deletions crates/engineioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@
//! fn on_disconnect(&self, socket: Arc<Socket<SocketState>>, reason: DisconnectReason) {
//! let cnt = self.user_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
//! }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<SocketState>>) {
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<SocketState>>) {
//! *socket.data.id.lock().unwrap() = msg.into(); // bind a provided user id to a socket
//! }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! }
//!
//! let svc = EngineIoService::new(Arc::new(MyHandler::default()));
Expand Down
5 changes: 5 additions & 0 deletions crates/engineioxide/src/str.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ impl From<Str> for String {
unsafe { String::from_utf8_unchecked(vec) }
}
}
impl From<Str> for Vec<u8> {
fn from(value: Str) -> Self {
Vec::from(value.0)
}
}
impl Serialize for Str {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
Expand Down
Loading

0 comments on commit 4ce3966

Please sign in to comment.