Skip to content

Commit

Permalink
fix(microservice): connect to kafka with producerOnlyMode per default
Browse files Browse the repository at this point in the history
resolves: #1690
  • Loading branch information
BrunnerLivio committed Sep 14, 2023
1 parent bc6bec1 commit 3e44d7a
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 75 deletions.
34 changes: 32 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,43 @@ services:
ports:
- 27017:27017

redis_test:
rabbitmq:
image: rabbitmq:3-management-alpine
container_name: 'rabbitmq'
networks:
- overlay
ports:
- 5672:5672
- 15672:15672

redis:
image: redis:latest
hostname: redis_test
networks:
- overlay
ports:
- 6379:6379

kafka_zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
container_name: kafka_zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka_broker:
image: confluentinc/cp-kafka:7.0.1
container_name: broker
ports:
- "9092:9092"
depends_on:
- kafka_zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'kafka_zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka_broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
networks:
overlay:
194 changes: 125 additions & 69 deletions e2e/health-checks/microservice.health.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ import {
bootstrapTestingModule,
DynamicHealthEndpointFn,
} from '../helper';
import { Transport } from '@nestjs/microservices';
import {
KafkaOptions,
RmqOptions,
TcpClientOptions,
Transport,
} from '@nestjs/microservices';

describe('MicroserviceHealthIndicator', () => {
let app: INestApplication;
Expand All @@ -20,80 +25,131 @@ describe('MicroserviceHealthIndicator', () => {
microservice = await bootstrapMicroservice();
});

it('should check if the microservice is available', async () => {
app = await setHealthEndpoint(({ healthCheck, microservice }) =>
healthCheck.check([
async () =>
microservice.pingCheck('tcp', {
transport: Transport.TCP,
options: {
host: '0.0.0.0',
port: 8889,
},
}),
]),
).start();

const details = { tcp: { status: 'up' } };
return request(app.getHttpServer()).get('/health').expect(200).expect({
status: 'ok',
info: details,
error: {},
details,
describe('TCP', () => {
it('should connect', async () => {
app = await setHealthEndpoint(({ healthCheck, microservice }) =>
healthCheck.check([
async () =>
microservice.pingCheck('tcp', {
transport: Transport.TCP,
options: {
host: '0.0.0.0',
port: 8889,
},
}),
]),
).start();

const details = { tcp: { status: 'up' } };
return request(app.getHttpServer()).get('/health').expect(200).expect({
status: 'ok',
info: details,
error: {},
details,
});
});

it('should throw an error if is not reachable', async () => {
app = await setHealthEndpoint(({ healthCheck, microservice }) =>
healthCheck.check([
async () =>
microservice.pingCheck<TcpClientOptions>('tcp', {
transport: Transport.TCP,
options: {
host: '0.0.0.0',
port: 8889,
},
}),
]),
).start();

await microservice.close();

const details = {
tcp: { status: 'down', message: 'connect ECONNREFUSED 0.0.0.0:8889' },
};
return request(app.getHttpServer()).get('/health').expect(503).expect({
status: 'error',
info: {},
error: details,
details,
});
});
});

it('should throw an error if the service is not reachable', async () => {
app = await setHealthEndpoint(({ healthCheck, microservice }) =>
healthCheck.check([
async () =>
microservice.pingCheck('tcp', {
transport: Transport.TCP,
options: {
host: '0.0.0.0',
port: 8889,
},
}),
]),
).start();

await microservice.close();

const details = {
tcp: { status: 'down', message: 'connect ECONNREFUSED 0.0.0.0:8889' },
};
return request(app.getHttpServer()).get('/health').expect(503).expect({
status: 'error',
info: {},
error: details,
details,
describe('RMQ', () => {
it('should connect', async () => {
app = await setHealthEndpoint(({ healthCheck, microservice }) =>
healthCheck.check([
async () =>
microservice.pingCheck<RmqOptions>('rmq', {
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
},
}),
]),
).start();

const details = { rmq: { status: 'up' } };
return request(app.getHttpServer()).get('/health').expect(200).expect({
status: 'ok',
info: details,
error: {},
details,
});
});

it('should throw an error if is not reachable', async () => {
app = await setHealthEndpoint(({ healthCheck, microservice }) =>
healthCheck.check([
async () =>
microservice.pingCheck<RmqOptions>('rmq', {
transport: Transport.RMQ,
options: {
urls: ['amqp://0.0.0.0:8889'],
},
}),
]),
).start();

await microservice.close();

const details = {
rmq: { status: 'down', message: 'rmq is not available' },
};
return request(app.getHttpServer()).get('/health').expect(503).expect({
status: 'error',
info: {},
error: details,
details,
});
});
});

it('should throw an error if an RMQ microservice is not reachable', async () => {
app = await setHealthEndpoint(({ healthCheck, microservice }) =>
healthCheck.check([
async () =>
microservice.pingCheck('rmq', {
transport: Transport.RMQ,
options: {
host: '0.0.0.0',
port: 8889,
},
}),
]),
).start();

await microservice.close();

const details = {
rmq: { status: 'down', message: 'rmq is not available' },
};
return request(app.getHttpServer()).get('/health').expect(503).expect({
status: 'error',
info: {},
error: details,
details,
describe('Kafka', () => {
it('should connect', async () => {
app = await setHealthEndpoint(({ healthCheck, microservice }) =>
healthCheck.check([
async () =>
microservice.pingCheck<KafkaOptions>('kafka', {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
},
}),
]),
).start();

const details = { kafka: { status: 'up' } };
return request(app.getHttpServer()).get('/health').expect(200).expect({
status: 'ok',
info: details,
error: {},
details,
});
});
});

Expand Down
11 changes: 10 additions & 1 deletion lib/health-indicator/microservice/microservice.health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ export class MicroserviceHealthIndicator extends HealthIndicator {
* @throws {HealthCheckError} If the microservice is not reachable
*
* @example
* microservice.pingCheck('tcp', {
* microservice.pingCheck<TcpClientOptions>('tcp', {
* transport: Transport.TCP,
* options: { host: 'localhost', port: 3001 },
* })
Expand All @@ -124,6 +124,15 @@ export class MicroserviceHealthIndicator extends HealthIndicator {
let isHealthy = false;
const timeout = options.timeout || 1000;

if (options.transport === NestJSMicroservices.Transport.KAFKA) {
options.options = {
// We need to set the producerOnlyMode to true in order to speed
// up the connection process. https://github.com/nestjs/terminus/issues/1690
producerOnlyMode: true,
...options.options,
};
}

try {
await promiseTimeout(timeout, this.pingMicroservice(options));
isHealthy = true;
Expand Down
16 changes: 16 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
"husky": "8.0.3",
"ioredis": "5.3.2",
"jest": "29.7.0",
"kafkajs": "^2.2.4",
"lint-staged": "13.2.3",
"mongoose": "7.3.3",
"mysql2": "3.6.1",
Expand Down
10 changes: 7 additions & 3 deletions sample/002-microservice-app/src/health/health.controller.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { Controller, Get } from '@nestjs/common';
import { Transport } from '@nestjs/microservices';
import {
RedisOptions,
TcpClientOptions,
Transport,
} from '@nestjs/microservices';
import {
HealthCheck,
HealthCheckService,
Expand All @@ -18,12 +22,12 @@ export class HealthController {
check() {
return this.health.check([
async () =>
this.microservice.pingCheck('tcp', {
this.microservice.pingCheck<TcpClientOptions>('tcp', {
transport: Transport.TCP,
options: { host: 'localhost', port: 8889 },
}),
async () =>
this.microservice.pingCheck('redis', {
this.microservice.pingCheck<RedisOptions>('redis', {
transport: Transport.REDIS,
options: {
host: 'localhost',
Expand Down

0 comments on commit 3e44d7a

Please sign in to comment.