Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): async adapter #419

Merged
merged 15 commits into from
Jan 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions .github/workflows/adapter-ci/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
name: adapter-setup
services:
valkey:
image: valkey/valkey
network_mode: host
healthcheck:
test: "valkey-cli ping"
interval: 2s
timeout: 5s
redis-node-0:
image: docker.io/bitnami/redis-cluster:7.0
network_mode: host
healthcheck:
test: "redis-cli ping"
interval: 2s
timeout: 5s
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_PORT_NUMBER=7000
- REDIS_CLUSTER_ANNOUNCE_PORT=7000
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1 # host ip address
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17000
- REDIS_CLUSTER_DYNAMIC_IPS=no
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005

redis-node-1:
image: docker.io/bitnami/redis-cluster:7.0
network_mode: host
healthcheck:
test: "redis-cli ping"
interval: 2s
timeout: 5s
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_PORT_NUMBER=7001
- REDIS_CLUSTER_ANNOUNCE_PORT=7001
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17001
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1
- REDIS_CLUSTER_DYNAMIC_IPS=no
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005

redis-node-2:
image: docker.io/bitnami/redis-cluster:7.0
network_mode: host
healthcheck:
test: "redis-cli ping"
interval: 2s
timeout: 5s
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_PORT_NUMBER=7002
- REDIS_CLUSTER_ANNOUNCE_PORT=7002
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17002
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1
- REDIS_CLUSTER_DYNAMIC_IPS=no
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005

redis-node-3:
image: docker.io/bitnami/redis-cluster:7.0
network_mode: host
healthcheck:
test: "redis-cli ping"
interval: 2s
timeout: 5s
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_PORT_NUMBER=7003
- REDIS_CLUSTER_ANNOUNCE_PORT=7003
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17003
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1
- REDIS_CLUSTER_DYNAMIC_IPS=no
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005

redis-node-4:
image: docker.io/bitnami/redis-cluster:7.0
network_mode: host
healthcheck:
test: "redis-cli ping"
interval: 2s
timeout: 5s
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_PORT_NUMBER=7004
- REDIS_CLUSTER_ANNOUNCE_PORT=7004
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17004
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1
- REDIS_CLUSTER_DYNAMIC_IPS=no
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005

redis-node-5:
image: docker.io/bitnami/redis-cluster:7.0
network_mode: host
healthcheck:
test: "redis-cli ping"
interval: 2s
timeout: 5s
depends_on:
- redis-node-0
- redis-node-1
- redis-node-2
- redis-node-3
- redis-node-4
environment:
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_CLUSTER_REPLICAS=1
- REDIS_PORT_NUMBER=7005
- REDIS_CLUSTER_ANNOUNCE_PORT=7005
- REDIS_CLUSTER_ANNOUNCE_BUS_PORT=17005
- REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1
- REDIS_CLUSTER_DYNAMIC_IPS=no
- REDIS_NODES=127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005
- REDIS_CLUSTER_CREATOR=yes
4 changes: 2 additions & 2 deletions .github/workflows/fuzzing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
- run: cargo install cargo-fuzz
- name: download corpus data
run: |
cd crates/${{ matrix.crate }}
cd crates/${{ matrix.crate }}/fuzz
wget https://github.com/Totodore/socketioxide-fuzzing-corpus/archive/refs/tags/v$TEST_DATA_VERSION.zip
unzip v$TEST_DATA_VERSION.zip
rm v$TEST_DATA_VERSION.zip
Expand Down Expand Up @@ -69,5 +69,5 @@ jobs:
- run: cargo install cargo-fuzz
- name: cargo fuzz run decode_packet
run: |
cd crates/${{ matrix.crate }}
cd crates/${{ matrix.crate }}/fuzz
cargo fuzz run ${{ matrix.target }} -- -timeout=5 -max_len=2048 -runs=2000000 -only_ascii=1
64 changes: 58 additions & 6 deletions .github/workflows/github-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ on:
branches:
- main
pull_request:
branches:
- main

