Skip to content

Commit

Permalink
Use session context to avoid rebalancing issues
Browse files Browse the repository at this point in the history
  • Loading branch information
K4L1Ma committed Oct 5, 2020
1 parent ef4e4c3 commit 17ed724
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ func (c *KafkaConsumer) AddFallback(fn FallbackFunc) {
}

func (c *KafkaConsumer) Start(ctx context.Context) error {
c.consumer.ctx = ctx

for {
select {
case err := <-c.consumerGroup.Errors():
Expand Down Expand Up @@ -188,7 +186,6 @@ type consumer struct {
handlerList messageHandlerList
topics []string
fallbackHandler FallbackFunc
ctx context.Context // sarama sessions closes context when re balancing even thou it keeps consuming messages so this is the faster way to mantain a non-closed copy of the main context
}

func (c *consumer) Setup(sarama.ConsumerGroupSession) error {
Expand All @@ -215,14 +212,19 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
ContextTopic := contextKey("Topic")

for message := range claim.Messages() {
ctx := context.WithValue(c.ctx, ContextTopic, message.Topic)
select {
case <-session.Context().Done():
return nil
default:
ctx := context.WithValue(session.Context(), ContextTopic, message.Topic)

err := c.handlerList.Handle(ctx, message)
if errors.Is(err, errHandlerNotFound) && c.fallbackHandler != nil {
_ = c.fallbackHandler(ctx, message.Value)
}
err := c.handlerList.Handle(ctx, message)
if errors.Is(err, errHandlerNotFound) && c.fallbackHandler != nil {
_ = c.fallbackHandler(ctx, message.Value)
}

session.MarkMessage(message, "")
session.MarkMessage(message, "")
}
}

return nil
Expand Down

0 comments on commit 17ed724

Please sign in to comment.