Skip to content

Commit

Permalink
better
Browse files Browse the repository at this point in the history
  • Loading branch information
gabe committed Apr 11, 2024
1 parent 5fd7d05 commit 3af1f87
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 33 deletions.
14 changes: 8 additions & 6 deletions impl/internal/dht/getput.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"math"
"sync"
"time"

k_nearest_nodes "github.com/anacrolix/dht/v2/k-nearest-nodes"
"github.com/anacrolix/torrent/bencode"
Expand Down Expand Up @@ -37,7 +38,10 @@ func startGetTraversal(
Alpha: 15,
Target: target,
DoQuery: func(ctx context.Context, addr krpc.NodeAddr) traversal.QueryResult {
res := s.Get(ctx, dht.NewAddr(addr.UDP()), target, seq, dht.QueryRateLimiting{})
queryCtx, cancel := context.WithTimeout(ctx, 5*time.Second) // Adjust the timeout as needed
defer cancel()

res := s.Get(queryCtx, dht.NewAddr(addr.UDP()), target, seq, dht.QueryRateLimiting{})
err := res.ToError()
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, dht.TransactionTimeout) {
logrus.WithContext(ctx).WithError(err).Debugf("error querying %v", addr)
Expand All @@ -52,7 +56,7 @@ func startGetTraversal(
Sig: r.Sig,
Mutable: false,
}:
case <-ctx.Done():
case <-queryCtx.Done():
}
} else if sha1.Sum(append(r.K[:], salt...)) == target && bep44.Verify(r.K[:], salt, *r.Seq, bv, r.Sig[:]) {
select {
Expand All @@ -62,15 +66,13 @@ func startGetTraversal(
Sig: r.Sig,
Mutable: true,
}:
case <-ctx.Done():
case <-queryCtx.Done():
}
} else if rv != nil {
logrus.WithContext(ctx).Debugf("get response item hash didn't match target: %q", rv)
}
}
tqr := res.TraversalQueryResult(addr)
// Filter replies from nodes that don't have a string token. This doesn't look prettier
// with generics. "The token value should be a short binary string." ¯\_(ツ)_/¯ (BEP 5).
tqr.ClosestData, _ = tqr.ClosestData.(string)
if tqr.ClosestData == nil {
tqr.ResponseFrom = nil
Expand All @@ -80,7 +82,7 @@ func startGetTraversal(
NodeFilter: s.TraversalNodeFilter,
})

// list for context cancellation or stalled traversal
// Listen for context cancellation or stalled traversal
go func() {
select {
case <-ctx.Done():
Expand Down
15 changes: 0 additions & 15 deletions impl/internal/did/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/anacrolix/dht/v2/bep44"
"github.com/goccy/go-json"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -43,20 +42,6 @@ func TestClient(t *testing.T) {
t.Logf("time to put and get: %s", since)
}

func TestGet(t *testing.T) {
client, err := NewGatewayClient("https://diddht.tbddev.org")

require.NoError(t, err)
require.NotNil(t, client)

doc, _, _, err := client.GetDIDDocument("did:dht:9m71tgxibxtkr9f8tfueqasuyqncpyxect18zosntkrqygckqcwo")
require.NoError(t, err)
require.NotNil(t, doc)

b, _ := json.Marshal(doc)
println(string(b))
}

func TestClientInvalidGateway(t *testing.T) {
g, err := NewGatewayClient("\n")
assert.Error(t, err)
Expand Down
35 changes: 23 additions & 12 deletions impl/pkg/service/pkarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewPkarrService(cfg *config.Config, db storage.Storage, d *dht.DHT) (*Pkarr
cache: cache,
scheduler: &scheduler,
}
if err = scheduler.Schedule(cfg.PkarrConfig.RepublishCRON, svc.republish); err != nil {
if err = scheduler.Schedule("* * * * *", svc.republish); err != nil {
return nil, ssiutil.LoggingErrorMsg(err, "failed to start republisher")
}
return &svc, nil
Expand Down Expand Up @@ -210,7 +210,7 @@ func (s *PkarrService) republish() {

var nextPageToken []byte
var recordsBatch []pkarr.Record
var seenRecords, batchCnt, successCnt, errCnt int32 = 0, 0, 0, 0
var seenRecords, batchCnt, successCnt, errCnt int32 = 0, 1, 0, 0

for {
recordsBatch, nextPageToken, err = s.db.ListRecords(ctx, nextPageToken, 1000)
Expand All @@ -229,26 +229,23 @@ func (s *PkarrService) republish() {
"record_count": batchSize,
"batch_number": batchCnt,
"total_seen": seenRecords,
}).Infof("republishing batch [%d] of [%d] records", batchSize, batchCnt)
}).Infof("republishing batch [%d] of [%d] records", batchCnt, batchSize)
batchCnt++

var wg sync.WaitGroup
wg.Add(batchSize)

var batchSuccessCnt, batchErrCnt int32 = 0, 0
var batchErrCnt, batchSuccessCnt int32 = 0, 0
for _, record := range recordsBatch {
// Pass the parent context to the goroutine
go func(ctx context.Context, record pkarr.Record) {
defer wg.Done()

recordID := zbase32.EncodeToString(record.Key[:])
logrus.WithContext(ctx).Debugf("republishing record: %s", recordID)

// Create a new context with a timeout of 10 seconds based on the parent context
putCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

// Use a local variable for the error to avoid shadowing
if _, putErr := s.dht.Put(putCtx, record.BEP44()); putErr != nil {
logrus.WithContext(putCtx).WithError(putErr).Errorf("failed to republish record: %s", recordID)
atomic.AddInt32(&batchErrCnt, 1)
Expand All @@ -258,27 +255,41 @@ func (s *PkarrService) republish() {
}(ctx, record)
}

// Wait for all goroutines in this batch to finish before moving on to the next batch
wg.Wait()

atomic.AddInt32(&errCnt, batchErrCnt)
// Update the success and error counts
atomic.AddInt32(&successCnt, batchSuccessCnt)
atomic.AddInt32(&errCnt, batchErrCnt)

successRate := float64(batchSuccessCnt) / float64(batchSize)

logrus.WithContext(ctx).WithFields(logrus.Fields{
"batch_number": batchCnt,
"success": batchSuccessCnt,
"errors": batchErrCnt,
}).Infof("batch completed with [%d] successes and [%d] errors", batchSuccessCnt, batchErrCnt)
"success": successCnt,
"errors": errCnt,
}).Infof("batch [%d] completed with a [%02f] percent success rate", batchCnt, successRate)

if successRate < 0.8 {
logrus.WithContext(ctx).WithFields(logrus.Fields{
"batch_number": batchCnt,
"success": successCnt,
"errors": errCnt,
}).Errorf("batch [%d] failed to meet success rate threshold; exiting republishing early", batchCnt)
break
}

if nextPageToken == nil {
break
}
}

successRate := float64(successCnt) / float64(seenRecords)
logrus.WithContext(ctx).WithFields(logrus.Fields{
"success": seenRecords - errCnt,
"errors": errCnt,
"total": seenRecords,
}).Infof("republishing complete with [%d] batches", batchCnt)
}).Infof("republishing complete with [%d] batches of [%d] total records with an [%02f] percent success rate", batchCnt, seenRecords, successRate*100)
}

// Close closes the Pkarr service gracefully
Expand Down

0 comments on commit 3af1f87

Please sign in to comment.