Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka consumer reads same message again after consumer group rebalances #1079

Open
subhamKumar04 opened this issue May 23, 2024 · 0 comments

Comments

@subhamKumar04
Copy link

Environment Information

  • OS [e.g. Mac, Arch, Windows 10]: Mac
  • Node Version [e.g. 8.2.1]: 14.17.5
  • NPM Version [e.g. 5.4.2]: 6.4.1
  • C++ Toolchain [e.g. Visual Studio, llvm, g++]:
  • node-rdkafka version [e.g. 2.3.3]: 2.18.0

Steps to Reproduce

I have two consumers running in my application both running on the same consumer group ID G1.

consumer1 is reading from topic1 and consumer2 is reading from topic2. autoCommit is disabled. I am manually committing each offset after message processing.

Both topics have only one partition.

MAX_MESSAGE_POLL is set to 1. So I am polling only 1 message at a time from the broker.

What I am noticing is, that if consumer1 is in middle of message processing and in the meantime consumer2 comes up. Consumer1 will re-read the same message again. I enabled rebalance_cb in kafka consumer config and got to know that broker rebalances the consumer group whenever a new consumer joins the group. It revokes and re-assigns the same partition again (since there is only one partition)

  • Question 1: Why is consumer1 reading the same message again even after successfully committing the current offset?

Another thing I noticed is that if I provide a default implementation of rebalance_cb from https://www.npmjs.com/package/node-rdkafka#rebalancing

That is the below code:

'rebalance_cb': (err, assignment) => {

      if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
        // Note: this can throw when you are disconnected. Take care and wrap it in
        // a try catch if that matters to you
        this.consumer.assign(assignment);
      } else if (err.code == Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS){
        // Same as above
        this.consumer.unassign();
      } else {
        // We had a real error
        console.error(err);
      }
  
    }

In this case, message is NOT read twice.

  • Question 2: What is causing consumer1 to NOT read the same message again when providing a default implementation of rebalance_cb ? I mean without this function also, node-rdkafka must be following same strategy.

My expectation is that after a successful commit, same message should never be read again from same consumer group.

node-rdkafka Configuration Settings

brokerConfig: {
    'metadata.broker.list': 'localhost:9092',
    'group.id': `group-name`,
    'event_cb': true,
    'compression.codec': 'snappy',
    'socket.keepalive.enable': true,
    'enable.auto.commit': false,
    'heartbeat.interval.ms': 250,
    'queued.min.messages': 100,
    'fetch.error.backoff.ms': 250,
    'queued.max.messages.kbytes': 50,
  },
  topicConfig: {
    'auto.offset.reset': 'earliest',
    'request.required.acks': 1
  }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant