Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
gabe committed Apr 11, 2024
1 parent 9eb197a commit 5fd7d05
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 25 deletions.
4 changes: 2 additions & 2 deletions impl/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func configureLogger(level string) {
if level != "" {
logLevel, err := logrus.ParseLevel(level)
if err != nil {
logrus.WithError(err).WithField("level", level).Error("could not parse log level, setting to info")
logrus.SetLevel(logrus.InfoLevel)
logrus.WithError(err).WithField("level", level).Error("could not parse log level, setting to debug")
logrus.SetLevel(logrus.DebugLevel)
} else {
logrus.SetLevel(logLevel)
}
Expand Down
2 changes: 1 addition & 1 deletion impl/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func GetDefaultConfig() Config {
CacheSizeLimitMB: 500,
},
Log: LogConfig{
Level: logrus.InfoLevel.String(),
Level: logrus.DebugLevel.String(),
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion impl/integrationtest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var (
)

func main() {
logrus.SetLevel(logrus.InfoLevel)
logrus.SetLevel(logrus.DebugLevel)
if len(os.Args) < 2 {
logrus.Fatal("must specify 1 argument (server URL)")
}
Expand Down
3 changes: 1 addition & 2 deletions impl/internal/did/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"
"net/http"
"net/url"
"time"

"github.com/TBD54566975/ssi-sdk/did"
"github.com/anacrolix/dht/v2/bep44"
Expand All @@ -26,7 +25,7 @@ func NewGatewayClient(gatewayURL string) (*GatewayClient, error) {
return nil, err
}
client := http.DefaultClient
client.Timeout = time.Second * 10
// client.Timeout = time.Second * 10
return &GatewayClient{
gatewayURL: gatewayURL,
client: client,
Expand Down
17 changes: 16 additions & 1 deletion impl/internal/did/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"time"

"github.com/anacrolix/dht/v2/bep44"
"github.com/goccy/go-json"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/TBD54566975/did-dht-method/pkg/dht"
)

func TestClient(t *testing.T) {
client, err := NewGatewayClient("https://diddht.tbddev.org")
client, err := NewGatewayClient("http://localhost:8305")

require.NoError(t, err)
require.NotNil(t, client)
Expand Down Expand Up @@ -42,6 +43,20 @@ func TestClient(t *testing.T) {
t.Logf("time to put and get: %s", since)
}

func TestGet(t *testing.T) {
client, err := NewGatewayClient("https://diddht.tbddev.org")

require.NoError(t, err)
require.NotNil(t, client)

doc, _, _, err := client.GetDIDDocument("did:dht:9m71tgxibxtkr9f8tfueqasuyqncpyxect18zosntkrqygckqcwo")
require.NoError(t, err)
require.NotNil(t, doc)

b, _ := json.Marshal(doc)
println(string(b))
}

func TestClientInvalidGateway(t *testing.T) {
g, err := NewGatewayClient("\n")
assert.Error(t, err)
Expand Down
5 changes: 3 additions & 2 deletions impl/pkg/dht/logging.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package dht

import (
"strings"

"github.com/anacrolix/log"
"github.com/sirupsen/logrus"
)

func init() {
logrus.SetFormatter(&logrus.JSONFormatter{})
log.Default.Handlers = []log.Handler{logrusHandler{}}
}

Expand All @@ -16,7 +17,7 @@ type logrusHandler struct{}
// It intentionally downgrades the log level to reduce verbosity.
func (logrusHandler) Handle(record log.Record) {
entry := logrus.WithFields(logrus.Fields{"names": record.Names})
msg := record.Msg.String()
msg := strings.Replace(record.Msg.String(), "\n", "", -1)

switch record.Level {
case log.Debug:
Expand Down
17 changes: 14 additions & 3 deletions impl/pkg/server/pkarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,18 @@ func (r *PkarrRouter) GetRecord(c *gin.Context) {
return
}

resp, err := r.service.GetPkarr(c.Request.Context(), *id)
// make sure the key is valid
key, err := util.Z32Decode(*id)
if err != nil {
LoggingRespondErrWithMsg(c, err, "invalid record id", http.StatusInternalServerError)
return
}
if len(key) != ed25519.PublicKeySize {
LoggingRespondErrMsg(c, "invalid z32 encoded ed25519 public key", http.StatusBadRequest)
return
}

resp, err := r.service.GetPkarr(c, *id)
if err != nil {
LoggingRespondErrWithMsg(c, err, "failed to get pkarr record", http.StatusInternalServerError)
return
Expand Down Expand Up @@ -82,7 +93,7 @@ func (r *PkarrRouter) PutRecord(c *gin.Context) {
}
key, err := util.Z32Decode(*id)
if err != nil {
LoggingRespondErrWithMsg(c, err, "failed to read id", http.StatusInternalServerError)
LoggingRespondErrWithMsg(c, err, "invalid record id", http.StatusInternalServerError)
return
}
if len(key) != ed25519.PublicKeySize {
Expand Down Expand Up @@ -114,7 +125,7 @@ func (r *PkarrRouter) PutRecord(c *gin.Context) {
return
}

if err = r.service.PublishPkarr(c.Request.Context(), *id, *request); err != nil {
if err = r.service.PublishPkarr(c, *id, *request); err != nil {
LoggingRespondErrWithMsg(c, err, "failed to publish pkarr record", http.StatusInternalServerError)
return
}
Expand Down
51 changes: 38 additions & 13 deletions impl/pkg/service/pkarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func (s *PkarrService) PublishPkarr(ctx context.Context, id string, record pkarr
ctx, span := telemetry.GetTracer().Start(ctx, "PkarrService.PublishPkarr")
defer span.End()

// make sure the key is valid
if _, err := util.Z32Decode(id); err != nil {
return ssiutil.LoggingCtxErrorMsgf(ctx, err, "failed to decode z-base-32 encoded ID: %s", id)
}

if err := record.IsValid(); err != nil {
return err
}
Expand Down Expand Up @@ -115,6 +120,11 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response
ctx, span := telemetry.GetTracer().Start(ctx, "PkarrService.GetPkarr")
defer span.End()

// make sure the key is valid
if _, err := util.Z32Decode(id); err != nil {
return nil, ssiutil.LoggingCtxErrorMsgf(ctx, err, "failed to decode z-base-32 encoded ID: %s", id)
}

// first do a cache lookup
if got, err := s.cache.Get(id); err == nil {
var resp pkarr.Response
Expand Down Expand Up @@ -193,57 +203,72 @@ func (s *PkarrService) republish() {
recordCnt, err := s.db.RecordCount(ctx)
if err != nil {
logrus.WithContext(ctx).WithError(err).Error("failed to get record count before republishing")
return
} else {
logrus.WithContext(ctx).WithField("record_count", recordCnt).Info("republishing records")
}

var nextPageToken []byte
var recordsBatch []pkarr.Record
var seenRecords, batchCnt, successCnt, errCnt int32 = 0, 0, 0, 0

for {
recordsBatch, nextPageToken, err = s.db.ListRecords(ctx, nextPageToken, 1000)
if err != nil {
logrus.WithContext(ctx).WithError(err).Error("failed to list record(s) for republishing")
return
}
seenRecords += int32(len(recordsBatch))
if len(recordsBatch) == 0 {
batchSize := len(recordsBatch)
seenRecords += int32(batchSize)
if batchSize == 0 {
logrus.WithContext(ctx).Info("no records to republish")
return
}

logrus.WithContext(ctx).WithFields(logrus.Fields{
"record_count": len(recordsBatch),
"record_count": batchSize,
"batch_number": batchCnt,
"total_seen": seenRecords,
}).Infof("republishing next batch of records")
}).Infof("republishing batch [%d] of [%d] records", batchSize, batchCnt)
batchCnt++

var wg sync.WaitGroup
wg.Add(len(recordsBatch))
wg.Add(batchSize)

var batchSuccessCnt, batchErrCnt int32 = 0, 0
for _, record := range recordsBatch {
go func(record pkarr.Record) {
// Pass the parent context to the goroutine
go func(ctx context.Context, record pkarr.Record) {
defer wg.Done()

recordID := zbase32.EncodeToString(record.Key[:])
logrus.WithContext(ctx).Debugf("republishing record: %s", recordID)

// Create a new context with a timeout of 10 seconds
putCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// Create a new context with a timeout of 10 seconds based on the parent context
putCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

if _, err = s.dht.Put(putCtx, record.BEP44()); err != nil {
logrus.WithContext(ctx).WithError(err).Errorf("failed to republish record: %s", recordID)
atomic.AddInt32(&errCnt, 1)
// Use a local variable for the error to avoid shadowing
if _, putErr := s.dht.Put(putCtx, record.BEP44()); putErr != nil {
logrus.WithContext(putCtx).WithError(putErr).Errorf("failed to republish record: %s", recordID)
atomic.AddInt32(&batchErrCnt, 1)
} else {
atomic.AddInt32(&successCnt, 1)
atomic.AddInt32(&batchSuccessCnt, 1)
}
}(record)
}(ctx, record)
}

wg.Wait()

atomic.AddInt32(&errCnt, batchErrCnt)
atomic.AddInt32(&successCnt, batchSuccessCnt)

logrus.WithContext(ctx).WithFields(logrus.Fields{
"batch_number": batchCnt,
"success": batchSuccessCnt,
"errors": batchErrCnt,
}).Infof("batch completed with [%d] successes and [%d] errors", batchSuccessCnt, batchErrCnt)

if nextPageToken == nil {
break
}
Expand Down

0 comments on commit 5fd7d05

Please sign in to comment.