jobs:
format:
Expand Down Expand Up @@ -58,10 +56,10 @@ jobs:
target/
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}-nightly
- name: Check unused dependencies on default features
run: cargo udeps --workspace
run: RUSTFLAGS="--cfg fuzzing" cargo udeps

- name: Check unused dependencies on all features
run: cargo udeps --all-features --workspace
run: RUSTFLAGS="--cfg fuzzing" cargo udeps --all-features

msrv:
runs-on: ubuntu-latest
Expand All @@ -85,7 +83,7 @@ jobs:
components: rustfmt, clippy

- name: check crates
run: cargo check -p socketioxide -p engineioxide --all-features
run: cargo check --all-features

feature_set:
runs-on: ubuntu-latest
Expand All @@ -110,7 +108,7 @@ jobs:
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}

- name: check --feature-powerset
run: cargo hack check --feature-powerset --no-dev-deps -p socketioxide -p engineioxide
run: cargo hack check --feature-powerset --no-dev-deps -p socketioxide -p engineioxide -p socketioxide-redis

examples:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -271,3 +269,57 @@ jobs:
- name: Client output
if: always()
run: cat client.txt
adapter:
runs-on: ubuntu-latest
needs: [socket_io, engine_io]
strategy:
matrix:
socketio-version: [v4, v4-msgpack, v5, v5-msgpack]
adapter: [fred-e2e, redis-e2e, redis-cluster-e2e, fred-cluster-e2e]
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@master
with:
toolchain: stable
- uses: actions/cache@v4
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
key: ${{ runner.os }}-cargo-adapter
- uses: actions/setup-node@v4
with:
node-version: 22
- name: install adapter infra
uses: hoverkraft-tech/[email protected]
with:
compose-file: ./.github/workflows/adapter-ci/docker-compose.yml
- run: cd e2e/adapter && npm install && npm install ts-node --location=global
- name: Install deps & run tests
run: |
PARSER=$(echo ${{ matrix.socketio-version }} | cut -d'-' -f2 -s)
VERSION=$(echo ${{ matrix.socketio-version }} | cut -d'-' -f1)
cargo build -p adapter-e2e --bin ${{ matrix.adapter }} --features $VERSION,$PARSER
cd e2e/adapter && CMD="cargo run -p adapter-e2e --bin ${{ matrix.adapter }} --features $VERSION,$PARSER" ts-node client.ts
- name: Server output
if: always()
run: cat e2e/adapter/*.log
all_passed:
runs-on: ubuntu-latest
needs:
[
adapter,
feature_set,
format,
udeps,
msrv,
examples,
doctest,
rust-clippy-analyze,
]
steps:
- name: All passed
run: echo "All tests passed"
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ hyper-util.version = "0.1"
hyper = "1.5"
pin-project-lite = "0.2"
matchit = "0.8"
rmp-serde = "1.3"
rmp = "0.8"
rustversion = "1"

# Dev deps
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/engineioxide/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "engineioxide"
description = "Engine IO server implementation in rust as a Tower Service."
version.workspace = true
version = "0.15.1"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions crates/engineioxide/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ impl EngineIoHandler for MyHandler {
let cnt = self.user_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
socket.emit(cnt.to_string()).ok();
}
fn on_message(&self, msg: Str, socket: Arc<Socket<SocketState>>) {
fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<SocketState>>) {
*socket.data.id.lock().unwrap() = msg.into(); // bind a provided user id to a socket
}
fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<SocketState>>) { }
}

// Create a new engineio layer
Expand Down
8 changes: 4 additions & 4 deletions crates/engineioxide/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
//! type Data = ();
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//!
//! let config = EngineIoConfig::builder()
Expand Down Expand Up @@ -150,12 +150,12 @@ impl EngineIoConfigBuilder {
/// println!("socket disconnect {}", socket.id);
/// }
///
/// fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) {
/// fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) {
/// println!("Ping pong message {:?}", msg);
/// socket.emit(msg).unwrap();
/// }
///
/// fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) {
/// fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) {
/// println!("Ping pong binary message {:?}", data);
/// socket.emit_binary(data).unwrap();
/// }
Expand Down
4 changes: 2 additions & 2 deletions crates/engineioxide/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ mod tests {
println!("socket disconnect {} {:?}", socket.id, reason);
}

fn on_message(&self, msg: Str, socket: Arc<Socket<Self::Data>>) {
fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<Self::Data>>) {
println!("Ping pong message {:?}", msg);
socket.emit(msg).ok();
}

fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>) {
fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<Self::Data>>) {
println!("Ping pong binary message {:?}", data);
socket.emit_binary(data).ok();
}
Expand Down
8 changes: 4 additions & 4 deletions crates/engineioxide/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
//! let cnt = self.user_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
//! socket.emit(cnt.to_string()).ok();
//! }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<SocketState>>) {
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<SocketState>>) {
//! *socket.data.id.lock().unwrap() = msg.into(); // bind a provided user id to a socket
//! }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! }
//!
//! // Create an engine io service with the given handler
Expand All @@ -60,8 +60,8 @@ pub trait EngineIoHandler: std::fmt::Debug + Send + Sync + 'static {
fn on_disconnect(&self, socket: Arc<Socket<Self::Data>>, reason: DisconnectReason);

/// Called when a message is received from the client.
fn on_message(&self, msg: Str, socket: Arc<Socket<Self::Data>>);
fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<Self::Data>>);

/// Called when a binary message is received from the client.
fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>);
fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<Self::Data>>);
}
4 changes: 2 additions & 2 deletions crates/engineioxide/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
//! type Data = ();
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//! // Create a new engineio layer
//! let layer = EngineIoLayer::new(Arc::new(MyHandler));
Expand Down
4 changes: 2 additions & 2 deletions crates/engineioxide/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
//! type Data = ();
//! fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) { }
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//!
//! // Create a new engine.io service that will return a 404 not found response for other requests
Expand Down
6 changes: 3 additions & 3 deletions crates/engineioxide/src/sid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ pub struct Sid([u8; 16]);
impl Sid {
/// A zeroed session id
pub const ZERO: Self = Self([0u8; 16]);
/// Generate a new random session id (base64 10 chars)
/// Generate a new random session id (base64 16 chars)
pub fn new() -> Self {
Self::default()
}

/// Get the session id as a base64 10 chars string
pub fn as_str(&self) -> &str {
/// Get the session id as a base64 16 chars string
pub const fn as_str(&self) -> &str {
// SAFETY: SID is always a base64 chars string
unsafe { std::str::from_utf8_unchecked(&self.0) }
}
Expand Down
4 changes: 2 additions & 2 deletions crates/engineioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@
//! fn on_disconnect(&self, socket: Arc<Socket<SocketState>>, reason: DisconnectReason) {
//! let cnt = self.user_cnt.fetch_sub(1, Ordering::Relaxed) - 1;
//! }
//! fn on_message(&self, msg: Str, socket: Arc<Socket<SocketState>>) {
//! fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<SocketState>>) {
//! *socket.data.id.lock().unwrap() = msg.into(); // bind a provided user id to a socket
//! }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! }
//!
//! let svc = EngineIoService::new(Arc::new(MyHandler::default()));
Expand Down
5 changes: 5 additions & 0 deletions crates/engineioxide/src/str.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ impl From<Str> for String {
unsafe { String::from_utf8_unchecked(vec) }
}
}
impl From<Str> for Vec<u8> {
fn from(value: Str) -> Self {
Vec::from(value.0)
}
}
impl Serialize for Str {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
Expand Down
Loading
Loading