Skip to content

Commit

Permalink
fix: add missing method
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnimhoff committed Nov 6, 2024
1 parent 7bb054d commit d9394a4
Showing 1 changed file with 55 additions and 0 deletions.
55 changes: 55 additions & 0 deletions src/kafka-consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,30 @@ Baton KafkaConsumer::RefreshAssignments() {
}
}

Baton KafkaConsumer::DisableQueueForwarding(RdKafka::TopicPartition * toppar) {
if (!IsConnected()) {
return Baton(RdKafka::ERR__STATE, "KafkaConsumer is not connected");
}

// Disable forwarding for own partition
RdKafka::Queue *queue = m_client->get_partition_queue(toppar);

if (queue == NULL) {
return Baton(RdKafka::ERR__STATE,
"TopicPartition has an invalid queue.");
}

RdKafka::ErrorCode err = queue->forward(NULL);
if (err != RdKafka::ERR_NO_ERROR) {
delete queue;
return Baton(RdKafka::ERR__STATE,
"Could not disable queue for given partition.");
}

delete queue;
return Baton(err);
}

std::string KafkaConsumer::Name() {
if (!IsConnected()) {
return std::string("");
Expand Down Expand Up @@ -548,6 +572,7 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {
Nan::SetPrototypeMethod(tpl, "assign", NodeAssign);
Nan::SetPrototypeMethod(tpl, "unassign", NodeUnassign);
Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignments);
Nan::SetPrototypeMethod(tpl, "disableQueueForwarding", NodeDisableQueueForwarding); // NOLINT

Nan::SetPrototypeMethod(tpl, "commit", NodeCommit);
Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync);
Expand Down Expand Up @@ -720,6 +745,36 @@ NAN_METHOD(KafkaConsumer::NodeAssignments) {
Conversion::TopicPartition::ToV8Array(consumer->m_partitions));
}

NAN_METHOD(KafkaConsumer::NodeDisableQueueForwarding) {
Nan::HandleScope scope;

KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());

if (!consumer->IsConnected()) {
Nan::ThrowError("KafkaConsumer is disconnected");
return;
}

if (info[0]->IsObject()) {
RdKafka::TopicPartition * toppar =
Conversion::TopicPartition::FromV8Object(info[0].As<v8::Object>());

if (toppar == NULL) {
Nan::ThrowError("Invalid topic partition provided");
return;
}

Baton b = consumer->DisableQueueForwarding(toppar);

delete toppar;
} else {
Nan::ThrowError("First parameter must be an object");
return;
}

info.GetReturnValue().Set(Nan::Null());
}

NAN_METHOD(KafkaConsumer::NodeAssign) {
Nan::HandleScope scope;

Expand Down

0 comments on commit d9394a4

Please sign in to comment.