From 4d4435c17200756cf2b3bf87e65a64e814a4b26b Mon Sep 17 00:00:00 2001 From: Eivind Dalholt Date: Sun, 1 Oct 2023 16:17:55 +0200 Subject: [PATCH] feat: add heartbeat to organizer and remove unused connections --- backend/utils/socketNotifier.ts | 61 +++++++++++++++++------- backend/wsServers/organizer.ts | 13 ++++- frontend/src/components/EditAssembly.tsx | 7 +-- frontend/src/pages/AssemblyPage.tsx | 7 +-- 4 files changed, 63 insertions(+), 25 deletions(-) diff --git a/backend/utils/socketNotifier.ts b/backend/utils/socketNotifier.ts index f53593c..7b5bf90 100644 --- a/backend/utils/socketNotifier.ts +++ b/backend/utils/socketNotifier.ts @@ -10,12 +10,9 @@ import { NTNUINoFromRequest } from "./wsCookieRetriever"; * https://learn.microsoft.com/en-us/azure/azure-resource-manager/management/azure-subscription-service-limits */ -type SocketList = { [key: number]: WebSocket }; -type OrganizersByGroupSlug = { [key: string]: SocketList }; - // Store all active organizer connections, the connections are stored by their respective groupSlug. // This makes it possible to send messages to all logged in organizers of a group. -export const organizerConnections: OrganizersByGroupSlug = {}; +export const organizerConnections = new Map>(); // Store all active participant connections, for access when sending messages about assembly. export const lobbyConnections = new Map(); @@ -30,6 +27,12 @@ export const startHeartbeatInterval = setInterval(() => { lobbyConnections.forEach((ws: WebSocket) => { sendPing(ws); }); + + organizerConnections.forEach((socketList) => { + socketList.forEach((ws: WebSocket) => { + sendPing(ws); + }); + }); }, 30000); // 30 seconds export const storeLobbyConnectionByCookie = ( @@ -39,8 +42,9 @@ export const storeLobbyConnectionByCookie = ( const ntnuiNo = NTNUINoFromRequest(req); if (ntnuiNo !== null) { // Notify about kicking out old connection if user already is connected. - if (typeof lobbyConnections.get(ntnuiNo) !== null) { + if (lobbyConnections.has(ntnuiNo)) { notifyOneParticipant(ntnuiNo, JSON.stringify({ status: "removed" })); + lobbyConnections.get(ntnuiNo)?.close(); } // Store socket connection on NTNUI ID. lobbyConnections.set(ntnuiNo, ws); @@ -54,13 +58,39 @@ export const removeLobbyConnectionByCookie = (req: IncomingMessage) => { } }; +export const removeOrganizerConnectionByCookie = (req: IncomingMessage) => { + const ntnuiNo = NTNUINoFromRequest(req); + if (ntnuiNo !== null) { + for (const groupSlug of organizerConnections.keys()) { + for (const ntnui_no of organizerConnections.get(groupSlug)?.keys() || + []) { + if (ntnui_no === ntnuiNo) { + try { + organizerConnections.get(groupSlug)?.delete(ntnui_no); + } catch (e) { + console.error( + "Tried to delete connection that did not exist. " + e + ); + } + console.log( + "Organizer " + ntnui_no + " unsubscribed from group " + groupSlug + ); + return; + } + } + } + } +}; + export const storeOrganizerConnectionByNTNUINo = ( ntnui_no: number, groupSlug: string, ws: WebSocket ) => { - if (!organizerConnections[groupSlug]) organizerConnections[groupSlug] = []; - organizerConnections[groupSlug][ntnui_no] = ws; + if (!organizerConnections.get(groupSlug)) { + organizerConnections.set(groupSlug, new Map()); + } + organizerConnections.get(groupSlug)?.set(ntnui_no, ws); }; export const notifyOneParticipant = (ntnui_no: number, message: string) => { @@ -76,16 +106,15 @@ export const notifyOneParticipant = (ntnui_no: number, message: string) => { }; export const notifyOrganizers = (groupSlug: string, message: string) => { - if (organizerConnections[groupSlug]) { - for (const ntnui_no in organizerConnections[groupSlug]) { - console.log("Sending message to organizer " + ntnui_no); + for (const ntnui_no of organizerConnections.get(groupSlug)?.keys() || []) { + console.log("Sending message to organizer " + ntnui_no); + const socket = organizerConnections.get(groupSlug)?.get(ntnui_no); + if (socket) { try { - organizerConnections[groupSlug][ntnui_no].send(message); - } catch (error) { - console.log( - "Could not notify organizer " + - ntnui_no + - ". Is there a problem with the socket URL? (Ignore if testing / dev has restarted)" + socket.send(message); + } catch (e) { + console.error( + "Error when sending message to organizer " + ntnui_no + ": " + e ); } } diff --git a/backend/wsServers/organizer.ts b/backend/wsServers/organizer.ts index 31d86f6..cba47e6 100644 --- a/backend/wsServers/organizer.ts +++ b/backend/wsServers/organizer.ts @@ -1,7 +1,10 @@ import { WebSocketServer } from "ws"; import { NTNUINoFromRequest } from "../utils/wsCookieRetriever"; import { User } from "../models/user"; -import { storeOrganizerConnectionByNTNUINo } from "../utils/socketNotifier"; +import { + removeOrganizerConnectionByCookie, + storeOrganizerConnectionByNTNUINo, +} from "../utils/socketNotifier"; export const organizerWss = new WebSocketServer({ noServer: true }); @@ -45,4 +48,12 @@ organizerWss.on("connection", function connection(ws, req) { ws.close(); } }); + + ws.on("pong", () => { + // The client responded to the ping, so the connection is still active. + }); + + ws.on("close", () => { + removeOrganizerConnectionByCookie(req); + }); }); diff --git a/frontend/src/components/EditAssembly.tsx b/frontend/src/components/EditAssembly.tsx index 86467ad..8d9199e 100644 --- a/frontend/src/components/EditAssembly.tsx +++ b/frontend/src/components/EditAssembly.tsx @@ -42,6 +42,8 @@ export function EditAssembly(state: { group: UserDataGroupType }) { const { lastMessage, sendJsonMessage } = useWebSocket( import.meta.env.VITE_SOCKET_URL + "/organizer", { + // Request access to live assembly data for the given group when the websocket is opened. + onOpen: () => sendJsonMessage({ groupSlug: group.groupSlug }), //Will attempt to reconnect on all close events, such as server shutting down shouldReconnect: () => true, // Try to reconnect 300 times before giving up. @@ -50,11 +52,6 @@ export function EditAssembly(state: { group: UserDataGroupType }) { } ); - // Request access to live assembly data for the given group when component is mounted. - useEffect(() => { - sendJsonMessage({ groupSlug: group.groupSlug }); - }, []); - useEffect(() => { // Update state every time the websocket receive a message. if (lastMessage) { diff --git a/frontend/src/pages/AssemblyPage.tsx b/frontend/src/pages/AssemblyPage.tsx index 6251a1a..047fff5 100644 --- a/frontend/src/pages/AssemblyPage.tsx +++ b/frontend/src/pages/AssemblyPage.tsx @@ -13,7 +13,7 @@ import { LimitedVoteType } from "../types/votes"; import { getUserData } from "../services/organizer"; import { NotFound } from "./NotFound"; -export function AssemblyLobxby() { +export function AssemblyLobby() { let navigate = useNavigate(); const { groupSlug } = useParams() as { groupSlug: string }; const [groupName, setGroupName] = useState(undefined); @@ -25,11 +25,11 @@ export function AssemblyLobxby() { LimitedVoteType | undefined >(undefined); const [voted, setVoted] = useState(false); - const { lastMessage } = useWebSocket( + const { lastMessage, getWebSocket } = useWebSocket( import.meta.env.VITE_SOCKET_URL + "/lobby", { //Will attempt to reconnect on all close events, such as server shutting down - shouldReconnect: () => true, + shouldReconnect: () => !kickedOut, // Try to reconnect 300 times before giving up. // Also possible to change interval (default is 5000ms) reconnectAttempts: 300, @@ -79,6 +79,7 @@ export function AssemblyLobxby() { // User is is removed from current lobby if logged in on another device. if (decodedMessage.status == "removed") { setKickedOut(true); + getWebSocket()?.close(); } // Redirect only if user is checked in on the right group. if (decodedMessage.group == groupSlug) {