From 6bff689614806e95323b9e088f32791982748934 Mon Sep 17 00:00:00 2001 From: Thiago Date: Tue, 22 May 2018 16:03:52 -0300 Subject: [PATCH] Fix update metadata and async push --- package.json | 2 +- src/node-kafka-consumer.js | 4 ++-- tests/node-kafka-consumer.test.js | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index 16d93bf..2cb9da6 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "quintoandar-kafka", - "version": "0.0.9", + "version": "0.0.10", "description": "Default Kafka NodeJS lib for QuintoAndar", "main": "src/main.js", "dependencies": { diff --git a/src/node-kafka-consumer.js b/src/node-kafka-consumer.js index ab842bc..f296cc1 100644 --- a/src/node-kafka-consumer.js +++ b/src/node-kafka-consumer.js @@ -11,7 +11,7 @@ class KafkaConsumer { this.configs.autoCommit = false; _.defaults(this.configs, { sessionTimeout: 15000 }); _.defaults(this.configs, { protocol: ['roundrobin'] }); - _.defaults(this.configs, { asyncPush: false }); + _.defaults(this.configs, { asyncPush: true }); _.defaults(this.configs, { fromOffset: 'latest' }); _.defaults(this.configs, { outOfRangeOffset: 'latest' }); _.defaults(this.configs, { fetchMaxBytes: 1024 * 1024 }); @@ -45,7 +45,7 @@ class KafkaConsumer { this.consumer.commit(msg, true); }); }); - setInterval(this.refreshMetadata.bind(this), this.updateMetadata); + setInterval(this.refreshMetadata.bind(this), this.configs.updateMetadata); logger.info('ConsumerGroupStream started'); } diff --git a/tests/node-kafka-consumer.test.js b/tests/node-kafka-consumer.test.js index 93ed5ab..50e4c2c 100644 --- a/tests/node-kafka-consumer.test.js +++ b/tests/node-kafka-consumer.test.js @@ -65,7 +65,7 @@ describe('Kafka Consumer', () => { autoCommit: false, sessionTimeout: 15000, protocol: ['roundrobin'], - asyncPush: false, + asyncPush: true, fromOffset: 'latest', outOfRangeOffset: 'latest', fetchMaxBytes: 1024 * 1024,