Skip to content

Commit

Permalink
breaking: remove socket room logic, fix #101
Browse files Browse the repository at this point in the history
  • Loading branch information
b-ma committed Oct 2, 2024
1 parent 334b028 commit 3849732
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 110 deletions.
27 changes: 2 additions & 25 deletions src/server/ServerSocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
import {
kSocketsLatencyStatsWorker,
kSocketsDebugPreventHeartBeat,
kSocketsRemoveFromAllRooms,
kSocketsDeleteSocket,
} from './ServerSockets.js';

export const kSocketClientId = Symbol('soundworks:socket-client-id');
Expand Down Expand Up @@ -144,7 +144,7 @@ class ServerSocket {
// clear ping / pong interval
clearInterval(this.#heartbeatId);
// remove socket from all rooms
this.#sockets[kSocketsRemoveFromAllRooms](this);
this.#sockets[kSocketsDeleteSocket](this);
// clear references to sockets
this.#sockets = null;
// clear all listeners
Expand All @@ -166,12 +166,6 @@ class ServerSocket {

/**
* Reference to the @link{ServerSockets} instance.
*
* Allows for broadcasting from a given socket instance.
*
* @type {ServerSockets}
* @example
* socket.sockets.broadcast('my-room', this, 'update-value', 1);
*/
get sockets() {
return this.#sockets;
Expand Down Expand Up @@ -257,23 +251,6 @@ class ServerSocket {
this.#listeners.delete(channel);
}
}

/**
* Add the socket to a room
* @param {string} roomId - Id of the room.
*/
addToRoom(roomId) {
this.sockets.addToRoom(this, roomId);
}

/**
* Remove the socket from a room
* @param {string} roomId - Id of the room.
*/
removeFromRoom(roomId) {
this.sockets.removeFromRoom(this, roomId);
}

}

export default ServerSocket;
94 changes: 16 additions & 78 deletions src/server/ServerSockets.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import networkLatencyWorker from './audit-network-latency.worker.js';
export const kSocketsStart = Symbol('soundworks:sockets-start');
export const kSocketsStop = Symbol('soundworks:sockets-stop');

export const kSocketsRemoveFromAllRooms = Symbol('soundworks:sockets-remove-from-all-rooms');
export const kSocketsDeleteSocket = Symbol('soundworks:sockets-delete-socket');
export const kSocketsLatencyStatsWorker = Symbol('soundworks:sockets-latency-stats-worker');
export const kSocketsDebugPreventHeartBeat = Symbol('soundworks:sockets-debug-prevent-heartbeat');

Expand All @@ -36,13 +36,11 @@ class ServerSockets {
#server = null;
#config = null;
#wsServer = null;
#rooms = new Map();
#sockets = new Set();

constructor(server, config) {
this.#server = server;
this.#config = config;
// Init special `'*'` room which stores all current connections.
this.#rooms.set('*', new Set());

this[kSocketsLatencyStatsWorker] = null;
this[kSocketsDebugPreventHeartBeat] = false;
Expand Down Expand Up @@ -89,9 +87,7 @@ class ServerSockets {
const { role, token } = querystring.parse(req.url.split('?')[1]);
const socket = new ServerSocket(ws, this);

socket.addToRoom('*');
socket.addToRoom(role);

this.#sockets.add(socket);
this.#server[kServerOnSocketConnection](role, socket, token);
});

Expand Down Expand Up @@ -120,90 +116,32 @@ class ServerSockets {
[kSocketsStop]() {
// terminate stat worker thread
this[kSocketsLatencyStatsWorker].terminate();
// clean sockets
const sockets = this.#rooms.get('*');
sockets.forEach(socket => socket[kSocketTerminate]());
// terminate sockets
this.#sockets.forEach(socket => socket[kSocketTerminate]());
}

/**
* Remove given socket from all rooms.
* @private
*/
[kSocketsRemoveFromAllRooms](socket) {
for (let [_, room] of this.#rooms) {
room.delete(socket);
}
[kSocketsDeleteSocket](socket) {
this.#sockets.delete(socket);
}

/**
* Add a socket to a room.
*
* _Note that in most cases, you should use a {@link SharedState} instead_
*
* @param {ServerSocket} socket - Socket to add to the room.
* @param {String} roomId - Id of the room.
*/
addToRoom(socket, roomId) {
if (!this.#rooms.has(roomId)) {
this.#rooms.set(roomId, new Set());
}

const room = this.#rooms.get(roomId);
room.add(socket);
entries() {
return this.#sockets.entries();
}

/**
* Remove a socket from a room.
*
* _Note that in most cases, you should use a {@link SharedState} instead_
*
* @param {ServerSocket} socket - Socket to remove from the room.
* @param {String} roomId - Id of the room.
*/
removeFromRoom(socket, roomId) {
if (this.#rooms.has(roomId)) {
const room = this.#rooms.get(roomId);
room.delete(socket);
}
keys() {
return this.#sockets.keys();
}

/**
* Send a message to all clients os given room(s). If no room is specified,
* the message is sent to all clients.
*
* _Note that in most cases, you should use a {@link SharedState} instead_
*
* @param {String|Array} roomsIds - Ids of the rooms that must receive
* the message. If `null` the message is sent to all clients.
* @param {ServerSocket} excludeSocket - Optionnal socket to ignore when
* broadcasting the message, typically the client at the origin of the message.
* @param {String} channel - Channel name.
* @param {...*} args - Payload of the message. As many arguments as needed, of
* JSON compatible data types (i.e. string, number, boolean, object, array and null).
*/
broadcast(roomIds, excludeSocket, channel, ...args) {
let targets = new Set();

if (typeof roomIds === 'string' || Array.isArray(roomIds)) {
if (typeof roomIds === 'string') {
roomIds = [roomIds];
}

roomIds.forEach(roomId => {
if (this.#rooms.has(roomId)) {
const room = this.#rooms.get(roomId);
room.forEach(socket => targets.add(socket));
}
});
} else {
targets = this.#rooms.get('*');
}
values() {
return this.#sockets.values();
}

targets.forEach(socket => {
if (socket.readyState === WebSocket.OPEN && socket !== excludeSocket) {
socket.send(channel, ...args);
}
});
forEach(func) {
return this.#sockets.forEach(func);
}
}

Expand Down
17 changes: 10 additions & 7 deletions tests/essentials/Client.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { assert } from 'chai';
import merge from 'lodash.merge';
import { delay } from '@ircam/sc-utils';

import { Server, ServerContext } from '../../src/server/index.js';
import { Client, ClientContext } from '../../src/client/index.js';
Expand Down Expand Up @@ -67,8 +68,11 @@ describe('# client::Client', () => {
socketMessageReceived = true;
});

server.sockets.broadcast('*', null, 'hello');
await new Promise(resolve => setTimeout(resolve, 200));
for (let socket of server.sockets.values()) {
socket.send('hello');
}

await delay(10)

assert.equal(socketMessageReceived, true);

Expand Down Expand Up @@ -159,9 +163,8 @@ describe('# client::Client', () => {
socketMessageReceived = true;
});

server.sockets.broadcast('*', null, 'hello');
await new Promise(resolve => setTimeout(resolve, 200));

server.sockets.forEach(socket => socket.send('hello'));
await delay(10)
assert.equal(socketMessageReceived, true);

await server.stop();
Expand Down Expand Up @@ -248,8 +251,8 @@ describe('# client::Client', () => {

await client.stop();

server.sockets.broadcast('*', null, 'hello');
await new Promise(resolve => setTimeout(resolve, 200));
server.sockets.forEach(socket => socket.send('hello'));
await delay(10)

assert.equal(socketMessageReceived, false);

Expand Down

0 comments on commit 3849732

Please sign in to comment.