From ba348854cd729233bc37495d5e5aca406fae1d33 Mon Sep 17 00:00:00 2001 From: Thiago Pinto Date: Wed, 13 Jun 2018 12:38:48 -0300 Subject: [PATCH] Change send interface to receive topic --- package.json | 2 +- src/node-kafka-producer.js | 10 +++------- tests/node-kafka-producer.test.js | 21 +++++++-------------- types/index.d.ts | 3 +-- 4 files changed, 12 insertions(+), 24 deletions(-) diff --git a/package.json b/package.json index 003bcc6..28e011f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "quintoandar-kafka", - "version": "0.0.13", + "version": "0.1.0", "description": "Default Kafka NodeJS lib for QuintoAndar", "main": "src/main.js", "types": "types/index.d.ts", diff --git a/src/node-kafka-producer.js b/src/node-kafka-producer.js index 731229d..55f8178 100644 --- a/src/node-kafka-producer.js +++ b/src/node-kafka-producer.js @@ -3,8 +3,7 @@ const logger = require('quintoandar-logger').getLogger(module); const _ = require('lodash'); class KafkaProducer { - constructor({ configs, topic }) { - this.topic = topic; + constructor({ configs }) { this.configs = configs; this.ready = false; this.validateConfigs(); @@ -18,9 +17,6 @@ class KafkaProducer { if (missingConfigs.length > 0) { throw new Error(`Missing Producer Configs ${missingConfigs}`); } - if (!this.topic) { - throw new Error('Missing param: topic'); - } } init() { @@ -39,9 +35,9 @@ class KafkaProducer { }); } - send(msg) { + send(topic, msg) { const sendPromisse = new Promise((resolve, reject) => { - const payload = { topic: this.topic, messages: [msg] }; + const payload = { topic, messages: [msg] }; this.readyPromisse.then(() => { this.producer.send([payload], (err, data) => { if (err) { diff --git a/tests/node-kafka-producer.test.js b/tests/node-kafka-producer.test.js index d2090cd..afdab33 100644 --- a/tests/node-kafka-producer.test.js +++ b/tests/node-kafka-producer.test.js @@ -12,13 +12,6 @@ describe('Kafka Prducer Configs Validation', () => { }).toThrow('kafkaHost'); done(); }); - - it('should throw error when kafka topic is missing', (done) => { - expect(() => { - new KafkaProducer({ configs: { kafkaHost: 'localhost:9092' } }); - }).toThrow('topic'); - done(); - }); }); describe('Kafka Producer', () => { @@ -31,15 +24,15 @@ describe('Kafka Producer', () => { }; it('should configure corretly kafka lib', (done) => { - const producer = new KafkaProducer({ configs, topic }); + const producer = new KafkaProducer({ configs }); expect(producer.client.configs).toEqual(fullConfigs); expect(producer.producer.client).toBe(producer.client); done(); }); it('should produce when ready', (done) => { - const producer = new KafkaProducer({ configs, topic }); - producer.send(msg).then(() => { + const producer = new KafkaProducer({ configs }); + producer.send(topic, msg).then(() => { expect(producer.producer.send.mock.calls[0][0]).toEqual([{ topic, messages: [msg], @@ -51,9 +44,9 @@ describe('Kafka Producer', () => { }); it('should produce when ready', (done) => { - const producer = new KafkaProducer({ configs, topic }); + const producer = new KafkaProducer({ configs }); producer.producer.emit('ready'); - producer.send(msg).then(() => { + producer.send(topic, msg).then(() => { expect(producer.producer.send.mock.calls[0][0]).toEqual([{ topic, messages: [msg], @@ -64,12 +57,12 @@ describe('Kafka Producer', () => { }); it('should reject promise on error', (done) => { - const producer = new KafkaProducer({ configs, topic }); + const producer = new KafkaProducer({ configs }); producer.producer.emit('ready'); producer.producer.send = jest.fn().mockImplementation((payload, cb) => { cb(new Error('some error')); }); - producer.send(msg).catch((err) => { + producer.send(topic, msg).catch((err) => { expect(err).toEqual(new Error('some error')); done(); }); diff --git a/types/index.d.ts b/types/index.d.ts index 8530073..846328d 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -16,11 +16,10 @@ export class KafkaConsumer { export class KafkaProducerOptions { configs: KafkaClientOptions; - topic: string; } export class KafkaProducer { constructor(options: KafkaProducerOptions) - send(msg: string): Promise; + send(topic: string, msg: string): Promise; } \ No newline at end of file