diff --git a/kafka/kafkatest/utils.go b/kafka/kafkatest/utils.go index fcded51..6d35664 100644 --- a/kafka/kafkatest/utils.go +++ b/kafka/kafkatest/utils.go @@ -6,13 +6,14 @@ import ( "sync" "time" - "github.com/confluentinc/confluent-kafka-go/kafka" + kafkalib "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/netlify/netlify-commons/kafka" "github.com/netlify/netlify-commons/util" "github.com/sirupsen/logrus" ) func KafkaPipe(log logrus.FieldLogger) (*FakeKafkaConsumer, *FakeKafkaProducer) { - distri := make(chan *kafka.Message, 200) + distri := make(chan *kafkalib.Message, 200) rdr := NewFakeKafkaConsumer(log, distri) wtr := NewFakeKafkaProducer(distri) wtr.commits = rdr.commits @@ -20,23 +21,18 @@ func KafkaPipe(log logrus.FieldLogger) (*FakeKafkaConsumer, *FakeKafkaProducer) } type FakeKafkaConsumer struct { - messages []*kafka.Message + messages []*kafkalib.Message msgMu sync.Mutex offset int64 notify chan struct{} - commits chan *kafka.Message + commits chan *kafkalib.Message log logrus.FieldLogger } -func (f *FakeKafkaConsumer) Close() error { - close(f.commits) - return nil -} - type FakeKafkaProducer struct { - distris []chan<- *kafka.Message + distris []chan<- *kafkalib.Message distrisMu sync.Mutex - commits <-chan *kafka.Message + commits <-chan *kafkalib.Message closed util.AtomicBool } @@ -53,19 +49,19 @@ func (f *FakeKafkaProducer) Close() error { return nil } -func NewFakeKafkaConsumer(log logrus.FieldLogger, distri <-chan *kafka.Message) *FakeKafkaConsumer { +func NewFakeKafkaConsumer(log logrus.FieldLogger, distri <-chan *kafkalib.Message) *FakeKafkaConsumer { r := &FakeKafkaConsumer{ - messages: make([]*kafka.Message, 0), + messages: make([]*kafkalib.Message, 0), offset: 0, notify: make(chan struct{}), log: log, - commits: make(chan *kafka.Message, 1000), + commits: make(chan *kafkalib.Message, 1000), } go func() { for msg := range distri { r.msgMu.Lock() - msg.TopicPartition.Offset = kafka.Offset(r.offset + 1) + msg.TopicPartition.Offset = kafkalib.Offset(r.offset + 1) r.messages = append(r.messages, setMsgDefaults(msg)) r.msgMu.Unlock() r.notify <- struct{}{} @@ -75,7 +71,7 @@ func NewFakeKafkaConsumer(log logrus.FieldLogger, distri <-chan *kafka.Message) return r } -func (f *FakeKafkaConsumer) FetchMessage(ctx context.Context) (*kafka.Message, error) { +func (f *FakeKafkaConsumer) FetchMessage(ctx context.Context) (*kafkalib.Message, error) { for { f.msgMu.Lock() if int64(len(f.messages)) > f.offset { @@ -89,13 +85,13 @@ func (f *FakeKafkaConsumer) FetchMessage(ctx context.Context) (*kafka.Message, e select { case <-ctx.Done(): - return &kafka.Message{}, ctx.Err() + return &kafkalib.Message{}, ctx.Err() case <-f.notify: } } } -func (f *FakeKafkaConsumer) CommitMessage(msg *kafka.Message) error { +func (f *FakeKafkaConsumer) CommitMessage(msg *kafkalib.Message) error { f.msgMu.Lock() f.log.WithField("offset", msg.TopicPartition.Offset).Trace("commiting message...") if int64(msg.TopicPartition.Offset) > f.offset { @@ -117,27 +113,52 @@ func (f *FakeKafkaConsumer) SetInitialOffset(offset int64) error { return nil } -func (f *FakeKafkaConsumer) Seek(offset int64, _ time.Duration) error { +func (f *FakeKafkaConsumer) Seek(offset int64) error { f.msgMu.Lock() f.offset = offset f.msgMu.Unlock() return nil } -func NewFakeKafkaProducer(distris ...chan<- *kafka.Message) *FakeKafkaProducer { +func (f *FakeKafkaConsumer) AssignPartitionByKey(key string, algorithm kafka.PartitionerAlgorithm) error { + return nil // noop +} + +func (f *FakeKafkaConsumer) AssignPartitionByID(id int32) error { + return nil // noop +} + +func (f *FakeKafkaConsumer) GetMetadata(allTopics bool) (*kafkalib.Metadata, error) { + return &kafkalib.Metadata{}, nil // noop +} + +func (f *FakeKafkaConsumer) GetPartitions() ([]int32, error) { + return []int32{}, nil // noop +} + +func (f *FakeKafkaConsumer) SeekToTime(t time.Time) error { + return nil // noop +} + +func (f *FakeKafkaConsumer) Close() error { + close(f.commits) + return nil +} + +func NewFakeKafkaProducer(distris ...chan<- *kafkalib.Message) *FakeKafkaProducer { return &FakeKafkaProducer{ distris: distris, closed: util.NewAtomicBool(false), } } -func (f *FakeKafkaProducer) AddDistri(d chan<- *kafka.Message) { +func (f *FakeKafkaProducer) AddDistri(d chan<- *kafkalib.Message) { f.distrisMu.Lock() f.distris = append(f.distris, d) f.distrisMu.Unlock() } -func (f *FakeKafkaProducer) Produce(ctx context.Context, msgs ...*kafka.Message) error { +func (f *FakeKafkaProducer) Produce(ctx context.Context, msgs ...*kafkalib.Message) error { f.distrisMu.Lock() for _, msg := range msgs { for _, d := range f.distris { @@ -162,7 +183,7 @@ func (f *FakeKafkaProducer) WaitForKey(key []byte) (gotKey bool) { return false } -func setMsgDefaults(msg *kafka.Message) *kafka.Message { +func setMsgDefaults(msg *kafkalib.Message) *kafkalib.Message { if msg.TopicPartition.Topic == nil { topicName := "local-test" msg.TopicPartition.Topic = &topicName