Skip to content

Commit

Permalink
Cooperative Rebalance (#1081)
Browse files Browse the repository at this point in the history
  • Loading branch information
serj026 authored Nov 18, 2024
1 parent fd8039e commit 1b33604
Show file tree
Hide file tree
Showing 9 changed files with 348 additions and 15 deletions.
59 changes: 59 additions & 0 deletions e2e/both.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,65 @@ describe('Consumer/Producer', function() {
});
});

describe('Cooperative sticky', function() {
var consumer;

beforeEach(function(done) {
var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex');

var consumerOpts = {
'metadata.broker.list': kafkaBrokerList,
'group.id': grp,
'fetch.wait.max.ms': 1000,
'session.timeout.ms': 10000,
'enable.auto.commit': false,
'debug': 'all',
'partition.assignment.strategy': 'cooperative-sticky'
};

consumer = new Kafka.KafkaConsumer(consumerOpts, {
'auto.offset.reset': 'largest',
});

consumer.connect({}, function(err, d) {
t.ifError(err);
t.equal(typeof d, 'object', 'metadata should be returned');
done();
});

eventListener(consumer);
});

afterEach(function(done) {
consumer.disconnect(function() {
done();
});
});

it('should be able to produce and consume messages', function (done) {
var key = 'key';

crypto.randomBytes(4096, function(ex, buffer) {
producer.setPollInterval(10);

consumer.on('data', function(message) {
t.equal(buffer.toString(), message.value.toString(), 'invalid message value');
t.equal(key, message.key, 'invalid message key');
t.equal(topic, message.topic, 'invalid message topic');
t.ok(message.offset >= 0, 'invalid message offset');
done();
});

consumer.subscribe([topic]);
consumer.consume();

setTimeout(function() {
producer.produce(topic, null, buffer, key);
}, 2000);
});
});
});

function assert_headers_match(expectedHeaders, messageHeaders) {
t.equal(expectedHeaders.length, messageHeaders.length, 'Headers length does not match expected length');
for (var i = 0; i < expectedHeaders.length; i++) {
Expand Down
39 changes: 39 additions & 0 deletions e2e/consumer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -344,4 +344,43 @@ describe('Consumer', function() {
});

});

describe('rebalance protocol', function () {
var strategies = {
'undefined': 'EAGER',
'range': 'EAGER',
'roundrobin': 'EAGER',
'cooperative-sticky': 'COOPERATIVE',
};

Object.keys(strategies).forEach(function (strategy) {
it('should return ' + strategies[strategy] + ' for ' + strategy, function(done) {
var consumer = new KafkaConsumer({
...gcfg,
...(strategy !== 'undefined' && { 'partition.assignment.strategy': strategy })
}, {});

t.equal(consumer.rebalanceProtocol(), 'NONE');

consumer.connect({ timeout: 2000 }, function(err) {
t.ifError(err);

consumer.subscribe([topic]);

consumer.on('rebalance', function (err) {
if (err.code === -175) {
t.equal(consumer.rebalanceProtocol(), strategies[strategy]);
consumer.disconnect(done);
}
});

consumer.consume(1, function(err) {
t.ifError(err);
});
});

eventListener(consumer);
});
});
});
});
4 changes: 4 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
constructor(conf: ConsumerGlobalConfig, topicConf: ConsumerTopicConfig);

assign(assignments: Assignment[]): this;
incrementalAssign(assignments: Assignment[]): this;

assignments(): Assignment[];

Expand Down Expand Up @@ -248,12 +249,15 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
subscription(): string[];

unassign(): this;
incrementalUnassign(assignments: Assignment[]): this;

unsubscribe(): this;

offsetsForTimes(topicPartitions: TopicPartitionTime[], timeout: number, cb?: (err: LibrdKafkaError, offsets: TopicPartitionOffset[]) => any): void;
offsetsForTimes(topicPartitions: TopicPartitionTime[], cb?: (err: LibrdKafkaError, offsets: TopicPartitionOffset[]) => any): void;

rebalanceProtocol(): string;

static createReadStream(conf: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, streamOptions: ReadStreamOptions | number): ConsumerStream;
}

Expand Down
48 changes: 45 additions & 3 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,20 @@ function KafkaConsumer(conf, topicConf) {
// Emit the event
self.emit('rebalance', err, assignment);

// That's it
// That's it.
try {
if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) {
self.assign(assignment);
if (self.rebalanceProtocol() === 'COOPERATIVE') {
self.incrementalAssign(assignment);
} else {
self.assign(assignment);
}
} else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) {
self.unassign();
if (self.rebalanceProtocol() === 'COOPERATIVE') {
self.incrementalUnassign(assignment);
} else {
self.unassign();
}
}
} catch (e) {
// Ignore exceptions if we are not connected
Expand Down Expand Up @@ -275,6 +283,40 @@ KafkaConsumer.prototype.unassign = function() {
return this;
};

/**
* Assign the consumer specific partitions and topics. Used for
* cooperative rebalancing.
*
* @param {array} assignments - Assignments array. Should contain
* objects with topic and partition set. Assignments are additive.
* @return {Client} - Returns itself
*/
KafkaConsumer.prototype.incrementalAssign = function(assignments) {
this._client.incrementalAssign(TopicPartition.map(assignments));
return this;
};

/**
* Unassign the consumer specific partitions and topics. Used for
* cooperative rebalancing.
*
* @param {array} assignments - Assignments array. Should contain
* objects with topic and partition set. Assignments are subtractive.
* @return {Client} - Returns itself
*/
KafkaConsumer.prototype.incrementalUnassign = function(assignments) {
this._client.incrementalUnassign(TopicPartition.map(assignments));
return this;
};

/**
* Get the type of rebalance protocol used in the consumer group.
*
* @returns "NONE", "COOPERATIVE" or "EAGER".
*/
KafkaConsumer.prototype.rebalanceProtocol = function() {
return this._client.rebalanceProtocol();
}

/**
* Get the assignments for the consumer
Expand Down
12 changes: 12 additions & 0 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ Connection::~Connection() {
}
}

Baton Connection::rdkafkaErrorToBaton(RdKafka::Error* error) {
if ( NULL == error) {
return Baton(RdKafka::ERR_NO_ERROR);
}
else {
Baton result(error->code(), error->str(), error->is_fatal(),
error->is_retriable(), error->txn_requires_abort());
delete error;
return result;
}
}

RdKafka::TopicPartition* Connection::GetPartition(std::string &topic) {
return RdKafka::TopicPartition::create(topic, RdKafka::Topic::PARTITION_UA);
}
Expand Down
1 change: 1 addition & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class Connection : public Nan::ObjectWrap {

static Nan::Persistent<v8::Function> constructor;
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);
static Baton rdkafkaErrorToBaton(RdKafka::Error* error);

bool m_has_been_disconnected;
bool m_is_closing;
Expand Down
Loading

0 comments on commit 1b33604

Please sign in to comment.