Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test #1

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b5dec74
feat: add support for named default handler (#785)
y-nk Sep 13, 2024
7c05a48
remove publish definition (#784)
y-nk Sep 16, 2024
e9560d7
fix: fixed nullable mocks issue (#787)
VonRehberg Sep 16, 2024
9119827
refactor(rabbitmq): update error handling behaviour + clean up Subscr…
ckfngod Sep 17, 2024
b89249f
refactor(rabbitmq): add more context for resumeConsumer error
ckfngod Sep 18, 2024
01f9dcc
refactor(rabbitmq): add batch option validations / update JSDoc + ref…
ckfngod Sep 18, 2024
c008820
build(deps): bump vite from 5.2.8 to 5.4.6 (#788)
dependabot[bot] Sep 18, 2024
e4e0f43
fix(rabbitmq): prevent setting batch size of 1
ckfngod Sep 18, 2024
7a8226b
test(rabbitmq): adds integration tests for message batching behaviour
ckfngod Sep 18, 2024
6e709a0
test(rabbitmq): adds more integration tests for batching behaviour
ckfngod Sep 19, 2024
774d512
refactor(rabbitmq): refactor ConsumerHandler type
ckfngod Sep 19, 2024
b8992ba
refactor(rabbitmq): fall back on top-level `errorHandler` for batch m…
ckfngod Sep 20, 2024
dfb1e13
docs(rabbitmq): adds documentation for the message batching behaviour
ckfngod Sep 20, 2024
1fe3daa
refactor(rabbitmq): remove `handleMessage` and `handleMessages`
ckfngod Sep 20, 2024
df41f5f
refactor(rabbitmq): use common `setupChannel` func in `createRpc`
ckfngod Sep 20, 2024
e7be878
refactor(rabbitmq): improve tests + minor clean up
ckfngod Sep 20, 2024
8898269
fix(rabbitmq): fix e2e test uri
ckfngod Sep 20, 2024
c81e1f4
refactor(rabbitmq): pr feedback
ckfngod Sep 23, 2024
3899070
Merge branch 'master' into feature/rabbitmq-message-batching
underfisk Sep 23, 2024
6bed50c
refactor(rabbitmq): remove precondition error code comment
ckfngod Sep 23, 2024
043bd7a
build(deps): bump rollup from 4.21.3 to 4.22.4 (#790)
dependabot[bot] Sep 24, 2024
26f271c
Merge branch 'master' into feature/rabbitmq-message-batching
underfisk Sep 24, 2024
3d1d574
refactor: test
ckfngod Sep 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 262 additions & 10 deletions integration/rabbitmq/e2e/subscribe.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { INestApplication, Injectable, LoggerService } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { flatten, times } from 'lodash';
import { createMock } from '@golevelup/ts-jest';
import { setTimeout } from 'node:timers/promises';

const testHandler = jest.fn();

Expand Down Expand Up @@ -35,6 +36,15 @@ const deleteHandler = jest.fn();
const FANOUT = 'fanout';
const fanoutHandler = jest.fn();

const BATCH_SIZE = 10;
const BATCH_TIMEOUT = 200;
const batchHandler = jest.fn();
const batchRoutingKey = 'testSubscribeBatch';
const batchQueue = 'testSubscribeBatchQueue';
const batchErrorHandler = jest.fn();
const batchErrorRoutingKey = 'testSubscribeBatchError';
const batchErrorQueue = 'testSubscribeBatchErrorQueue';

@Injectable()
class SubscribeService {
@RabbitSubscribe({
Expand Down Expand Up @@ -139,6 +149,33 @@ class SubscribeService {
subscriberThatReturns() {
return Promise.resolve(true);
}

@RabbitSubscribe({
exchange,
routingKey: batchRoutingKey,
queue: batchQueue,
batchOptions: {
size: BATCH_SIZE,
timeout: BATCH_TIMEOUT,
},
})
batchSubscriber(messages) {
batchHandler(messages);
}

@RabbitSubscribe({
exchange,
routingKey: batchErrorRoutingKey,
queue: batchErrorQueue,
batchOptions: {
size: BATCH_SIZE,
timeout: BATCH_TIMEOUT,
errorHandler: batchErrorHandler,
},
})
batchErrorSubscriber() {
throw new Error();
}
}

describe('Rabbit Subscribe', () => {
Expand Down Expand Up @@ -211,7 +248,7 @@ describe('Rabbit Subscribe', () => {
amqpConnection.publish(exchange, x, `testMessage-${i}`),
);

await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

expect(testHandler).toHaveBeenCalledTimes(3);
expect(testHandler).toHaveBeenCalledWith(`testMessage-0`);
Expand All @@ -222,7 +259,7 @@ describe('Rabbit Subscribe', () => {
it('should receive messages when subscribed via handler name', async () => {
await amqpConnection.publish(exchange, routingKey3, 'testMessage');

await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

expect(testHandler).toHaveBeenCalledTimes(1);
expect(testHandler).toHaveBeenCalledWith(`testMessage`);
Expand All @@ -232,7 +269,7 @@ describe('Rabbit Subscribe', () => {
await amqpConnection.publish(exchange, routingKey4, 'testMessage');
await amqpConnection.publish(exchange, routingKey5, 'testMessage2');

await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

expect(testHandler).toHaveBeenCalledTimes(2);
expect(testHandler).toHaveBeenCalledWith(`testMessage`);
Expand All @@ -249,7 +286,7 @@ describe('Rabbit Subscribe', () => {
);

await Promise.all(promises);
await new Promise((resolve) => setTimeout(resolve, 150));
await setTimeout(150);

expect(createHandler).toHaveBeenCalledTimes(100);
times(100).forEach((x) => expect(createHandler).toHaveBeenCalledWith(x));
Expand All @@ -268,7 +305,7 @@ describe('Rabbit Subscribe', () => {
Buffer.from('{a:'),
);

await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

expect(testHandler).toHaveBeenCalledTimes(3);
expect(testHandler).toHaveBeenNthCalledWith(1, '');
Expand All @@ -286,7 +323,7 @@ describe('Rabbit Subscribe', () => {
amqpConnection.publish(amqDefaultExchange, preExistingQueue, message2);
amqpConnection.publish(amqDefaultExchange, preExistingQueue, message3);

await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

expect(testHandler).toHaveBeenCalledTimes(3);
expect(testHandler).toHaveBeenCalledWith(message1);
Expand All @@ -299,7 +336,7 @@ describe('Rabbit Subscribe', () => {
// publish to the default exchange, using the queue as routing key
amqpConnection.publish(amqDefaultExchange, nonExistingQueue, message);

await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

expect(testHandler).toHaveBeenCalledTimes(1);
expect(testHandler).toHaveBeenCalledWith(message);
Expand All @@ -309,7 +346,7 @@ describe('Rabbit Subscribe', () => {
const message = '{"key2":"value2"}';
amqpConnection.publish(amqDefaultExchange, nonExistingQueue, message);

await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);
expect(testHandler).toHaveBeenCalledTimes(1);
const msg = testHandler.mock.calls[0][1];
expect(msg.fields.consumerTag).toEqual(preDefinedConsumerTag);
Expand All @@ -319,7 +356,7 @@ describe('Rabbit Subscribe', () => {
const message = { message: 'message' };
amqpConnection.publish(FANOUT, '', message);

await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

expect(fanoutHandler).toHaveBeenCalledTimes(1);
expect(fanoutHandler).toHaveBeenCalledWith(message);
Expand All @@ -331,10 +368,225 @@ describe('Rabbit Subscribe', () => {
// publish and expect to acknowledge but not throw
const warnSpy = jest.spyOn(customLogger, 'warn');
amqpConnection.publish(exchange, 'infinite-loop', message);
await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining('Subscribe handlers should only return void'),
);
});

describe('Message Batching', () => {
const publishMessages = async (
size: number,
ex: string,
rk: string,
prefix = '',
) => {
const messages: string[] = [];
for (let i = 0; i < size; i++) {
const testMessage = `${prefix}testMessage${i}`;
await amqpConnection.publish(ex, rk, testMessage);
messages.push(testMessage);
}
return messages;
};

const parseMessages = (messages) => {
return messages.map((message) => JSON.parse(message.content.toString()));
};

const mockErrorHandler = (channel, messages) => {
for (const msg of messages) {
channel.ack(msg);
}
};

const paddedBatchTimeout = BATCH_TIMEOUT + 10;

it('should return a full message batch immediately', async () => {
const testMessages = await publishMessages(
BATCH_SIZE,
exchange,
batchRoutingKey,
);

expect(batchHandler).toHaveBeenCalledTimes(1);
expect(batchHandler).toHaveBeenCalledWith(testMessages);
});

it('should return a partial message batch after timeout', async () => {
const testMessages = await publishMessages(1, exchange, batchRoutingKey);

await setTimeout(paddedBatchTimeout);
expect(batchHandler).toHaveBeenCalledTimes(1);
expect(batchHandler).toHaveBeenCalledWith(testMessages);
});

it('should return multiple batches of differing sizes', async () => {
const testMessageBatches: string[][] = [];

for (const [index, size] of [BATCH_SIZE, BATCH_SIZE, 1].entries()) {
testMessageBatches.push(
await publishMessages(
size,
exchange,
batchRoutingKey,
`batch${index}-`,
),
);
}

// two full batches should be immediately handled
expect(batchHandler).toHaveBeenCalledTimes(2);
for (const index of [0, 1]) {
expect(batchHandler).toHaveBeenNthCalledWith(
index + 1,
testMessageBatches[index],
);
}

await setTimeout(paddedBatchTimeout);
expect(batchHandler).toHaveBeenCalledTimes(3);
expect(batchHandler).toHaveBeenLastCalledWith(testMessageBatches[2]);
});

it('should return a full batch to the custom error handler', async () => {
// have to do this here because `resetAllMocks`
batchErrorHandler.mockImplementation(mockErrorHandler);

const testMessages = await publishMessages(
BATCH_SIZE,
exchange,
batchErrorRoutingKey,
);

// should be enough to place this after async error handling on the call stack
await setTimeout(1);

expect(batchErrorHandler).toHaveBeenCalledTimes(1);
expect(parseMessages(batchErrorHandler.mock.calls[0][1])).toEqual(
testMessages,
);
});

it('should return a partial batch to the custom error handler', async () => {
// have to do this here because `resetAllMocks`
batchErrorHandler.mockImplementation(mockErrorHandler);

const testMessages = await publishMessages(
1,
exchange,
batchErrorRoutingKey,
);

await setTimeout(paddedBatchTimeout);
expect(batchErrorHandler).toHaveBeenCalledTimes(1);
expect(parseMessages(batchErrorHandler.mock.calls[0][1])).toEqual(
testMessages,
);
});

it('should return multiple batches of differing sizes to the custom error handler', async () => {
// have to do this here because `resetAllMocks`
batchErrorHandler.mockImplementation(mockErrorHandler);

const testMessageBatches: string[][] = [];

for (const [index, size] of [BATCH_SIZE, BATCH_SIZE, 1].entries()) {
testMessageBatches.push(
await publishMessages(
size,
exchange,
batchErrorRoutingKey,
`batch${index}-`,
),
);
}

// should be enough to place this after async error handling on the call stack
await setTimeout(1);

// two full batches should be immediately handled
expect(batchErrorHandler).toHaveBeenCalledTimes(2);
for (const index of [0, 1]) {
expect(parseMessages(batchErrorHandler.mock.calls[index][1])).toEqual(
testMessageBatches[index],
);
}

await setTimeout(paddedBatchTimeout);
expect(batchErrorHandler).toHaveBeenCalledTimes(3);
expect(parseMessages(batchErrorHandler.mock.calls[2][1])).toEqual(
testMessageBatches[2],
);
});

it('should not return another full batch after batch timeout', async () => {
await publishMessages(BATCH_SIZE, exchange, batchRoutingKey);
expect(batchHandler).toHaveBeenCalledTimes(1);

await setTimeout(paddedBatchTimeout);
expect(batchHandler).toHaveBeenCalledTimes(1);
});

it('should not return a partial batch before batch timeout', async () => {
await publishMessages(1, exchange, batchRoutingKey);

await setTimeout(paddedBatchTimeout * 0.9);
expect(batchHandler).toHaveBeenCalledTimes(0);

await setTimeout(paddedBatchTimeout * 0.2);
expect(batchHandler).toHaveBeenCalledTimes(1);
});

it('should not return another partial batch after first batch timeout', async () => {
await publishMessages(1, exchange, batchRoutingKey);

await setTimeout(paddedBatchTimeout);
expect(batchHandler).toHaveBeenCalledTimes(1);

await setTimeout(paddedBatchTimeout);
expect(batchHandler).toHaveBeenCalledTimes(1);
});

it('should not return another full batch after batch timeout to the custom error handler', async () => {
// have to do this here because `resetAllMocks`
batchErrorHandler.mockImplementation(mockErrorHandler);

await publishMessages(BATCH_SIZE, exchange, batchErrorRoutingKey);

// should be enough to place this after async error handling on the call stack
await setTimeout(1);
expect(batchErrorHandler).toHaveBeenCalledTimes(1);

await setTimeout(paddedBatchTimeout);
expect(batchErrorHandler).toHaveBeenCalledTimes(1);
});

it('should not return a partial batch before batch timeout to the custom error handler', async () => {
// have to do this here because `resetAllMocks`
batchErrorHandler.mockImplementation(mockErrorHandler);

await publishMessages(1, exchange, batchErrorRoutingKey);

await setTimeout(paddedBatchTimeout * 0.9);
expect(batchErrorHandler).toHaveBeenCalledTimes(0);

await setTimeout(paddedBatchTimeout * 0.2);
expect(batchErrorHandler).toHaveBeenCalledTimes(1);
});

it('should not return another partial batch after first batch timeout to the custom error handler', async () => {
// have to do this here because `resetAllMocks`
batchErrorHandler.mockImplementation(mockErrorHandler);

await publishMessages(1, exchange, batchErrorRoutingKey);

await setTimeout(paddedBatchTimeout);
expect(batchErrorHandler).toHaveBeenCalledTimes(1);

await setTimeout(paddedBatchTimeout);
expect(batchErrorHandler).toHaveBeenCalledTimes(1);
});
});
});
Loading
Loading