Skip to content

Commit

Permalink
fix(kafkatest): add missing fake consumer methods (#191)
Browse files Browse the repository at this point in the history
  • Loading branch information
smoya authored Oct 21, 2020
1 parent 8f699f8 commit fb50904
Showing 1 changed file with 44 additions and 23 deletions.
67 changes: 44 additions & 23 deletions kafka/kafkatest/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,33 @@ import (
"sync"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
kafkalib "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/netlify/netlify-commons/kafka"
"github.com/netlify/netlify-commons/util"
"github.com/sirupsen/logrus"
)

func KafkaPipe(log logrus.FieldLogger) (*FakeKafkaConsumer, *FakeKafkaProducer) {
distri := make(chan *kafka.Message, 200)
distri := make(chan *kafkalib.Message, 200)
rdr := NewFakeKafkaConsumer(log, distri)
wtr := NewFakeKafkaProducer(distri)
wtr.commits = rdr.commits
return rdr, wtr
}

type FakeKafkaConsumer struct {
messages []*kafka.Message
messages []*kafkalib.Message
msgMu sync.Mutex
offset int64
notify chan struct{}
commits chan *kafka.Message
commits chan *kafkalib.Message
log logrus.FieldLogger
}

func (f *FakeKafkaConsumer) Close() error {
close(f.commits)
return nil
}

type FakeKafkaProducer struct {
distris []chan<- *kafka.Message
distris []chan<- *kafkalib.Message
distrisMu sync.Mutex
commits <-chan *kafka.Message
commits <-chan *kafkalib.Message
closed util.AtomicBool
}

Expand All @@ -53,19 +49,19 @@ func (f *FakeKafkaProducer) Close() error {
return nil
}

func NewFakeKafkaConsumer(log logrus.FieldLogger, distri <-chan *kafka.Message) *FakeKafkaConsumer {
func NewFakeKafkaConsumer(log logrus.FieldLogger, distri <-chan *kafkalib.Message) *FakeKafkaConsumer {
r := &FakeKafkaConsumer{
messages: make([]*kafka.Message, 0),
messages: make([]*kafkalib.Message, 0),
offset: 0,
notify: make(chan struct{}),
log: log,
commits: make(chan *kafka.Message, 1000),
commits: make(chan *kafkalib.Message, 1000),
}

go func() {
for msg := range distri {
r.msgMu.Lock()
msg.TopicPartition.Offset = kafka.Offset(r.offset + 1)
msg.TopicPartition.Offset = kafkalib.Offset(r.offset + 1)
r.messages = append(r.messages, setMsgDefaults(msg))
r.msgMu.Unlock()
r.notify <- struct{}{}
Expand All @@ -75,7 +71,7 @@ func NewFakeKafkaConsumer(log logrus.FieldLogger, distri <-chan *kafka.Message)
return r
}

func (f *FakeKafkaConsumer) FetchMessage(ctx context.Context) (*kafka.Message, error) {
func (f *FakeKafkaConsumer) FetchMessage(ctx context.Context) (*kafkalib.Message, error) {
for {
f.msgMu.Lock()
if int64(len(f.messages)) > f.offset {
Expand All @@ -89,13 +85,13 @@ func (f *FakeKafkaConsumer) FetchMessage(ctx context.Context) (*kafka.Message, e

select {
case <-ctx.Done():
return &kafka.Message{}, ctx.Err()
return &kafkalib.Message{}, ctx.Err()
case <-f.notify:
}
}
}

func (f *FakeKafkaConsumer) CommitMessage(msg *kafka.Message) error {
func (f *FakeKafkaConsumer) CommitMessage(msg *kafkalib.Message) error {
f.msgMu.Lock()
f.log.WithField("offset", msg.TopicPartition.Offset).Trace("commiting message...")
if int64(msg.TopicPartition.Offset) > f.offset {
Expand All @@ -117,27 +113,52 @@ func (f *FakeKafkaConsumer) SetInitialOffset(offset int64) error {
return nil
}

func (f *FakeKafkaConsumer) Seek(offset int64, _ time.Duration) error {
func (f *FakeKafkaConsumer) Seek(offset int64) error {
f.msgMu.Lock()
f.offset = offset
f.msgMu.Unlock()
return nil
}

func NewFakeKafkaProducer(distris ...chan<- *kafka.Message) *FakeKafkaProducer {
func (f *FakeKafkaConsumer) AssignPartitionByKey(key string, algorithm kafka.PartitionerAlgorithm) error {
return nil // noop
}

func (f *FakeKafkaConsumer) AssignPartitionByID(id int32) error {
return nil // noop
}

func (f *FakeKafkaConsumer) GetMetadata(allTopics bool) (*kafkalib.Metadata, error) {
return &kafkalib.Metadata{}, nil // noop
}

func (f *FakeKafkaConsumer) GetPartitions() ([]int32, error) {
return []int32{}, nil // noop
}

func (f *FakeKafkaConsumer) SeekToTime(t time.Time) error {
return nil // noop
}

func (f *FakeKafkaConsumer) Close() error {
close(f.commits)
return nil
}

func NewFakeKafkaProducer(distris ...chan<- *kafkalib.Message) *FakeKafkaProducer {
return &FakeKafkaProducer{
distris: distris,
closed: util.NewAtomicBool(false),
}
}

func (f *FakeKafkaProducer) AddDistri(d chan<- *kafka.Message) {
func (f *FakeKafkaProducer) AddDistri(d chan<- *kafkalib.Message) {
f.distrisMu.Lock()
f.distris = append(f.distris, d)
f.distrisMu.Unlock()
}

func (f *FakeKafkaProducer) Produce(ctx context.Context, msgs ...*kafka.Message) error {
func (f *FakeKafkaProducer) Produce(ctx context.Context, msgs ...*kafkalib.Message) error {
f.distrisMu.Lock()
for _, msg := range msgs {
for _, d := range f.distris {
Expand All @@ -162,7 +183,7 @@ func (f *FakeKafkaProducer) WaitForKey(key []byte) (gotKey bool) {
return false
}

func setMsgDefaults(msg *kafka.Message) *kafka.Message {
func setMsgDefaults(msg *kafkalib.Message) *kafkalib.Message {
if msg.TopicPartition.Topic == nil {
topicName := "local-test"
msg.TopicPartition.Topic = &topicName
Expand Down

0 comments on commit fb50904

Please sign in to comment.