From 7f13c88df0d707b63400137530e68a10713f10f5 Mon Sep 17 00:00:00 2001 From: Gabe <7622243+decentralgabe@users.noreply.github.com> Date: Mon, 22 Apr 2024 13:22:51 -0700 Subject: [PATCH] fix republish and migration (#195) * fix republish and migration * update cron --- impl/pkg/service/dht.go | 94 ++++++++++++++++++-------------- impl/pkg/storage/db/bolt/bolt.go | 70 +++++++++++++++++++++++- 2 files changed, 120 insertions(+), 44 deletions(-) diff --git a/impl/pkg/service/dht.go b/impl/pkg/service/dht.go index 566db4a5..489495e5 100644 --- a/impl/pkg/service/dht.go +++ b/impl/pkg/service/dht.go @@ -165,11 +165,11 @@ func (s *DHTService) GetDHT(ctx context.Context, id string) (*dht.BEP44Response, record, err := s.db.ReadRecord(ctx, id) if err != nil || record == nil { - logrus.WithContext(ctx).WithError(err).WithField("record_id", id).Error("failed to resolve record from storage; adding to badGetCache") + logrus.WithContext(ctx).WithError(err).WithField("record_id", id).Error("failed to resolve record from storage; adding to bad get cache") // add the key to the badGetCache to prevent spamming the DHT if err = s.badGetCache.Set(id, []byte{0}); err != nil { - logrus.WithContext(ctx).WithError(err).WithField("record_id", id).Error("failed to set key in badGetCache") + logrus.WithContext(ctx).WithError(err).WithField("record_id", id).Error("failed to set key in bad get cache") } return nil, err @@ -228,6 +228,7 @@ type failedRecord struct { } // TODO(gabe) make this more efficient. create a publish schedule based on each individual record, not all records +// republish republishes all records in the db func (s *DHTService) republish() { ctx, span := telemetry.GetTracer().Start(context.Background(), "DHTService.republish") defer span.End() @@ -241,43 +242,53 @@ func (s *DHTService) republish() { // republish all records in the db and handle failed records up to 3 times failedRecords := s.republishRecords(ctx) + + // handle failed records + logrus.WithContext(ctx).WithField("failed_record_count", len(failedRecords)).Info("handling failed records") s.handleFailedRecords(ctx, failedRecords) } -// republishRecords republishes all records in the db to the DHT and returns a list of failed records +// republishRecords republishes all records in the db and returns a list of failed records to be retried func (s *DHTService) republishRecords(ctx context.Context) []failedRecord { var nextPageToken []byte var seenRecords, batchCnt int32 var failedRecords []failedRecord + var recordsBatch []dht.BEP44Record + var err error + + var wg sync.WaitGroup for { - recordsBatch, nextPageToken, err := s.db.ListRecords(ctx, nextPageToken, 1000) + recordsBatch, nextPageToken, err = s.db.ListRecords(ctx, nextPageToken, 1000) if err != nil { logrus.WithContext(ctx).WithError(err).Error("failed to list record(s) for republishing") - return failedRecords + continue } batchSize := len(recordsBatch) seenRecords += int32(batchSize) if batchSize == 0 { logrus.WithContext(ctx).Info("no records to republish") - return failedRecords + break } logrus.WithContext(ctx).WithFields(logrus.Fields{ "record_count": batchSize, "batch_number": batchCnt, "total_seen": seenRecords, - }).Infof("republishing batch [%d] of [%d] records", batchCnt, batchSize) + }).Debugf("republishing batch [%d] of [%d] records", batchCnt, batchSize) batchCnt++ - failedRecords = append(failedRecords, s.republishBatch(ctx, recordsBatch)...) + batchFailedRecords := s.republishBatch(ctx, &wg, recordsBatch) + failedRecords = append(failedRecords, batchFailedRecords...) if nextPageToken == nil { break } } + wg.Wait() + successRate := float64(seenRecords-int32(len(failedRecords))) / float64(seenRecords) * 100 logrus.WithContext(ctx).WithFields(logrus.Fields{ "success": seenRecords - int32(len(failedRecords)), @@ -288,67 +299,66 @@ func (s *DHTService) republishRecords(ctx context.Context) []failedRecord { return failedRecords } -// republishBatch republishes a batch of records to the DHT and returns a list of failed records -func (s *DHTService) republishBatch(ctx context.Context, recordsBatch []dht.BEP44Record) []failedRecord { - var wg sync.WaitGroup +// republishBatch republishes a batch of records and returns a list of failed records to be retried +func (s *DHTService) republishBatch(ctx context.Context, wg *sync.WaitGroup, recordsBatch []dht.BEP44Record) []failedRecord { + failedRecordsChan := make(chan failedRecord, len(recordsBatch)) var failedRecords []failedRecord for _, record := range recordsBatch { wg.Add(1) - go func(ctx context.Context, record dht.BEP44Record) { + go func(record dht.BEP44Record) { defer wg.Done() id := record.ID() - logrus.WithContext(ctx).WithField("record_id", id).Debug("republishing record") - putCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - if _, putErr := s.dht.Put(putCtx, record.Put()); putErr != nil { - logrus.WithContext(putCtx).WithField("record_id", id).WithError(putErr).Debug("failed to republish record") - failedRecords = append(failedRecords, failedRecord{ + _, putErr := s.dht.Put(putCtx, record.Put()) + if putErr != nil { + if errors.Is(putErr, context.DeadlineExceeded) { + logrus.WithContext(putCtx).WithField("record_id", id).Info("republish timeout exceeded") + } else { + logrus.WithContext(putCtx).WithField("record_id", id).WithError(putErr).Info("failed to republish record") + } + failedRecordsChan <- failedRecord{ record: record, failureCnt: 1, - }) + } } - }(ctx, record) + }(record) } wg.Wait() + close(failedRecordsChan) + + for fr := range failedRecordsChan { + failedRecords = append(failedRecords, fr) + } return failedRecords } // handleFailedRecords attempts to republish failed records up to 3 times func (s *DHTService) handleFailedRecords(ctx context.Context, failedRecords []failedRecord) { - for i := 0; i < 3; i++ { - var remainingFailedRecords []failedRecord - for _, fr := range failedRecords { + for _, fr := range failedRecords { + retryCount := 0 + for retryCount < 3 { id := fr.record.ID() putCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() if _, putErr := s.dht.Put(putCtx, fr.record.Put()); putErr != nil { - logrus.WithContext(putCtx).WithField("record_id", id).WithError(putErr).Debugf("failed to re-republish [%s], attempt: %d", id, i+1) - fr.failureCnt++ - if fr.failureCnt <= 3 { - remainingFailedRecords = append(remainingFailedRecords, fr) - } else { - logrus.WithContext(ctx).WithField("record_id", id).Errorf("record failed to republish after 3 attempts") - } + logrus.WithContext(putCtx).WithField("record_id", id).WithError(putErr).Debugf("failed to re-republish [%s], attempt: %d", id, retryCount+1) + retryCount++ + } else { + break } } - failedRecords = remainingFailedRecords - if len(failedRecords) == 0 { - logrus.WithContext(ctx).Info("all failed records successfully republished") - break - } - if i == 2 { - logrus.WithContext(ctx).WithField("failed_records", failedRecords).Error("failed to republish all records after 3 attempts") - for _, fr := range failedRecords { - id := fr.record.ID() - if err := s.db.WriteFailedRecord(ctx, id); err != nil { - logrus.WithContext(ctx).WithField("record_id", id).WithError(err).Warn("failed to write failed record to db") - } + + if retryCount == 3 { + id := fr.record.ID() + logrus.WithContext(ctx).WithField("record_id", id).Error("record failed to republish after 3 attempts") + if err := s.db.WriteFailedRecord(ctx, id); err != nil { + logrus.WithContext(ctx).WithField("record_id", id).WithError(err).Warn("failed to write failed record to db") } } } @@ -376,7 +386,7 @@ func (s *DHTService) Close() { } if s.badGetCache != nil { if err := s.badGetCache.Close(); err != nil { - logrus.WithError(err).Error("failed to close badGetCache") + logrus.WithError(err).Error("failed to close bad get cache") } } if err := s.db.Close(); err != nil { diff --git a/impl/pkg/storage/db/bolt/bolt.go b/impl/pkg/storage/db/bolt/bolt.go index 2f25a39a..c4c4c22a 100644 --- a/impl/pkg/storage/db/bolt/bolt.go +++ b/impl/pkg/storage/db/bolt/bolt.go @@ -17,7 +17,8 @@ import ( ) const ( - dhtNamespace = "pkarr" + dhtNamespace = "dht" + oldDHTNamespace = "pkarr" failedNamespace = "failed" ) @@ -38,7 +39,72 @@ func NewBolt(path string) (*Bolt, error) { if err != nil { return nil, err } - return &Bolt{db: db}, nil + + b := Bolt{db: db} + b.migrate(context.Background()) + return &b, nil +} + +func (b *Bolt) migrate(ctx context.Context) { + _, span := telemetry.GetTracer().Start(ctx, "bolt.migrate") + defer span.End() + + // Delete new namespace + err := b.db.Update(func(tx *bolt.Tx) error { + return tx.DeleteBucket([]byte(dhtNamespace)) + }) + if err != nil { + logrus.WithContext(ctx).WithError(err).Error("error deleting new namespace") + } + + // Migrate old namespace to new namespace + var nextPageToken []byte + var migratedCount int + var failedCount int + for { + pageSize := 1000 + oldRecords, err := b.readSeveral(ctx, oldDHTNamespace, nextPageToken, pageSize) + if err != nil { + logrus.WithContext(ctx).WithError(err).Error("error reading old namespace") + return + } + + for _, oldRecord := range oldRecords { + key := string(oldRecord.key) + var encodedRecord base64BEP44Record + if err = json.Unmarshal(oldRecord.value, &encodedRecord); err != nil { + logrus.WithContext(ctx).WithError(err).Errorf("error decoding[%s]", key) + continue + } + record, err := encodedRecord.Decode() + if err != nil { + logrus.WithContext(ctx).WithError(err).Errorf("error decoding[%s]", key) + continue + } + if err = b.write(ctx, dhtNamespace, record.ID(), oldRecord.value); err != nil { + logrus.WithContext(ctx).WithError(err).Errorf("error writing[%s] to new namespace", key) + failedCount++ + } else { + migratedCount++ + } + } + + if len(oldRecords) == pageSize { + nextPageToken = oldRecords[len(oldRecords)-1].key + } else { + break + } + } + + logrus.WithContext(ctx).Infof("migrated %d records, failed %d records", migratedCount, failedCount) + if failedCount == 0 { + err = b.db.Update(func(tx *bolt.Tx) error { + return tx.DeleteBucket([]byte(oldDHTNamespace)) + }) + if err != nil { + logrus.WithContext(ctx).WithError(err).Error("error deleting old namespace") + } + } } // WriteRecord writes the given record to the storage