From bd2b67aa2b2c9ec9841f6f359546caec98567d85 Mon Sep 17 00:00:00 2001 From: Nir Rozenbaum Date: Wed, 29 Dec 2021 11:52:59 +0200 Subject: [PATCH] removed offset committing of spec topic. this should be done only when there is a topic per leaf hub, will be added again when RBAC is introduced. Signed-off-by: Nir Rozenbaum --- pkg/bundle/bundle.go | 8 --- pkg/controller/bundles/bundles_syncer.go | 2 - pkg/transport/bundle_metadata.go | 4 -- pkg/transport/kafka/consumer.go | 83 +--------------------- pkg/transport/sync-service/sync_service.go | 78 +------------------- pkg/transport/transport.go | 2 - 6 files changed, 2 insertions(+), 175 deletions(-) delete mode 100644 pkg/transport/bundle_metadata.go diff --git a/pkg/bundle/bundle.go b/pkg/bundle/bundle.go index b21be74..8e86eca 100644 --- a/pkg/bundle/bundle.go +++ b/pkg/bundle/bundle.go @@ -1,7 +1,6 @@ package bundle import ( - "github.com/open-cluster-management/leaf-hub-spec-sync/pkg/transport" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) @@ -14,11 +13,4 @@ func NewBundle() *Bundle { type Bundle struct { Objects []*unstructured.Unstructured `json:"objects"` DeletedObjects []*unstructured.Unstructured `json:"deletedObjects"` - transport.BundleMetadata -} - -// WithMetadata adds metadata to the bundle object. -func (bundle *Bundle) WithMetadata(metadata transport.BundleMetadata) *Bundle { - bundle.BundleMetadata = metadata - return bundle } diff --git a/pkg/controller/bundles/bundles_syncer.go b/pkg/controller/bundles/bundles_syncer.go index 3308fe0..ba5c19f 100644 --- a/pkg/controller/bundles/bundles_syncer.go +++ b/pkg/controller/bundles/bundles_syncer.go @@ -92,8 +92,6 @@ func (syncer *BundleSpecSync) sync(ctx context.Context) { } // ensure all updates and deletes have finished before reading next bundle syncer.bundleProcessingWaitingGroup.Wait() - // mark bundle as committed since it was fully processed - syncer.transport.CommitAsync(receivedBundle.BundleMetadata) } } } diff --git a/pkg/transport/bundle_metadata.go b/pkg/transport/bundle_metadata.go deleted file mode 100644 index 3898b17..0000000 --- a/pkg/transport/bundle_metadata.go +++ /dev/null @@ -1,4 +0,0 @@ -package transport - -// BundleMetadata may include metadata that relates to transport - e.g. commit offset. -type BundleMetadata interface{} diff --git a/pkg/transport/kafka/consumer.go b/pkg/transport/kafka/consumer.go index 61cca8b..140fbbb 100644 --- a/pkg/transport/kafka/consumer.go +++ b/pkg/transport/kafka/consumer.go @@ -7,7 +7,6 @@ import ( "fmt" "os" "sync" - "time" "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/go-logr/logr" @@ -27,7 +26,6 @@ const ( envVarKafkaSSLCA = "KAFKA_SSL_CA" envVarKafkaTopic = "KAFKA_TOPIC" defaultCompressionType = compressor.NoOp - committerInterval = time.Second * 20 ) var ( @@ -139,7 +137,6 @@ type Consumer struct { // Start function starts the consumer. func (c *Consumer) Start() { c.startOnce.Do(func() { - go c.handleCommits(c.ctx) go c.handleKafkaMessages(c.ctx) }) } @@ -153,84 +150,6 @@ func (c *Consumer) Stop() { }) } -// CommitAsync commits a transported message that was processed locally. -func (c *Consumer) CommitAsync(metadata transport.BundleMetadata) { - topicPartition, ok := metadata.(kafka.TopicPartition) - if !ok { - return // shouldn't happen - } - - c.lock.Lock() - defer c.lock.Unlock() - - if currentOffset, found := c.partitionToOffsetToCommitMap[topicPartition.Partition]; found && - topicPartition.Offset <= currentOffset { - return - } - // given offset is greater than the current one. update offset to commit. - c.partitionToOffsetToCommitMap[topicPartition.Partition] = topicPartition.Offset -} - -func (c *Consumer) handleCommits(ctx context.Context) { - ticker := time.NewTicker(committerInterval) - - for { - select { - case <-ctx.Done(): - return - - case <-ticker.C: - c.commitConsumedOffsets() - } - } -} - -func (c *Consumer) commitConsumedOffsets() { - if offsets := c.getOffsetsToCommit(); offsets != nil { - // commit offsets - if _, err := c.kafkaConsumer.Consumer().CommitOffsets(offsets); err != nil { - c.log.Error(err, "failed to commit offsets", "Offsets", offsets) - return - } - // update offsets map, delete what's been committed - c.removeCommittedOffsetsFromMap(offsets) - } -} - -func (c *Consumer) getOffsetsToCommit() []kafka.TopicPartition { - c.lock.Lock() - defer c.lock.Unlock() - - if len(c.partitionToOffsetToCommitMap) == 0 { - return nil - } - - topicPartitions := make([]kafka.TopicPartition, 0, len(c.partitionToOffsetToCommitMap)) - - // prepare batch for committing - for partition, highestOffset := range c.partitionToOffsetToCommitMap { - topicPartitions = append(topicPartitions, kafka.TopicPartition{ - Topic: &c.topic, - Partition: partition, - Offset: highestOffset + 1, // kafka re-processes the committed offset on restart, so +1 to avoid that. - }) - } - - return topicPartitions -} - -func (c *Consumer) removeCommittedOffsetsFromMap(topicPartitions []kafka.TopicPartition) { - c.lock.Lock() - defer c.lock.Unlock() - - for _, topicPartition := range topicPartitions { - if c.partitionToOffsetToCommitMap[topicPartition.Partition] == topicPartition.Offset { - // no new offsets processed on this partition, delete from map - delete(c.partitionToOffsetToCommitMap, topicPartition.Partition) - } - } -} - func (c *Consumer) handleKafkaMessages(ctx context.Context) { for { select { @@ -274,7 +193,7 @@ func (c *Consumer) processMessage(msg *kafka.Message) { return } - c.bundlesUpdatesChan <- receivedBundle.WithMetadata(msg.TopicPartition) + c.bundlesUpdatesChan <- receivedBundle default: c.log.Error(errReceivedUnsupportedBundleType, "skipped received message", "MessageID", transportMsg.ID, "MessageType", transportMsg.MsgType, "Version", transportMsg.Version) diff --git a/pkg/transport/sync-service/sync_service.go b/pkg/transport/sync-service/sync_service.go index c0d968f..55e1bd2 100644 --- a/pkg/transport/sync-service/sync_service.go +++ b/pkg/transport/sync-service/sync_service.go @@ -10,14 +10,12 @@ import ( "strconv" "strings" "sync" - "time" "github.com/go-logr/logr" datatypes "github.com/open-cluster-management/hub-of-hubs-data-types" compressor "github.com/open-cluster-management/hub-of-hubs-message-compression" "github.com/open-cluster-management/hub-of-hubs-message-compression/compressors" "github.com/open-cluster-management/leaf-hub-spec-sync/pkg/bundle" - "github.com/open-cluster-management/leaf-hub-spec-sync/pkg/transport" "github.com/open-horizon/edge-sync-service-client/client" ) @@ -28,7 +26,6 @@ const ( envVarSyncServicePollingInterval = "SYNC_SERVICE_POLLING_INTERVAL" compressionHeaderTokensLength = 2 defaultCompressionType = compressor.NoOp - committerInterval = time.Second * 60 ) var ( @@ -120,7 +117,6 @@ type SyncService struct { // Start function starts sync service. func (s *SyncService) Start() { s.startOnce.Do(func() { - go s.handleCommits(s.ctx) go s.handleBundles(s.ctx) }) } @@ -133,78 +129,6 @@ func (s *SyncService) Stop() { }) } -// CommitAsync commits a transported message that was processed locally. -func (s *SyncService) CommitAsync(metadata transport.BundleMetadata) { - objectMetadata, ok := metadata.(*client.ObjectMetaData) - if !ok { - return // shouldn't happen - } - - s.lock.Lock() - defer s.lock.Unlock() - - s.commitMap[fmt.Sprintf("%s.%s", objectMetadata.ObjectType, objectMetadata.ObjectID)] = objectMetadata -} - -func (s *SyncService) handleCommits(ctx context.Context) { - ticker := time.NewTicker(committerInterval) - - for { - select { - case <-ctx.Done(): - return - - case <-ticker.C: - s.commitHandledBundles() - } - } -} - -func (s *SyncService) commitHandledBundles() { - if objectMetadataToCommit := s.getObjectMetadataToCommit(); objectMetadataToCommit != nil { - // commit object-metadata - for _, objectMetadata := range objectMetadataToCommit { - if err := s.client.MarkObjectConsumed(objectMetadata); err != nil { - // if one fails, return and retry in next cycle - s.log.Error(err, "failed to commit", "ObjectMetadata", objectMetadata) - return - } - } - - // update metadata map, delete what's been committed - s.removeCommittedObjectMetadataFromMap(objectMetadataToCommit) - } -} - -func (s *SyncService) getObjectMetadataToCommit() []*client.ObjectMetaData { - s.lock.Lock() - defer s.lock.Unlock() - - if len(s.commitMap) == 0 { - return nil - } - - objectMetadataToCommit := make([]*client.ObjectMetaData, 0, len(s.commitMap)) - for _, objectMetadata := range s.commitMap { - objectMetadataToCommit = append(objectMetadataToCommit, objectMetadata) - } - - return objectMetadataToCommit -} - -func (s *SyncService) removeCommittedObjectMetadataFromMap(committedObjectMetadata []*client.ObjectMetaData) { - s.lock.Lock() - defer s.lock.Unlock() - - for _, objectMetadata := range committedObjectMetadata { - objectIdentifier := fmt.Sprintf("%s.%s", objectMetadata.ObjectType, objectMetadata.ObjectID) - if s.commitMap[objectIdentifier] == objectMetadata { - // no new object processed for this type, delete from map - delete(s.commitMap, objectIdentifier) - } - } -} - func (s *SyncService) handleBundles(ctx context.Context) { // register for updates for spec bundles and config objects, includes all types of spec bundles. s.client.StartPollingForUpdates(datatypes.SpecBundle, s.pollingInterval, s.bundlesMetaDataChan) @@ -246,7 +170,7 @@ func (s *SyncService) handleBundles(ctx context.Context) { continue } - s.bundlesUpdatesChan <- receivedBundle.WithMetadata(objectMetaData) + s.bundlesUpdatesChan <- receivedBundle if err := s.client.MarkObjectReceived(objectMetaData); err != nil { s.logError(err, "failed to report object received to sync service", objectMetaData) diff --git a/pkg/transport/transport.go b/pkg/transport/transport.go index 594a50c..02ab357 100644 --- a/pkg/transport/transport.go +++ b/pkg/transport/transport.go @@ -2,8 +2,6 @@ package transport // Transport is an interface for transport layer. type Transport interface { - // CommitAsync marks a transported bundle as processed. - CommitAsync(metadata BundleMetadata) // Start starts the transport. Start() // Stop stops the transport.