Skip to content

Commit

Permalink
fix republish and migration
Browse files Browse the repository at this point in the history
  • Loading branch information
gabe committed Apr 22, 2024
1 parent 09dd037 commit bfd4ef3
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 45 deletions.
96 changes: 53 additions & 43 deletions impl/pkg/service/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewDHTService(cfg *config.Config, db storage.Storage, d *dht.DHT) (*DHTServ
badGetCache: badGetCache,
scheduler: &scheduler,
}
if err = scheduler.Schedule(cfg.DHTConfig.RepublishCRON, svc.republish); err != nil {
if err = scheduler.Schedule("* * * * *", svc.republish); err != nil {
return nil, ssiutil.LoggingErrorMsg(err, "failed to start republisher")
}
return &svc, nil
Expand Down Expand Up @@ -165,11 +165,11 @@ func (s *DHTService) GetDHT(ctx context.Context, id string) (*dht.BEP44Response,

record, err := s.db.ReadRecord(ctx, id)
if err != nil || record == nil {
logrus.WithContext(ctx).WithError(err).WithField("record_id", id).Error("failed to resolve record from storage; adding to badGetCache")
logrus.WithContext(ctx).WithError(err).WithField("record_id", id).Error("failed to resolve record from storage; adding to bad get cache")

// add the key to the badGetCache to prevent spamming the DHT
if err = s.badGetCache.Set(id, []byte{0}); err != nil {
logrus.WithContext(ctx).WithError(err).WithField("record_id", id).Error("failed to set key in badGetCache")
logrus.WithContext(ctx).WithError(err).WithField("record_id", id).Error("failed to set key in bad get cache")
}

return nil, err
Expand Down Expand Up @@ -228,6 +228,7 @@ type failedRecord struct {
}

// TODO(gabe) make this more efficient. create a publish schedule based on each individual record, not all records
// republish republishes all records in the db
func (s *DHTService) republish() {
ctx, span := telemetry.GetTracer().Start(context.Background(), "DHTService.republish")
defer span.End()
Expand All @@ -241,43 +242,53 @@ func (s *DHTService) republish() {

// republish all records in the db and handle failed records up to 3 times
failedRecords := s.republishRecords(ctx)

// handle failed records
logrus.WithContext(ctx).WithField("failed_record_count", len(failedRecords)).Info("handling failed records")
s.handleFailedRecords(ctx, failedRecords)
}

// republishRecords republishes all records in the db to the DHT and returns a list of failed records
// republishRecords republishes all records in the db and returns a list of failed records to be retried
func (s *DHTService) republishRecords(ctx context.Context) []failedRecord {
var nextPageToken []byte
var seenRecords, batchCnt int32
var failedRecords []failedRecord
var recordsBatch []dht.BEP44Record
var err error

var wg sync.WaitGroup

for {
recordsBatch, nextPageToken, err := s.db.ListRecords(ctx, nextPageToken, 1000)
recordsBatch, nextPageToken, err = s.db.ListRecords(ctx, nextPageToken, 1000)
if err != nil {
logrus.WithContext(ctx).WithError(err).Error("failed to list record(s) for republishing")
return failedRecords
continue
}

batchSize := len(recordsBatch)
seenRecords += int32(batchSize)
if batchSize == 0 {
logrus.WithContext(ctx).Info("no records to republish")
return failedRecords
break
}

logrus.WithContext(ctx).WithFields(logrus.Fields{
"record_count": batchSize,
"batch_number": batchCnt,
"total_seen": seenRecords,
}).Infof("republishing batch [%d] of [%d] records", batchCnt, batchSize)
}).Debugf("republishing batch [%d] of [%d] records", batchCnt, batchSize)
batchCnt++

failedRecords = append(failedRecords, s.republishBatch(ctx, recordsBatch)...)
batchFailedRecords := s.republishBatch(ctx, &wg, recordsBatch)
failedRecords = append(failedRecords, batchFailedRecords...)

if nextPageToken == nil {
break
}
}

wg.Wait()

successRate := float64(seenRecords-int32(len(failedRecords))) / float64(seenRecords) * 100
logrus.WithContext(ctx).WithFields(logrus.Fields{
"success": seenRecords - int32(len(failedRecords)),
Expand All @@ -288,67 +299,66 @@ func (s *DHTService) republishRecords(ctx context.Context) []failedRecord {
return failedRecords
}

// republishBatch republishes a batch of records to the DHT and returns a list of failed records
func (s *DHTService) republishBatch(ctx context.Context, recordsBatch []dht.BEP44Record) []failedRecord {
var wg sync.WaitGroup
// republishBatch republishes a batch of records and returns a list of failed records to be retried
func (s *DHTService) republishBatch(ctx context.Context, wg *sync.WaitGroup, recordsBatch []dht.BEP44Record) []failedRecord {
failedRecordsChan := make(chan failedRecord, len(recordsBatch))
var failedRecords []failedRecord

for _, record := range recordsBatch {
wg.Add(1)
go func(ctx context.Context, record dht.BEP44Record) {
go func(record dht.BEP44Record) {
defer wg.Done()

id := record.ID()
logrus.WithContext(ctx).WithField("record_id", id).Debug("republishing record")

putCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

if _, putErr := s.dht.Put(putCtx, record.Put()); putErr != nil {
logrus.WithContext(putCtx).WithField("record_id", id).WithError(putErr).Debug("failed to republish record")
failedRecords = append(failedRecords, failedRecord{
_, putErr := s.dht.Put(putCtx, record.Put())
if putErr != nil {
if errors.Is(putErr, context.DeadlineExceeded) {
logrus.WithContext(putCtx).WithField("record_id", id).Info("republish timeout exceeded")
} else {
logrus.WithContext(putCtx).WithField("record_id", id).WithError(putErr).Info("failed to republish record")
}
failedRecordsChan <- failedRecord{
record: record,
failureCnt: 1,
})
}
}
}(ctx, record)
}(record)
}

wg.Wait()
close(failedRecordsChan)

for fr := range failedRecordsChan {
failedRecords = append(failedRecords, fr)
}
return failedRecords
}

// handleFailedRecords attempts to republish failed records up to 3 times
func (s *DHTService) handleFailedRecords(ctx context.Context, failedRecords []failedRecord) {
for i := 0; i < 3; i++ {
var remainingFailedRecords []failedRecord
for _, fr := range failedRecords {
for _, fr := range failedRecords {
retryCount := 0
for retryCount < 3 {
id := fr.record.ID()
putCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

if _, putErr := s.dht.Put(putCtx, fr.record.Put()); putErr != nil {
logrus.WithContext(putCtx).WithField("record_id", id).WithError(putErr).Debugf("failed to re-republish [%s], attempt: %d", id, i+1)
fr.failureCnt++
if fr.failureCnt <= 3 {
remainingFailedRecords = append(remainingFailedRecords, fr)
} else {
logrus.WithContext(ctx).WithField("record_id", id).Errorf("record failed to republish after 3 attempts")
}
logrus.WithContext(putCtx).WithField("record_id", id).WithError(putErr).Debugf("failed to re-republish [%s], attempt: %d", id, retryCount+1)
retryCount++
} else {
break
}
}
failedRecords = remainingFailedRecords
if len(failedRecords) == 0 {
logrus.WithContext(ctx).Info("all failed records successfully republished")
break
}
if i == 2 {
logrus.WithContext(ctx).WithField("failed_records", failedRecords).Error("failed to republish all records after 3 attempts")
for _, fr := range failedRecords {
id := fr.record.ID()
if err := s.db.WriteFailedRecord(ctx, id); err != nil {
logrus.WithContext(ctx).WithField("record_id", id).WithError(err).Warn("failed to write failed record to db")
}

if retryCount == 3 {
id := fr.record.ID()
logrus.WithContext(ctx).WithField("record_id", id).Error("record failed to republish after 3 attempts")
if err := s.db.WriteFailedRecord(ctx, id); err != nil {
logrus.WithContext(ctx).WithField("record_id", id).WithError(err).Warn("failed to write failed record to db")
}
}
}
Expand Down Expand Up @@ -376,7 +386,7 @@ func (s *DHTService) Close() {
}
if s.badGetCache != nil {
if err := s.badGetCache.Close(); err != nil {
logrus.WithError(err).Error("failed to close badGetCache")
logrus.WithError(err).Error("failed to close bad get cache")
}
}
if err := s.db.Close(); err != nil {
Expand Down
70 changes: 68 additions & 2 deletions impl/pkg/storage/db/bolt/bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
)

const (
dhtNamespace = "pkarr"
dhtNamespace = "dht"
oldDHTNamespace = "pkarr"
failedNamespace = "failed"
)

Expand All @@ -38,7 +39,72 @@ func NewBolt(path string) (*Bolt, error) {
if err != nil {
return nil, err
}
return &Bolt{db: db}, nil

b := Bolt{db: db}
b.migrate(context.Background())
return &b, nil
}

func (b *Bolt) migrate(ctx context.Context) {
_, span := telemetry.GetTracer().Start(ctx, "bolt.migrate")
defer span.End()

// Delete new namespace
err := b.db.Update(func(tx *bolt.Tx) error {
return tx.DeleteBucket([]byte(dhtNamespace))
})
if err != nil {
logrus.WithContext(ctx).WithError(err).Error("error deleting new namespace")
}

// Migrate old namespace to new namespace
var nextPageToken []byte
var migratedCount int
var failedCount int
for {
pageSize := 1000
oldRecords, err := b.readSeveral(ctx, oldDHTNamespace, nextPageToken, pageSize)
if err != nil {
logrus.WithContext(ctx).WithError(err).Error("error reading old namespace")
return
}

for _, oldRecord := range oldRecords {
key := string(oldRecord.key)
var encodedRecord base64BEP44Record
if err = json.Unmarshal(oldRecord.value, &encodedRecord); err != nil {
logrus.WithContext(ctx).WithError(err).Errorf("error decoding[%s]", key)
continue
}
record, err := encodedRecord.Decode()
if err != nil {
logrus.WithContext(ctx).WithError(err).Errorf("error decoding[%s]", key)
continue
}
if err = b.write(ctx, dhtNamespace, record.ID(), oldRecord.value); err != nil {
logrus.WithContext(ctx).WithError(err).Errorf("error writing[%s] to new namespace", key)
failedCount++
} else {
migratedCount++
}
}

if len(oldRecords) == pageSize {
nextPageToken = oldRecords[len(oldRecords)-1].key
} else {
break
}
}

logrus.WithContext(ctx).Infof("migrated %d records, failed %d records", migratedCount, failedCount)
if failedCount == 0 {
err = b.db.Update(func(tx *bolt.Tx) error {
return tx.DeleteBucket([]byte(oldDHTNamespace))
})
if err != nil {
logrus.WithContext(ctx).WithError(err).Error("error deleting old namespace")
}
}
}

// WriteRecord writes the given record to the storage
Expand Down

0 comments on commit bfd4ef3

Please sign in to comment.