diff --git a/src/channel/guaranteedChannel.ts b/src/channel/guaranteedChannel.ts index a2f264ca..a65b47e0 100644 --- a/src/channel/guaranteedChannel.ts +++ b/src/channel/guaranteedChannel.ts @@ -99,7 +99,7 @@ export class GuaranteedChannel implements OutputChannel { () => { if (this.closed) { // Cancel delay immediately when the channel is closed - abort(MessageDeliveryError.nonRetryable('Connection deliberately closed.')); + abort(MessageDeliveryError.retryable('Connection deliberately closed.')); } }, 0, diff --git a/src/channel/queuedChannel.ts b/src/channel/queuedChannel.ts index fd68a0d1..e87b718d 100644 --- a/src/channel/queuedChannel.ts +++ b/src/channel/queuedChannel.ts @@ -39,18 +39,10 @@ export class QueuedChannel implements OutputChannel { } if (this.pending === undefined) { - this.pending = this.queue.isEmpty() - ? Promise.resolve() - : Promise.reject(MessageDeliveryError.retryable('The queue must be flushed.')); + this.pending = this.requeue(); } - this.enqueue(message); - - this.pending = this.pending.then( - () => this.channel - .publish(message) - .then(this.dequeue.bind(this)), - ); + this.pending = this.chainNext(this.pending, message, true); return this.pending; } @@ -86,16 +78,55 @@ export class QueuedChannel implements OutputChannel { this.logger.debug(`Queue length: ${length}`); for (const message of this.queue.all()) { - this.pending = this.pending.then( - () => this.channel - .publish(message) - .then(this.dequeue.bind(this)), - ); + this.pending = this.chainNext(this.pending, message); } return this.pending; } + private async chainNext(previous: Promise, message: T, enqueue = false): Promise { + if (enqueue) { + this.enqueue(message); + } + + try { + await previous; + } catch (error) { + if (error instanceof MessageDeliveryError && error.retryable) { + // If the previous message failed to deliver, requeue all messages + // including the current one that was just enqueued + return this.requeue(); + } + + throw error; + } + + if (this.closed) { + throw MessageDeliveryError.retryable('Connection deliberately closed.'); + } + + try { + const result = await this.channel.publish(message); + + this.dequeue(); + + return result; + } catch (error) { + if (!(error instanceof MessageDeliveryError) || !error.retryable) { + // Discard the message if it's non-retryable + this.dequeue(); + + if (!enqueue) { + // If the message was not enqueued, suppress the error + // so that the next message in the queue can be immediately + return; + } + } + + throw error; + } + } + public async close(): Promise { this.closed = true; diff --git a/src/channel/retryChannel.ts b/src/channel/retryChannel.ts index daec4b0c..24687013 100644 --- a/src/channel/retryChannel.ts +++ b/src/channel/retryChannel.ts @@ -42,7 +42,7 @@ export class RetryChannel implements OutputChannel { while (this.retryPolicy.shouldRetry(attempt, message, error)) { if (this.closed) { - throw MessageDeliveryError.nonRetryable('Connection deliberately closed.'); + throw MessageDeliveryError.retryable('Connection deliberately closed.'); } const delay = this.retryPolicy.getDelay(attempt); @@ -59,7 +59,7 @@ export class RetryChannel implements OutputChannel { // Cancel delay immediately when the channel is closed window.clearInterval(closeWatcher); - reject(MessageDeliveryError.nonRetryable('Connection deliberately closed.')); + reject(MessageDeliveryError.retryable('Connection deliberately closed.')); } }, 0, diff --git a/test/channel/guaranteedChannel.test.ts b/test/channel/guaranteedChannel.test.ts index 59aaa3cc..ab2d858a 100644 --- a/test/channel/guaranteedChannel.test.ts +++ b/test/channel/guaranteedChannel.test.ts @@ -66,7 +66,7 @@ describe('A guaranteed channel', () => { await channel.close(); await expect(promise).rejects.toThrowWithMessage(MessageDeliveryError, 'Connection deliberately closed.'); - await expect(promise).rejects.toHaveProperty('retryable', false); + await expect(promise).rejects.toHaveProperty('retryable', true); }); it('should close the output channel on close', async () => { diff --git a/test/channel/queuedChannel.test.ts b/test/channel/queuedChannel.test.ts index deae745a..4eb22239 100644 --- a/test/channel/queuedChannel.test.ts +++ b/test/channel/queuedChannel.test.ts @@ -1,12 +1,13 @@ import {InMemoryQueue, CapacityRestrictedQueue} from '../../src/queue'; import {QueuedChannel, OutputChannel, MessageDeliveryError} from '../../src/channel'; +import {Logger} from '../../src/logging'; describe('A queued channel', () => { afterEach(() => { jest.restoreAllMocks(); }); - it('should resume flushing from the last failed message', async () => { + it('should resume flushing from the oldest message', async () => { const outputChannel: OutputChannel = { close: jest.fn().mockResolvedValue(undefined), publish: jest.fn() @@ -14,21 +15,29 @@ describe('A queued channel', () => { .mockRejectedValueOnce(new MessageDeliveryError('Rejected', true)) .mockResolvedValue(undefined), }; - const channel = new QueuedChannel(outputChannel, new InMemoryQueue('foo', 'bar')); + const queue = new InMemoryQueue('foo', 'bar'); + const channel = new QueuedChannel(outputChannel, queue); + + await expect(channel.flush()).rejects.toThrowWithMessage(MessageDeliveryError, 'Rejected'); - await expect(channel.flush()).rejects.toEqual(expect.any(Error)); expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo'); expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar'); + expect(outputChannel.publish).toHaveBeenCalledTimes(2); + expect(queue.isEmpty()).toBe(false); + expect(queue.peek()).toBe('bar'); + await channel.flush(); - expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'bar'); + expect(outputChannel.publish).toHaveBeenCalledTimes(3); + expect(queue.isEmpty()).toBe(true); - await channel.flush(); + await expect(channel.publish('baz')).resolves.toBeUndefined(); - expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'bar'); + expect(outputChannel.publish).toHaveBeenNthCalledWith(4, 'baz'); - expect(outputChannel.publish).toHaveBeenCalledTimes(3); + expect(outputChannel.publish).toHaveBeenCalledTimes(4); + expect(queue.isEmpty()).toBe(true); }); it('should do nothing when flushing an empty queue', async () => { @@ -123,30 +132,30 @@ describe('A queued channel', () => { await expect(promise).rejects.toHaveProperty('retryable', false); }); - it('should fail to publish messages if queue has pending messages', async () => { + it('should automatically requeue messages on the first publish', async () => { const outputChannel: OutputChannel = { close: jest.fn().mockResolvedValue(undefined), publish: jest.fn().mockResolvedValue(undefined), }; - const channel = new QueuedChannel(outputChannel, new InMemoryQueue('foo')); - - const promise = channel.publish('bar'); - await expect(promise).rejects.toThrowWithMessage(MessageDeliveryError, 'The queue must be flushed.'); - await expect(promise).rejects.toHaveProperty('retryable', true); + const queue = new InMemoryQueue('foo', 'bar'); + const channel = new QueuedChannel(outputChannel, queue); - await channel.flush(); + await expect(channel.publish('baz')).resolves.toBeUndefined(); expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo'); expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar'); + expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'baz'); - await channel.publish('baz'); + expect(queue.isEmpty()).toBe(true); - expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'baz'); - expect(outputChannel.publish).toHaveBeenCalledTimes(3); + await channel.publish('qux'); + + expect(outputChannel.publish).toHaveBeenNthCalledWith(4, 'qux'); + expect(outputChannel.publish).toHaveBeenCalledTimes(4); }); - it('should publish messages if queue has no pending messages', async () => { + it('should publish messages immediately if queue has no pending messages', async () => { const outputChannel: OutputChannel = { close: jest.fn().mockResolvedValue(undefined), publish: jest.fn().mockResolvedValue(undefined), @@ -166,16 +175,169 @@ describe('A queued channel', () => { expect(outputChannel.publish).toHaveBeenCalledTimes(2); }); - it('should close the output channel and wait for pending messages', async () => { + it('should require a flush after a failed, non-retryable message', async () => { + const logger: Logger = { + debug: jest.fn(), + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + }; + const outputChannel: OutputChannel = { close: jest.fn().mockResolvedValue(undefined), - publish: jest.fn().mockResolvedValue(undefined), + publish: jest.fn() + .mockImplementationOnce( + () => new Promise((_, reject) => { + setTimeout(() => reject(new Error('Failed')), 1); + }), + ) + .mockResolvedValue(undefined), }; - const channel = new QueuedChannel(outputChannel, new InMemoryQueue('foo')); + + const channel = new QueuedChannel(outputChannel, new InMemoryQueue(), logger); + + await expect(channel.publish('foo')).rejects.toEqual(expect.any(Error)); + await expect(channel.publish('bar')).rejects.toEqual(expect.any(Error)); + + expect(outputChannel.publish).toHaveBeenCalledTimes(1); + expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo'); + + await channel.flush(); + + expect(outputChannel.publish).toHaveBeenCalledTimes(2); + expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar'); + }); + + it('should resume processing on re-enqueued message errors', async () => { + const outputChannel: OutputChannel = { + close: jest.fn().mockResolvedValue(undefined), + publish: jest.fn() + .mockRejectedValueOnce(new Error('Failed')) + .mockRejectedValueOnce(new Error('Failed')) + .mockResolvedValue(undefined), + }; + + const queue = new InMemoryQueue('foo', 'bar'); + const channel = new QueuedChannel(outputChannel, queue); + + await expect(channel.publish('baz')).resolves.toBeUndefined(); + + expect(outputChannel.publish).toHaveBeenCalledTimes(3); + expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo'); + expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar'); + expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'baz'); + + expect(queue.isEmpty()).toBe(true); + }); + + it('should flush all non-retryable messages even if an error occurs', async () => { + const outputChannel: OutputChannel = { + close: jest.fn().mockResolvedValue(undefined), + publish: jest.fn() + .mockRejectedValueOnce(new Error('Failed')) + .mockRejectedValueOnce(new Error('Failed')), + }; + + const queue = new InMemoryQueue('foo', 'bar'); + const channel = new QueuedChannel(outputChannel, queue); await expect(channel.flush()).resolves.toBeUndefined(); - await channel.close(); + expect(outputChannel.publish).toHaveBeenCalledTimes(2); + expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo'); + expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar'); + + expect(queue.isEmpty()).toBe(true); + }); + + it('should not dequeue messages if an retryable error occurs', async () => { + const outputChannel: OutputChannel = { + close: jest.fn().mockResolvedValue(undefined), + publish: jest.fn() + .mockResolvedValueOnce(undefined) + .mockRejectedValueOnce(new MessageDeliveryError('Rejected', true)) + .mockResolvedValueOnce(undefined), + }; + + const queue = new InMemoryQueue('foo'); + const channel = new QueuedChannel(outputChannel, queue); + + const promise = channel.publish('bar'); + + await expect(promise).rejects.toEqual(expect.any(MessageDeliveryError)); + + expect(outputChannel.publish).toHaveBeenCalledTimes(2); + expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo'); + expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar'); + + expect(queue.length()).toBe(1); + expect(queue.peek()).toBe('bar'); + + await expect(channel.flush()).resolves.toBeUndefined(); + + expect(outputChannel.publish).toHaveBeenCalledTimes(3); + expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'bar'); + + expect(queue.isEmpty()).toBe(true); + }); + + it('should requeue messages if a retryable error occurs', async () => { + const outputChannel: OutputChannel = { + close: jest.fn().mockResolvedValue(undefined), + publish: jest.fn() + .mockResolvedValueOnce(undefined) + .mockRejectedValueOnce(new MessageDeliveryError('Rejected', true)) + .mockResolvedValue(undefined), + }; + + const queue = new InMemoryQueue('foo'); + const channel = new QueuedChannel(outputChannel, queue); + + await expect(channel.publish('bar')).rejects.toEqual(expect.any(MessageDeliveryError)); + + expect(outputChannel.publish).toHaveBeenCalledTimes(2); + expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo'); + expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar'); + + expect(queue.length()).toBe(1); + expect(queue.peek()).toBe('bar'); + + await expect(channel.publish('baz')).resolves.toBeUndefined(); + + expect(outputChannel.publish).toHaveBeenCalledTimes(4); + expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'bar'); + expect(outputChannel.publish).toHaveBeenNthCalledWith(4, 'baz'); + + expect(queue.isEmpty()).toBe(true); + }); + + it('should close the output channel and wait for pending messages', async () => { + const outputChannel: OutputChannel = { + close: jest.fn().mockResolvedValue(undefined), + publish: jest.fn() + .mockImplementationOnce( + () => new Promise(resolve => { + setTimeout(resolve, 2); + }), + ), + }; + + const queue = new InMemoryQueue(); + const channel = new QueuedChannel(outputChannel, queue); + + const firstPromise = channel.publish('foo'); + const secondPromise = channel.publish('bar'); + + const close = new Promise(resolve => { setTimeout(() => resolve(channel.close()), 1); }); + + await expect(firstPromise).resolves.toBeUndefined(); + await expect(secondPromise).rejects.toThrowWithMessage(MessageDeliveryError, 'Connection deliberately closed.'); + await expect(close).resolves.toBeUndefined(); + + expect(outputChannel.publish).toHaveBeenCalledTimes(1); + + expect(queue.length()).toBe(1); + expect(queue.peek()).toBe('bar'); expect(outputChannel.close).toHaveBeenCalled(); }); diff --git a/test/channel/retryChannel.test.ts b/test/channel/retryChannel.test.ts index 92b68dbd..fb9e918e 100644 --- a/test/channel/retryChannel.test.ts +++ b/test/channel/retryChannel.test.ts @@ -76,7 +76,7 @@ describe('A retry channel', () => { await channel.close(); await expect(promise).rejects.toThrowWithMessage(MessageDeliveryError, 'Connection deliberately closed.'); - await expect(promise).rejects.toHaveProperty('retryable', false); + await expect(promise).rejects.toHaveProperty('retryable', true); }); it('should fail to publish a message if the channel is closed while retrying', async () => { @@ -96,7 +96,7 @@ describe('A retry channel', () => { await channel.close(); await expect(promise).rejects.toThrowWithMessage(MessageDeliveryError, 'Connection deliberately closed.'); - await expect(promise).rejects.toHaveProperty('retryable', false); + await expect(promise).rejects.toHaveProperty('retryable', true); }); it('should fail to publish a message if maximum retry attempts is reached', async () => {