Skip to content

Commit

Permalink
feat: consume from specific partition
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnimhoff committed Nov 6, 2024
1 parent 8a3d40c commit 7bb054d
Show file tree
Hide file tree
Showing 8 changed files with 419 additions and 5 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ The following table lists important methods for this API.
|`consumer.unsubscribe()` | Unsubscribes from the currently subscribed topics. <br><br>You cannot subscribe to different topics without calling the `unsubscribe()` method first. |
|`consumer.consume(cb)` | Gets messages from the existing subscription as quickly as possible. If `cb` is specified, invokes `cb(err, message)`. <br><br>This method keeps a background thread running to do the work. Note that the number of threads in nodejs process is limited by `UV_THREADPOOL_SIZE` (default value is 4) and using up all of them blocks other parts of the application that need threads. If you need multiple consumers then consider increasing `UV_THREADPOOL_SIZE` or using `consumer.consume(number, cb)` instead. |
|`consumer.consume(number, cb)` | Gets `number` of messages from the existing subscription. If `cb` is specified, invokes `cb(err, message)`. |
|`consumer.consume(number, topic, partition, cb)` | Gets `number` of messages from a partition of the given topic. The topic must have a subscription. If `cb` is specified, invokes `cb(err, message)`.
|`consumer.commit()` | Commits all locally stored offsets |
|`consumer.commit(topicPartition)` | Commits offsets specified by the topic partition |
|`consumer.commitMessage(message)` | Commits the offsets specified by the message |
Expand Down
130 changes: 130 additions & 0 deletions examples/consumer-per-partition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
A consumer that is subscribed to multiple partitions can control the mix of messages consumed from each partition. How this is done is explained [here](https://github.com/confluentinc/librdkafka/wiki/FAQ#what-are-partition-queues-and-why-are-some-partitions-slower-than-others).

The example below simulates a partition 0 which is slow (2s per consume). Other partitions consume at a rate of 0.5s. To use the example, create a topic "test" with two partitions. Produce 500 message to both partitions. This example does not require an active producer. Run the example to see the result. Run multiple instances to see the rebalancing take effect.

```js
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/

var Kafka = require('../');

var consumer = new Kafka.KafkaConsumer({
//'debug': 'all',
'metadata.broker.list': 'localhost:9092',
'group.id': 'test-group-' + Math.random(),
'enable.auto.commit': false,
'rebalance_cb': true,
}, {
'auto.offset.reset': 'earliest', // start from the beginning
});

var topicName = 'test';

// Keep track of which partitions are assigned.
var assignments = [];

//logging debug messages, if debug is enabled
consumer.on('event.log', function(log) {
console.log(log);
});

//logging all errors
consumer.on('event.error', function(err) {
console.error('Error from consumer');
console.error(err);
});

consumer.on('ready', function(arg) {
console.log('consumer ready: ' + JSON.stringify(arg));

consumer.subscribe([topicName]);

// start a regular consume loop in flowing mode. This won't result in any
// messages because will we start consuming from a partition directly.
// This is required to serve the rebalancing events
consumer.consume();
});

// Start our own consume loops for all newly assigned partitions
consumer.on('rebalance', function(err, updatedAssignments) {
console.log('rebalancing done, got partitions assigned: ', updatedAssignments.map(function(a) {
return a.partition;
}));

// Normally messages are forwarded to a general queue, which contains messages from all assigned partitions.
// however we want to consume per partitions, for this we need to disable forwarding.
updatedAssignments.forEach(function (assignment) {
consumer.disableQueueForwarding(assignment);
});

// find new assignments
var newAssignments = updatedAssignments.filter(function (updatedAssignment) {
return !assignments.some(function (assignment) {
return assignment.partition === updatedAssignment.partition;
});
});

// update global assignments array
assignments = updatedAssignments;

// then start consume loops for the new assignments
newAssignments.forEach(function (assignment) {
startConsumeMessages(assignment.partition);
});
});

function startConsumeMessages(partition) {
console.log('partition: ' + partition + ' starting to consume');

function consume() {
var isPartitionAssigned = assignments.some(function(assignment) {
return assignment.partition === partition;
});

if (!isPartitionAssigned) {
console.log('partition: ' + partition + ' stop consuming');
return;
}

// consume per 5 messages
consumer.consume(5, topicName, partition, callback);
}

function callback(err, messages) {
messages.forEach(function(message) {
// consume the message
console.log('partition ' + message.partition + ' value ' + message.value.toString());
consumer.commitMessage(message);
});

if (messages.length > 0) {
consumer.commitMessage(messages.pop());
}

// simulate performance
setTimeout(consume, partition === 0 ? 2000 : 500);
}

// kick-off recursive consume loop
consume();
}

consumer.on('disconnected', function(arg) {
console.log('consumer disconnected. ' + JSON.stringify(arg));
});

//starting the consumer
consumer.connect();

//stopping this example after 30s
setTimeout(function() {
consumer.disconnect();
}, 30000);

```
7 changes: 5 additions & 2 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,9 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
committed(toppars: TopicPartition[], timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;
committed(timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;

consume(number: number, cb?: (err: LibrdKafkaError, messages: Message[]) => void): void;
consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void;
consume(number: number, topic: string, partition: number, cb?: (err: LibrdKafkaError | null, messages: Message[] | undefined) => void): void;
consume(number: number, cb?: (err: LibrdKafkaError | null, messages: Message[] | undefined) => void): void;
consume(cb: (err: LibrdKafkaError | null, messages: Message[] | undefined) => void): void;
consume(): void;

getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets;
Expand All @@ -239,6 +240,8 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {

seek(toppar: TopicPartitionOffset, timeout: number | null, cb: (err: LibrdKafkaError) => void): this;

disableQueueForwarding(topicPartition: TopicPartition): this;

setDefaultConsumeTimeout(timeoutMs: number): void;

setDefaultConsumeLoopTimeoutDelay(timeoutMs: number): void;
Expand Down
2 changes: 1 addition & 1 deletion lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ function LibrdKafkaError(e) {
this.origin = 'kafka';
}
Error.captureStackTrace(this, this.constructor);
} else if (!util.isError(e)) {
} else if (!(Object.prototype.toString(e) === "[object Error]" || e instanceof Error)) {
// This is the better way
this.message = e.message;
this.code = e.code;
Expand Down
80 changes: 78 additions & 2 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ KafkaConsumer.prototype.setDefaultConsumeLoopTimeoutDelay = function(intervalMs)
this._consumeLoopTimeoutDelay = intervalMs;
};

KafkaConsumer.prototype.disableQueueForwarding = function(topicPartition) {
this._client.disableQueueForwarding(topicPartition);
return this;
};

/**
* Get a stream representation of this KafkaConsumer
*
Expand Down Expand Up @@ -361,6 +366,20 @@ KafkaConsumer.prototype.unsubscribe = function() {
};

/**
* Read a number of messages from a specific topic and partition.
*
* Can be useful if the consume performance differs per partition. Consuming
* per partition could prevent slow performance on one partition from affecting
* the consumption of other partitions.
*
* To select the right partition it is required to set a topic param, because a
* consumer can be subscribed to multiple topics.
*
* @param {number} size - Number of messages to read
* @param {string} topic - Name of topic to read
* @param {number} partition - Identifier of partition to read
* @param {KafkaConsumer~readCallback} cb - Callback to return when work is done.
*//**
* Read a number of messages from Kafka.
*
* This method is similar to the main one, except that it reads a number
Expand All @@ -384,12 +403,22 @@ KafkaConsumer.prototype.unsubscribe = function() {
* @param {KafkaConsumer~readCallback} cb - Callback to return when a message
* is fetched.
*/
KafkaConsumer.prototype.consume = function(number, cb) {
KafkaConsumer.prototype.consume = function(number, topic, partition, cb) {
var timeoutMs = this._consumeTimeout !== undefined ? this._consumeTimeout : DEFAULT_CONSUME_TIME_OUT;
var self = this;

if ((number && typeof number === 'number') || (number && cb)) {
if ((number && typeof number === 'number') && typeof topic === 'string' && typeof partition === 'number') {

if (cb === undefined) {
cb = function() {};
} else if (typeof cb !== 'function') {
throw new TypeError('Callback must be a function');
}

this._consumeNumOfPartition(timeoutMs, number, topic, partition, cb);
} else if ((number && typeof number === 'number') || (number && topic)) {
// topic is given as the cb
cb = topic;
if (cb === undefined) {
cb = function() {};
} else if (typeof cb !== 'function') {
Expand Down Expand Up @@ -499,6 +528,53 @@ KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) {

};

/**
* Consume a number of messages from a specific topic and partition
* Wrapped in a try catch with proper error reporting. Should not be
* called directly, and instead should be called using consume.
*
* @private
* @see consume
*/
KafkaConsumer.prototype._consumeNumOfPartition = function(timeoutMs, numMessages, topic, partition, cb, onlyApplyTimeoutToFirstMessage) {
var self = this;

this._client.consume(timeoutMs, numMessages, topic, partition, function(err, messages, eofEvents) {
if (err) {
err = LibrdKafkaError.create(err);
if (cb) {
cb(err);
}
return;
}

var currentEofEventsIndex = 0;

function emitEofEventsFor(messageIndex) {
while (currentEofEventsIndex < eofEvents.length && eofEvents[currentEofEventsIndex].messageIndex === messageIndex) {
delete eofEvents[currentEofEventsIndex].messageIndex;
self.emit('partition.eof', eofEvents[currentEofEventsIndex])
++currentEofEventsIndex;
}
}

emitEofEventsFor(-1);

for (var i = 0; i < messages.length; i++) {
self.emit('data', messages[i]);
emitEofEventsFor(i);
}

emitEofEventsFor(messages.length);

if (cb) {
cb(null, messages);
}

}, onlyApplyTimeoutToFirstMessage);

};

/**
* This callback returns the message read from Kafka.
*
Expand Down
3 changes: 3 additions & 0 deletions src/kafka-consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class KafkaConsumer : public Connection {

Baton Assign(std::vector<RdKafka::TopicPartition*>);
Baton Unassign();

Baton DisableQueueForwarding(RdKafka::TopicPartition*);

Baton Seek(const RdKafka::TopicPartition &partition, int timeout_ms);

Expand Down Expand Up @@ -107,6 +109,7 @@ class KafkaConsumer : public Connection {
static NAN_METHOD(NodeAssign);
static NAN_METHOD(NodeUnassign);
static NAN_METHOD(NodeAssignments);
static NAN_METHOD(NodeDisableQueueForwarding);
static NAN_METHOD(NodeUnsubscribe);
static NAN_METHOD(NodeCommit);
static NAN_METHOD(NodeCommitSync);
Expand Down
Loading

0 comments on commit 7bb054d

Please sign in to comment.