Skip to content

Commit

Permalink
use errors.Is instead of switch
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Nov 3, 2024
1 parent d2169c1 commit 7bed85e
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions e2e/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package e2e

import (
"context"
"errors"
"fmt"
"github.com/pkg/errors"
"math"
"time"

Expand All @@ -14,12 +14,14 @@ import (

// Check our end-to-end test topic and adapt accordingly if something does not match our expectations.
// - does it exist?
//
// - is it configured correctly?
// - does it have enough partitions?
// - is the replicationFactor correct?
// - does it have enough partitions?
// - is the replicationFactor correct?
//
// - are assignments good?
// - is each broker leading at least one partition?
// - are replicas distributed correctly?
// - is each broker leading at least one partition?
// - are replicas distributed correctly?
func (s *Service) validateManagementTopic(ctx context.Context) error {
s.logger.Debug("validating end-to-end topic...")

Expand All @@ -30,10 +32,10 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {

typedErr := kerr.TypedErrorForCode(meta.Topics[0].ErrorCode)
topicExists := false
switch typedErr {
case nil:
switch {
case typedErr == nil:
topicExists = true
case kerr.UnknownTopicOrPartition:
case errors.Is(typedErr, kerr.UnknownTopicOrPartition):
// UnknownTopicOrPartition (Error code 3) means that the topic does not exist.
// When the topic doesn't exist, continue to create it further down in the code.
topicExists = false
Expand Down Expand Up @@ -72,8 +74,10 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {
return s.updatePartitionCount(ctx)
}

// The partition count must be updated after topic validation because the validation process may lead to the
// creation of new partitions. This can occur when new brokers are added to the cluster.
// updatePartitionCount retrieves metadata to inform kminion about the updated
// partition count of its e2e topic. It must be updated after topic validation
// because the validation process may lead to the creation of new partitions.
// This can occur when new brokers are added to the cluster.
func (s *Service) updatePartitionCount(ctx context.Context) error {
retryTicker := time.NewTicker(1 * time.Second)
defer retryTicker.Stop()
Expand All @@ -98,9 +102,9 @@ func (s *Service) updatePartitionCount(ctx context.Context) error {
return fmt.Errorf("unexpected error while updating partition count: %w", typedErr)
}
s.logger.Warn("updatePartitionCount: received UNKNOWN_TOPIC_OR_PARTITION error, possibly due to timing issue. Retrying...")
// The UNKNOWN_TOPIC_OR_PARTITION error occurs occasionally even though the topic is created
// in the validateManagementTopic function. It appears to be a timing issue where the topic metadata
// is not immediately available after creation. In practice, waiting for a short period and then retrying
// The UNKNOWN_TOPIC_OR_PARTITION error occurs occasionally even though the topic is created
// in the validateManagementTopic function. It appears to be a timing issue where the topic metadata
// is not immediately available after creation. In practice, waiting for a short period and then retrying
// the operation resolves the issue.
}
}
Expand Down

0 comments on commit 7bed85e

Please sign in to comment.