Skip to content

Commit

Permalink
feat(kafka) add detached consumer (#178)
Browse files Browse the repository at this point in the history
  • Loading branch information
smoya authored Sep 24, 2020
1 parent 101e3b3 commit d090a31
Showing 1 changed file with 73 additions and 4 deletions.
77 changes: 73 additions & 4 deletions kafka/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,75 @@ type ConfluentConsumer struct {
eventChan chan kafkalib.Event
}

// NewDetachedConsumer creates a Consumer detached from Consumer Groups for partition assignment and rebalance (see NOTE).
// - NOTE Either a partition or partition key is required to be set.
// A detached consumer will work out of consumer groups for partition assignment and rebalance, however it needs
// permission on the group coordinator for managing commits, so it needs a consumer group in the broker.
// In order to simplify, the default consumer group id is copied from the configured topic name, so make sure you have a
// policy that gives permission to such consumer group.
func NewDetachedConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (Consumer, error) {
// See Reference at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
kafkaConf := conf.baseKafkaConfig()
_ = kafkaConf.SetKey("enable.auto.offset.store", false) // manually StoreOffset after processing a message. It is mandatory for detached consumers.

// In case we try to assign an offset out of range (greater than log-end-offset), consumer will use start consuming from offset zero.
_ = kafkaConf.SetKey("auto.offset.reset", "earliest")

conf.Consumer.GroupID = conf.Topic // Defaults to topic name. See NOTE above)

conf.Consumer.Apply(kafkaConf)
for _, opt := range opts {
opt(kafkaConf)
}

if err := conf.configureAuth(kafkaConf); err != nil {
return nil, errors.Wrap(err, "error configuring auth for the Kafka consumer")
}

consumer, err := kafkalib.NewConsumer(kafkaConf)
if err != nil {
return nil, err
}

if conf.RequestTimeout == 0 {
conf.RequestTimeout = DefaultTimeout
}

cc := &ConfluentConsumer{
c: consumer,
conf: conf,
log: log,
}

logFields := logrus.Fields{"kafka_topic": cc.conf.Topic}

if cc.conf.Consumer.Partition == nil && cc.conf.Consumer.PartitionKey == "" {
return nil, errors.New("Either a partition or a partition key is required for creating a detached consumer")
}

logFields["kafka_partition_key"] = cc.conf.Consumer.PartitionKey
logFields["kafka_partition"] = cc.conf.Consumer.Partition

if cc.conf.Consumer.Partition != nil {
cc.log.WithFields(logFields).Debug("Assigning specified partition")
pt := []kafkalib.TopicPartition{
{
Topic: &cc.conf.Topic,
Partition: *cc.conf.Consumer.Partition,
},
}
return cc, cc.c.Assign(pt)
}

if cc.conf.Consumer.PartitionerAlgorithm == "" {
cc.conf.Consumer.PartitionerAlgorithm = PartitionerMurMur2
}

cc.log.WithFields(logFields).Debug("Assigning partition by partition key")

return cc, cc.AssignPartitionByKey(cc.conf.Consumer.PartitionKey, cc.conf.Consumer.PartitionerAlgorithm)
}

// NewConsumer creates a ConfluentConsumer based on config.
// - NOTE if the partition is set and the partition key is not set in config we have no way
// of knowing where to assign the consumer to in the case of a rebalance
Expand Down Expand Up @@ -107,7 +176,7 @@ func NewConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (Consum
logFields["kafka_partition"] = cc.conf.Consumer.Partition
}

cc.setupRebalanceHandler(cc.conf.Consumer.InitialOffset)
cc.setupRebalanceHandler()
cc.log.WithFields(logFields).Debug("Subscribing to Kafka topic")
if serr := cc.c.Subscribe(cc.conf.Topic, cc.rebalanceHandler); serr != nil {
err = errors.Wrap(serr, "error subscribing to topic")
Expand Down Expand Up @@ -155,7 +224,7 @@ func (cc *ConfluentConsumer) SeekToTime(t time.Time) error {
}

// setupReabalnceHandler does the setup of the rebalance handler
func (cc *ConfluentConsumer) setupRebalanceHandler(offset *int64) {
func (cc *ConfluentConsumer) setupRebalanceHandler() {
cc.rebalanceHandlerMutex.Lock()
defer cc.rebalanceHandlerMutex.Unlock()

Expand All @@ -176,8 +245,8 @@ func (cc *ConfluentConsumer) setupRebalanceHandler(offset *int64) {
// if we have an initial offset we need to set it
if cc.conf.Consumer.InitialOffset != nil {
once.Do(func() {
log.WithField("kafka_offset", *offset).Debug("Skipping Kafka assignment given by coordinator after rebalance in favor of resetting the offset")
partitions = kafkalib.TopicPartitions{{Topic: &cc.conf.Topic, Offset: kafkalib.Offset(*offset)}}
log.WithField("kafka_offset", *cc.conf.Consumer.InitialOffset).Debug("Skipping Kafka assignment given by coordinator after rebalance in favor of resetting the offset")
partitions = kafkalib.TopicPartitions{{Topic: &cc.conf.Topic, Offset: kafkalib.Offset(*cc.conf.Consumer.InitialOffset)}}
})
}
log.WithField("kafka_partitions", partitions).Debug("Assigning Kafka partitions after rebalance")
Expand Down

0 comments on commit d090a31

Please sign in to comment.