diff --git a/api/src/services/queue.service.ts b/api/src/services/queue.service.ts index a347977..c7ec547 100644 --- a/api/src/services/queue.service.ts +++ b/api/src/services/queue.service.ts @@ -112,6 +112,20 @@ class RabbitQueueService implements QueueService { } private async connect(useCache: boolean = true): Promise { + let lastError; + for (let i = 0; i < 10; i++) { + try { + return await this._connect(useCache) + } catch (ex) { + lastError = ex + await new Promise(r => setTimeout(r, 2000)); + } + } + + throw new Error(`could not connect after 10 tries: ${lastError?.message}`) + } + + private async _connect(useCache: boolean = true): Promise { if (useCache && this.channel) { return this.channel; } @@ -122,6 +136,7 @@ class RabbitQueueService implements QueueService { await channel.assertQueue(this.messageGroup); this.channel = channel; } catch (ex) { + await this.handleException(ex); throw new Error(`could not connect to queue service: ${ex}`); } @@ -135,6 +150,7 @@ class RabbitQueueService implements QueueService { await channel.close(); return true; } catch (ex) { + await this.handleException(ex); return false; } } @@ -146,26 +162,37 @@ class RabbitQueueService implements QueueService { const messageSent = channel.sendToQueue(this.messageGroup, Buffer.from(JSON.stringify(message)), { headers }); resolve(messageSent); } catch (ex) { + this.handleException(ex); reject(false); } }); } public async subscribe(callback: Function): Promise { - const channel = await this.connect(); + const channel = await this.connect(false); const onConsume = async message => { if (message) { try { await callback(message); channel.ack(message); } catch (ex) { + this.handleException(ex); channel.nack(message); throw ex; } } }; + const reconnect = () => setTimeout(() => this.subscribe(callback), 1000); + channel.on("close", reconnect); + channel.on("error", reconnect); channel.consume(this.messageGroup, onConsume); } + + private async handleException(ex: Error): Promise { + if (ex.message == "Channel closed") { + this.channel = null; + } + } } export { createQueueService };