Skip to content

Commit

Permalink
feat: make queue service reconnect on queue failure
Browse files Browse the repository at this point in the history
  • Loading branch information
mathnogueira committed Nov 16, 2023
1 parent ea9d2b1 commit fc4535e
Showing 1 changed file with 30 additions and 1 deletion.
31 changes: 30 additions & 1 deletion api/src/services/queue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,33 @@ class RabbitQueueService<T> implements QueueService<T> {
}

private async connect(useCache: boolean = true): Promise<ampqlib.Channel> {
let lastError;
for (let i = 0; i < 10; i++) {
try {
return await this._connect(useCache)
} catch (ex) {
lastError = ex
await new Promise(r => setTimeout(r, 2000));
}
}

throw new Error(`could not connect after 10 tries: ${lastError?.message}`)
}

private async _connect(useCache: boolean = true): Promise<ampqlib.Channel> {
if (useCache && this.channel) {
return this.channel;
}

this.channel?.recover

try {
const connection = await ampqlib.connect(`amqp://${RABBITMQ_HOST}`);
const channel = await connection.createChannel();
await channel.assertQueue(this.messageGroup);
this.channel = channel;
} catch (ex) {
await this.handleException(ex);
throw new Error(`could not connect to queue service: ${ex}`);
}

Expand All @@ -135,6 +152,7 @@ class RabbitQueueService<T> implements QueueService<T> {
await channel.close();
return true;
} catch (ex) {
await this.handleException(ex);
return false;
}
}
Expand All @@ -146,26 +164,37 @@ class RabbitQueueService<T> implements QueueService<T> {
const messageSent = channel.sendToQueue(this.messageGroup, Buffer.from(JSON.stringify(message)), { headers });
resolve(messageSent);
} catch (ex) {
this.handleException(ex);
reject(false);
}
});
}

public async subscribe(callback: Function): Promise<void> {
const channel = await this.connect();
const channel = await this.connect(false);
const onConsume = async message => {
if (message) {
try {
await callback(message);
channel.ack(message);
} catch (ex) {
this.handleException(ex);
channel.nack(message);
throw ex;
}
}
};
const reconnect = () => setTimeout(() => this.subscribe(callback), 1000);
channel.on("close", reconnect);
channel.on("error", reconnect);
channel.consume(this.messageGroup, onConsume);
}

private async handleException(ex: Error): Promise<void> {
if (ex.message == "Channel closed") {
this.channel = null;
}
}
}

export { createQueueService };

0 comments on commit fc4535e

Please sign in to comment.