From fc4535ec5bb4a4d1caeda9e253b8644b93c547e3 Mon Sep 17 00:00:00 2001 From: Matheus Nogueira Date: Thu, 16 Nov 2023 15:55:12 -0300 Subject: [PATCH 1/2] feat: make queue service reconnect on queue failure --- api/src/services/queue.service.ts | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/api/src/services/queue.service.ts b/api/src/services/queue.service.ts index a347977..aefea5f 100644 --- a/api/src/services/queue.service.ts +++ b/api/src/services/queue.service.ts @@ -112,16 +112,33 @@ class RabbitQueueService implements QueueService { } private async connect(useCache: boolean = true): Promise { + let lastError; + for (let i = 0; i < 10; i++) { + try { + return await this._connect(useCache) + } catch (ex) { + lastError = ex + await new Promise(r => setTimeout(r, 2000)); + } + } + + throw new Error(`could not connect after 10 tries: ${lastError?.message}`) + } + + private async _connect(useCache: boolean = true): Promise { if (useCache && this.channel) { return this.channel; } + this.channel?.recover + try { const connection = await ampqlib.connect(`amqp://${RABBITMQ_HOST}`); const channel = await connection.createChannel(); await channel.assertQueue(this.messageGroup); this.channel = channel; } catch (ex) { + await this.handleException(ex); throw new Error(`could not connect to queue service: ${ex}`); } @@ -135,6 +152,7 @@ class RabbitQueueService implements QueueService { await channel.close(); return true; } catch (ex) { + await this.handleException(ex); return false; } } @@ -146,26 +164,37 @@ class RabbitQueueService implements QueueService { const messageSent = channel.sendToQueue(this.messageGroup, Buffer.from(JSON.stringify(message)), { headers }); resolve(messageSent); } catch (ex) { + this.handleException(ex); reject(false); } }); } public async subscribe(callback: Function): Promise { - const channel = await this.connect(); + const channel = await this.connect(false); const onConsume = async message => { if (message) { try { await callback(message); channel.ack(message); } catch (ex) { + this.handleException(ex); channel.nack(message); throw ex; } } }; + const reconnect = () => setTimeout(() => this.subscribe(callback), 1000); + channel.on("close", reconnect); + channel.on("error", reconnect); channel.consume(this.messageGroup, onConsume); } + + private async handleException(ex: Error): Promise { + if (ex.message == "Channel closed") { + this.channel = null; + } + } } export { createQueueService }; From e1b7ca3ac8bc9ad0862c4256fa151f10f0f79205 Mon Sep 17 00:00:00 2001 From: Matheus Nogueira Date: Thu, 16 Nov 2023 15:55:53 -0300 Subject: [PATCH 2/2] clean up --- api/src/services/queue.service.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/api/src/services/queue.service.ts b/api/src/services/queue.service.ts index aefea5f..c7ec547 100644 --- a/api/src/services/queue.service.ts +++ b/api/src/services/queue.service.ts @@ -130,8 +130,6 @@ class RabbitQueueService implements QueueService { return this.channel; } - this.channel?.recover - try { const connection = await ampqlib.connect(`amqp://${RABBITMQ_HOST}`); const channel = await connection.createChannel();