Skip to content

Commit

Permalink
Merge pull request #7 from quintoandar/change-producer-send-interface
Browse files Browse the repository at this point in the history
Change send interface to receive topic
  • Loading branch information
thspinto authored Jun 13, 2018
2 parents 92955d3 + ba34885 commit af46999
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 24 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "quintoandar-kafka",
"version": "0.0.13",
"version": "0.1.0",
"description": "Default Kafka NodeJS lib for QuintoAndar",
"main": "src/main.js",
"types": "types/index.d.ts",
Expand Down
10 changes: 3 additions & 7 deletions src/node-kafka-producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ const logger = require('quintoandar-logger').getLogger(module);
const _ = require('lodash');

class KafkaProducer {
constructor({ configs, topic }) {
this.topic = topic;
constructor({ configs }) {
this.configs = configs;
this.ready = false;
this.validateConfigs();
Expand All @@ -18,9 +17,6 @@ class KafkaProducer {
if (missingConfigs.length > 0) {
throw new Error(`Missing Producer Configs ${missingConfigs}`);
}
if (!this.topic) {
throw new Error('Missing param: topic');
}
}

init() {
Expand All @@ -39,9 +35,9 @@ class KafkaProducer {
});
}

send(msg) {
send(topic, msg) {
const sendPromisse = new Promise((resolve, reject) => {
const payload = { topic: this.topic, messages: [msg] };
const payload = { topic, messages: [msg] };
this.readyPromisse.then(() => {
this.producer.send([payload], (err, data) => {
if (err) {
Expand Down
21 changes: 7 additions & 14 deletions tests/node-kafka-producer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@ describe('Kafka Prducer Configs Validation', () => {
}).toThrow('kafkaHost');
done();
});

it('should throw error when kafka topic is missing', (done) => {
expect(() => {
new KafkaProducer({ configs: { kafkaHost: 'localhost:9092' } });
}).toThrow('topic');
done();
});
});

describe('Kafka Producer', () => {
Expand All @@ -31,15 +24,15 @@ describe('Kafka Producer', () => {
};

it('should configure corretly kafka lib', (done) => {
const producer = new KafkaProducer({ configs, topic });
const producer = new KafkaProducer({ configs });
expect(producer.client.configs).toEqual(fullConfigs);
expect(producer.producer.client).toBe(producer.client);
done();
});

it('should produce when ready', (done) => {
const producer = new KafkaProducer({ configs, topic });
producer.send(msg).then(() => {
const producer = new KafkaProducer({ configs });
producer.send(topic, msg).then(() => {
expect(producer.producer.send.mock.calls[0][0]).toEqual([{
topic,
messages: [msg],
Expand All @@ -51,9 +44,9 @@ describe('Kafka Producer', () => {
});

it('should produce when ready', (done) => {
const producer = new KafkaProducer({ configs, topic });
const producer = new KafkaProducer({ configs });
producer.producer.emit('ready');
producer.send(msg).then(() => {
producer.send(topic, msg).then(() => {
expect(producer.producer.send.mock.calls[0][0]).toEqual([{
topic,
messages: [msg],
Expand All @@ -64,12 +57,12 @@ describe('Kafka Producer', () => {
});

it('should reject promise on error', (done) => {
const producer = new KafkaProducer({ configs, topic });
const producer = new KafkaProducer({ configs });
producer.producer.emit('ready');
producer.producer.send = jest.fn().mockImplementation((payload, cb) => {
cb(new Error('some error'));
});
producer.send(msg).catch((err) => {
producer.send(topic, msg).catch((err) => {
expect(err).toEqual(new Error('some error'));
done();
});
Expand Down
3 changes: 1 addition & 2 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ export class KafkaConsumer {

export class KafkaProducerOptions {
configs: KafkaClientOptions;
topic: string;
}

export class KafkaProducer {
constructor(options: KafkaProducerOptions)

send(msg: string): Promise<string>;
send(topic: string, msg: string): Promise<string>;
}

0 comments on commit af46999

Please sign in to comment.