From 1b33604c31c16bcd08f738c8124f0533611e7c94 Mon Sep 17 00:00:00 2001 From: Serhii Franchuk <44800855+serj026@users.noreply.github.com> Date: Mon, 18 Nov 2024 21:35:44 +0100 Subject: [PATCH] Cooperative Rebalance (#1081) --- e2e/both.spec.js | 59 ++++++++++++++ e2e/consumer.spec.js | 39 +++++++++ index.d.ts | 4 + lib/kafka-consumer.js | 48 ++++++++++- src/connection.cc | 12 +++ src/connection.h | 1 + src/kafka-consumer.cc | 181 ++++++++++++++++++++++++++++++++++++++++++ src/kafka-consumer.h | 7 ++ src/producer.cc | 12 --- 9 files changed, 348 insertions(+), 15 deletions(-) diff --git a/e2e/both.spec.js b/e2e/both.spec.js index a8289ec3..4663f3a2 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -614,6 +614,65 @@ describe('Consumer/Producer', function() { }); }); + describe('Cooperative sticky', function() { + var consumer; + + beforeEach(function(done) { + var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex'); + + var consumerOpts = { + 'metadata.broker.list': kafkaBrokerList, + 'group.id': grp, + 'fetch.wait.max.ms': 1000, + 'session.timeout.ms': 10000, + 'enable.auto.commit': false, + 'debug': 'all', + 'partition.assignment.strategy': 'cooperative-sticky' + }; + + consumer = new Kafka.KafkaConsumer(consumerOpts, { + 'auto.offset.reset': 'largest', + }); + + consumer.connect({}, function(err, d) { + t.ifError(err); + t.equal(typeof d, 'object', 'metadata should be returned'); + done(); + }); + + eventListener(consumer); + }); + + afterEach(function(done) { + consumer.disconnect(function() { + done(); + }); + }); + + it('should be able to produce and consume messages', function (done) { + var key = 'key'; + + crypto.randomBytes(4096, function(ex, buffer) { + producer.setPollInterval(10); + + consumer.on('data', function(message) { + t.equal(buffer.toString(), message.value.toString(), 'invalid message value'); + t.equal(key, message.key, 'invalid message key'); + t.equal(topic, message.topic, 'invalid message topic'); + t.ok(message.offset >= 0, 'invalid message offset'); + done(); + }); + + consumer.subscribe([topic]); + consumer.consume(); + + setTimeout(function() { + producer.produce(topic, null, buffer, key); + }, 2000); + }); + }); + }); + function assert_headers_match(expectedHeaders, messageHeaders) { t.equal(expectedHeaders.length, messageHeaders.length, 'Headers length does not match expected length'); for (var i = 0; i < expectedHeaders.length; i++) { diff --git a/e2e/consumer.spec.js b/e2e/consumer.spec.js index a167483f..972d5af6 100644 --- a/e2e/consumer.spec.js +++ b/e2e/consumer.spec.js @@ -344,4 +344,43 @@ describe('Consumer', function() { }); }); + + describe('rebalance protocol', function () { + var strategies = { + 'undefined': 'EAGER', + 'range': 'EAGER', + 'roundrobin': 'EAGER', + 'cooperative-sticky': 'COOPERATIVE', + }; + + Object.keys(strategies).forEach(function (strategy) { + it('should return ' + strategies[strategy] + ' for ' + strategy, function(done) { + var consumer = new KafkaConsumer({ + ...gcfg, + ...(strategy !== 'undefined' && { 'partition.assignment.strategy': strategy }) + }, {}); + + t.equal(consumer.rebalanceProtocol(), 'NONE'); + + consumer.connect({ timeout: 2000 }, function(err) { + t.ifError(err); + + consumer.subscribe([topic]); + + consumer.on('rebalance', function (err) { + if (err.code === -175) { + t.equal(consumer.rebalanceProtocol(), strategies[strategy]); + consumer.disconnect(done); + } + }); + + consumer.consume(1, function(err) { + t.ifError(err); + }); + }); + + eventListener(consumer); + }); + }); + }); }); diff --git a/index.d.ts b/index.d.ts index 43bedfc9..f1d458fe 100644 --- a/index.d.ts +++ b/index.d.ts @@ -208,6 +208,7 @@ export class KafkaConsumer extends Client { constructor(conf: ConsumerGlobalConfig, topicConf: ConsumerTopicConfig); assign(assignments: Assignment[]): this; + incrementalAssign(assignments: Assignment[]): this; assignments(): Assignment[]; @@ -248,12 +249,15 @@ export class KafkaConsumer extends Client { subscription(): string[]; unassign(): this; + incrementalUnassign(assignments: Assignment[]): this; unsubscribe(): this; offsetsForTimes(topicPartitions: TopicPartitionTime[], timeout: number, cb?: (err: LibrdKafkaError, offsets: TopicPartitionOffset[]) => any): void; offsetsForTimes(topicPartitions: TopicPartitionTime[], cb?: (err: LibrdKafkaError, offsets: TopicPartitionOffset[]) => any): void; + rebalanceProtocol(): string; + static createReadStream(conf: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, streamOptions: ReadStreamOptions | number): ConsumerStream; } diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index c479240f..875a3779 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -58,12 +58,20 @@ function KafkaConsumer(conf, topicConf) { // Emit the event self.emit('rebalance', err, assignment); - // That's it + // That's it. try { if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) { - self.assign(assignment); + if (self.rebalanceProtocol() === 'COOPERATIVE') { + self.incrementalAssign(assignment); + } else { + self.assign(assignment); + } } else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) { - self.unassign(); + if (self.rebalanceProtocol() === 'COOPERATIVE') { + self.incrementalUnassign(assignment); + } else { + self.unassign(); + } } } catch (e) { // Ignore exceptions if we are not connected @@ -275,6 +283,40 @@ KafkaConsumer.prototype.unassign = function() { return this; }; +/** + * Assign the consumer specific partitions and topics. Used for + * cooperative rebalancing. + * + * @param {array} assignments - Assignments array. Should contain + * objects with topic and partition set. Assignments are additive. + * @return {Client} - Returns itself + */ +KafkaConsumer.prototype.incrementalAssign = function(assignments) { + this._client.incrementalAssign(TopicPartition.map(assignments)); + return this; +}; + +/** + * Unassign the consumer specific partitions and topics. Used for + * cooperative rebalancing. + * + * @param {array} assignments - Assignments array. Should contain + * objects with topic and partition set. Assignments are subtractive. + * @return {Client} - Returns itself + */ +KafkaConsumer.prototype.incrementalUnassign = function(assignments) { + this._client.incrementalUnassign(TopicPartition.map(assignments)); + return this; +}; + +/** + * Get the type of rebalance protocol used in the consumer group. + * + * @returns "NONE", "COOPERATIVE" or "EAGER". + */ +KafkaConsumer.prototype.rebalanceProtocol = function() { + return this._client.rebalanceProtocol(); +} /** * Get the assignments for the consumer diff --git a/src/connection.cc b/src/connection.cc index cfb8ba23..f76e012f 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -68,6 +68,18 @@ Connection::~Connection() { } } +Baton Connection::rdkafkaErrorToBaton(RdKafka::Error* error) { + if ( NULL == error) { + return Baton(RdKafka::ERR_NO_ERROR); + } + else { + Baton result(error->code(), error->str(), error->is_fatal(), + error->is_retriable(), error->txn_requires_abort()); + delete error; + return result; + } +} + RdKafka::TopicPartition* Connection::GetPartition(std::string &topic) { return RdKafka::TopicPartition::create(topic, RdKafka::Topic::PARTITION_UA); } diff --git a/src/connection.h b/src/connection.h index 8ee3f052..bba8a7c7 100644 --- a/src/connection.h +++ b/src/connection.h @@ -80,6 +80,7 @@ class Connection : public Nan::ObjectWrap { static Nan::Persistent constructor; static void New(const Nan::FunctionCallbackInfo& info); + static Baton rdkafkaErrorToBaton(RdKafka::Error* error); bool m_has_been_disconnected; bool m_is_closing; diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 0f5e32ed..89528ceb 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -219,6 +219,59 @@ Baton KafkaConsumer::Unassign() { return Baton(RdKafka::ERR_NO_ERROR); } +Baton KafkaConsumer::IncrementalAssign(std::vector partitions) { + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE, "KafkaConsumer is disconnected"); + } + + RdKafka::KafkaConsumer* consumer = + dynamic_cast(m_client); + + RdKafka::Error* error = consumer->incremental_assign(partitions); + + if (error == NULL) { + m_partition_cnt += partitions.size(); + m_partitions.insert(m_partitions.end(), partitions.begin(), partitions.end()); + } else { + RdKafka::TopicPartition::destroy(partitions); + } + + return rdkafkaErrorToBaton(error); +} + +Baton KafkaConsumer::IncrementalUnassign(std::vector partitions) { + if (!IsClosing() && !IsConnected()) { + return Baton(RdKafka::ERR__STATE); + } + + RdKafka::KafkaConsumer* consumer = + dynamic_cast(m_client); + + RdKafka::Error* error = consumer->incremental_unassign(partitions); + + std::vector delete_partitions; + + if (error == NULL) { + for (unsigned int i = 0; i < partitions.size(); i++) { + for (unsigned int j = 0; j < m_partitions.size(); j++) { + if (partitions[i]->partition() == m_partitions[j]->partition() && + partitions[i]->topic() == m_partitions[j]->topic()) { + delete_partitions.push_back(m_partitions[j]); + m_partitions.erase(m_partitions.begin() + j); + m_partition_cnt--; + break; + } + } + } + } + + RdKafka::TopicPartition::destroy(delete_partitions); + + RdKafka::TopicPartition::destroy(partitions); + + return rdkafkaErrorToBaton(error); +} + Baton KafkaConsumer::Commit(std::vector toppars) { if (!IsConnected()) { return Baton(RdKafka::ERR__STATE); @@ -494,6 +547,17 @@ std::string KafkaConsumer::Name() { return std::string(m_client->name()); } +std::string KafkaConsumer::RebalanceProtocol() { + if (!IsConnected()) { + return std::string("NONE"); + } + + RdKafka::KafkaConsumer* consumer = + dynamic_cast(m_client); + + return consumer->rebalance_protocol(); +} + Nan::Persistent KafkaConsumer::constructor; void KafkaConsumer::Init(v8::Local exports) { @@ -547,7 +611,10 @@ void KafkaConsumer::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "position", NodePosition); Nan::SetPrototypeMethod(tpl, "assign", NodeAssign); Nan::SetPrototypeMethod(tpl, "unassign", NodeUnassign); + Nan::SetPrototypeMethod(tpl, "incrementalAssign", NodeIncrementalAssign); + Nan::SetPrototypeMethod(tpl, "incrementalUnassign", NodeIncrementalUnassign); Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignments); + Nan::SetPrototypeMethod(tpl, "rebalanceProtocol", NodeRebalanceProtocol); Nan::SetPrototypeMethod(tpl, "commit", NodeCommit); Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync); @@ -720,6 +787,12 @@ NAN_METHOD(KafkaConsumer::NodeAssignments) { Conversion::TopicPartition::ToV8Array(consumer->m_partitions)); } +NAN_METHOD(KafkaConsumer::NodeRebalanceProtocol) { + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + std::string protocol = consumer->RebalanceProtocol(); + info.GetReturnValue().Set(Nan::New(protocol).ToLocalChecked()); +} + NAN_METHOD(KafkaConsumer::NodeAssign) { Nan::HandleScope scope; @@ -798,6 +871,114 @@ NAN_METHOD(KafkaConsumer::NodeUnassign) { info.GetReturnValue().Set(Nan::True()); } +NAN_METHOD(KafkaConsumer::NodeIncrementalAssign) { + Nan::HandleScope scope; + + if (info.Length() < 1 || !info[0]->IsArray()) { + return Nan::ThrowError("Need to specify an array of partitions"); + } + + v8::Local partitions = info[0].As(); + std::vector topic_partitions; + + for (unsigned int i = 0; i < partitions->Length(); ++i) { + v8::Local partition_obj_value; + if (!( + Nan::Get(partitions, i).ToLocal(&partition_obj_value) && + partition_obj_value->IsObject())) { + Nan::ThrowError("Must pass topic-partition objects"); + } + + v8::Local partition_obj = partition_obj_value.As(); + + int64_t partition = GetParameter(partition_obj, "partition", -1); + std::string topic = GetParameter(partition_obj, "topic", ""); + + if (!topic.empty()) { + RdKafka::TopicPartition* part; + + if (partition < 0) { + part = Connection::GetPartition(topic); + } else { + part = Connection::GetPartition(topic, partition); + } + + int64_t offset = GetParameter( + partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID); + if (offset != RdKafka::Topic::OFFSET_INVALID) { + part->set_offset(offset); + } + + topic_partitions.push_back(part); + } + } + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + + Baton b = consumer->IncrementalAssign(topic_partitions); + + if (b.err() != RdKafka::ERR_NO_ERROR) { + v8::Local errorObject = b.ToObject(); + Nan::ThrowError(errorObject); + } + + info.GetReturnValue().Set(Nan::True()); +} + +NAN_METHOD(KafkaConsumer::NodeIncrementalUnassign) { + Nan::HandleScope scope; + + if (info.Length() < 1 || !info[0]->IsArray()) { + return Nan::ThrowError("Need to specify an array of partitions"); + } + + v8::Local partitions = info[0].As(); + std::vector topic_partitions; + + for (unsigned int i = 0; i < partitions->Length(); ++i) { + v8::Local partition_obj_value; + if (!( + Nan::Get(partitions, i).ToLocal(&partition_obj_value) && + partition_obj_value->IsObject())) { + Nan::ThrowError("Must pass topic-partition objects"); + } + + v8::Local partition_obj = partition_obj_value.As(); + + int64_t partition = GetParameter(partition_obj, "partition", -1); + std::string topic = GetParameter(partition_obj, "topic", ""); + + if (!topic.empty()) { + RdKafka::TopicPartition* part; + + if (partition < 0) { + part = Connection::GetPartition(topic); + } else { + part = Connection::GetPartition(topic, partition); + } + + int64_t offset = GetParameter( + partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID); + if (offset != RdKafka::Topic::OFFSET_INVALID) { + part->set_offset(offset); + } + + topic_partitions.push_back(part); + } + } + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + + Baton b = consumer->IncrementalUnassign(topic_partitions); + + if (b.err() != RdKafka::ERR_NO_ERROR) { + v8::Local errorObject = b.ToObject(); + Nan::ThrowError(errorObject); + } + + info.GetReturnValue().Set(Nan::True()); +} + NAN_METHOD(KafkaConsumer::NodeUnsubscribe) { Nan::HandleScope scope; diff --git a/src/kafka-consumer.h b/src/kafka-consumer.h index c91590ec..dfb45b9a 100644 --- a/src/kafka-consumer.h +++ b/src/kafka-consumer.h @@ -74,9 +74,13 @@ class KafkaConsumer : public Connection { Baton Assign(std::vector); Baton Unassign(); + Baton IncrementalAssign(std::vector); + Baton IncrementalUnassign(std::vector); + Baton Seek(const RdKafka::TopicPartition &partition, int timeout_ms); std::string Name(); + std::string RebalanceProtocol(); Baton Subscribe(std::vector); Baton Consume(int timeout_ms); @@ -106,6 +110,9 @@ class KafkaConsumer : public Connection { static NAN_METHOD(NodeDisconnect); static NAN_METHOD(NodeAssign); static NAN_METHOD(NodeUnassign); + static NAN_METHOD(NodeIncrementalAssign); + static NAN_METHOD(NodeIncrementalUnassign); + static NAN_METHOD(NodeRebalanceProtocol); static NAN_METHOD(NodeAssignments); static NAN_METHOD(NodeUnsubscribe); static NAN_METHOD(NodeCommit); diff --git a/src/producer.cc b/src/producer.cc index c8e3e632..8e20320a 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -370,18 +370,6 @@ void Producer::ConfigureCallback(const std::string &string_key, const v8::Local< } } -Baton rdkafkaErrorToBaton(RdKafka::Error* error) { - if ( NULL == error) { - return Baton(RdKafka::ERR_NO_ERROR); - } - else { - Baton result(error->code(), error->str(), error->is_fatal(), - error->is_retriable(), error->txn_requires_abort()); - delete error; - return result; - } -} - Baton Producer::InitTransactions(int32_t timeout_ms) { if (!IsConnected()) { return Baton(RdKafka::ERR__STATE);