Skip to content

Commit

Permalink
Added periodic client refresh for e2e monitoring.
Browse files Browse the repository at this point in the history
Fixed consumer group monitoring flag
Fixed topic management reconciliation
  • Loading branch information
azun committed Sep 25, 2023
1 parent caa5c0a commit 2a2054c
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 63 deletions.
44 changes: 42 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
version: '2.1'
version: '3.9'

services:

Expand All @@ -12,6 +12,8 @@ services:
ZOOKEEPER_TICK_TIME: 2000
container_name: zookeeper
hostname: zookeeper
networks:
- kminion

kafka:
image: confluentinc/cp-kafka:latest
Expand All @@ -30,6 +32,10 @@ services:
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
links:
- zookeeper
networks:
- kminion

kafka-minion:
build:
Expand All @@ -44,4 +50,38 @@ services:
- 8080:8080
environment:
KAFKA_BROKERS: kafka:29092
restart: unless-stopped
restart: unless-stopped
links:
- kafka
networks:
- kminion

grafana:
image: grafana/grafana-oss
ports:
- '3000:3000'
volumes:
- "/tmp/grafana:/var/lib/grafana"
container_name: grafana
hostname: grafana
networks:
- kminion

prometheus:
image: prom/prometheus
ports:
- '9090:9090'
configs:
- source: prometheus
target: /etc/prometheus/prometheus.yml
container_name: prometheus
hostname: prometheus
networks:
- kminion
configs:
prometheus:
file: example/sample_prometheus.yml


networks:
kminion:
12 changes: 7 additions & 5 deletions e2e/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ import (
)

type Config struct {
Enabled bool `koanf:"enabled"`
TopicManagement EndToEndTopicConfig `koanf:"topicManagement"`
ProbeInterval time.Duration `koanf:"probeInterval"`
Producer EndToEndProducerConfig `koanf:"producer"`
Consumer EndToEndConsumerConfig `koanf:"consumer"`
Enabled bool `koanf:"enabled"`
TopicManagement EndToEndTopicConfig `koanf:"topicManagement"`
ProbeInterval time.Duration `koanf:"probeInterval"`
ReconnectInterval time.Duration `koanf:"reconnectInterval"`
Producer EndToEndProducerConfig `koanf:"producer"`
Consumer EndToEndConsumerConfig `koanf:"consumer"`
}

