diff --git a/impl/internal/dht/getput.go b/impl/internal/dht/getput.go index 52cbe9c1..cdb8b6fb 100644 --- a/impl/internal/dht/getput.go +++ b/impl/internal/dht/getput.go @@ -6,6 +6,7 @@ import ( "errors" "math" "sync" + "time" k_nearest_nodes "github.com/anacrolix/dht/v2/k-nearest-nodes" "github.com/anacrolix/torrent/bencode" @@ -37,7 +38,10 @@ func startGetTraversal( Alpha: 15, Target: target, DoQuery: func(ctx context.Context, addr krpc.NodeAddr) traversal.QueryResult { - res := s.Get(ctx, dht.NewAddr(addr.UDP()), target, seq, dht.QueryRateLimiting{}) + queryCtx, cancel := context.WithTimeout(ctx, 5*time.Second) // Adjust the timeout as needed + defer cancel() + + res := s.Get(queryCtx, dht.NewAddr(addr.UDP()), target, seq, dht.QueryRateLimiting{}) err := res.ToError() if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, dht.TransactionTimeout) { logrus.WithContext(ctx).WithError(err).Debugf("error querying %v", addr) @@ -52,7 +56,7 @@ func startGetTraversal( Sig: r.Sig, Mutable: false, }: - case <-ctx.Done(): + case <-queryCtx.Done(): } } else if sha1.Sum(append(r.K[:], salt...)) == target && bep44.Verify(r.K[:], salt, *r.Seq, bv, r.Sig[:]) { select { @@ -62,15 +66,13 @@ func startGetTraversal( Sig: r.Sig, Mutable: true, }: - case <-ctx.Done(): + case <-queryCtx.Done(): } } else if rv != nil { logrus.WithContext(ctx).Debugf("get response item hash didn't match target: %q", rv) } } tqr := res.TraversalQueryResult(addr) - // Filter replies from nodes that don't have a string token. This doesn't look prettier - // with generics. "The token value should be a short binary string." ¯\_(ツ)_/¯ (BEP 5). tqr.ClosestData, _ = tqr.ClosestData.(string) if tqr.ClosestData == nil { tqr.ResponseFrom = nil @@ -80,7 +82,7 @@ func startGetTraversal( NodeFilter: s.TraversalNodeFilter, }) - // list for context cancellation or stalled traversal + // Listen for context cancellation or stalled traversal go func() { select { case <-ctx.Done(): diff --git a/impl/internal/did/client_test.go b/impl/internal/did/client_test.go index b207c56a..bb845d1e 100644 --- a/impl/internal/did/client_test.go +++ b/impl/internal/did/client_test.go @@ -5,7 +5,6 @@ import ( "time" "github.com/anacrolix/dht/v2/bep44" - "github.com/goccy/go-json" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -43,20 +42,6 @@ func TestClient(t *testing.T) { t.Logf("time to put and get: %s", since) } -func TestGet(t *testing.T) { - client, err := NewGatewayClient("https://diddht.tbddev.org") - - require.NoError(t, err) - require.NotNil(t, client) - - doc, _, _, err := client.GetDIDDocument("did:dht:9m71tgxibxtkr9f8tfueqasuyqncpyxect18zosntkrqygckqcwo") - require.NoError(t, err) - require.NotNil(t, doc) - - b, _ := json.Marshal(doc) - println(string(b)) -} - func TestClientInvalidGateway(t *testing.T) { g, err := NewGatewayClient("\n") assert.Error(t, err) diff --git a/impl/pkg/service/pkarr.go b/impl/pkg/service/pkarr.go index e46622f8..e8c50530 100644 --- a/impl/pkg/service/pkarr.go +++ b/impl/pkg/service/pkarr.go @@ -59,7 +59,7 @@ func NewPkarrService(cfg *config.Config, db storage.Storage, d *dht.DHT) (*Pkarr cache: cache, scheduler: &scheduler, } - if err = scheduler.Schedule(cfg.PkarrConfig.RepublishCRON, svc.republish); err != nil { + if err = scheduler.Schedule("* * * * *", svc.republish); err != nil { return nil, ssiutil.LoggingErrorMsg(err, "failed to start republisher") } return &svc, nil @@ -210,7 +210,7 @@ func (s *PkarrService) republish() { var nextPageToken []byte var recordsBatch []pkarr.Record - var seenRecords, batchCnt, successCnt, errCnt int32 = 0, 0, 0, 0 + var seenRecords, batchCnt, successCnt, errCnt int32 = 0, 1, 0, 0 for { recordsBatch, nextPageToken, err = s.db.ListRecords(ctx, nextPageToken, 1000) @@ -229,26 +229,23 @@ func (s *PkarrService) republish() { "record_count": batchSize, "batch_number": batchCnt, "total_seen": seenRecords, - }).Infof("republishing batch [%d] of [%d] records", batchSize, batchCnt) + }).Infof("republishing batch [%d] of [%d] records", batchCnt, batchSize) batchCnt++ var wg sync.WaitGroup wg.Add(batchSize) - var batchSuccessCnt, batchErrCnt int32 = 0, 0 + var batchErrCnt, batchSuccessCnt int32 = 0, 0 for _, record := range recordsBatch { - // Pass the parent context to the goroutine go func(ctx context.Context, record pkarr.Record) { defer wg.Done() recordID := zbase32.EncodeToString(record.Key[:]) logrus.WithContext(ctx).Debugf("republishing record: %s", recordID) - // Create a new context with a timeout of 10 seconds based on the parent context putCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - // Use a local variable for the error to avoid shadowing if _, putErr := s.dht.Put(putCtx, record.BEP44()); putErr != nil { logrus.WithContext(putCtx).WithError(putErr).Errorf("failed to republish record: %s", recordID) atomic.AddInt32(&batchErrCnt, 1) @@ -258,27 +255,41 @@ func (s *PkarrService) republish() { }(ctx, record) } + // Wait for all goroutines in this batch to finish before moving on to the next batch wg.Wait() - atomic.AddInt32(&errCnt, batchErrCnt) + // Update the success and error counts atomic.AddInt32(&successCnt, batchSuccessCnt) + atomic.AddInt32(&errCnt, batchErrCnt) + + successRate := float64(batchSuccessCnt) / float64(batchSize) logrus.WithContext(ctx).WithFields(logrus.Fields{ "batch_number": batchCnt, - "success": batchSuccessCnt, - "errors": batchErrCnt, - }).Infof("batch completed with [%d] successes and [%d] errors", batchSuccessCnt, batchErrCnt) + "success": successCnt, + "errors": errCnt, + }).Infof("batch [%d] completed with a [%02f] percent success rate", batchCnt, successRate) + + if successRate < 0.8 { + logrus.WithContext(ctx).WithFields(logrus.Fields{ + "batch_number": batchCnt, + "success": successCnt, + "errors": errCnt, + }).Errorf("batch [%d] failed to meet success rate threshold; exiting republishing early", batchCnt) + break + } if nextPageToken == nil { break } } + successRate := float64(successCnt) / float64(seenRecords) logrus.WithContext(ctx).WithFields(logrus.Fields{ "success": seenRecords - errCnt, "errors": errCnt, "total": seenRecords, - }).Infof("republishing complete with [%d] batches", batchCnt) + }).Infof("republishing complete with [%d] batches of [%d] total records with an [%02f] percent success rate", batchCnt, seenRecords, successRate*100) } // Close closes the Pkarr service gracefully