From 959f04bfabf72d53a4287769673fab418fa0fab3 Mon Sep 17 00:00:00 2001 From: Ricardo Garim Date: Fri, 6 Dec 2024 12:25:23 -0300 Subject: [PATCH 01/11] chore: makes instance-status works along with Moleculer lifecyle --- .../server/local-services/instance/service.ts | 87 ++++---- .../server/database/watchCollections.ts | 3 +- .../server/models/raw/InstanceStatus.ts | 11 +- packages/instance-status/src/index.ts | 185 ++++++++---------- .../src/models/IInstanceStatusModel.ts | 3 + 5 files changed, 128 insertions(+), 161 deletions(-) diff --git a/apps/meteor/ee/server/local-services/instance/service.ts b/apps/meteor/ee/server/local-services/instance/service.ts index a7e921511a62..e67ed9ce9505 100644 --- a/apps/meteor/ee/server/local-services/instance/service.ts +++ b/apps/meteor/ee/server/local-services/instance/service.ts @@ -1,7 +1,7 @@ import os from 'os'; import { License, ServiceClassInternal } from '@rocket.chat/core-services'; -import { InstanceStatus } from '@rocket.chat/instance-status'; +import { InstanceStatus, defaultPingInterval, indexExpire } from '@rocket.chat/instance-status'; import { InstanceStatus as InstanceStatusRaw } from '@rocket.chat/models'; import EJSON from 'ejson'; import type { BrokerNode } from 'moleculer'; @@ -42,26 +42,41 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe constructor() { super(); - const tx = getTransporter({ transporter: process.env.TRANSPORTER, port: process.env.TCP_PORT, extra: process.env.TRANSPORTER_EXTRA }); - if (typeof tx === 'string') { - this.transporter = new Transporters.NATS({ url: tx }); - this.isTransporterTCP = false; - } else { - this.transporter = new Transporters.TCP(tx); - } + const transporter = getTransporter({ transporter: process.env.TRANSPORTER, port: process.env.TCP_PORT, extra: process.env.TRANSPORTER_EXTRA }); + this.isTransporterTCP = typeof transporter !== 'string'; + + const activeInstances = InstanceStatusRaw.getActiveInstancesAddress() + .then((instances) => instances) + .catch(() => []); + + this.transporter = this.isTransporterTCP + ? new Transporters.TCP({ ...transporter, urls: activeInstances }) + : new Transporters.NATS({ url: transporter }); + + this.broker = new ServiceBroker({ + nodeID: InstanceStatus.id(), + transporter: this.transporter, + serializer: new EJSONSerializer(), + heartbeatInterval: defaultPingInterval, + heartbeatTimeout: indexExpire, + ...getLogger(process.env), + }); if (this.isTransporterTCP) { - this.onEvent('watch.instanceStatus', async ({ clientAction, data }): Promise => { - if (clientAction === 'removed') { - (this.broker.transit?.tx as any).nodes.disconnected(data?._id, false); - (this.broker.transit?.tx as any).nodes.nodes.delete(data?._id); - return; - } - - if (clientAction === 'inserted' && data?.extraInformation?.tcpPort) { - this.connectNode(data); - } + this.broker.localBus.on("$node.connected", (node) => { + console.log(`[${this.broker.nodeID}] connected to ${node.node.id}`); + }); + + this.broker.localBus.on("$node.disconnected", (node) => { + console.log(`[${this.broker.nodeID}] disconnected from ${node.node.id}`); }); + + // setInterval(async () => { + // const nodes: Array<{ id: string }> = await this.broker.call('$node.list', { onlyAvailable: true }); + // const localNode = this.broker.getLocalNodeInfo(); + // console.log(nodes.map((node) => node.id)); + // console.log(localNode.ipList[0], localNode.instanceID, localNode.port); + // }, 10000); } this.onEvent('license.module', async ({ module, valid }) => { @@ -93,17 +108,6 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe } async created() { - this.broker = new ServiceBroker({ - nodeID: InstanceStatus.id(), - transporter: this.transporter, - serializer: new EJSONSerializer(), - ...getLogger(process.env), - }); - - if ((this.broker.transit?.tx as any)?.nodes?.localNode) { - (this.broker.transit?.tx as any).nodes.localNode.ipList = [hostIP]; - } - this.broker.createService({ name: 'matrix', events: { @@ -176,31 +180,6 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe this.broadcastStarted = true; StreamerCentral.on('broadcast', this.sendBroadcast.bind(this)); - - if (this.isTransporterTCP) { - await InstanceStatusRaw.find( - { - 'extraInformation.tcpPort': { - $exists: true, - }, - }, - { - sort: { - _createdAt: -1, - }, - }, - ).forEach(this.connectNode.bind(this)); - } - } - - private connectNode(record: any) { - if (record._id === InstanceStatus.id()) { - return; - } - - const { host, tcpPort } = record.extraInformation; - - (this.broker?.transit?.tx as any).addOfflineNode(record._id, host, tcpPort); } private sendBroadcast(streamName: string, eventName: string, args: unknown[]) { diff --git a/apps/meteor/server/database/watchCollections.ts b/apps/meteor/server/database/watchCollections.ts index 6dd173d5d323..b7bdf13c28b8 100644 --- a/apps/meteor/server/database/watchCollections.ts +++ b/apps/meteor/server/database/watchCollections.ts @@ -29,10 +29,11 @@ const onlyCollections = DBWATCHER_ONLY_COLLECTIONS.split(',') .filter(Boolean); export function getWatchCollections(): string[] { - const collections = [InstanceStatus.getCollectionName()]; + const collections = []; // add back to the list of collections in case db watchers are enabled if (!dbWatchersDisabled) { + collections.push(InstanceStatus.getCollectionName()); collections.push(Users.getCollectionName()); collections.push(Messages.getCollectionName()); collections.push(LivechatInquiry.getCollectionName()); diff --git a/apps/meteor/server/models/raw/InstanceStatus.ts b/apps/meteor/server/models/raw/InstanceStatus.ts index 039d8fa18afc..e9264788b2e4 100644 --- a/apps/meteor/server/models/raw/InstanceStatus.ts +++ b/apps/meteor/server/models/raw/InstanceStatus.ts @@ -1,6 +1,6 @@ import type { IInstanceStatus } from '@rocket.chat/core-typings'; import type { IInstanceStatusModel } from '@rocket.chat/model-typings'; -import type { Db } from 'mongodb'; +import type { Db, UpdateResult } from 'mongodb'; import { BaseRaw } from './BaseRaw'; @@ -17,4 +17,13 @@ export class InstanceStatusRaw extends BaseRaw implements IInst async getActiveInstanceCount(): Promise { return this.col.countDocuments({ _updatedAt: { $gt: new Date(Date.now() - process.uptime() * 1000 - 2000) } }); } + + async getActiveInstancesAddress(): Promise { + const instances = await this.find({}, { projection: { _id: 1, extraInformation: { host: 1, port: 1 } } }).toArray(); + return instances.map((instance) => `${instance.extraInformation.host}:${instance.extraInformation.port}/${instance._id}`); + } + + async setDocumentHeartbeat(documentId: string): Promise { + return this.updateOne({ _id: documentId }, { $currentDate: { _updatedAt: true } }); + } } diff --git a/packages/instance-status/src/index.ts b/packages/instance-status/src/index.ts index 38109e01626d..59e1dd1e5393 100644 --- a/packages/instance-status/src/index.ts +++ b/packages/instance-status/src/index.ts @@ -1,26 +1,41 @@ -// import { IInstanceStatus } from '@rocket.chat/core-typings'; -import { EventEmitter } from 'events'; - import { InstanceStatus as InstanceStatusModel } from '@rocket.chat/models'; -import { tracerSpan } from '@rocket.chat/tracing'; import { v4 as uuidv4 } from 'uuid'; -const events = new EventEmitter(); +export const defaultPingInterval = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10; // default to 10s +export const indexExpire = (parseInt(String(process.env.MULTIPLE_INSTANCES_EXPIRE)) || Math.ceil((defaultPingInterval * 3) / 60)) * 60; + +const ID = uuidv4(); + +const currentInstance = { + name: '', + extraInformation: {}, +}; + +let pingInterval: NodeJS.Timeout | null; + +export function id() { + return ID; +} -const defaultPingInterval = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10; // default to 10s +function start() { + stop(); + pingInterval = setInterval(async () => ping(), defaultPingInterval * 1000); +} -// if not set via env var ensures at least 3 ticks before expiring (multiple of 60s) -const indexExpire = (parseInt(String(process.env.MULTIPLE_INSTANCES_EXPIRE)) || Math.ceil((defaultPingInterval * 3) / 60)) * 60; +function stop() { + if (!pingInterval) { + return; + } + clearInterval(pingInterval); + pingInterval = null; +} let createIndexes = async () => { await InstanceStatusModel.col .indexes() - .catch(function () { - // the collection should not exists yet, return empty then - return []; - }) - .then(function (result) { - return result.some(function (index) { + .catch(() => []) + .then((result) => + result.some((index) => { if (index.key && index.key._updatedAt === 1) { if (index.expireAfterSeconds !== indexExpire) { InstanceStatusModel.col.dropIndex(index.name); @@ -29,115 +44,86 @@ let createIndexes = async () => { return true; } return false; - }); - }) - .then(function (created) { + }), + ) + .then((created) => { if (!created) { InstanceStatusModel.col.createIndex({ _updatedAt: 1 }, { expireAfterSeconds: indexExpire }); } }); - createIndexes = async () => { - // no op - }; + createIndexes = async () => {}; }; -const ID = uuidv4(); - -function id() { - return ID; +async function updateInstanceOnDB(instance: any) { + try { + return InstanceStatusModel.findOneAndUpdate( + { _id: ID }, + { + $set: instance, + $currentDate: { _createdAt: true, _updatedAt: true }, + }, + { upsert: true, returnDocument: 'after' }, + ); + } catch (e) { + return e; + } } -const currentInstance = { - name: '', - extraInformation: {}, -}; - -async function registerInstance(name: string, extraInformation: Record): Promise { - createIndexes(); - - currentInstance.name = name; - currentInstance.extraInformation = extraInformation; - - // if (ID === undefined || ID === null) { - // return console.error('[multiple-instances-status] only can be called after Meteor.startup'); - // } - - const instance = { - $set: { - pid: process.pid, - name, - ...(extraInformation && { extraInformation }), +async function updateConnections(conns: number) { + await InstanceStatusModel.updateOne( + { + _id: ID, }, - $currentDate: { - _createdAt: true, - _updatedAt: true, + { + $set: { + 'extraInformation.conns': conns, + }, }, - }; + ); +} +async function deleteInstanceOnDB() { try { - await InstanceStatusModel.updateOne({ _id: ID }, instance as any, { upsert: true }); + await InstanceStatusModel.deleteOne({ _id: ID }); + } catch (e) { + return e; + } +} - const result = await InstanceStatusModel.findOne({ _id: ID }); +export async function registerInstance(name: string, extraInformation: Record): Promise { + createIndexes(); - start(); + currentInstance.name = name; + currentInstance.extraInformation = extraInformation; - events.emit('registerInstance', result, instance); + const result = await updateInstanceOnDB({ + pid: process.pid, + name, + ...(extraInformation && { extraInformation }), + }); - process.on('exit', onExit); + start(); + process.on('exit', onExit); - return result; - } catch (e) { - return e; - } + return result; } async function unregisterInstance() { try { - const result = await InstanceStatusModel.deleteOne({ _id: ID }); + const result = await deleteInstanceOnDB(); stop(); - - events.emit('unregisterInstance', ID); - process.removeListener('exit', onExit); - return result; } catch (e) { return e; } } -let pingInterval: NodeJS.Timeout | null; - -function start(interval?: number) { - stop(); - - interval = interval || defaultPingInterval; - - pingInterval = setInterval(async function () { - await tracerSpan('InstanceStatus.ping', {}, () => ping()); - }, interval * 1000); -} - -function stop() { - if (!pingInterval) { - return; - } - clearInterval(pingInterval); - pingInterval = null; -} - async function ping() { - const result = await InstanceStatusModel.updateOne( - { - _id: ID, - }, - { - $currentDate: { - _updatedAt: true, - }, - }, - ); + const result = await InstanceStatusModel.setDocumentHeartbeat(ID); + + console.log(`[${ID}] ping`, result.modifiedCount); if (result.modifiedCount === 0) { await registerInstance(currentInstance.name, currentInstance.extraInformation); @@ -148,21 +134,10 @@ async function onExit() { await unregisterInstance(); } -async function updateConnections(conns: number) { - await InstanceStatusModel.updateOne( - { - _id: ID, - }, - { - $set: { - 'extraInformation.conns': conns, - }, - }, - ); -} - export const InstanceStatus = { id, registerInstance, updateConnections, + defaultPingInterval, + indexExpire, }; diff --git a/packages/model-typings/src/models/IInstanceStatusModel.ts b/packages/model-typings/src/models/IInstanceStatusModel.ts index dd6aa4d76bc7..b27f6f2db4fd 100644 --- a/packages/model-typings/src/models/IInstanceStatusModel.ts +++ b/packages/model-typings/src/models/IInstanceStatusModel.ts @@ -1,7 +1,10 @@ import type { IInstanceStatus } from '@rocket.chat/core-typings'; import type { IBaseModel } from './IBaseModel'; +import type { UpdateResult } from 'mongodb'; export interface IInstanceStatusModel extends IBaseModel { getActiveInstanceCount(): Promise; + getActiveInstancesAddress(): Promise; + setDocumentHeartbeat(documentId: string): Promise; } From 560a0ab3feef9047d856c6c1ed2b7cdd95006cc9 Mon Sep 17 00:00:00 2001 From: Ricardo Garim Date: Fri, 6 Dec 2024 13:29:33 -0300 Subject: [PATCH 02/11] chore: ignore lastDoc validation when db watchers disabled --- .../ee/server/local-services/instance/service.ts | 10 +++++++--- apps/meteor/server/startup/watchDb.ts | 14 ++++++++------ 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/apps/meteor/ee/server/local-services/instance/service.ts b/apps/meteor/ee/server/local-services/instance/service.ts index e67ed9ce9505..861ccc4006f5 100644 --- a/apps/meteor/ee/server/local-services/instance/service.ts +++ b/apps/meteor/ee/server/local-services/instance/service.ts @@ -42,7 +42,11 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe constructor() { super(); - const transporter = getTransporter({ transporter: process.env.TRANSPORTER, port: process.env.TCP_PORT, extra: process.env.TRANSPORTER_EXTRA }); + const transporter = getTransporter({ + transporter: process.env.TRANSPORTER, + port: process.env.TCP_PORT, + extra: process.env.TRANSPORTER_EXTRA, + }); this.isTransporterTCP = typeof transporter !== 'string'; const activeInstances = InstanceStatusRaw.getActiveInstancesAddress() @@ -63,11 +67,11 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe }); if (this.isTransporterTCP) { - this.broker.localBus.on("$node.connected", (node) => { + this.broker.localBus.on('$node.connected', (node) => { console.log(`[${this.broker.nodeID}] connected to ${node.node.id}`); }); - this.broker.localBus.on("$node.disconnected", (node) => { + this.broker.localBus.on('$node.disconnected', (node) => { console.log(`[${this.broker.nodeID}] disconnected from ${node.node.id}`); }); diff --git a/apps/meteor/server/startup/watchDb.ts b/apps/meteor/server/startup/watchDb.ts index 5a0dd0417f63..bf45af8dcd97 100644 --- a/apps/meteor/server/startup/watchDb.ts +++ b/apps/meteor/server/startup/watchDb.ts @@ -1,4 +1,4 @@ -import { api } from '@rocket.chat/core-services'; +import { api, dbWatchersDisabled } from '@rocket.chat/core-services'; import { Logger } from '@rocket.chat/logger'; import { MongoInternals } from 'meteor/mongo'; @@ -19,11 +19,13 @@ watcher.watch().catch((err: Error) => { process.exit(1); }); -setInterval(function _checkDatabaseWatcher() { - if (watcher.isLastDocDelayed()) { - SystemLogger.error('No real time data received recently'); - } -}, 20000); +if (!dbWatchersDisabled) { + setInterval(function _checkDatabaseWatcher() { + if (watcher.isLastDocDelayed()) { + SystemLogger.error('No real time data received recently'); + } + }, 20000); +} export function isLastDocDelayed(): boolean { return watcher.isLastDocDelayed(); From e40a2c4b48d3556e6bdb19ad56c9924bc617a3c1 Mon Sep 17 00:00:00 2001 From: Ricardo Garim Date: Fri, 6 Dec 2024 13:57:40 -0300 Subject: [PATCH 03/11] chore: disable linter for empty line --- packages/instance-status/src/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/instance-status/src/index.ts b/packages/instance-status/src/index.ts index 59e1dd1e5393..4a24887ecdd0 100644 --- a/packages/instance-status/src/index.ts +++ b/packages/instance-status/src/index.ts @@ -52,6 +52,7 @@ let createIndexes = async () => { } }); + // eslint-disable-next-line @typescript-eslint/no-empty-function createIndexes = async () => {}; }; From c883c1aad73581dc32f313c2996ece01d627af43 Mon Sep 17 00:00:00 2001 From: Ricardo Garim Date: Fri, 6 Dec 2024 14:06:37 -0300 Subject: [PATCH 04/11] linter fix --- packages/model-typings/src/models/IInstanceStatusModel.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/model-typings/src/models/IInstanceStatusModel.ts b/packages/model-typings/src/models/IInstanceStatusModel.ts index b27f6f2db4fd..5525ce6c9b3d 100644 --- a/packages/model-typings/src/models/IInstanceStatusModel.ts +++ b/packages/model-typings/src/models/IInstanceStatusModel.ts @@ -1,7 +1,7 @@ import type { IInstanceStatus } from '@rocket.chat/core-typings'; +import type { UpdateResult } from 'mongodb'; import type { IBaseModel } from './IBaseModel'; -import type { UpdateResult } from 'mongodb'; export interface IInstanceStatusModel extends IBaseModel { getActiveInstanceCount(): Promise; From 3b606df209a61b96cf975f6fd4e5d301925dbc66 Mon Sep 17 00:00:00 2001 From: Ricardo Garim Date: Mon, 9 Dec 2024 07:41:51 -0300 Subject: [PATCH 05/11] remove unused logs --- .../server/local-services/instance/service.ts | 22 ++++--------------- .../server/models/raw/InstanceStatus.ts | 4 ++-- packages/instance-status/src/index.ts | 2 -- 3 files changed, 6 insertions(+), 22 deletions(-) diff --git a/apps/meteor/ee/server/local-services/instance/service.ts b/apps/meteor/ee/server/local-services/instance/service.ts index 861ccc4006f5..cd4fcab21b99 100644 --- a/apps/meteor/ee/server/local-services/instance/service.ts +++ b/apps/meteor/ee/server/local-services/instance/service.ts @@ -50,7 +50,10 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe this.isTransporterTCP = typeof transporter !== 'string'; const activeInstances = InstanceStatusRaw.getActiveInstancesAddress() - .then((instances) => instances) + .then((instances) => { + console.info(`Found ${instances.length} active instances`); + return instances; + }) .catch(() => []); this.transporter = this.isTransporterTCP @@ -66,23 +69,6 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe ...getLogger(process.env), }); - if (this.isTransporterTCP) { - this.broker.localBus.on('$node.connected', (node) => { - console.log(`[${this.broker.nodeID}] connected to ${node.node.id}`); - }); - - this.broker.localBus.on('$node.disconnected', (node) => { - console.log(`[${this.broker.nodeID}] disconnected from ${node.node.id}`); - }); - - // setInterval(async () => { - // const nodes: Array<{ id: string }> = await this.broker.call('$node.list', { onlyAvailable: true }); - // const localNode = this.broker.getLocalNodeInfo(); - // console.log(nodes.map((node) => node.id)); - // console.log(localNode.ipList[0], localNode.instanceID, localNode.port); - // }, 10000); - } - this.onEvent('license.module', async ({ module, valid }) => { if (module === 'scalability' && valid) { await this.startBroadcast(); diff --git a/apps/meteor/server/models/raw/InstanceStatus.ts b/apps/meteor/server/models/raw/InstanceStatus.ts index e9264788b2e4..c8763c431781 100644 --- a/apps/meteor/server/models/raw/InstanceStatus.ts +++ b/apps/meteor/server/models/raw/InstanceStatus.ts @@ -19,8 +19,8 @@ export class InstanceStatusRaw extends BaseRaw implements IInst } async getActiveInstancesAddress(): Promise { - const instances = await this.find({}, { projection: { _id: 1, extraInformation: { host: 1, port: 1 } } }).toArray(); - return instances.map((instance) => `${instance.extraInformation.host}:${instance.extraInformation.port}/${instance._id}`); + const instances = await this.find({}, { projection: { _id: 1, extraInformation: { host: 1, tcpPort: 1 } } }).toArray(); + return instances.map((instance) => `${instance.extraInformation.host}:${instance.extraInformation.tcpPort}/${instance._id}`); } async setDocumentHeartbeat(documentId: string): Promise { diff --git a/packages/instance-status/src/index.ts b/packages/instance-status/src/index.ts index 4a24887ecdd0..b2375cb267e4 100644 --- a/packages/instance-status/src/index.ts +++ b/packages/instance-status/src/index.ts @@ -124,8 +124,6 @@ async function unregisterInstance() { async function ping() { const result = await InstanceStatusModel.setDocumentHeartbeat(ID); - console.log(`[${ID}] ping`, result.modifiedCount); - if (result.modifiedCount === 0) { await registerInstance(currentInstance.name, currentInstance.extraInformation); } From d71f2678dea27d681f192dc52ec7445111919c20 Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Mon, 9 Dec 2024 17:35:57 -0300 Subject: [PATCH 06/11] doc is never delayed if db watchers disabled --- apps/meteor/server/startup/watchDb.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/apps/meteor/server/startup/watchDb.ts b/apps/meteor/server/startup/watchDb.ts index bf45af8dcd97..f60f1e5949fd 100644 --- a/apps/meteor/server/startup/watchDb.ts +++ b/apps/meteor/server/startup/watchDb.ts @@ -28,5 +28,8 @@ if (!dbWatchersDisabled) { } export function isLastDocDelayed(): boolean { + if (dbWatchersDisabled) { + return true; + } return watcher.isLastDocDelayed(); } From 9661ea38a62c5b4af4fd55dbe4f09fa2ba6ac522 Mon Sep 17 00:00:00 2001 From: Ricardo Garim Date: Tue, 10 Dec 2024 16:06:33 -0300 Subject: [PATCH 07/11] chore: move db calls to model definition --- .../server/local-services/instance/service.ts | 51 +++++++--------- apps/meteor/ee/server/startup/presence.ts | 6 +- .../server/models/raw/InstanceStatus.ts | 38 +++++++++++- ee/apps/ddp-streamer/src/DDPStreamer.ts | 4 +- packages/instance-status/src/index.ts | 61 ++++--------------- .../src/models/IInstanceStatusModel.ts | 5 +- 6 files changed, 81 insertions(+), 84 deletions(-) diff --git a/apps/meteor/ee/server/local-services/instance/service.ts b/apps/meteor/ee/server/local-services/instance/service.ts index cd4fcab21b99..93d6d0c45e98 100644 --- a/apps/meteor/ee/server/local-services/instance/service.ts +++ b/apps/meteor/ee/server/local-services/instance/service.ts @@ -33,8 +33,6 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe private transporter: Transporters.TCP | Transporters.NATS; - private isTransporterTCP = true; - private broker: ServiceBroker; private troubleshootDisableInstanceBroadcast = false; @@ -42,33 +40,6 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe constructor() { super(); - const transporter = getTransporter({ - transporter: process.env.TRANSPORTER, - port: process.env.TCP_PORT, - extra: process.env.TRANSPORTER_EXTRA, - }); - this.isTransporterTCP = typeof transporter !== 'string'; - - const activeInstances = InstanceStatusRaw.getActiveInstancesAddress() - .then((instances) => { - console.info(`Found ${instances.length} active instances`); - return instances; - }) - .catch(() => []); - - this.transporter = this.isTransporterTCP - ? new Transporters.TCP({ ...transporter, urls: activeInstances }) - : new Transporters.NATS({ url: transporter }); - - this.broker = new ServiceBroker({ - nodeID: InstanceStatus.id(), - transporter: this.transporter, - serializer: new EJSONSerializer(), - heartbeatInterval: defaultPingInterval, - heartbeatTimeout: indexExpire, - ...getLogger(process.env), - }); - this.onEvent('license.module', async ({ module, valid }) => { if (module === 'scalability' && valid) { await this.startBroadcast(); @@ -98,6 +69,28 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe } async created() { + const transporter = getTransporter({ + transporter: process.env.TRANSPORTER, + port: process.env.TCP_PORT, + extra: process.env.TRANSPORTER_EXTRA, + }); + + const activeInstances = InstanceStatusRaw.getActiveInstancesAddress(); + + this.transporter = + typeof transporter !== 'string' + ? new Transporters.TCP({ ...transporter, urls: activeInstances }) + : new Transporters.NATS({ url: transporter }); + + this.broker = new ServiceBroker({ + nodeID: InstanceStatus.id(), + transporter: this.transporter, + serializer: new EJSONSerializer(), + heartbeatInterval: defaultPingInterval, + heartbeatTimeout: indexExpire, + ...getLogger(process.env), + }); + this.broker.createService({ name: 'matrix', events: { diff --git a/apps/meteor/ee/server/startup/presence.ts b/apps/meteor/ee/server/startup/presence.ts index e0756e4b4c59..933399fc6cb6 100644 --- a/apps/meteor/ee/server/startup/presence.ts +++ b/apps/meteor/ee/server/startup/presence.ts @@ -1,12 +1,14 @@ import { Presence } from '@rocket.chat/core-services'; -import { InstanceStatus } from '@rocket.chat/instance-status'; +import { id as instanceId } from '@rocket.chat/instance-status'; +import { InstanceStatus } from '@rocket.chat/models'; import { Accounts } from 'meteor/accounts-base'; import { Meteor } from 'meteor/meteor'; import { throttle } from 'underscore'; // update connections count every 30 seconds const updateConns = throttle(function _updateConns() { - void InstanceStatus.updateConnections(Meteor.server.sessions.size); + const nodeId = instanceId(); + void InstanceStatus.updateConnections(nodeId, Meteor.server.sessions.size); }, 30000); Meteor.startup(() => { diff --git a/apps/meteor/server/models/raw/InstanceStatus.ts b/apps/meteor/server/models/raw/InstanceStatus.ts index c8763c431781..86aed7fbc128 100644 --- a/apps/meteor/server/models/raw/InstanceStatus.ts +++ b/apps/meteor/server/models/raw/InstanceStatus.ts @@ -1,6 +1,6 @@ import type { IInstanceStatus } from '@rocket.chat/core-typings'; import type { IInstanceStatusModel } from '@rocket.chat/model-typings'; -import type { Db, UpdateResult } from 'mongodb'; +import type { Db, ModifyResult, UpdateResult, DeleteResult } from 'mongodb'; import { BaseRaw } from './BaseRaw'; @@ -23,7 +23,43 @@ export class InstanceStatusRaw extends BaseRaw implements IInst return instances.map((instance) => `${instance.extraInformation.host}:${instance.extraInformation.tcpPort}/${instance._id}`); } + async removeInstanceById(_id: IInstanceStatus['_id']): Promise { + return this.deleteOne({ _id }); + } + async setDocumentHeartbeat(documentId: string): Promise { return this.updateOne({ _id: documentId }, { $currentDate: { _updatedAt: true } }); } + + async upsertInstance(instance: Partial): Promise> { + return this.findOneAndUpdate( + { + _id: instance._id, + }, + { + $set: instance, + $currentDate: { + _createdAt: true, + _updatedAt: true, + }, + }, + { + upsert: true, + returnDocument: 'after', + }, + ); + } + + async updateConnections(_id: IInstanceStatus['_id'], conns: number) { + return this.updateOne( + { + _id, + }, + { + $set: { + 'extraInformation.conns': conns, + }, + }, + ); + } } diff --git a/ee/apps/ddp-streamer/src/DDPStreamer.ts b/ee/apps/ddp-streamer/src/DDPStreamer.ts index 1c880929650d..416afa5f5cc2 100644 --- a/ee/apps/ddp-streamer/src/DDPStreamer.ts +++ b/ee/apps/ddp-streamer/src/DDPStreamer.ts @@ -2,7 +2,7 @@ import crypto from 'crypto'; import { MeteorService, Presence, ServiceClass } from '@rocket.chat/core-services'; import { InstanceStatus } from '@rocket.chat/instance-status'; -import { Users } from '@rocket.chat/models'; +import { Users, InstanceStatus as InstanceStatusModel } from '@rocket.chat/models'; import polka from 'polka'; import { throttle } from 'underscore'; import WebSocket from 'ws'; @@ -66,7 +66,7 @@ export class DDPStreamer extends ServiceClass { // update connections count every 30 seconds updateConnections = throttle(() => { - InstanceStatus.updateConnections(this.wss?.clients.size ?? 0); + InstanceStatusModel.updateConnections(InstanceStatus.id(), this.wss?.clients.size ?? 0); }, 30000); async created(): Promise { diff --git a/packages/instance-status/src/index.ts b/packages/instance-status/src/index.ts index b2375cb267e4..10128c302b62 100644 --- a/packages/instance-status/src/index.ts +++ b/packages/instance-status/src/index.ts @@ -1,10 +1,12 @@ +import type { IInstanceStatus } from '@rocket.chat/core-typings'; import { InstanceStatus as InstanceStatusModel } from '@rocket.chat/models'; import { v4 as uuidv4 } from 'uuid'; -export const defaultPingInterval = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10; // default to 10s -export const indexExpire = (parseInt(String(process.env.MULTIPLE_INSTANCES_EXPIRE)) || Math.ceil((defaultPingInterval * 3) / 60)) * 60; +const defaultPingInterval = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10; +const indexExpire = (parseInt(String(process.env.MULTIPLE_INSTANCES_EXPIRE)) || Math.ceil((defaultPingInterval * 3) / 60)) * 60; const ID = uuidv4(); +const id = (): IInstanceStatus['_id'] => ID; const currentInstance = { name: '', @@ -13,10 +15,6 @@ const currentInstance = { let pingInterval: NodeJS.Timeout | null; -export function id() { - return ID; -} - function start() { stop(); pingInterval = setInterval(async () => ping(), defaultPingInterval * 1000); @@ -52,56 +50,22 @@ let createIndexes = async () => { } }); - // eslint-disable-next-line @typescript-eslint/no-empty-function - createIndexes = async () => {}; + createIndexes = async () => { + // noop + }; }; -async function updateInstanceOnDB(instance: any) { - try { - return InstanceStatusModel.findOneAndUpdate( - { _id: ID }, - { - $set: instance, - $currentDate: { _createdAt: true, _updatedAt: true }, - }, - { upsert: true, returnDocument: 'after' }, - ); - } catch (e) { - return e; - } -} - -async function updateConnections(conns: number) { - await InstanceStatusModel.updateOne( - { - _id: ID, - }, - { - $set: { - 'extraInformation.conns': conns, - }, - }, - ); -} - -async function deleteInstanceOnDB() { - try { - await InstanceStatusModel.deleteOne({ _id: ID }); - } catch (e) { - return e; - } -} - -export async function registerInstance(name: string, extraInformation: Record): Promise { +async function registerInstance(name: string, extraInformation: Partial): Promise { createIndexes(); currentInstance.name = name; currentInstance.extraInformation = extraInformation; - const result = await updateInstanceOnDB({ + const result = await InstanceStatusModel.upsertInstance({ + _id: id(), pid: process.pid, name, - ...(extraInformation && { extraInformation }), + extraInformation: extraInformation as IInstanceStatus['extraInformation'], }); start(); @@ -112,7 +76,7 @@ export async function registerInstance(name: string, extraInformation: Record { getActiveInstanceCount(): Promise; getActiveInstancesAddress(): Promise; + removeInstanceById(_id: IInstanceStatus['_id']): Promise; setDocumentHeartbeat(documentId: string): Promise; + upsertInstance(instance: Partial): Promise>; + updateConnections(_id: IInstanceStatus['_id'], conns: number): Promise; } From b9c20d01fbed74b754424ff52a9c8171167a2b9e Mon Sep 17 00:00:00 2001 From: Ricardo Garim Date: Tue, 10 Dec 2024 16:10:33 -0300 Subject: [PATCH 08/11] chore: move db calls to model definition --- apps/meteor/ee/server/startup/presence.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/apps/meteor/ee/server/startup/presence.ts b/apps/meteor/ee/server/startup/presence.ts index 933399fc6cb6..daf0e4b6da48 100644 --- a/apps/meteor/ee/server/startup/presence.ts +++ b/apps/meteor/ee/server/startup/presence.ts @@ -1,14 +1,13 @@ import { Presence } from '@rocket.chat/core-services'; -import { id as instanceId } from '@rocket.chat/instance-status'; -import { InstanceStatus } from '@rocket.chat/models'; +import { InstanceStatus } from '@rocket.chat/instance-status'; +import { InstanceStatus as InstanceStatusModel } from '@rocket.chat/models'; import { Accounts } from 'meteor/accounts-base'; import { Meteor } from 'meteor/meteor'; import { throttle } from 'underscore'; // update connections count every 30 seconds const updateConns = throttle(function _updateConns() { - const nodeId = instanceId(); - void InstanceStatus.updateConnections(nodeId, Meteor.server.sessions.size); + void InstanceStatusModel.updateConnections(InstanceStatus.id(), Meteor.server.sessions.size); }, 30000); Meteor.startup(() => { From 8f3d074a155d6374686dca7f293de68d18225e3e Mon Sep 17 00:00:00 2001 From: Ricardo Garim Date: Tue, 10 Dec 2024 16:30:50 -0300 Subject: [PATCH 09/11] chore: bring back exported default values --- packages/instance-status/src/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/instance-status/src/index.ts b/packages/instance-status/src/index.ts index 10128c302b62..463f391a1b5b 100644 --- a/packages/instance-status/src/index.ts +++ b/packages/instance-status/src/index.ts @@ -2,8 +2,8 @@ import type { IInstanceStatus } from '@rocket.chat/core-typings'; import { InstanceStatus as InstanceStatusModel } from '@rocket.chat/models'; import { v4 as uuidv4 } from 'uuid'; -const defaultPingInterval = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10; -const indexExpire = (parseInt(String(process.env.MULTIPLE_INSTANCES_EXPIRE)) || Math.ceil((defaultPingInterval * 3) / 60)) * 60; +export const defaultPingInterval = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10; +export const indexExpire = (parseInt(String(process.env.MULTIPLE_INSTANCES_EXPIRE)) || Math.ceil((defaultPingInterval * 3) / 60)) * 60; const ID = uuidv4(); const id = (): IInstanceStatus['_id'] => ID; From b55808b1ab431c2fa7093bb1c6dff8ffc5b221e3 Mon Sep 17 00:00:00 2001 From: Ricardo Garim Date: Mon, 16 Dec 2024 18:08:50 -0300 Subject: [PATCH 10/11] chore: move updateConnections to instance-status package --- apps/meteor/ee/server/startup/presence.ts | 3 +-- ee/apps/ddp-streamer/src/DDPStreamer.ts | 3 +-- packages/instance-status/src/index.ts | 9 +++++++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/apps/meteor/ee/server/startup/presence.ts b/apps/meteor/ee/server/startup/presence.ts index daf0e4b6da48..e0756e4b4c59 100644 --- a/apps/meteor/ee/server/startup/presence.ts +++ b/apps/meteor/ee/server/startup/presence.ts @@ -1,13 +1,12 @@ import { Presence } from '@rocket.chat/core-services'; import { InstanceStatus } from '@rocket.chat/instance-status'; -import { InstanceStatus as InstanceStatusModel } from '@rocket.chat/models'; import { Accounts } from 'meteor/accounts-base'; import { Meteor } from 'meteor/meteor'; import { throttle } from 'underscore'; // update connections count every 30 seconds const updateConns = throttle(function _updateConns() { - void InstanceStatusModel.updateConnections(InstanceStatus.id(), Meteor.server.sessions.size); + void InstanceStatus.updateConnections(Meteor.server.sessions.size); }, 30000); Meteor.startup(() => { diff --git a/ee/apps/ddp-streamer/src/DDPStreamer.ts b/ee/apps/ddp-streamer/src/DDPStreamer.ts index 416afa5f5cc2..31f8c7087a85 100644 --- a/ee/apps/ddp-streamer/src/DDPStreamer.ts +++ b/ee/apps/ddp-streamer/src/DDPStreamer.ts @@ -2,7 +2,6 @@ import crypto from 'crypto'; import { MeteorService, Presence, ServiceClass } from '@rocket.chat/core-services'; import { InstanceStatus } from '@rocket.chat/instance-status'; -import { Users, InstanceStatus as InstanceStatusModel } from '@rocket.chat/models'; import polka from 'polka'; import { throttle } from 'underscore'; import WebSocket from 'ws'; @@ -66,7 +65,7 @@ export class DDPStreamer extends ServiceClass { // update connections count every 30 seconds updateConnections = throttle(() => { - InstanceStatusModel.updateConnections(InstanceStatus.id(), this.wss?.clients.size ?? 0); + InstanceStatus.updateConnections(this.wss?.clients.size ?? 0); }, 30000); async created(): Promise { diff --git a/packages/instance-status/src/index.ts b/packages/instance-status/src/index.ts index 463f391a1b5b..86f638e089c3 100644 --- a/packages/instance-status/src/index.ts +++ b/packages/instance-status/src/index.ts @@ -85,6 +85,10 @@ async function unregisterInstance() { } } +async function updateConnections(connections: number) { + await InstanceStatusModel.updateConnections(id(), connections); +} + async function ping() { const result = await InstanceStatusModel.setDocumentHeartbeat(ID); @@ -98,8 +102,9 @@ async function onExit() { } export const InstanceStatus = { - id, - registerInstance, defaultPingInterval, + id, indexExpire, + registerInstance, + updateConnections, }; From 515a70a6d713eb1bceaa9f27587b8e1897bd475b Mon Sep 17 00:00:00 2001 From: Ricardo Garim Date: Mon, 16 Dec 2024 18:10:57 -0300 Subject: [PATCH 11/11] chore: move updateConnections to instance-status package --- ee/apps/ddp-streamer/src/DDPStreamer.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/ee/apps/ddp-streamer/src/DDPStreamer.ts b/ee/apps/ddp-streamer/src/DDPStreamer.ts index 31f8c7087a85..1c880929650d 100644 --- a/ee/apps/ddp-streamer/src/DDPStreamer.ts +++ b/ee/apps/ddp-streamer/src/DDPStreamer.ts @@ -2,6 +2,7 @@ import crypto from 'crypto'; import { MeteorService, Presence, ServiceClass } from '@rocket.chat/core-services'; import { InstanceStatus } from '@rocket.chat/instance-status'; +import { Users } from '@rocket.chat/models'; import polka from 'polka'; import { throttle } from 'underscore'; import WebSocket from 'ws';