diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 083b5bb9..130e5c50 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -4,7 +4,7 @@ on: workflow_dispatch: inputs: heaptrack: - description: 'Run heaptrack memory benchmark' + description: "Run heaptrack memory benchmark" required: true default: false type: boolean @@ -18,6 +18,9 @@ jobs: - uses: dtolnay/rust-toolchain@master with: toolchain: stable + - uses: actions/setup-node@v4 + with: + node-version: 22 - uses: actions/cache@v4 with: path: | @@ -30,17 +33,22 @@ jobs: - name: Install heaptrack run: sudo apt-get install -y heaptrack - name: Build server && client - run: cargo build -r -p heaptrack && cargo build -r -p heaptrack --bin heaptrack-client + run: cargo build -r -p heaptrack && npm i --prefix e2e/heaptrack - name: Run memory benchmark - run: heaptrack target/release/heaptrack > server.txt & ./target/release/heaptrack-client > client.txt - - name: Server output - if: always() - run: cat server.txt - - name: Client output - if: always() - run: cat client.txt + run: | + ulimit -n 20000 + RUST_LOG="socketioxide=trace,engineioxide=trace,info" heaptrack target/release/heaptrack > server.txt & + cd e2e/heaptrack + node --experimental-strip-types client.ts > ../../client.txt & + sleep 60 + kill $! + - name: Publish logs + uses: actions/upload-artifact@v4 + with: + name: logs-${{ github.head_ref }}.${{ github.sha }} + path: *.txt - name: Publish memory benchmark uses: actions/upload-artifact@v4 with: name: heaptrack-${{ github.head_ref }}.${{ github.sha }} - path: heaptrack.heaptrack.* \ No newline at end of file + path: heaptrack.heaptrack.* diff --git a/crates/engineioxide/src/config.rs b/crates/engineioxide/src/config.rs index 8a9eaeea..3eeb1fbc 100644 --- a/crates/engineioxide/src/config.rs +++ b/crates/engineioxide/src/config.rs @@ -57,9 +57,16 @@ pub struct EngineIoConfig { pub max_buffer_size: usize, /// The maximum number of bytes that can be received per http request. - /// Defaults to 100kb. + /// Defaults to 100KB. pub max_payload: u64, + /// The size of the read buffer for the websocket transport. + /// You can tweak this value depending on your use case. By default it is set to 4KiB. + /// + /// Setting it to a higher value will improve performance on heavy read scenarios + /// but will consume more memory. + pub ws_read_buffer_size: usize, + /// Allowed transports on this server /// It is represented as a bitfield to allow to combine any number of transports easily pub transports: u8, @@ -73,6 +80,7 @@ impl Default for EngineIoConfig { ping_timeout: Duration::from_millis(20000), max_buffer_size: 128, max_payload: 1e5 as u64, // 100kb + ws_read_buffer_size: 4096, transports: TransportType::Polling as u8 | TransportType::Websocket as u8, } } @@ -173,6 +181,16 @@ impl EngineIoConfigBuilder { self } + /// The size of the read buffer for the websocket transport. + /// You can tweak this value depending on your use case. Defaults to 4KiB. + /// + /// Setting it to a higher value will improve performance on heavy read scenarios + /// but will consume more memory. + pub fn ws_read_buffer_size(mut self, ws_read_buffer_size: usize) -> Self { + self.config.ws_read_buffer_size = ws_read_buffer_size; + self + } + /// Allowed transports on this server /// /// The `transports` array should have a size of 1 or 2 diff --git a/crates/engineioxide/src/transport/ws.rs b/crates/engineioxide/src/transport/ws.rs index a7acdcf7..0a86df6e 100644 --- a/crates/engineioxide/src/transport/ws.rs +++ b/crates/engineioxide/src/transport/ws.rs @@ -16,7 +16,11 @@ use tokio::{ task::JoinHandle, }; use tokio_tungstenite::{ - tungstenite::{handshake::derive_accept_key, protocol::Role, Message}, + tungstenite::{ + handshake::derive_accept_key, + protocol::{Role, WebSocketConfig}, + Message, + }, WebSocketStream, }; @@ -110,7 +114,8 @@ pub async fn on_init( where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - let ws_init = move || WebSocketStream::from_raw_socket(conn, Role::Server, None); + let ws_config = WebSocketConfig::default().read_buffer_size(engine.config.ws_read_buffer_size); + let ws_init = move || WebSocketStream::from_raw_socket(conn, Role::Server, Some(ws_config)); let (socket, ws) = if let Some(sid) = sid { match engine.get_socket(sid) { None => return Err(Error::UnknownSessionID(sid)), diff --git a/crates/parser-common/src/lib.rs b/crates/parser-common/src/lib.rs index 7fd281df..4331eecd 100644 --- a/crates/parser-common/src/lib.rs +++ b/crates/parser-common/src/lib.rs @@ -91,7 +91,9 @@ impl Parse for CommonParser { .. }) => { let binaries = binaries.get_or_insert(VecDeque::new()); - binaries.push_back(data); + // We copy the data to avoid holding a ref to the engine.io + // websocket buffer too long. + binaries.push_back(Bytes::copy_from_slice(&data)); if state.incoming_binary_cnt.load(Ordering::Relaxed) > binaries.len() { Err(ParseError::NeedsMoreBinaryData) } else { diff --git a/crates/socketioxide/src/client.rs b/crates/socketioxide/src/client.rs index 504328aa..b389ac7e 100644 --- a/crates/socketioxide/src/client.rs +++ b/crates/socketioxide/src/client.rs @@ -61,7 +61,7 @@ impl Client { fn sock_connect( self: &Arc, auth: Option, - ns_path: Str, + ns_path: &str, esocket: &Arc>>, ) { #[cfg(feature = "tracing")] @@ -77,12 +77,10 @@ impl Client { } }; - if let Some(ns) = self.get_ns(&ns_path) { + if let Some(ns) = self.get_ns(ns_path) { tokio::spawn(connect(ns, esocket.clone())); - } else if let Ok(Match { value: ns_ctr, .. }) = self.router.read().unwrap().at(&ns_path) { - // We have to create a new `Str` otherwise, we would keep a ref to the original connect packet - // for the entire lifetime of the Namespace. - let path = Str::copy_from_slice(&ns_path); + } else if let Ok(Match { value: ns_ctr, .. }) = self.router.read().unwrap().at(ns_path) { + let path = Str::copy_from_slice(ns_path); let ns = ns_ctr.get_new_ns(path.clone(), &self.adapter_state, &self.config); let this = self.clone(); let esocket = esocket.clone(); @@ -100,9 +98,10 @@ impl Client { ); esocket.close(EIoDisconnectReason::TransportClose); } else { + let path = Str::copy_from_slice(ns_path); let packet = self .parser() - .encode(Packet::connect_error(ns_path, "Invalid namespace")); + .encode(Packet::connect_error(path, "Invalid namespace")); let _ = match packet { Value::Str(p, _) => esocket.emit(p).map_err(|_e| { #[cfg(feature = "tracing")] @@ -257,7 +256,7 @@ impl EngineIoHandler for Client { ProtocolVersion::V4 => { #[cfg(feature = "tracing")] tracing::debug!("connecting to default namespace for v4"); - self.sock_connect(None, Str::from("/"), &socket); + self.sock_connect(None, "/", &socket); } ProtocolVersion::V5 => self.spawn_connect_timeout_task(socket), } @@ -302,7 +301,7 @@ impl EngineIoHandler for Client { let res: Result<(), Error> = match packet.inner { PacketData::Connect(auth) => { - self.sock_connect(auth, packet.ns, &socket); + self.sock_connect(auth, &packet.ns, &socket); Ok(()) } _ => self.sock_propagate_packet(packet, socket.id), @@ -339,7 +338,7 @@ impl EngineIoHandler for Client { let res: Result<(), Error> = match packet.inner { PacketData::Connect(auth) => { - self.sock_connect(auth, packet.ns, &socket); + self.sock_connect(auth, &packet.ns, &socket); Ok(()) } _ => self.sock_propagate_packet(packet, socket.id), diff --git a/crates/socketioxide/src/io.rs b/crates/socketioxide/src/io.rs index ae243072..e6cd4a8f 100644 --- a/crates/socketioxide/src/io.rs +++ b/crates/socketioxide/src/io.rs @@ -158,6 +158,19 @@ impl SocketIoBuilder { self } + /// The size of the read buffer for the websocket transport. + /// You can tweak this value depending on your use case. Defaults to 4KiB. + /// + /// Setting it to a higher value will improve performance on heavy read scenarios + /// but will consume more memory. + #[inline] + pub fn ws_read_buffer_size(mut self, ws_read_buffer_size: usize) -> Self { + self.engine_config_builder = self + .engine_config_builder + .ws_read_buffer_size(ws_read_buffer_size); + self + } + /// Allowed transports on this server /// /// The `transports` array should have a size of 1 or 2 diff --git a/e2e/heaptrack/.gitignore b/e2e/heaptrack/.gitignore index 2bbb807b..4fc1be9b 100644 --- a/e2e/heaptrack/.gitignore +++ b/e2e/heaptrack/.gitignore @@ -1,3 +1,3 @@ *.gz -client/node_modules -memory_usage.svg \ No newline at end of file +node_modules +memory_usage.svg diff --git a/e2e/heaptrack/Cargo.toml b/e2e/heaptrack/Cargo.toml index 1011ee1e..692988c4 100644 --- a/e2e/heaptrack/Cargo.toml +++ b/e2e/heaptrack/Cargo.toml @@ -6,14 +6,10 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -socketioxide = { path = "../../crates/socketioxide" } +socketioxide = { path = "../../crates/socketioxide", features = ["tracing"] } hyper = { workspace = true, features = ["server", "http1"] } hyper-util = { workspace = true, features = ["tokio"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] } -rust_socketio = { version = "0.6.0", features = ["async"] } -serde_json = "1.0.68" +tracing-subscriber.workspace = true +serde_json.workspace = true rand = "0.8.4" - -[[bin]] -name = "heaptrack-client" -path = "src/client.rs" diff --git a/e2e/heaptrack/client.ts b/e2e/heaptrack/client.ts new file mode 100644 index 00000000..3c1517c7 --- /dev/null +++ b/e2e/heaptrack/client.ts @@ -0,0 +1,56 @@ +import { io } from "socket.io-client"; + +const URL = process.env.URL || "http://localhost:3000"; +const MAX_CLIENTS = 5000; +const POLLING_PERCENTAGE = 0.05; +const CLIENT_CREATION_INTERVAL_IN_MS = 1; +const EMIT_INTERVAL_IN_MS = 1000; + +let clientCount = 0; +let lastReport = new Date().getTime(); +let packetsSinceLastReport = 0; + +const createClient = () => { + // for demonstration purposes, some clients stay stuck in HTTP long-polling + const transports = + Math.random() < POLLING_PERCENTAGE ? ["polling"] : ["polling", "websocket"]; + + const socket = io(URL, { + transports, + }); + + setInterval(() => { + socket.emit("ping"); + }, EMIT_INTERVAL_IN_MS); + + socket.on("pong", () => { + packetsSinceLastReport++; + }); + + socket.on("disconnect", (reason) => { + console.log(`disconnect due to ${reason}`); + }); + + if (++clientCount < MAX_CLIENTS) { + setTimeout(createClient, CLIENT_CREATION_INTERVAL_IN_MS); + } +}; + +createClient(); + +const printReport = () => { + const now = new Date().getTime(); + const durationSinceLastReport = (now - lastReport) / 1000; + const packetsPerSeconds = ( + packetsSinceLastReport / durationSinceLastReport + ).toFixed(2); + + console.log( + `client count: ${clientCount} ; average packets received per second: ${packetsPerSeconds}`, + ); + + packetsSinceLastReport = 0; + lastReport = now; +}; + +setInterval(printReport, 5000); diff --git a/e2e/heaptrack/package-lock.json b/e2e/heaptrack/package-lock.json new file mode 100644 index 00000000..95a19ec4 --- /dev/null +++ b/e2e/heaptrack/package-lock.json @@ -0,0 +1,141 @@ +{ + "name": "heaptrack-bench", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "heaptrack-bench", + "dependencies": { + "socket.io-client": "^4.8.1" + }, + "devDependencies": { + "@types/node": "^22.10.10" + } + }, + "node_modules/@socket.io/component-emitter": { + "version": "3.1.2", + "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.2.tgz", + "integrity": "sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==", + "license": "MIT" + }, + "node_modules/@types/node": { + "version": "22.10.10", + "resolved": "https://registry.npmjs.org/@types/node/-/node-22.10.10.tgz", + "integrity": "sha512-X47y/mPNzxviAGY5TcYPtYL8JsY3kAq2n8fMmKoRCxq/c4v4pyGNCzM2R6+M5/umG4ZfHuT+sgqDYqWc9rJ6ww==", + "dev": true, + "license": "MIT", + "dependencies": { + "undici-types": "~6.20.0" + } + }, + "node_modules/debug": { + "version": "4.3.7", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz", + "integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==", + "license": "MIT", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/engine.io-client": { + "version": "6.6.3", + "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.6.3.tgz", + "integrity": "sha512-T0iLjnyNWahNyv/lcjS2y4oE358tVS/SYQNxYXGAJ9/GLgH4VCvOQ/mhTjqU88mLZCQgiG8RIegFHYCdVC+j5w==", + "license": "MIT", + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.17.1", + "xmlhttprequest-ssl": "~2.1.1" + } + }, + "node_modules/engine.io-parser": { + "version": "5.2.3", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.3.tgz", + "integrity": "sha512-HqD3yTBfnBxIrbnM1DoD6Pcq8NECnh8d4As1Qgh0z5Gg3jRRIqijury0CL3ghu/edArpUYiYqQiDUQBIs4np3Q==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + } + }, + "node_modules/ms": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", + "license": "MIT" + }, + "node_modules/socket.io-client": { + "version": "4.8.1", + "resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.8.1.tgz", + "integrity": "sha512-hJVXfu3E28NmzGk8o1sHhN3om52tRvwYeidbj7xKy2eIIse5IoKX3USlS6Tqt3BHAtflLIkCQBkzVrEEfWUyYQ==", + "license": "MIT", + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.2", + "engine.io-client": "~6.6.1", + "socket.io-parser": "~4.2.4" + }, + "engines": { + "node": ">=10.0.0" + } + }, + "node_modules/socket.io-parser": { + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", + "integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==", + "license": "MIT", + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1" + }, + "engines": { + "node": ">=10.0.0" + } + }, + "node_modules/undici-types": { + "version": "6.20.0", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.20.0.tgz", + "integrity": "sha512-Ny6QZ2Nju20vw1SRHe3d9jVu6gJ+4e3+MMpqu7pqE5HT6WsTSlce++GQmK5UXS8mzV8DSYHrQH+Xrf2jVcuKNg==", + "dev": true, + "license": "MIT" + }, + "node_modules/ws": { + "version": "8.17.1", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz", + "integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, + "node_modules/xmlhttprequest-ssl": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.1.2.tgz", + "integrity": "sha512-TEU+nJVUUnA4CYJFLvK5X9AOeH4KvDvhIfm0vV1GaQRtchnG0hgK5p8hw/xjv8cunWYCsiPCSDzObPyhEwq3KQ==", + "engines": { + "node": ">=0.4.0" + } + } + } +} diff --git a/e2e/heaptrack/package.json b/e2e/heaptrack/package.json new file mode 100644 index 00000000..cb2e7a50 --- /dev/null +++ b/e2e/heaptrack/package.json @@ -0,0 +1,10 @@ +{ + "name": "heaptrack-bench", + "type": "module", + "dependencies": { + "socket.io-client": "^4.8.1" + }, + "devDependencies": { + "@types/node": "^22.10.10" + } +} diff --git a/e2e/heaptrack/src/client.rs b/e2e/heaptrack/src/client.rs deleted file mode 100644 index 126d17c4..00000000 --- a/e2e/heaptrack/src/client.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::{pin::Pin, time::Duration}; - -use rust_socketio::{ - asynchronous::{Client, ClientBuilder}, - Payload, -}; - -const PING_INTERVAL: Duration = Duration::from_millis(1000); -const POLLING_PERCENTAGE: f32 = 0.05; -const MAX_CLIENT: usize = 200; - -fn cb(_: Payload, socket: Client) -> Pin + Send>> { - Box::pin(async move { - tokio::spawn(async move { - let mut inter = tokio::time::interval(PING_INTERVAL); - loop { - inter.tick().await; - let _ = socket.emit("ping", serde_json::Value::Null).await; - let _ = socket.emit("ping", (0..u8::MAX).collect::>()).await; - } - }); - }) -} -#[tokio::main] -async fn main() -> Result<(), Box> { - tokio::spawn(async move { - for _ in 0..MAX_CLIENT { - let random: f32 = rand::random(); - let transport_type = if POLLING_PERCENTAGE > random { - rust_socketio::TransportType::Polling - } else { - rust_socketio::TransportType::WebsocketUpgrade - }; - // get a socket that is connected to the admin namespace - ClientBuilder::new("http://localhost:3000/") - .transport_type(transport_type) - .namespace("/") - .on("open", cb) - .on("error", |err, _| { - Box::pin(async move { eprintln!("Error: {:#?}", err) }) - }) - .connect() - .await - .expect("Connection failed"); - } - }); - tokio::time::sleep(Duration::from_secs(60)).await; - - Ok(()) -} diff --git a/e2e/heaptrack/src/main.rs b/e2e/heaptrack/src/main.rs index 2fcf2d08..679313d1 100644 --- a/e2e/heaptrack/src/main.rs +++ b/e2e/heaptrack/src/main.rs @@ -12,6 +12,7 @@ fn on_connect(socket: SocketRef) { #[tokio::main] async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); let (svc, io) = SocketIo::new_svc(); io.ns("/", on_connect);