func (c *Config) SetDefaults() {
c.Enabled = false
c.ProbeInterval = 100 * time.Millisecond
c.ReconnectInterval = 0 * time.Second
c.TopicManagement.SetDefaults()
c.Producer.SetDefaults()
c.Consumer.SetDefaults()
Expand Down
20 changes: 17 additions & 3 deletions e2e/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,42 @@ import (

func (s *Service) startConsumeMessages(ctx context.Context, initializedCh chan<- bool) {
client := s.client
logger := s.logger.Named("consumer")

s.logger.Info("Starting to consume end-to-end topic",
logger.Info("Starting to consume end-to-end topic",
zap.String("topic_name", s.config.TopicManagement.Name),
zap.String("group_id", s.groupId))

isInitialized := false
for {
if ctx.Err() != nil {
break
}
fetches := client.PollFetches(ctx)
if !isInitialized {
isInitialized = true
initializedCh <- true
}

if fetches == nil {
break
}

logger.Debug("fetching messages", zap.Any("fetches", fetches))
// Log all errors and continue afterwards as we might get errors and still have some fetch results
errors := fetches.Errors()
for _, err := range errors {
s.logger.Error("kafka fetch error",
logger.Error("kafka fetch error",
zap.String("topic", err.Topic),
zap.Int32("partition", err.Partition),
zap.Error(err.Err))
}

fetches.EachRecord(s.processMessage)
}

client.LeaveGroup()
logger.Info("Consumer thread exited")
}

func (s *Service) commitOffsets(ctx context.Context) {
Expand Down Expand Up @@ -75,14 +87,16 @@ func (s *Service) commitOffsets(ctx context.Context) {
// - checks if it is from us, or from another kminion process running somewhere else
// - hands it off to the service, which then reports metrics on it
func (s *Service) processMessage(record *kgo.Record) {
logger := s.logger.Named("consumer")

if record.Value == nil {
// Init messages have nil values - we want to skip these. They are only used to make sure a consumer is ready.
return
}

var msg EndToEndMessage
if jerr := json.Unmarshal(record.Value, &msg); jerr != nil {
s.logger.Error("failed to unmarshal message value", zap.Error(jerr))
logger.Error("failed to unmarshal message value", zap.Error(jerr))
return // maybe older version
}

Expand Down
3 changes: 3 additions & 0 deletions e2e/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ func (s *Service) produceMessagesToAllPartitions(ctx context.Context) {
// it will add it to the message tracker. If producing fails a message will be logged and the respective metrics
// will be incremented.
func (s *Service) produceMessage(ctx context.Context, partition int) {
logger := s.logger.Named("producer")

topicName := s.config.TopicManagement.Name
record, msg := createEndToEndRecord(s.minionID, topicName, partition)

Expand All @@ -34,6 +36,7 @@ func (s *Service) produceMessage(ctx context.Context, partition int) {
pID := strconv.Itoa(partition)
s.messagesProducedInFlight.WithLabelValues(pID).Inc()
s.messageTracker.addToTracker(msg)
logger.Debug("producing message", zap.Any("record", record))
s.client.Produce(childCtx, record, func(r *kgo.Record, err error) {
defer cancel()
ackDuration := time.Since(startTime)
Expand Down
106 changes: 80 additions & 26 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ type Service struct {
config Config
logger *zap.Logger

kafkaSvc *kafka.Service // creates kafka client for us
client *kgo.Client
kafkaSvc *kafka.Service // creates kafka client for us
client *kgo.Client
adminClient *kgo.Client
kgoOpts []kgo.Opt

// Service
minionID string // unique identifier, reported in metrics, in case multiple instances run at the same time
Expand Down Expand Up @@ -57,14 +59,17 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k
kgoOpts = append(kgoOpts, kgo.DisableIdempotentWrite())
}
kgoOpts = append(kgoOpts, kgo.ProduceRequestTimeout(3*time.Second))
kgoOpts = append(kgoOpts, kgo.ClientID("kminion"))

// Consumer configs
kgoOpts = append(kgoOpts,
kgo.ConsumerGroup(groupID),
kgo.ConsumeTopics(cfg.TopicManagement.Name),
kgo.Balancers(kgo.CooperativeStickyBalancer()),
kgo.DisableAutoCommit(),
kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()))
kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()),
kgo.InstanceID(groupID),
)

// Prepare hooks
hooks := newEndToEndClientHooks(logger)
Expand All @@ -73,29 +78,17 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k
// We use the manual partitioner so that the records' partition id will be used as target partition
kgoOpts = append(kgoOpts, kgo.RecordPartitioner(kgo.ManualPartitioner()))

// Create kafka service and check if client can successfully connect to Kafka cluster
logger.Info("connecting to Kafka seed brokers, trying to fetch cluster metadata",
zap.String("seed_brokers", strings.Join(kafkaSvc.Brokers(), ",")))
client, err := kafkaSvc.CreateAndTestClient(ctx, logger, kgoOpts)
if err != nil {
return nil, fmt.Errorf("failed to create kafka client for e2e: %w", err)
}
logger.Info("successfully connected to kafka cluster")

svc := &Service{
config: cfg,
logger: logger.Named("e2e"),
kafkaSvc: kafkaSvc,
client: client,
kgoOpts: kgoOpts,

minionID: minionID,
groupId: groupID,
clientHooks: hooks,
}

svc.groupTracker = newGroupTracker(cfg, logger, client, groupID)
svc.messageTracker = newMessageTracker(svc)

makeCounterVec := func(name string, labelNames []string, help string) *prometheus.CounterVec {
cv := prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: "end_to_end",
Expand Down Expand Up @@ -145,24 +138,85 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k
return svc, nil
}

// Start starts the service (wow)
func (s *Service) Start(ctx context.Context) error {
// Ensure topic exists and is configured correctly
if err := s.validateManagementTopic(ctx); err != nil {
return fmt.Errorf("could not validate end-to-end topic: %w", err)
func (s *Service) initKafka(ctx context.Context) error {
// Create kafka service and check if client can successfully connect to Kafka cluster
s.logger.Info("connecting to Kafka seed brokers, trying to fetch cluster metadata",
zap.String("seed_brokers", strings.Join(s.kafkaSvc.Brokers(), ",")))
client, err := s.kafkaSvc.CreateAndTestClient(ctx, s.logger, s.kgoOpts)
if err != nil {
return fmt.Errorf("failed to create kafka client for e2e: %w", err)
}
s.logger.Info("successfully connected to kafka cluster")

// Get up-to-date metadata and inform our custom partitioner about the partition count
topicMetadata, err := s.getTopicMetadata(ctx)
s.client = client
s.groupTracker = newGroupTracker(s.config, s.logger, client, s.groupId)
s.messageTracker = newMessageTracker(s)

return nil
}

func (s *Service) initReconcile(ctx context.Context) error {
s.logger.Info("Starting reconcile")
adminClient, err := s.kafkaSvc.CreateAndTestClient(ctx, s.logger, []kgo.Opt{})
if err != nil {
return fmt.Errorf("could not get topic metadata after validation: %w", err)
return fmt.Errorf("failed to create kafka client for e2e: %w", err)
}

s.adminClient = adminClient

// Ensure topic exists and is configured correctly
if err := s.validateManagementTopic(ctx); err != nil {
return fmt.Errorf("could not validate end-to-end topic: %w", err)
}
partitions := len(topicMetadata.Topics[0].Partitions)
s.partitionCount = partitions

// finally start everything else (producing, consuming, continuous validation, consumer group tracking)
go s.startReconciliation(ctx)

return nil
}

// Start starts the service (wow)
func (s *Service) Start(ctx context.Context) error {
if err := s.initReconcile(ctx); err != nil {
return err
}
if s.config.ReconnectInterval > 0*time.Second {
go s.reconnectLoop(ctx)
} else {
if err := s.run(ctx); err != nil {
return err
}

}
return nil
}

// Stop stops the service
func (s *Service) Stop() {
s.logger.Info("Stopping e2e service")
s.client.Close()
}

func (s *Service) reconnectLoop(pctx context.Context) {
for {
ctx, _ := context.WithTimeout(pctx, s.config.ReconnectInterval)
s.run(ctx)
select {
case <-ctx.Done():
s.Stop()
fmt.Println("Restarting e2e service")
case <-pctx.Done():
s.Stop()
return
}
}
}

func (s *Service) run(ctx context.Context) error {
if err := s.initKafka(ctx); err != nil {
return err
}

// Start consumer and wait until we've received a response for the first poll which would indicate that the
// consumer is ready. Only if the consumer is ready we want to start the producer to ensure that we will not
// miss messages because the consumer wasn't ready.
Expand Down
Loading

0 comments on commit 2a2054c

Please sign in to comment.