Skip to content

Commit

Permalink
feat: add heartbeat to organizer and remove unused connections
Browse files Browse the repository at this point in the history
  • Loading branch information
edalholt committed Oct 1, 2023
1 parent 2d82a6e commit 4d4435c
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 25 deletions.
61 changes: 45 additions & 16 deletions backend/utils/socketNotifier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@ import { NTNUINoFromRequest } from "./wsCookieRetriever";
* https://learn.microsoft.com/en-us/azure/azure-resource-manager/management/azure-subscription-service-limits
*/

type SocketList = { [key: number]: WebSocket };
type OrganizersByGroupSlug = { [key: string]: SocketList };

// Store all active organizer connections, the connections are stored by their respective groupSlug.
// This makes it possible to send messages to all logged in organizers of a group.
export const organizerConnections: OrganizersByGroupSlug = {};
export const organizerConnections = new Map<string, Map<number, WebSocket>>();
// Store all active participant connections, for access when sending messages about assembly.
export const lobbyConnections = new Map<number, WebSocket>();

Expand All @@ -30,6 +27,12 @@ export const startHeartbeatInterval = setInterval(() => {
lobbyConnections.forEach((ws: WebSocket) => {
sendPing(ws);
});

organizerConnections.forEach((socketList) => {
socketList.forEach((ws: WebSocket) => {
sendPing(ws);
});
});
}, 30000); // 30 seconds

export const storeLobbyConnectionByCookie = (
Expand All @@ -39,8 +42,9 @@ export const storeLobbyConnectionByCookie = (
const ntnuiNo = NTNUINoFromRequest(req);
if (ntnuiNo !== null) {
// Notify about kicking out old connection if user already is connected.
if (typeof lobbyConnections.get(ntnuiNo) !== null) {
if (lobbyConnections.has(ntnuiNo)) {
notifyOneParticipant(ntnuiNo, JSON.stringify({ status: "removed" }));
lobbyConnections.get(ntnuiNo)?.close();
}
// Store socket connection on NTNUI ID.
lobbyConnections.set(ntnuiNo, ws);
Expand All @@ -54,13 +58,39 @@ export const removeLobbyConnectionByCookie = (req: IncomingMessage) => {
}
};

export const removeOrganizerConnectionByCookie = (req: IncomingMessage) => {
const ntnuiNo = NTNUINoFromRequest(req);
if (ntnuiNo !== null) {
for (const groupSlug of organizerConnections.keys()) {
for (const ntnui_no of organizerConnections.get(groupSlug)?.keys() ||
[]) {
if (ntnui_no === ntnuiNo) {
try {
organizerConnections.get(groupSlug)?.delete(ntnui_no);
} catch (e) {
console.error(
"Tried to delete connection that did not exist. " + e
);
}
console.log(
"Organizer " + ntnui_no + " unsubscribed from group " + groupSlug
);
return;
}
}
}
}
};

export const storeOrganizerConnectionByNTNUINo = (
ntnui_no: number,
groupSlug: string,
ws: WebSocket
) => {
if (!organizerConnections[groupSlug]) organizerConnections[groupSlug] = [];
organizerConnections[groupSlug][ntnui_no] = ws;
if (!organizerConnections.get(groupSlug)) {
organizerConnections.set(groupSlug, new Map<number, WebSocket>());
}
organizerConnections.get(groupSlug)?.set(ntnui_no, ws);
};

export const notifyOneParticipant = (ntnui_no: number, message: string) => {
Expand All @@ -76,16 +106,15 @@ export const notifyOneParticipant = (ntnui_no: number, message: string) => {
};

export const notifyOrganizers = (groupSlug: string, message: string) => {
if (organizerConnections[groupSlug]) {
for (const ntnui_no in organizerConnections[groupSlug]) {
console.log("Sending message to organizer " + ntnui_no);
for (const ntnui_no of organizerConnections.get(groupSlug)?.keys() || []) {
console.log("Sending message to organizer " + ntnui_no);
const socket = organizerConnections.get(groupSlug)?.get(ntnui_no);
if (socket) {
try {
organizerConnections[groupSlug][ntnui_no].send(message);
} catch (error) {
console.log(
"Could not notify organizer " +
ntnui_no +
". Is there a problem with the socket URL? (Ignore if testing / dev has restarted)"
socket.send(message);
} catch (e) {
console.error(
"Error when sending message to organizer " + ntnui_no + ": " + e
);
}
}
Expand Down
13 changes: 12 additions & 1 deletion backend/wsServers/organizer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { WebSocketServer } from "ws";
import { NTNUINoFromRequest } from "../utils/wsCookieRetriever";
import { User } from "../models/user";
import { storeOrganizerConnectionByNTNUINo } from "../utils/socketNotifier";
import {
removeOrganizerConnectionByCookie,
storeOrganizerConnectionByNTNUINo,
} from "../utils/socketNotifier";

export const organizerWss = new WebSocketServer({ noServer: true });

Expand Down Expand Up @@ -45,4 +48,12 @@ organizerWss.on("connection", function connection(ws, req) {
ws.close();
}
});

ws.on("pong", () => {
// The client responded to the ping, so the connection is still active.
});

ws.on("close", () => {
removeOrganizerConnectionByCookie(req);
});
});
7 changes: 2 additions & 5 deletions frontend/src/components/EditAssembly.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ export function EditAssembly(state: { group: UserDataGroupType }) {
const { lastMessage, sendJsonMessage } = useWebSocket(
import.meta.env.VITE_SOCKET_URL + "/organizer",
{
// Request access to live assembly data for the given group when the websocket is opened.
onOpen: () => sendJsonMessage({ groupSlug: group.groupSlug }),
//Will attempt to reconnect on all close events, such as server shutting down
shouldReconnect: () => true,
// Try to reconnect 300 times before giving up.
Expand All @@ -50,11 +52,6 @@ export function EditAssembly(state: { group: UserDataGroupType }) {
}
);

// Request access to live assembly data for the given group when component is mounted.
useEffect(() => {
sendJsonMessage({ groupSlug: group.groupSlug });
}, []);

useEffect(() => {
// Update state every time the websocket receive a message.
if (lastMessage) {
Expand Down
7 changes: 4 additions & 3 deletions frontend/src/pages/AssemblyPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { LimitedVoteType } from "../types/votes";
import { getUserData } from "../services/organizer";
import { NotFound } from "./NotFound";

export function AssemblyLobxby() {
export function AssemblyLobby() {
let navigate = useNavigate();
const { groupSlug } = useParams() as { groupSlug: string };
const [groupName, setGroupName] = useState<string | undefined>(undefined);
Expand All @@ -25,11 +25,11 @@ export function AssemblyLobxby() {
LimitedVoteType | undefined
>(undefined);
const [voted, setVoted] = useState<boolean>(false);
const { lastMessage } = useWebSocket(
const { lastMessage, getWebSocket } = useWebSocket(
import.meta.env.VITE_SOCKET_URL + "/lobby",
{
//Will attempt to reconnect on all close events, such as server shutting down
shouldReconnect: () => true,
shouldReconnect: () => !kickedOut,
// Try to reconnect 300 times before giving up.
// Also possible to change interval (default is 5000ms)
reconnectAttempts: 300,
Expand Down Expand Up @@ -79,6 +79,7 @@ export function AssemblyLobxby() {
// User is is removed from current lobby if logged in on another device.
if (decodedMessage.status == "removed") {
setKickedOut(true);
getWebSocket()?.close();
}
// Redirect only if user is checked in on the right group.
if (decodedMessage.group == groupSlug) {
Expand Down

0 comments on commit 4d4435c

Please sign in to comment.