From 57436be8173505940f8ee9e07e548439f3142c28 Mon Sep 17 00:00:00 2001 From: Antoine Arlaud Date: Wed, 17 Jul 2024 13:53:10 -0400 Subject: [PATCH] fix: unibroker handle runtime rotating brkr identifier --- .../brokerClientPlugins/pluginManager.ts | 7 ---- .../connectionsManager/connectionHelpers.ts | 19 +++++++++++ lib/client/connectionsManager/mainWatcher.ts | 34 +++++++++++++------ lib/client/socket.ts | 12 +++++-- 4 files changed, 53 insertions(+), 19 deletions(-) create mode 100644 lib/client/connectionsManager/connectionHelpers.ts diff --git a/lib/client/brokerClientPlugins/pluginManager.ts b/lib/client/brokerClientPlugins/pluginManager.ts index dcb9b49fc..218876723 100644 --- a/lib/client/brokerClientPlugins/pluginManager.ts +++ b/lib/client/brokerClientPlugins/pluginManager.ts @@ -62,12 +62,6 @@ export const runStartupPlugins = async (clientOpts, connectionKey) => { string, BrokerPlugin[] >; - - // const connectionsKeys = clientOpts.config.connections - // ? Object.keys(clientOpts.config.connections) - // : []; - - // for (const connectionKey of connectionsKeys) { if ( loadedPlugins.has(`${clientOpts.config.connections[connectionKey].type}`) ) { @@ -81,7 +75,6 @@ export const runStartupPlugins = async (clientOpts, connectionKey) => { ); } } - // } }; export const runPreRequestPlugins = async ( diff --git a/lib/client/connectionsManager/connectionHelpers.ts b/lib/client/connectionsManager/connectionHelpers.ts new file mode 100644 index 000000000..d2122181e --- /dev/null +++ b/lib/client/connectionsManager/connectionHelpers.ts @@ -0,0 +1,19 @@ +import { WebSocketConnection } from '../types/client'; + +export const shutDownConnectionPair = ( + websocketConnections: WebSocketConnection[], + connectionIndex: number, +) => { + const friendlyName = websocketConnections[connectionIndex].friendlyName; + websocketConnections[connectionIndex].end(); + websocketConnections[connectionIndex].destroy(); + websocketConnections.splice(connectionIndex, 1); + const secondTunnelIndex = websocketConnections.findIndex( + (websocketConnection) => websocketConnection.friendlyName == friendlyName, + ); + websocketConnections[secondTunnelIndex].end(); + websocketConnections[secondTunnelIndex].destroy(); + websocketConnections.splice(secondTunnelIndex, 1); + + //TODO: Clean up plugins elements (intervals, etc) +}; diff --git a/lib/client/connectionsManager/mainWatcher.ts b/lib/client/connectionsManager/mainWatcher.ts index f39b9ff4b..bb515b3b9 100644 --- a/lib/client/connectionsManager/mainWatcher.ts +++ b/lib/client/connectionsManager/mainWatcher.ts @@ -5,6 +5,7 @@ import { log as logger } from '../../logs/logger'; import { createWebSocketConnectionPairs } from '../socket'; import { runStartupPlugins } from '../brokerClientPlugins/pluginManager'; import { addTimerToTerminalHandlers } from '../../common/utils/signals'; +import { shutDownConnectionPair } from './connectionHelpers'; export const setMainWatcher = async ( clientOpts: LoadedClientOpts, websocketConnections: WebSocketConnection[], @@ -50,16 +51,7 @@ export const setMainWatcher = async ( }, `Shutting down unused connection`, ); - websocketConnections[currentWebsocketConnectionIndex].end(); - websocketConnections[currentWebsocketConnectionIndex].destroy(); - websocketConnections.splice(currentWebsocketConnectionIndex, 1); - const secondTunnelIndex = websocketConnections.findIndex( - (websocketConnection) => - websocketConnection.friendlyName == integrationsKeys[i], - ); - websocketConnections[secondTunnelIndex].end(); - websocketConnections[secondTunnelIndex].destroy(); - websocketConnections.splice(secondTunnelIndex, 1); + shutDownConnectionPair(websocketConnections, i); } else { logger.info( { @@ -81,6 +73,28 @@ export const setMainWatcher = async ( ); await runStartupPlugins(clientOpts, integrationsKeys[i]); + await createWebSocketConnectionPairs( + websocketConnections, + clientOpts, + globalIdentifyingMetadata, + integrationsKeys[i], + ); + } else if ( + // Token rotation for the connection at hand + clientOpts.config.connections[`${integrationsKeys[i]}`].identifier != + websocketConnections[currentWebsocketConnectionIndex].identifier + ) { + logger.info( + { connectionName: integrationsKeys[i] }, + 'Updating configured connection for new identifier.', + ); + // shut down previous tunnels + shutDownConnectionPair(websocketConnections, i); + + // setup new tunnels + + await runStartupPlugins(clientOpts, integrationsKeys[i]); + await createWebSocketConnectionPairs( websocketConnections, clientOpts, diff --git a/lib/client/socket.ts b/lib/client/socket.ts index 89e0fbb76..03fce778f 100644 --- a/lib/client/socket.ts +++ b/lib/client/socket.ts @@ -54,10 +54,18 @@ export const createWebSocketConnectionPairs = async ( ); } if (serverId === null) { - logger.warn({}, 'could not receive server id from Broker Dispatcher'); + if (clientOpts.config.BROKER_HA_MODE_ENABLED == 'true') { + logger.warn({}, 'could not receive server id from Broker Dispatcher'); + } serverId = ''; } else { - logger.info({ serverId }, 'received server id'); + logger.info( + { + connection: socketIdentifyingMetadata.friendlyName, + serverId: serverId, + }, + 'received server id', + ); clientOpts.config.connections[ `${socketIdentifyingMetadata.friendlyName}` ].serverId = serverId;