Skip to content

Commit

Permalink
Stop using Session Context, since it get canceled on rebalancing
Browse files Browse the repository at this point in the history
  • Loading branch information
K4L1Ma committed Oct 4, 2020
1 parent 66798a7 commit ef4e4c3
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 48 deletions.
2 changes: 2 additions & 0 deletions guard/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ func HaveTwoAttributes(handler interface{}) bool {

func IsACtx(handler interface{}, attributePosition int) bool {
ctxInterface := reflect.TypeOf((*context.Context)(nil)).Elem()

return reflect.TypeOf(handler).In(attributePosition).Kind() == reflect.Interface &&
reflect.TypeOf(handler).In(attributePosition).Implements(ctxInterface)
}
Expand All @@ -25,6 +26,7 @@ func IsAFunc(handler interface{}) bool {

func FuncReturnError(handler interface{}) bool {
errorInterface := reflect.TypeOf((*error)(nil)).Elem()

return reflect.TypeOf(handler).Out(0).Implements(errorInterface)
}

Expand Down
4 changes: 3 additions & 1 deletion guard/message_handler_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package guard
package guard_test

import (
"context"
"testing"

"github.com/stretchr/testify/require"

. "github.com/chiguirez/kfk/v2/guard"
)

func TestMessageHandler(t *testing.T) {
Expand Down
11 changes: 7 additions & 4 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type FallbackFunc func(context.Context, []byte) error

func NewHandler(handlerFunc interface{}) MessageHandler {
guard.MessageHandler(handlerFunc)

return MessageHandler{reflect.TypeOf(handlerFunc), reflect.ValueOf(handlerFunc)}
}

Expand Down Expand Up @@ -146,12 +147,12 @@ func (c *KafkaConsumer) AddFallback(fn FallbackFunc) {
}

func (c *KafkaConsumer) Start(ctx context.Context) error {
defer func() {
_ = c.consumerGroup.Close()
}()
c.consumer.ctx = ctx

for {
select {
case err := <-c.consumerGroup.Errors():
return err
case <-ctx.Done():
if !errors.Is(ctx.Err(), context.Canceled) {
return ctx.Err()
Expand Down Expand Up @@ -187,10 +188,12 @@ type consumer struct {
handlerList messageHandlerList
topics []string
fallbackHandler FallbackFunc
ctx context.Context // sarama sessions closes context when re balancing even thou it keeps consuming messages so this is the faster way to mantain a non-closed copy of the main context
}

func (c *consumer) Setup(sarama.ConsumerGroupSession) error {
close(c.ready)

return nil
}

Expand All @@ -212,7 +215,7 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
ContextTopic := contextKey("Topic")

for message := range claim.Messages() {
ctx := context.WithValue(session.Context(), ContextTopic, message.Topic)
ctx := context.WithValue(c.ctx, ContextTopic, message.Topic)

err := c.handlerList.Handle(ctx, message)
if errors.Is(err, errHandlerNotFound) && c.fallbackHandler != nil {
Expand Down
112 changes: 69 additions & 43 deletions kafka_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kfk
package kfk_test

import (
"context"
Expand All @@ -11,7 +11,8 @@ import (

"github.com/Shopify/sarama"
"github.com/stretchr/testify/require"
"github.com/wvanbergen/kazoo-go"

. "github.com/chiguirez/kfk/v2"
)

const (
Expand All @@ -24,6 +25,7 @@ type sentTestingMessage struct {
Name string `json:"name"`
}

//nolint:funlen
func TestKafkaProduceAndConsume(t *testing.T) {
var (
kafkaConsumer *KafkaConsumer
Expand All @@ -43,11 +45,12 @@ func TestKafkaProduceAndConsume(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())

setup := func() {
setup := func(t *testing.T) (*KafkaProducer, func(t *testing.T)) {
messageHandler := NewHandler(func(ctx context.Context, s sentTestingMessage) error {
stChan <- s
topicFromContext, _ := TopicFromContext(ctx)
topicChan <- topicFromContext

return nil
})

Expand All @@ -60,25 +63,41 @@ func TestKafkaProduceAndConsume(t *testing.T) {

kafkaConsumer.AddHandler("sentTestingMessage", messageHandler)

waitChan := make(chan struct{})

go func() {
err = kafkaConsumer.Start(ctx)
require.NoError(t, err)
waitChan <- struct{}{}
}()

kafkaProducer, err = NewKafkaProducer([]string{kafkaBroker})
require.NoError(t, err)
}

tearDown := func() {
cancel()
return kafkaProducer, func(t *testing.T) {
cancel()
<-waitChan

config := sarama.NewConfig()
config.Version = sarama.V1_1_0_0
admin, err := sarama.NewClusterAdmin([]string{broker}, config)
require.NoError(t, err)
config := sarama.NewConfig()
config.Version = sarama.V1_1_0_0

admin, err := sarama.NewClusterAdmin([]string{kafkaBroker}, config)
require.NoError(t, err)

err = admin.DeleteTopic(topic)
require.NoError(t, err)

_ = admin.DeleteTopic(topic)
_ = admin.DeleteConsumerGroup(groupID)
// lets wait a few seconds for kafka to realize we are not consuming anymore
time.Sleep(10 * time.Second)

err = admin.DeleteConsumerGroup(groupID)
require.NoError(t, err)
}
}

t.Run("Given a producer and a message is sent to kafka topic", func(t *testing.T) {
setup()
defer tearDown()
kafkaProducer, tearDown := setup(t)
defer tearDown(t)

msg := sentTestingMessage{
ID: "testing-message-id",
Expand All @@ -88,11 +107,6 @@ func TestKafkaProduceAndConsume(t *testing.T) {
require.NoError(t, err)

t.Run("When the consumer is started", func(t *testing.T) {
go func() {
err = kafkaConsumer.Start(ctx)
require.NoError(t, err)
}()

t.Run("Then the message is consumed and the message information can be retrieved", func(t *testing.T) {
message := <-stChan

Expand All @@ -114,13 +128,8 @@ func TestKafkaProduceAndConsume(t *testing.T) {
})
}

//nolint:funlen
func TestKafkaFallbackConsume(t *testing.T) {
var (
kafkaConsumer *KafkaConsumer
kafkaProducer *KafkaProducer
err error
)

stChan := make(chan sentTestingMessage)
topic := "topic-name-with-fallback"
groupID := groupID
Expand All @@ -130,9 +139,9 @@ func TestKafkaFallbackConsume(t *testing.T) {
kafkaBroker = broker
}

ctx, cancel := context.WithCancel(context.Background())
setup := func(t *testing.T) (*KafkaConsumer, *KafkaProducer, func(t *testing.T)) {
ctx, cancel := context.WithCancel(context.Background())

setup := func() {
messageHandler := func(ctx context.Context, s []byte) error {
message := &sentTestingMessage{}
if err := json.Unmarshal(s, message); err != nil {
Expand All @@ -143,7 +152,7 @@ func TestKafkaFallbackConsume(t *testing.T) {
return nil
}

kafkaConsumer, err = NewKafkaConsumer(
kafkaConsumer, err := NewKafkaConsumer(
[]string{kafkaBroker},
groupID,
[]string{topic},
Expand All @@ -152,35 +161,50 @@ func TestKafkaFallbackConsume(t *testing.T) {

kafkaConsumer.AddFallback(messageHandler)

kafkaProducer, err = NewKafkaProducer([]string{kafkaBroker})
kafkaProducer, err := NewKafkaProducer([]string{kafkaBroker})
require.NoError(t, err)
}

tearDown := func() {
newKazoo, _ := kazoo.NewKazoo([]string{"localhost:2181"}, nil)
_ = newKazoo.DeleteTopicSync(topic, time.Second*5)
_ = newKazoo.Consumergroup(groupID).Delete()
waitChan := make(chan struct{})

go func() {
err := kafkaConsumer.Start(ctx)
require.NoError(t, err)
waitChan <- struct{}{}
}()

return kafkaConsumer, kafkaProducer, func(t *testing.T) {
cancel()
<-waitChan

config := sarama.NewConfig()
config.Version = sarama.V1_1_0_0

admin, err := sarama.NewClusterAdmin([]string{kafkaBroker}, config)
require.NoError(t, err)

err = admin.DeleteTopic(topic)
require.NoError(t, err)

// lets wait a few seconds for kafka to realize we are not consuming anymore
time.Sleep(10 * time.Second)

cancel()
err = admin.DeleteConsumerGroup(groupID)
require.NoError(t, err)
}
}

t.Run("Given a valid consumer and a message is sent to kafka topic", func(t *testing.T) {
setup()
defer tearDown()
kafkaConsumer, kafkaProducer, tearDown := setup(t)
defer tearDown(t)

msg := sentTestingMessage{
ID: "testing-message-id",
Name: "testing-message-name",
}
err = kafkaProducer.Send(topic, msg.ID, msg)
err := kafkaProducer.Send(topic, msg.ID, msg)
require.NoError(t, err)

t.Run("When the consumer is started", func(t *testing.T) {
go func() {
err = kafkaConsumer.Start(ctx)
require.NoError(t, err)
}()

t.Run("Then the message is consumed by the fallback and the message information can be retrieved", func(t *testing.T) {
message := <-stChan

Expand Down Expand Up @@ -211,6 +235,7 @@ func (c *CustomEncodingDecodingMessage) UnmarshallKFK(data []byte) error {

func (c CustomEncodingDecodingMessage) MarshalKFK() ([]byte, error) {
customEncoding := fmt.Sprintf("%s;%s", c.id, c.name)

return []byte(customEncoding), nil
}

Expand Down Expand Up @@ -247,6 +272,7 @@ func TestCustomEncodeDecode(t *testing.T) {
go func() {
messageChan <- message
}()

return nil
}))

Expand Down

0 comments on commit ef4e4c3

Please sign in to comment.