diff --git a/kafka.go b/kafka.go index 825dc8e..3e1e052 100644 --- a/kafka.go +++ b/kafka.go @@ -147,8 +147,6 @@ func (c *KafkaConsumer) AddFallback(fn FallbackFunc) { } func (c *KafkaConsumer) Start(ctx context.Context) error { - c.consumer.ctx = ctx - for { select { case err := <-c.consumerGroup.Errors(): @@ -188,7 +186,6 @@ type consumer struct { handlerList messageHandlerList topics []string fallbackHandler FallbackFunc - ctx context.Context // sarama sessions closes context when re balancing even thou it keeps consuming messages so this is the faster way to mantain a non-closed copy of the main context } func (c *consumer) Setup(sarama.ConsumerGroupSession) error { @@ -215,14 +212,19 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram ContextTopic := contextKey("Topic") for message := range claim.Messages() { - ctx := context.WithValue(c.ctx, ContextTopic, message.Topic) + select { + case <-session.Context().Done(): + return nil + default: + ctx := context.WithValue(session.Context(), ContextTopic, message.Topic) - err := c.handlerList.Handle(ctx, message) - if errors.Is(err, errHandlerNotFound) && c.fallbackHandler != nil { - _ = c.fallbackHandler(ctx, message.Value) - } + err := c.handlerList.Handle(ctx, message) + if errors.Is(err, errHandlerNotFound) && c.fallbackHandler != nil { + _ = c.fallbackHandler(ctx, message.Value) + } - session.MarkMessage(message, "") + session.MarkMessage(message, "") + } } return nil