Skip to content

Commit

Permalink
removed offset committing of spec topic.
Browse files Browse the repository at this point in the history
this should be done only when there is a topic per leaf hub,
will be added again when RBAC is introduced.

Signed-off-by: Nir Rozenbaum <[email protected]>
  • Loading branch information
nirrozenbaum committed Dec 29, 2021
1 parent 219eca1 commit bd2b67a
Show file tree
Hide file tree
Showing 6 changed files with 2 additions and 175 deletions.
8 changes: 0 additions & 8 deletions pkg/bundle/bundle.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package bundle

import (
"github.com/open-cluster-management/leaf-hub-spec-sync/pkg/transport"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

Expand All @@ -14,11 +13,4 @@ func NewBundle() *Bundle {
type Bundle struct {
Objects []*unstructured.Unstructured `json:"objects"`
DeletedObjects []*unstructured.Unstructured `json:"deletedObjects"`
transport.BundleMetadata
}

// WithMetadata adds metadata to the bundle object.
func (bundle *Bundle) WithMetadata(metadata transport.BundleMetadata) *Bundle {
bundle.BundleMetadata = metadata
return bundle
}
2 changes: 0 additions & 2 deletions pkg/controller/bundles/bundles_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ func (syncer *BundleSpecSync) sync(ctx context.Context) {
}
// ensure all updates and deletes have finished before reading next bundle
syncer.bundleProcessingWaitingGroup.Wait()
// mark bundle as committed since it was fully processed
syncer.transport.CommitAsync(receivedBundle.BundleMetadata)
}
}
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/transport/bundle_metadata.go

This file was deleted.

83 changes: 1 addition & 82 deletions pkg/transport/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"os"
"sync"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/go-logr/logr"
Expand All @@ -27,7 +26,6 @@ const (
envVarKafkaSSLCA = "KAFKA_SSL_CA"
envVarKafkaTopic = "KAFKA_TOPIC"
defaultCompressionType = compressor.NoOp
committerInterval = time.Second * 20
)

var (
Expand Down Expand Up @@ -139,7 +137,6 @@ type Consumer struct {
// Start function starts the consumer.
func (c *Consumer) Start() {
c.startOnce.Do(func() {
go c.handleCommits(c.ctx)
go c.handleKafkaMessages(c.ctx)
})
}
Expand All @@ -153,84 +150,6 @@ func (c *Consumer) Stop() {
})
}

// CommitAsync commits a transported message that was processed locally.
func (c *Consumer) CommitAsync(metadata transport.BundleMetadata) {
topicPartition, ok := metadata.(kafka.TopicPartition)
if !ok {
return // shouldn't happen
}

c.lock.Lock()
defer c.lock.Unlock()

if currentOffset, found := c.partitionToOffsetToCommitMap[topicPartition.Partition]; found &&
topicPartition.Offset <= currentOffset {
return
}
// given offset is greater than the current one. update offset to commit.
c.partitionToOffsetToCommitMap[topicPartition.Partition] = topicPartition.Offset
}

func (c *Consumer) handleCommits(ctx context.Context) {
ticker := time.NewTicker(committerInterval)

for {
select {
case <-ctx.Done():
return

case <-ticker.C:
c.commitConsumedOffsets()
}
}
}

func (c *Consumer) commitConsumedOffsets() {
if offsets := c.getOffsetsToCommit(); offsets != nil {
// commit offsets
if _, err := c.kafkaConsumer.Consumer().CommitOffsets(offsets); err != nil {
c.log.Error(err, "failed to commit offsets", "Offsets", offsets)
return
}
// update offsets map, delete what's been committed
c.removeCommittedOffsetsFromMap(offsets)
}
}

func (c *Consumer) getOffsetsToCommit() []kafka.TopicPartition {
c.lock.Lock()
defer c.lock.Unlock()

if len(c.partitionToOffsetToCommitMap) == 0 {
return nil
}

topicPartitions := make([]kafka.TopicPartition, 0, len(c.partitionToOffsetToCommitMap))

// prepare batch for committing
for partition, highestOffset := range c.partitionToOffsetToCommitMap {
topicPartitions = append(topicPartitions, kafka.TopicPartition{
Topic: &c.topic,
Partition: partition,
Offset: highestOffset + 1, // kafka re-processes the committed offset on restart, so +1 to avoid that.
})
}

return topicPartitions
}

func (c *Consumer) removeCommittedOffsetsFromMap(topicPartitions []kafka.TopicPartition) {
c.lock.Lock()
defer c.lock.Unlock()

for _, topicPartition := range topicPartitions {
if c.partitionToOffsetToCommitMap[topicPartition.Partition] == topicPartition.Offset {
// no new offsets processed on this partition, delete from map
delete(c.partitionToOffsetToCommitMap, topicPartition.Partition)
}
}
}

