Skip to content

Commit

Permalink
Fix queued channel failure logic (#436)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcospassos authored Oct 17, 2024
1 parent cbc2027 commit cb8d2d8
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 43 deletions.
2 changes: 1 addition & 1 deletion src/channel/guaranteedChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export class GuaranteedChannel<M, S> implements OutputChannel<M> {
() => {
if (this.closed) {
// Cancel delay immediately when the channel is closed
abort(MessageDeliveryError.nonRetryable('Connection deliberately closed.'));
abort(MessageDeliveryError.retryable('Connection deliberately closed.'));
}
},
0,
Expand Down
61 changes: 46 additions & 15 deletions src/channel/queuedChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,10 @@ export class QueuedChannel<T> implements OutputChannel<T> {
}

if (this.pending === undefined) {
this.pending = this.queue.isEmpty()
? Promise.resolve()
: Promise.reject(MessageDeliveryError.retryable('The queue must be flushed.'));
this.pending = this.requeue();
}

this.enqueue(message);

this.pending = this.pending.then(
() => this.channel
.publish(message)
.then(this.dequeue.bind(this)),
);
this.pending = this.chainNext(this.pending, message, true);

return this.pending;
}
Expand Down Expand Up @@ -86,16 +78,55 @@ export class QueuedChannel<T> implements OutputChannel<T> {
this.logger.debug(`Queue length: ${length}`);

for (const message of this.queue.all()) {
this.pending = this.pending.then(
() => this.channel
.publish(message)
.then(this.dequeue.bind(this)),
);
this.pending = this.chainNext(this.pending, message);
}

return this.pending;
}

private async chainNext(previous: Promise<void>, message: T, enqueue = false): Promise<void> {
if (enqueue) {
this.enqueue(message);
}

try {
await previous;
} catch (error) {
if (error instanceof MessageDeliveryError && error.retryable) {
// If the previous message failed to deliver, requeue all messages
// including the current one that was just enqueued
return this.requeue();
}

throw error;
}

if (this.closed) {
throw MessageDeliveryError.retryable('Connection deliberately closed.');
}

try {
const result = await this.channel.publish(message);

this.dequeue();

return result;
} catch (error) {
if (!(error instanceof MessageDeliveryError) || !error.retryable) {
// Discard the message if it's non-retryable
this.dequeue();

if (!enqueue) {
// If the message was not enqueued, suppress the error
// so that the next message in the queue can be immediately
return;
}
}

throw error;
}
}

public async close(): Promise<void> {
this.closed = true;

Expand Down
4 changes: 2 additions & 2 deletions src/channel/retryChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class RetryChannel<T> implements OutputChannel<T> {

while (this.retryPolicy.shouldRetry(attempt, message, error)) {
if (this.closed) {
throw MessageDeliveryError.nonRetryable('Connection deliberately closed.');
throw MessageDeliveryError.retryable('Connection deliberately closed.');
}

const delay = this.retryPolicy.getDelay(attempt);
Expand All @@ -59,7 +59,7 @@ export class RetryChannel<T> implements OutputChannel<T> {
// Cancel delay immediately when the channel is closed
window.clearInterval(closeWatcher);

reject(MessageDeliveryError.nonRetryable('Connection deliberately closed.'));
reject(MessageDeliveryError.retryable('Connection deliberately closed.'));
}
},
0,
Expand Down
2 changes: 1 addition & 1 deletion test/channel/guaranteedChannel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ describe('A guaranteed channel', () => {
await channel.close();

await expect(promise).rejects.toThrowWithMessage(MessageDeliveryError, 'Connection deliberately closed.');
await expect(promise).rejects.toHaveProperty('retryable', false);
await expect(promise).rejects.toHaveProperty('retryable', true);
});

it('should close the output channel on close', async () => {
Expand Down
206 changes: 184 additions & 22 deletions test/channel/queuedChannel.test.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,43 @@
import {InMemoryQueue, CapacityRestrictedQueue} from '../../src/queue';
import {QueuedChannel, OutputChannel, MessageDeliveryError} from '../../src/channel';
import {Logger} from '../../src/logging';

describe('A queued channel', () => {
afterEach(() => {
jest.restoreAllMocks();
});

it('should resume flushing from the last failed message', async () => {
it('should resume flushing from the oldest message', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn()
.mockResolvedValueOnce(undefined)
.mockRejectedValueOnce(new MessageDeliveryError('Rejected', true))
.mockResolvedValue(undefined),
};
const channel = new QueuedChannel(outputChannel, new InMemoryQueue('foo', 'bar'));
const queue = new InMemoryQueue('foo', 'bar');
const channel = new QueuedChannel(outputChannel, queue);

await expect(channel.flush()).rejects.toThrowWithMessage(MessageDeliveryError, 'Rejected');

await expect(channel.flush()).rejects.toEqual(expect.any(Error));
expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo');
expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar');

expect(outputChannel.publish).toHaveBeenCalledTimes(2);
expect(queue.isEmpty()).toBe(false);
expect(queue.peek()).toBe('bar');

await channel.flush();

expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'bar');
expect(outputChannel.publish).toHaveBeenCalledTimes(3);
expect(queue.isEmpty()).toBe(true);

await channel.flush();
await expect(channel.publish('baz')).resolves.toBeUndefined();

expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'bar');
expect(outputChannel.publish).toHaveBeenNthCalledWith(4, 'baz');

expect(outputChannel.publish).toHaveBeenCalledTimes(3);
expect(outputChannel.publish).toHaveBeenCalledTimes(4);
expect(queue.isEmpty()).toBe(true);
});

it('should do nothing when flushing an empty queue', async () => {
Expand Down Expand Up @@ -123,30 +132,30 @@ describe('A queued channel', () => {
await expect(promise).rejects.toHaveProperty('retryable', false);
});

it('should fail to publish messages if queue has pending messages', async () => {
it('should automatically requeue messages on the first publish', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn().mockResolvedValue(undefined),
};
const channel = new QueuedChannel(outputChannel, new InMemoryQueue('foo'));

const promise = channel.publish('bar');

await expect(promise).rejects.toThrowWithMessage(MessageDeliveryError, 'The queue must be flushed.');
await expect(promise).rejects.toHaveProperty('retryable', true);
const queue = new InMemoryQueue('foo', 'bar');
const channel = new QueuedChannel(outputChannel, queue);

await channel.flush();
await expect(channel.publish('baz')).resolves.toBeUndefined();

expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo');
expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar');
expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'baz');

await channel.publish('baz');
expect(queue.isEmpty()).toBe(true);

expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'baz');
expect(outputChannel.publish).toHaveBeenCalledTimes(3);
await channel.publish('qux');

expect(outputChannel.publish).toHaveBeenNthCalledWith(4, 'qux');
expect(outputChannel.publish).toHaveBeenCalledTimes(4);
});

it('should publish messages if queue has no pending messages', async () => {
it('should publish messages immediately if queue has no pending messages', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn().mockResolvedValue(undefined),
Expand All @@ -166,16 +175,169 @@ describe('A queued channel', () => {
expect(outputChannel.publish).toHaveBeenCalledTimes(2);
});

it('should close the output channel and wait for pending messages', async () => {
it('should require a flush after a failed, non-retryable message', async () => {
const logger: Logger = {
debug: jest.fn(),
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
};

const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn().mockResolvedValue(undefined),
publish: jest.fn()
.mockImplementationOnce(
() => new Promise((_, reject) => {
setTimeout(() => reject(new Error('Failed')), 1);
}),
)
.mockResolvedValue(undefined),
};
const channel = new QueuedChannel(outputChannel, new InMemoryQueue('foo'));

const channel = new QueuedChannel(outputChannel, new InMemoryQueue(), logger);

await expect(channel.publish('foo')).rejects.toEqual(expect.any(Error));
await expect(channel.publish('bar')).rejects.toEqual(expect.any(Error));

expect(outputChannel.publish).toHaveBeenCalledTimes(1);
expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo');

await channel.flush();

expect(outputChannel.publish).toHaveBeenCalledTimes(2);
expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar');
});

it('should resume processing on re-enqueued message errors', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn()
.mockRejectedValueOnce(new Error('Failed'))
.mockRejectedValueOnce(new Error('Failed'))
.mockResolvedValue(undefined),
};

const queue = new InMemoryQueue('foo', 'bar');
const channel = new QueuedChannel(outputChannel, queue);

await expect(channel.publish('baz')).resolves.toBeUndefined();

expect(outputChannel.publish).toHaveBeenCalledTimes(3);
expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo');
expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar');
expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'baz');

expect(queue.isEmpty()).toBe(true);
});

it('should flush all non-retryable messages even if an error occurs', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn()
.mockRejectedValueOnce(new Error('Failed'))
.mockRejectedValueOnce(new Error('Failed')),
};

const queue = new InMemoryQueue('foo', 'bar');
const channel = new QueuedChannel(outputChannel, queue);

await expect(channel.flush()).resolves.toBeUndefined();

await channel.close();
expect(outputChannel.publish).toHaveBeenCalledTimes(2);
expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo');
expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar');

expect(queue.isEmpty()).toBe(true);
});

it('should not dequeue messages if an retryable error occurs', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn()
.mockResolvedValueOnce(undefined)
.mockRejectedValueOnce(new MessageDeliveryError('Rejected', true))
.mockResolvedValueOnce(undefined),
};

const queue = new InMemoryQueue('foo');
const channel = new QueuedChannel(outputChannel, queue);

const promise = channel.publish('bar');

await expect(promise).rejects.toEqual(expect.any(MessageDeliveryError));

expect(outputChannel.publish).toHaveBeenCalledTimes(2);
expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo');
expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar');

expect(queue.length()).toBe(1);
expect(queue.peek()).toBe('bar');

await expect(channel.flush()).resolves.toBeUndefined();

expect(outputChannel.publish).toHaveBeenCalledTimes(3);
expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'bar');

expect(queue.isEmpty()).toBe(true);
});

it('should requeue messages if a retryable error occurs', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn()
.mockResolvedValueOnce(undefined)
.mockRejectedValueOnce(new MessageDeliveryError('Rejected', true))
.mockResolvedValue(undefined),
};

const queue = new InMemoryQueue('foo');
const channel = new QueuedChannel(outputChannel, queue);

await expect(channel.publish('bar')).rejects.toEqual(expect.any(MessageDeliveryError));

expect(outputChannel.publish).toHaveBeenCalledTimes(2);
expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo');
expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar');

expect(queue.length()).toBe(1);
expect(queue.peek()).toBe('bar');

await expect(channel.publish('baz')).resolves.toBeUndefined();

expect(outputChannel.publish).toHaveBeenCalledTimes(4);
expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'bar');
expect(outputChannel.publish).toHaveBeenNthCalledWith(4, 'baz');

expect(queue.isEmpty()).toBe(true);
});

it('should close the output channel and wait for pending messages', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn()
.mockImplementationOnce(
() => new Promise(resolve => {
setTimeout(resolve, 2);
}),
),
};

const queue = new InMemoryQueue();
const channel = new QueuedChannel(outputChannel, queue);

const firstPromise = channel.publish('foo');
const secondPromise = channel.publish('bar');

const close = new Promise(resolve => { setTimeout(() => resolve(channel.close()), 1); });

await expect(firstPromise).resolves.toBeUndefined();
await expect(secondPromise).rejects.toThrowWithMessage(MessageDeliveryError, 'Connection deliberately closed.');
await expect(close).resolves.toBeUndefined();

expect(outputChannel.publish).toHaveBeenCalledTimes(1);

expect(queue.length()).toBe(1);
expect(queue.peek()).toBe('bar');

expect(outputChannel.close).toHaveBeenCalled();
});
Expand Down
Loading

0 comments on commit cb8d2d8

Please sign in to comment.