diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 0f5e32ed..ea9110fc 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -487,6 +487,30 @@ Baton KafkaConsumer::RefreshAssignments() { } } +Baton KafkaConsumer::DisableQueueForwarding(RdKafka::TopicPartition * toppar) { + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE, "KafkaConsumer is not connected"); + } + + // Disable forwarding for own partition + RdKafka::Queue *queue = m_client->get_partition_queue(toppar); + + if (queue == NULL) { + return Baton(RdKafka::ERR__STATE, + "TopicPartition has an invalid queue."); + } + + RdKafka::ErrorCode err = queue->forward(NULL); + if (err != RdKafka::ERR_NO_ERROR) { + delete queue; + return Baton(RdKafka::ERR__STATE, + "Could not disable queue for given partition."); + } + + delete queue; + return Baton(err); +} + std::string KafkaConsumer::Name() { if (!IsConnected()) { return std::string(""); @@ -548,6 +572,7 @@ void KafkaConsumer::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "assign", NodeAssign); Nan::SetPrototypeMethod(tpl, "unassign", NodeUnassign); Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignments); + Nan::SetPrototypeMethod(tpl, "disableQueueForwarding", NodeDisableQueueForwarding); // NOLINT Nan::SetPrototypeMethod(tpl, "commit", NodeCommit); Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync); @@ -720,6 +745,36 @@ NAN_METHOD(KafkaConsumer::NodeAssignments) { Conversion::TopicPartition::ToV8Array(consumer->m_partitions)); } +NAN_METHOD(KafkaConsumer::NodeDisableQueueForwarding) { + Nan::HandleScope scope; + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + + if (!consumer->IsConnected()) { + Nan::ThrowError("KafkaConsumer is disconnected"); + return; + } + + if (info[0]->IsObject()) { + RdKafka::TopicPartition * toppar = + Conversion::TopicPartition::FromV8Object(info[0].As()); + + if (toppar == NULL) { + Nan::ThrowError("Invalid topic partition provided"); + return; + } + + Baton b = consumer->DisableQueueForwarding(toppar); + + delete toppar; + } else { + Nan::ThrowError("First parameter must be an object"); + return; + } + + info.GetReturnValue().Set(Nan::Null()); +} + NAN_METHOD(KafkaConsumer::NodeAssign) { Nan::HandleScope scope;