diff --git a/apps/meteor/ee/server/local-services/instance/service.ts b/apps/meteor/ee/server/local-services/instance/service.ts index a7e921511a62..93d6d0c45e98 100644 --- a/apps/meteor/ee/server/local-services/instance/service.ts +++ b/apps/meteor/ee/server/local-services/instance/service.ts @@ -1,7 +1,7 @@ import os from 'os'; import { License, ServiceClassInternal } from '@rocket.chat/core-services'; -import { InstanceStatus } from '@rocket.chat/instance-status'; +import { InstanceStatus, defaultPingInterval, indexExpire } from '@rocket.chat/instance-status'; import { InstanceStatus as InstanceStatusRaw } from '@rocket.chat/models'; import EJSON from 'ejson'; import type { BrokerNode } from 'moleculer'; @@ -33,8 +33,6 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe private transporter: Transporters.TCP | Transporters.NATS; - private isTransporterTCP = true; - private broker: ServiceBroker; private troubleshootDisableInstanceBroadcast = false; @@ -42,28 +40,6 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe constructor() { super(); - const tx = getTransporter({ transporter: process.env.TRANSPORTER, port: process.env.TCP_PORT, extra: process.env.TRANSPORTER_EXTRA }); - if (typeof tx === 'string') { - this.transporter = new Transporters.NATS({ url: tx }); - this.isTransporterTCP = false; - } else { - this.transporter = new Transporters.TCP(tx); - } - - if (this.isTransporterTCP) { - this.onEvent('watch.instanceStatus', async ({ clientAction, data }): Promise => { - if (clientAction === 'removed') { - (this.broker.transit?.tx as any).nodes.disconnected(data?._id, false); - (this.broker.transit?.tx as any).nodes.nodes.delete(data?._id); - return; - } - - if (clientAction === 'inserted' && data?.extraInformation?.tcpPort) { - this.connectNode(data); - } - }); - } - this.onEvent('license.module', async ({ module, valid }) => { if (module === 'scalability' && valid) { await this.startBroadcast(); @@ -93,17 +69,28 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe } async created() { + const transporter = getTransporter({ + transporter: process.env.TRANSPORTER, + port: process.env.TCP_PORT, + extra: process.env.TRANSPORTER_EXTRA, + }); + + const activeInstances = InstanceStatusRaw.getActiveInstancesAddress(); + + this.transporter = + typeof transporter !== 'string' + ? new Transporters.TCP({ ...transporter, urls: activeInstances }) + : new Transporters.NATS({ url: transporter }); + this.broker = new ServiceBroker({ nodeID: InstanceStatus.id(), transporter: this.transporter, serializer: new EJSONSerializer(), + heartbeatInterval: defaultPingInterval, + heartbeatTimeout: indexExpire, ...getLogger(process.env), }); - if ((this.broker.transit?.tx as any)?.nodes?.localNode) { - (this.broker.transit?.tx as any).nodes.localNode.ipList = [hostIP]; - } - this.broker.createService({ name: 'matrix', events: { @@ -176,31 +163,6 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe this.broadcastStarted = true; StreamerCentral.on('broadcast', this.sendBroadcast.bind(this)); - - if (this.isTransporterTCP) { - await InstanceStatusRaw.find( - { - 'extraInformation.tcpPort': { - $exists: true, - }, - }, - { - sort: { - _createdAt: -1, - }, - }, - ).forEach(this.connectNode.bind(this)); - } - } - - private connectNode(record: any) { - if (record._id === InstanceStatus.id()) { - return; - } - - const { host, tcpPort } = record.extraInformation; - - (this.broker?.transit?.tx as any).addOfflineNode(record._id, host, tcpPort); } private sendBroadcast(streamName: string, eventName: string, args: unknown[]) { diff --git a/apps/meteor/ee/server/startup/presence.ts b/apps/meteor/ee/server/startup/presence.ts index e0756e4b4c59..daf0e4b6da48 100644 --- a/apps/meteor/ee/server/startup/presence.ts +++ b/apps/meteor/ee/server/startup/presence.ts @@ -1,12 +1,13 @@ import { Presence } from '@rocket.chat/core-services'; import { InstanceStatus } from '@rocket.chat/instance-status'; +import { InstanceStatus as InstanceStatusModel } from '@rocket.chat/models'; import { Accounts } from 'meteor/accounts-base'; import { Meteor } from 'meteor/meteor'; import { throttle } from 'underscore'; // update connections count every 30 seconds const updateConns = throttle(function _updateConns() { - void InstanceStatus.updateConnections(Meteor.server.sessions.size); + void InstanceStatusModel.updateConnections(InstanceStatus.id(), Meteor.server.sessions.size); }, 30000); Meteor.startup(() => { diff --git a/apps/meteor/server/database/watchCollections.ts b/apps/meteor/server/database/watchCollections.ts index 6dd173d5d323..b7bdf13c28b8 100644 --- a/apps/meteor/server/database/watchCollections.ts +++ b/apps/meteor/server/database/watchCollections.ts @@ -29,10 +29,11 @@ const onlyCollections = DBWATCHER_ONLY_COLLECTIONS.split(',') .filter(Boolean); export function getWatchCollections(): string[] { - const collections = [InstanceStatus.getCollectionName()]; + const collections = []; // add back to the list of collections in case db watchers are enabled if (!dbWatchersDisabled) { + collections.push(InstanceStatus.getCollectionName()); collections.push(Users.getCollectionName()); collections.push(Messages.getCollectionName()); collections.push(LivechatInquiry.getCollectionName()); diff --git a/apps/meteor/server/models/raw/InstanceStatus.ts b/apps/meteor/server/models/raw/InstanceStatus.ts index 039d8fa18afc..86aed7fbc128 100644 --- a/apps/meteor/server/models/raw/InstanceStatus.ts +++ b/apps/meteor/server/models/raw/InstanceStatus.ts @@ -1,6 +1,6 @@ import type { IInstanceStatus } from '@rocket.chat/core-typings'; import type { IInstanceStatusModel } from '@rocket.chat/model-typings'; -import type { Db } from 'mongodb'; +import type { Db, ModifyResult, UpdateResult, DeleteResult } from 'mongodb'; import { BaseRaw } from './BaseRaw'; @@ -17,4 +17,49 @@ export class InstanceStatusRaw extends BaseRaw implements IInst async getActiveInstanceCount(): Promise { return this.col.countDocuments({ _updatedAt: { $gt: new Date(Date.now() - process.uptime() * 1000 - 2000) } }); } + + async getActiveInstancesAddress(): Promise { + const instances = await this.find({}, { projection: { _id: 1, extraInformation: { host: 1, tcpPort: 1 } } }).toArray(); + return instances.map((instance) => `${instance.extraInformation.host}:${instance.extraInformation.tcpPort}/${instance._id}`); + } + + async removeInstanceById(_id: IInstanceStatus['_id']): Promise { + return this.deleteOne({ _id }); + } + + async setDocumentHeartbeat(documentId: string): Promise { + return this.updateOne({ _id: documentId }, { $currentDate: { _updatedAt: true } }); + } + + async upsertInstance(instance: Partial): Promise> { + return this.findOneAndUpdate( + { + _id: instance._id, + }, + { + $set: instance, + $currentDate: { + _createdAt: true, + _updatedAt: true, + }, + }, + { + upsert: true, + returnDocument: 'after', + }, + ); + } + + async updateConnections(_id: IInstanceStatus['_id'], conns: number) { + return this.updateOne( + { + _id, + }, + { + $set: { + 'extraInformation.conns': conns, + }, + }, + ); + } } diff --git a/apps/meteor/server/startup/watchDb.ts b/apps/meteor/server/startup/watchDb.ts index 5a0dd0417f63..f60f1e5949fd 100644 --- a/apps/meteor/server/startup/watchDb.ts +++ b/apps/meteor/server/startup/watchDb.ts @@ -1,4 +1,4 @@ -import { api } from '@rocket.chat/core-services'; +import { api, dbWatchersDisabled } from '@rocket.chat/core-services'; import { Logger } from '@rocket.chat/logger'; import { MongoInternals } from 'meteor/mongo'; @@ -19,12 +19,17 @@ watcher.watch().catch((err: Error) => { process.exit(1); }); -setInterval(function _checkDatabaseWatcher() { - if (watcher.isLastDocDelayed()) { - SystemLogger.error('No real time data received recently'); - } -}, 20000); +if (!dbWatchersDisabled) { + setInterval(function _checkDatabaseWatcher() { + if (watcher.isLastDocDelayed()) { + SystemLogger.error('No real time data received recently'); + } + }, 20000); +} export function isLastDocDelayed(): boolean { + if (dbWatchersDisabled) { + return true; + } return watcher.isLastDocDelayed(); } diff --git a/ee/apps/ddp-streamer/src/DDPStreamer.ts b/ee/apps/ddp-streamer/src/DDPStreamer.ts index 1c880929650d..416afa5f5cc2 100644 --- a/ee/apps/ddp-streamer/src/DDPStreamer.ts +++ b/ee/apps/ddp-streamer/src/DDPStreamer.ts @@ -2,7 +2,7 @@ import crypto from 'crypto'; import { MeteorService, Presence, ServiceClass } from '@rocket.chat/core-services'; import { InstanceStatus } from '@rocket.chat/instance-status'; -import { Users } from '@rocket.chat/models'; +import { Users, InstanceStatus as InstanceStatusModel } from '@rocket.chat/models'; import polka from 'polka'; import { throttle } from 'underscore'; import WebSocket from 'ws'; @@ -66,7 +66,7 @@ export class DDPStreamer extends ServiceClass { // update connections count every 30 seconds updateConnections = throttle(() => { - InstanceStatus.updateConnections(this.wss?.clients.size ?? 0); + InstanceStatusModel.updateConnections(InstanceStatus.id(), this.wss?.clients.size ?? 0); }, 30000); async created(): Promise { diff --git a/packages/instance-status/src/index.ts b/packages/instance-status/src/index.ts index 38109e01626d..463f391a1b5b 100644 --- a/packages/instance-status/src/index.ts +++ b/packages/instance-status/src/index.ts @@ -1,26 +1,39 @@ -// import { IInstanceStatus } from '@rocket.chat/core-typings'; -import { EventEmitter } from 'events'; - +import type { IInstanceStatus } from '@rocket.chat/core-typings'; import { InstanceStatus as InstanceStatusModel } from '@rocket.chat/models'; -import { tracerSpan } from '@rocket.chat/tracing'; import { v4 as uuidv4 } from 'uuid'; -const events = new EventEmitter(); +export const defaultPingInterval = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10; +export const indexExpire = (parseInt(String(process.env.MULTIPLE_INSTANCES_EXPIRE)) || Math.ceil((defaultPingInterval * 3) / 60)) * 60; + +const ID = uuidv4(); +const id = (): IInstanceStatus['_id'] => ID; -const defaultPingInterval = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10; // default to 10s +const currentInstance = { + name: '', + extraInformation: {}, +}; -// if not set via env var ensures at least 3 ticks before expiring (multiple of 60s) -const indexExpire = (parseInt(String(process.env.MULTIPLE_INSTANCES_EXPIRE)) || Math.ceil((defaultPingInterval * 3) / 60)) * 60; +let pingInterval: NodeJS.Timeout | null; + +function start() { + stop(); + pingInterval = setInterval(async () => ping(), defaultPingInterval * 1000); +} + +function stop() { + if (!pingInterval) { + return; + } + clearInterval(pingInterval); + pingInterval = null; +} let createIndexes = async () => { await InstanceStatusModel.col .indexes() - .catch(function () { - // the collection should not exists yet, return empty then - return []; - }) - .then(function (result) { - return result.some(function (index) { + .catch(() => []) + .then((result) => + result.some((index) => { if (index.key && index.key._updatedAt === 1) { if (index.expireAfterSeconds !== indexExpire) { InstanceStatusModel.col.dropIndex(index.name); @@ -29,115 +42,51 @@ let createIndexes = async () => { return true; } return false; - }); - }) - .then(function (created) { + }), + ) + .then((created) => { if (!created) { InstanceStatusModel.col.createIndex({ _updatedAt: 1 }, { expireAfterSeconds: indexExpire }); } }); createIndexes = async () => { - // no op + // noop }; }; -const ID = uuidv4(); - -function id() { - return ID; -} - -const currentInstance = { - name: '', - extraInformation: {}, -}; - -async function registerInstance(name: string, extraInformation: Record): Promise { +async function registerInstance(name: string, extraInformation: Partial): Promise { createIndexes(); currentInstance.name = name; currentInstance.extraInformation = extraInformation; - // if (ID === undefined || ID === null) { - // return console.error('[multiple-instances-status] only can be called after Meteor.startup'); - // } - - const instance = { - $set: { - pid: process.pid, - name, - ...(extraInformation && { extraInformation }), - }, - $currentDate: { - _createdAt: true, - _updatedAt: true, - }, - }; - - try { - await InstanceStatusModel.updateOne({ _id: ID }, instance as any, { upsert: true }); - - const result = await InstanceStatusModel.findOne({ _id: ID }); - - start(); + const result = await InstanceStatusModel.upsertInstance({ + _id: id(), + pid: process.pid, + name, + extraInformation: extraInformation as IInstanceStatus['extraInformation'], + }); - events.emit('registerInstance', result, instance); + start(); + process.on('exit', onExit); - process.on('exit', onExit); - - return result; - } catch (e) { - return e; - } + return result; } async function unregisterInstance() { try { - const result = await InstanceStatusModel.deleteOne({ _id: ID }); + const result = await InstanceStatusModel.removeInstanceById(id()); stop(); - - events.emit('unregisterInstance', ID); - process.removeListener('exit', onExit); - return result; } catch (e) { return e; } } -let pingInterval: NodeJS.Timeout | null; - -function start(interval?: number) { - stop(); - - interval = interval || defaultPingInterval; - - pingInterval = setInterval(async function () { - await tracerSpan('InstanceStatus.ping', {}, () => ping()); - }, interval * 1000); -} - -function stop() { - if (!pingInterval) { - return; - } - clearInterval(pingInterval); - pingInterval = null; -} - async function ping() { - const result = await InstanceStatusModel.updateOne( - { - _id: ID, - }, - { - $currentDate: { - _updatedAt: true, - }, - }, - ); + const result = await InstanceStatusModel.setDocumentHeartbeat(ID); if (result.modifiedCount === 0) { await registerInstance(currentInstance.name, currentInstance.extraInformation); @@ -148,21 +97,9 @@ async function onExit() { await unregisterInstance(); } -async function updateConnections(conns: number) { - await InstanceStatusModel.updateOne( - { - _id: ID, - }, - { - $set: { - 'extraInformation.conns': conns, - }, - }, - ); -} - export const InstanceStatus = { id, registerInstance, - updateConnections, + defaultPingInterval, + indexExpire, }; diff --git a/packages/model-typings/src/models/IInstanceStatusModel.ts b/packages/model-typings/src/models/IInstanceStatusModel.ts index dd6aa4d76bc7..f514ba074666 100644 --- a/packages/model-typings/src/models/IInstanceStatusModel.ts +++ b/packages/model-typings/src/models/IInstanceStatusModel.ts @@ -1,7 +1,13 @@ import type { IInstanceStatus } from '@rocket.chat/core-typings'; +import type { DeleteResult, ModifyResult, UpdateResult } from 'mongodb'; import type { IBaseModel } from './IBaseModel'; export interface IInstanceStatusModel extends IBaseModel { getActiveInstanceCount(): Promise; + getActiveInstancesAddress(): Promise; + removeInstanceById(_id: IInstanceStatus['_id']): Promise; + setDocumentHeartbeat(documentId: string): Promise; + upsertInstance(instance: Partial): Promise>; + updateConnections(_id: IInstanceStatus['_id'], conns: number): Promise; }