Skip to content

Commit

Permalink
fix(engineio/ws): add read_buffer_size option and set default to 4KiB (
Browse files Browse the repository at this point in the history
…#450)

* fix(engineio/ws): add read_buffer_size option and set default to 4KiB

* fix(socketio/client): make sure that the ns packet slice is not kept at ns connection

* fix(parser/common): copy buffer for partial bin packets

* test(e2e/heaptrack) set max client to 10K
  • Loading branch information
Totodore authored Jan 28, 2025
1 parent 1dc49e5 commit fe40809
Show file tree
Hide file tree
Showing 13 changed files with 282 additions and 83 deletions.
28 changes: 18 additions & 10 deletions .github/workflows/bench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
workflow_dispatch:
inputs:
heaptrack:
description: 'Run heaptrack memory benchmark'
description: "Run heaptrack memory benchmark"
required: true
default: false
type: boolean
Expand All @@ -18,6 +18,9 @@ jobs:
- uses: dtolnay/rust-toolchain@master
with:
toolchain: stable
- uses: actions/setup-node@v4
with:
node-version: 22
- uses: actions/cache@v4
with:
path: |
Expand All @@ -30,17 +33,22 @@ jobs:
- name: Install heaptrack
run: sudo apt-get install -y heaptrack
- name: Build server && client
run: cargo build -r -p heaptrack && cargo build -r -p heaptrack --bin heaptrack-client
run: cargo build -r -p heaptrack && npm i --prefix e2e/heaptrack
- name: Run memory benchmark
run: heaptrack target/release/heaptrack > server.txt & ./target/release/heaptrack-client > client.txt
- name: Server output
if: always()
run: cat server.txt
- name: Client output
if: always()
run: cat client.txt
run: |
ulimit -n 20000
RUST_LOG="socketioxide=trace,engineioxide=trace,info" heaptrack target/release/heaptrack > server.txt &
cd e2e/heaptrack
node --experimental-strip-types client.ts > ../../client.txt &
sleep 60
kill $!
- name: Publish logs
uses: actions/upload-artifact@v4
with:
name: logs-${{ github.head_ref }}.${{ github.sha }}
path: *.txt
- name: Publish memory benchmark
uses: actions/upload-artifact@v4
with:
name: heaptrack-${{ github.head_ref }}.${{ github.sha }}
path: heaptrack.heaptrack.*
path: heaptrack.heaptrack.*
20 changes: 19 additions & 1 deletion crates/engineioxide/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,16 @@ pub struct EngineIoConfig {
pub max_buffer_size: usize,

/// The maximum number of bytes that can be received per http request.
/// Defaults to 100kb.
/// Defaults to 100KB.
pub max_payload: u64,

/// The size of the read buffer for the websocket transport.
/// You can tweak this value depending on your use case. By default it is set to 4KiB.
///
/// Setting it to a higher value will improve performance on heavy read scenarios
/// but will consume more memory.
pub ws_read_buffer_size: usize,

/// Allowed transports on this server
/// It is represented as a bitfield to allow to combine any number of transports easily
pub transports: u8,
Expand All @@ -73,6 +80,7 @@ impl Default for EngineIoConfig {
ping_timeout: Duration::from_millis(20000),
max_buffer_size: 128,
max_payload: 1e5 as u64, // 100kb
ws_read_buffer_size: 4096,
transports: TransportType::Polling as u8 | TransportType::Websocket as u8,
}
}
Expand Down Expand Up @@ -173,6 +181,16 @@ impl EngineIoConfigBuilder {
self
}

/// The size of the read buffer for the websocket transport.
/// You can tweak this value depending on your use case. Defaults to 4KiB.
///
/// Setting it to a higher value will improve performance on heavy read scenarios
/// but will consume more memory.
pub fn ws_read_buffer_size(mut self, ws_read_buffer_size: usize) -> Self {
self.config.ws_read_buffer_size = ws_read_buffer_size;
self
}

/// Allowed transports on this server
///
/// The `transports` array should have a size of 1 or 2
Expand Down
9 changes: 7 additions & 2 deletions crates/engineioxide/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ use tokio::{
task::JoinHandle,
};
use tokio_tungstenite::{
tungstenite::{handshake::derive_accept_key, protocol::Role, Message},
tungstenite::{
handshake::derive_accept_key,
protocol::{Role, WebSocketConfig},
Message,
},
WebSocketStream,
};

Expand Down Expand Up @@ -110,7 +114,8 @@ pub async fn on_init<H: EngineIoHandler, S>(
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let ws_init = move || WebSocketStream::from_raw_socket(conn, Role::Server, None);
let ws_config = WebSocketConfig::default().read_buffer_size(engine.config.ws_read_buffer_size);
let ws_init = move || WebSocketStream::from_raw_socket(conn, Role::Server, Some(ws_config));
let (socket, ws) = if let Some(sid) = sid {
match engine.get_socket(sid) {
None => return Err(Error::UnknownSessionID(sid)),
Expand Down
4 changes: 3 additions & 1 deletion crates/parser-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ impl Parse for CommonParser {
..
}) => {
let binaries = binaries.get_or_insert(VecDeque::new());
binaries.push_back(data);
// We copy the data to avoid holding a ref to the engine.io
// websocket buffer too long.
binaries.push_back(Bytes::copy_from_slice(&data));
if state.incoming_binary_cnt.load(Ordering::Relaxed) > binaries.len() {
Err(ParseError::NeedsMoreBinaryData)
} else {
Expand Down
19 changes: 9 additions & 10 deletions crates/socketioxide/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<A: Adapter> Client<A> {
fn sock_connect(
self: &Arc<Self>,
auth: Option<Value>,
ns_path: Str,
ns_path: &str,
esocket: &Arc<engineioxide::Socket<SocketData<A>>>,
) {
#[cfg(feature = "tracing")]
Expand All @@ -77,12 +77,10 @@ impl<A: Adapter> Client<A> {
}
};

if let Some(ns) = self.get_ns(&ns_path) {
if let Some(ns) = self.get_ns(ns_path) {
tokio::spawn(connect(ns, esocket.clone()));
} else if let Ok(Match { value: ns_ctr, .. }) = self.router.read().unwrap().at(&ns_path) {
// We have to create a new `Str` otherwise, we would keep a ref to the original connect packet
// for the entire lifetime of the Namespace.
let path = Str::copy_from_slice(&ns_path);
} else if let Ok(Match { value: ns_ctr, .. }) = self.router.read().unwrap().at(ns_path) {
let path = Str::copy_from_slice(ns_path);
let ns = ns_ctr.get_new_ns(path.clone(), &self.adapter_state, &self.config);
let this = self.clone();
let esocket = esocket.clone();
Expand All @@ -100,9 +98,10 @@ impl<A: Adapter> Client<A> {
);
esocket.close(EIoDisconnectReason::TransportClose);
} else {
let path = Str::copy_from_slice(ns_path);
let packet = self
.parser()
.encode(Packet::connect_error(ns_path, "Invalid namespace"));
.encode(Packet::connect_error(path, "Invalid namespace"));
let _ = match packet {
Value::Str(p, _) => esocket.emit(p).map_err(|_e| {
#[cfg(feature = "tracing")]
Expand Down Expand Up @@ -257,7 +256,7 @@ impl<A: Adapter> EngineIoHandler for Client<A> {
ProtocolVersion::V4 => {
#[cfg(feature = "tracing")]
tracing::debug!("connecting to default namespace for v4");
self.sock_connect(None, Str::from("/"), &socket);
self.sock_connect(None, "/", &socket);
}
ProtocolVersion::V5 => self.spawn_connect_timeout_task(socket),
}
Expand Down Expand Up @@ -302,7 +301,7 @@ impl<A: Adapter> EngineIoHandler for Client<A> {

let res: Result<(), Error> = match packet.inner {
PacketData::Connect(auth) => {
self.sock_connect(auth, packet.ns, &socket);
self.sock_connect(auth, &packet.ns, &socket);
Ok(())
}
_ => self.sock_propagate_packet(packet, socket.id),
Expand Down Expand Up @@ -339,7 +338,7 @@ impl<A: Adapter> EngineIoHandler for Client<A> {

let res: Result<(), Error> = match packet.inner {
PacketData::Connect(auth) => {
self.sock_connect(auth, packet.ns, &socket);
self.sock_connect(auth, &packet.ns, &socket);
Ok(())
}
_ => self.sock_propagate_packet(packet, socket.id),
Expand Down
13 changes: 13 additions & 0 deletions crates/socketioxide/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,19 @@ impl<A: Adapter> SocketIoBuilder<A> {
self
}

/// The size of the read buffer for the websocket transport.
/// You can tweak this value depending on your use case. Defaults to 4KiB.
///
/// Setting it to a higher value will improve performance on heavy read scenarios
/// but will consume more memory.
#[inline]
pub fn ws_read_buffer_size(mut self, ws_read_buffer_size: usize) -> Self {
self.engine_config_builder = self
.engine_config_builder
.ws_read_buffer_size(ws_read_buffer_size);
self
}

/// Allowed transports on this server
///
/// The `transports` array should have a size of 1 or 2
Expand Down
4 changes: 2 additions & 2 deletions e2e/heaptrack/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
*.gz
client/node_modules
memory_usage.svg
node_modules
memory_usage.svg
10 changes: 3 additions & 7 deletions e2e/heaptrack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
socketioxide = { path = "../../crates/socketioxide" }
socketioxide = { path = "../../crates/socketioxide", features = ["tracing"] }
hyper = { workspace = true, features = ["server", "http1"] }
hyper-util = { workspace = true, features = ["tokio"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] }
rust_socketio = { version = "0.6.0", features = ["async"] }
serde_json = "1.0.68"
tracing-subscriber.workspace = true
serde_json.workspace = true
rand = "0.8.4"

[[bin]]
name = "heaptrack-client"
path = "src/client.rs"
56 changes: 56 additions & 0 deletions e2e/heaptrack/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { io } from "socket.io-client";

const URL = process.env.URL || "http://localhost:3000";
const MAX_CLIENTS = 5000;
const POLLING_PERCENTAGE = 0.05;
const CLIENT_CREATION_INTERVAL_IN_MS = 1;
const EMIT_INTERVAL_IN_MS = 1000;

let clientCount = 0;
let lastReport = new Date().getTime();
let packetsSinceLastReport = 0;

const createClient = () => {
// for demonstration purposes, some clients stay stuck in HTTP long-polling
const transports =
Math.random() < POLLING_PERCENTAGE ? ["polling"] : ["polling", "websocket"];

const socket = io(URL, {
transports,
});

setInterval(() => {
socket.emit("ping");
}, EMIT_INTERVAL_IN_MS);

socket.on("pong", () => {
packetsSinceLastReport++;
});

socket.on("disconnect", (reason) => {
console.log(`disconnect due to ${reason}`);
});

if (++clientCount < MAX_CLIENTS) {
setTimeout(createClient, CLIENT_CREATION_INTERVAL_IN_MS);
}
};

createClient();

const printReport = () => {
const now = new Date().getTime();
const durationSinceLastReport = (now - lastReport) / 1000;
const packetsPerSeconds = (
packetsSinceLastReport / durationSinceLastReport
).toFixed(2);

console.log(
`client count: ${clientCount} ; average packets received per second: ${packetsPerSeconds}`,
);

packetsSinceLastReport = 0;
lastReport = now;
};

setInterval(printReport, 5000);
Loading

0 comments on commit fe40809

Please sign in to comment.