Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove db migration #192

Merged
merged 3 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions impl/pkg/service/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *DHTService) PublishDHT(ctx context.Context, id string, record dht.BEP44
if err = s.cache.Set(id, recordBytes); err != nil {
return err
}
logrus.WithContext(ctx).WithField("record", id).Debug("added dht record to cache and db")
logrus.WithContext(ctx).WithField("record_id", id).Debug("added dht record to cache and db")

// return here and put it in the DHT asynchronously
go func() {
Expand All @@ -117,9 +117,9 @@ func (s *DHTService) PublishDHT(ctx context.Context, id string, record dht.BEP44
defer cancel()

if _, err = s.dht.Put(putCtx, record.Put()); err != nil {
logrus.WithContext(ctx).WithError(err).Errorf("error from dht.Put for record: %s", id)
logrus.WithContext(ctx).WithField("record_id", id).WithError(err).Warnf("error from dht.Put for record: %s", id)
} else {
logrus.WithContext(ctx).WithField("record", id).Debug("put record to DHT")
logrus.WithContext(ctx).WithField("record_id", id).Debug("put record to DHT")
}
}()

Expand Down Expand Up @@ -148,7 +148,7 @@ func (s *DHTService) GetDHT(ctx context.Context, id string) (*dht.BEP44Response,
logrus.WithContext(ctx).WithField("record_id", id).Info("resolved record from cache")
return &resp, nil
}
logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get record from cache, falling back to dht")
logrus.WithContext(ctx).WithError(err).WithField("record_id", id).Warn("failed to get record from cache, falling back to dht")
}

// next do a dht lookup with a timeout of 10 seconds
Expand All @@ -158,28 +158,28 @@ func (s *DHTService) GetDHT(ctx context.Context, id string) (*dht.BEP44Response,
got, err := s.dht.GetFull(getCtx, id)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
logrus.WithContext(ctx).WithField("record", id).Warn("dht lookup timed out, attempting to resolve from storage")
logrus.WithContext(ctx).WithField("record_id", id).Warn("dht lookup timed out, attempting to resolve from storage")
} else {
logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get record from dht, attempting to resolve from storage")
logrus.WithContext(ctx).WithError(err).WithField("record_id", id).Warn("failed to get record from dht, attempting to resolve from storage")
}

record, err := s.db.ReadRecord(ctx, id)
if err != nil || record == nil {
logrus.WithContext(ctx).WithError(err).WithField("record", id).Error("failed to resolve record from storage; adding to badGetCache")
logrus.WithContext(ctx).WithError(err).WithField("record_id", id).Error("failed to resolve record from storage; adding to badGetCache")

// add the key to the badGetCache to prevent spamming the DHT
if err = s.badGetCache.Set(id, []byte{0}); err != nil {
logrus.WithContext(ctx).WithError(err).WithField("record", id).Error("failed to set key in badGetCache")
logrus.WithContext(ctx).WithError(err).WithField("record_id", id).Error("failed to set key in badGetCache")
}

return nil, err
}

logrus.WithContext(ctx).WithField("record", id).Info("resolved record from storage")
logrus.WithContext(ctx).WithField("record_id", id).Info("resolved record from storage")
resp := record.Response()
// add the record back to the cache for future lookups
if err = s.addRecordToCache(id, record.Response()); err != nil {
logrus.WithError(err).WithField("record", id).Error("failed to set record in cache")
logrus.WithError(err).WithField("record_id", id).Error("failed to set record in cache")
}

return &resp, err
Expand All @@ -202,9 +202,9 @@ func (s *DHTService) GetDHT(ctx context.Context, id string) (*dht.BEP44Response,

// add the record to cache, do it here to avoid duplicate calculations
if err = s.addRecordToCache(id, resp); err != nil {
logrus.WithContext(ctx).WithField("record", id).WithError(err).Error("failed to set record in cache")
logrus.WithContext(ctx).WithField("record_id", id).WithError(err).Error("failed to set record in cache")
} else {
logrus.WithContext(ctx).WithField("record", id).Info("added record back to cache")
logrus.WithContext(ctx).WithField("record_id", id).Info("added record back to cache")
}

return &resp, nil
Expand Down
67 changes: 5 additions & 62 deletions impl/pkg/storage/db/bolt/bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"time"

"github.com/goccy/go-json"
Expand All @@ -19,7 +18,6 @@ import (

const (
dhtNamespace = "dht"
oldDHTNamespace = "pkarr"
failedNamespace = "failed"
)

Expand All @@ -40,52 +38,9 @@ func NewBolt(path string) (*Bolt, error) {
if err != nil {
return nil, err
}

// Perform the migration
go migrate(db)

return &Bolt{db: db}, nil
}

func migrate(db *bolt.DB) {
// Perform the migration within a write transaction
err := db.Update(func(tx *bolt.Tx) error {
// Create the new namespace bucket
newBucket, err := tx.CreateBucketIfNotExists([]byte(dhtNamespace))
if err != nil {
return fmt.Errorf("failed to create new namespace bucket: %v", err)
}

// Get the old namespace bucket
oldBucket := tx.Bucket([]byte(oldDHTNamespace))
if oldBucket == nil {
// If the old namespace bucket doesn't exist, there's nothing to migrate
return nil
}

// Iterate over the key-value pairs in the old namespace bucket
err = oldBucket.ForEach(func(k, v []byte) error {
// Copy each key-value pair to the new namespace bucket
err = newBucket.Put(k, v)
if err != nil {
return fmt.Errorf("failed to copy key-value pair to new namespace: %v", err)
}
return nil
})
if err != nil {
return err
}

return nil
})

if err != nil {
logrus.WithError(err).Error("failed to migrate records")
} else {
logrus.Info("migration completed successfully")
}
}

// WriteRecord writes the given record to the storage
// TODO: don't overwrite existing records, store unique seq numbers
func (b *Bolt) WriteRecord(ctx context.Context, record dht.BEP44Record) error {
Expand All @@ -98,27 +53,15 @@ func (b *Bolt) WriteRecord(ctx context.Context, record dht.BEP44Record) error {
return err
}

// write to both the old and new namespaces for now
errOld := b.write(ctx, oldDHTNamespace, record.ID(), recordBytes)
errNew := b.write(ctx, dhtNamespace, record.ID(), recordBytes)
if errOld == nil && errNew == nil {
return nil
}
if errOld != nil && errNew != nil {
return errors.New(fmt.Sprintf("old: %v, new: %v", errOld, errNew))
}
if errOld != nil {
return errOld
}
return errNew
return b.write(ctx, dhtNamespace, record.ID(), recordBytes)
}

// ReadRecord reads the record with the given id from the storage
func (b *Bolt) ReadRecord(ctx context.Context, id string) (*dht.BEP44Record, error) {
ctx, span := telemetry.GetTracer().Start(ctx, "bolt.ReadRecord")
defer span.End()

recordBytes, err := b.read(ctx, oldDHTNamespace, id)
recordBytes, err := b.read(ctx, dhtNamespace, id)
if err != nil {
return nil, err
}
Expand All @@ -144,7 +87,7 @@ func (b *Bolt) ListRecords(ctx context.Context, nextPageToken []byte, pageSize i
ctx, span := telemetry.GetTracer().Start(ctx, "bolt.ListRecords")
defer span.End()

boltRecords, err := b.readSeveral(ctx, oldDHTNamespace, nextPageToken, pageSize)
boltRecords, err := b.readSeveral(ctx, dhtNamespace, nextPageToken, pageSize)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -268,9 +211,9 @@ func (b *Bolt) RecordCount(ctx context.Context) (int, error) {

var count int
err := b.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(oldDHTNamespace))
bucket := tx.Bucket([]byte(dhtNamespace))
if bucket == nil {
logrus.WithContext(ctx).WithField("namespace", oldDHTNamespace).Warn("namespace does not exist")
logrus.WithContext(ctx).WithField("namespace", dhtNamespace).Warn("namespace does not exist")
return nil
}
count = bucket.Stats().KeyN
Expand Down
Loading