func (c *Consumer) handleKafkaMessages(ctx context.Context) {
for {
select {
Expand Down Expand Up @@ -274,7 +193,7 @@ func (c *Consumer) processMessage(msg *kafka.Message) {
return
}

c.bundlesUpdatesChan <- receivedBundle.WithMetadata(msg.TopicPartition)
c.bundlesUpdatesChan <- receivedBundle
default:
c.log.Error(errReceivedUnsupportedBundleType, "skipped received message", "MessageID", transportMsg.ID,
"MessageType", transportMsg.MsgType, "Version", transportMsg.Version)
Expand Down
78 changes: 1 addition & 77 deletions pkg/transport/sync-service/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/go-logr/logr"
datatypes "github.com/open-cluster-management/hub-of-hubs-data-types"
compressor "github.com/open-cluster-management/hub-of-hubs-message-compression"
"github.com/open-cluster-management/hub-of-hubs-message-compression/compressors"
"github.com/open-cluster-management/leaf-hub-spec-sync/pkg/bundle"
"github.com/open-cluster-management/leaf-hub-spec-sync/pkg/transport"
"github.com/open-horizon/edge-sync-service-client/client"
)

Expand All @@ -28,7 +26,6 @@ const (
envVarSyncServicePollingInterval = "SYNC_SERVICE_POLLING_INTERVAL"
compressionHeaderTokensLength = 2
defaultCompressionType = compressor.NoOp
committerInterval = time.Second * 60
)

var (
Expand Down Expand Up @@ -120,7 +117,6 @@ type SyncService struct {
// Start function starts sync service.
func (s *SyncService) Start() {
s.startOnce.Do(func() {
go s.handleCommits(s.ctx)
go s.handleBundles(s.ctx)
})
}
Expand All @@ -133,78 +129,6 @@ func (s *SyncService) Stop() {
})
}

// CommitAsync commits a transported message that was processed locally.
func (s *SyncService) CommitAsync(metadata transport.BundleMetadata) {
objectMetadata, ok := metadata.(*client.ObjectMetaData)
if !ok {
return // shouldn't happen
}

s.lock.Lock()
defer s.lock.Unlock()

s.commitMap[fmt.Sprintf("%s.%s", objectMetadata.ObjectType, objectMetadata.ObjectID)] = objectMetadata
}

func (s *SyncService) handleCommits(ctx context.Context) {
ticker := time.NewTicker(committerInterval)

for {
select {
case <-ctx.Done():
return

case <-ticker.C:
s.commitHandledBundles()
}
}
}

func (s *SyncService) commitHandledBundles() {
if objectMetadataToCommit := s.getObjectMetadataToCommit(); objectMetadataToCommit != nil {
// commit object-metadata
for _, objectMetadata := range objectMetadataToCommit {
if err := s.client.MarkObjectConsumed(objectMetadata); err != nil {
// if one fails, return and retry in next cycle
s.log.Error(err, "failed to commit", "ObjectMetadata", objectMetadata)
return
}
}

// update metadata map, delete what's been committed
s.removeCommittedObjectMetadataFromMap(objectMetadataToCommit)
}
}

func (s *SyncService) getObjectMetadataToCommit() []*client.ObjectMetaData {
s.lock.Lock()
defer s.lock.Unlock()

if len(s.commitMap) == 0 {
return nil
}

objectMetadataToCommit := make([]*client.ObjectMetaData, 0, len(s.commitMap))
for _, objectMetadata := range s.commitMap {
objectMetadataToCommit = append(objectMetadataToCommit, objectMetadata)
}

return objectMetadataToCommit
}

func (s *SyncService) removeCommittedObjectMetadataFromMap(committedObjectMetadata []*client.ObjectMetaData) {
s.lock.Lock()
defer s.lock.Unlock()

for _, objectMetadata := range committedObjectMetadata {
objectIdentifier := fmt.Sprintf("%s.%s", objectMetadata.ObjectType, objectMetadata.ObjectID)
if s.commitMap[objectIdentifier] == objectMetadata {
// no new object processed for this type, delete from map
delete(s.commitMap, objectIdentifier)
}
}
}

func (s *SyncService) handleBundles(ctx context.Context) {
// register for updates for spec bundles and config objects, includes all types of spec bundles.
s.client.StartPollingForUpdates(datatypes.SpecBundle, s.pollingInterval, s.bundlesMetaDataChan)
Expand Down Expand Up @@ -246,7 +170,7 @@ func (s *SyncService) handleBundles(ctx context.Context) {
continue
}

s.bundlesUpdatesChan <- receivedBundle.WithMetadata(objectMetaData)
s.bundlesUpdatesChan <- receivedBundle

if err := s.client.MarkObjectReceived(objectMetaData); err != nil {
s.logError(err, "failed to report object received to sync service", objectMetaData)
Expand Down
2 changes: 0 additions & 2 deletions pkg/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package transport

// Transport is an interface for transport layer.
type Transport interface {
// CommitAsync marks a transported bundle as processed.
CommitAsync(metadata BundleMetadata)
// Start starts the transport.
Start()
// Stop stops the transport.
Expand Down

0 comments on commit bd2b67a

Please sign in to comment.