Skip to content

Commit

Permalink
feat: make queue service reconnect on queue failure (#31)
Browse files Browse the repository at this point in the history
* feat: make queue service reconnect on queue failure

* clean up
  • Loading branch information
mathnogueira authored Nov 16, 2023
1 parent ea9d2b1 commit 7420847
Showing 1 changed file with 28 additions and 1 deletion.
29 changes: 28 additions & 1 deletion api/src/services/queue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,20 @@ class RabbitQueueService<T> implements QueueService<T> {
}

private async connect(useCache: boolean = true): Promise<ampqlib.Channel> {
let lastError;
for (let i = 0; i < 10; i++) {
try {
return await this._connect(useCache)
} catch (ex) {
lastError = ex
await new Promise(r => setTimeout(r, 2000));
}
}

throw new Error(`could not connect after 10 tries: ${lastError?.message}`)
}

private async _connect(useCache: boolean = true): Promise<ampqlib.Channel> {
if (useCache && this.channel) {
return this.channel;
}
Expand All @@ -122,6 +136,7 @@ class RabbitQueueService<T> implements QueueService<T> {
await channel.assertQueue(this.messageGroup);
this.channel = channel;
} catch (ex) {
await this.handleException(ex);
throw new Error(`could not connect to queue service: ${ex}`);
}

Expand All @@ -135,6 +150,7 @@ class RabbitQueueService<T> implements QueueService<T> {
await channel.close();
return true;
} catch (ex) {
await this.handleException(ex);
return false;
}
}
Expand All @@ -146,26 +162,37 @@ class RabbitQueueService<T> implements QueueService<T> {
const messageSent = channel.sendToQueue(this.messageGroup, Buffer.from(JSON.stringify(message)), { headers });
resolve(messageSent);
} catch (ex) {
this.handleException(ex);
reject(false);
}
});
}

public async subscribe(callback: Function): Promise<void> {
const channel = await this.connect();
const channel = await this.connect(false);
const onConsume = async message => {
if (message) {
try {
await callback(message);
channel.ack(message);
} catch (ex) {
this.handleException(ex);
channel.nack(message);
throw ex;
}
}
};
const reconnect = () => setTimeout(() => this.subscribe(callback), 1000);
channel.on("close", reconnect);
channel.on("error", reconnect);
channel.consume(this.messageGroup, onConsume);
}

private async handleException(ex: Error): Promise<void> {
if (ex.message == "Channel closed") {
this.channel = null;
}
}
}

export { createQueueService };

0 comments on commit 7420847

Please sign in to comment.