diff --git a/impl/cmd/main.go b/impl/cmd/main.go index 2c341752..83b6a9c2 100644 --- a/impl/cmd/main.go +++ b/impl/cmd/main.go @@ -109,8 +109,8 @@ func configureLogger(level string) { if level != "" { logLevel, err := logrus.ParseLevel(level) if err != nil { - logrus.WithError(err).WithField("level", level).Error("could not parse log level, setting to info") - logrus.SetLevel(logrus.InfoLevel) + logrus.WithError(err).WithField("level", level).Error("could not parse log level, setting to debug") + logrus.SetLevel(logrus.DebugLevel) } else { logrus.SetLevel(logLevel) } diff --git a/impl/config/config.go b/impl/config/config.go index a95ec23e..e17d5d5e 100644 --- a/impl/config/config.go +++ b/impl/config/config.go @@ -89,7 +89,7 @@ func GetDefaultConfig() Config { CacheSizeLimitMB: 500, }, Log: LogConfig{ - Level: logrus.InfoLevel.String(), + Level: logrus.DebugLevel.String(), }, } } diff --git a/impl/integrationtest/main.go b/impl/integrationtest/main.go index bad1df58..5765c773 100644 --- a/impl/integrationtest/main.go +++ b/impl/integrationtest/main.go @@ -22,7 +22,7 @@ var ( ) func main() { - logrus.SetLevel(logrus.InfoLevel) + logrus.SetLevel(logrus.DebugLevel) if len(os.Args) < 2 { logrus.Fatal("must specify 1 argument (server URL)") } diff --git a/impl/internal/did/client.go b/impl/internal/did/client.go index d945e21b..8e904d86 100644 --- a/impl/internal/did/client.go +++ b/impl/internal/did/client.go @@ -6,7 +6,6 @@ import ( "io" "net/http" "net/url" - "time" "github.com/TBD54566975/ssi-sdk/did" "github.com/anacrolix/dht/v2/bep44" @@ -26,7 +25,7 @@ func NewGatewayClient(gatewayURL string) (*GatewayClient, error) { return nil, err } client := http.DefaultClient - client.Timeout = time.Second * 10 + // client.Timeout = time.Second * 10 return &GatewayClient{ gatewayURL: gatewayURL, client: client, diff --git a/impl/internal/did/client_test.go b/impl/internal/did/client_test.go index 42ceecd3..b207c56a 100644 --- a/impl/internal/did/client_test.go +++ b/impl/internal/did/client_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/anacrolix/dht/v2/bep44" + "github.com/goccy/go-json" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -12,7 +13,7 @@ import ( ) func TestClient(t *testing.T) { - client, err := NewGatewayClient("https://diddht.tbddev.org") + client, err := NewGatewayClient("http://localhost:8305") require.NoError(t, err) require.NotNil(t, client) @@ -42,6 +43,20 @@ func TestClient(t *testing.T) { t.Logf("time to put and get: %s", since) } +func TestGet(t *testing.T) { + client, err := NewGatewayClient("https://diddht.tbddev.org") + + require.NoError(t, err) + require.NotNil(t, client) + + doc, _, _, err := client.GetDIDDocument("did:dht:9m71tgxibxtkr9f8tfueqasuyqncpyxect18zosntkrqygckqcwo") + require.NoError(t, err) + require.NotNil(t, doc) + + b, _ := json.Marshal(doc) + println(string(b)) +} + func TestClientInvalidGateway(t *testing.T) { g, err := NewGatewayClient("\n") assert.Error(t, err) diff --git a/impl/pkg/dht/logging.go b/impl/pkg/dht/logging.go index f54fb3c1..47e3af2a 100644 --- a/impl/pkg/dht/logging.go +++ b/impl/pkg/dht/logging.go @@ -1,12 +1,13 @@ package dht import ( + "strings" + "github.com/anacrolix/log" "github.com/sirupsen/logrus" ) func init() { - logrus.SetFormatter(&logrus.JSONFormatter{}) log.Default.Handlers = []log.Handler{logrusHandler{}} } @@ -16,7 +17,7 @@ type logrusHandler struct{} // It intentionally downgrades the log level to reduce verbosity. func (logrusHandler) Handle(record log.Record) { entry := logrus.WithFields(logrus.Fields{"names": record.Names}) - msg := record.Msg.String() + msg := strings.Replace(record.Msg.String(), "\n", "", -1) switch record.Level { case log.Debug: diff --git a/impl/pkg/server/pkarr.go b/impl/pkg/server/pkarr.go index fc61c500..d78293cf 100644 --- a/impl/pkg/server/pkarr.go +++ b/impl/pkg/server/pkarr.go @@ -43,7 +43,18 @@ func (r *PkarrRouter) GetRecord(c *gin.Context) { return } - resp, err := r.service.GetPkarr(c.Request.Context(), *id) + // make sure the key is valid + key, err := util.Z32Decode(*id) + if err != nil { + LoggingRespondErrWithMsg(c, err, "invalid record id", http.StatusInternalServerError) + return + } + if len(key) != ed25519.PublicKeySize { + LoggingRespondErrMsg(c, "invalid z32 encoded ed25519 public key", http.StatusBadRequest) + return + } + + resp, err := r.service.GetPkarr(c, *id) if err != nil { LoggingRespondErrWithMsg(c, err, "failed to get pkarr record", http.StatusInternalServerError) return @@ -82,7 +93,7 @@ func (r *PkarrRouter) PutRecord(c *gin.Context) { } key, err := util.Z32Decode(*id) if err != nil { - LoggingRespondErrWithMsg(c, err, "failed to read id", http.StatusInternalServerError) + LoggingRespondErrWithMsg(c, err, "invalid record id", http.StatusInternalServerError) return } if len(key) != ed25519.PublicKeySize { @@ -114,7 +125,7 @@ func (r *PkarrRouter) PutRecord(c *gin.Context) { return } - if err = r.service.PublishPkarr(c.Request.Context(), *id, *request); err != nil { + if err = r.service.PublishPkarr(c, *id, *request); err != nil { LoggingRespondErrWithMsg(c, err, "failed to publish pkarr record", http.StatusInternalServerError) return } diff --git a/impl/pkg/service/pkarr.go b/impl/pkg/service/pkarr.go index 781384c8..e46622f8 100644 --- a/impl/pkg/service/pkarr.go +++ b/impl/pkg/service/pkarr.go @@ -70,6 +70,11 @@ func (s *PkarrService) PublishPkarr(ctx context.Context, id string, record pkarr ctx, span := telemetry.GetTracer().Start(ctx, "PkarrService.PublishPkarr") defer span.End() + // make sure the key is valid + if _, err := util.Z32Decode(id); err != nil { + return ssiutil.LoggingCtxErrorMsgf(ctx, err, "failed to decode z-base-32 encoded ID: %s", id) + } + if err := record.IsValid(); err != nil { return err } @@ -115,6 +120,11 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response ctx, span := telemetry.GetTracer().Start(ctx, "PkarrService.GetPkarr") defer span.End() + // make sure the key is valid + if _, err := util.Z32Decode(id); err != nil { + return nil, ssiutil.LoggingCtxErrorMsgf(ctx, err, "failed to decode z-base-32 encoded ID: %s", id) + } + // first do a cache lookup if got, err := s.cache.Get(id); err == nil { var resp pkarr.Response @@ -193,6 +203,7 @@ func (s *PkarrService) republish() { recordCnt, err := s.db.RecordCount(ctx) if err != nil { logrus.WithContext(ctx).WithError(err).Error("failed to get record count before republishing") + return } else { logrus.WithContext(ctx).WithField("record_count", recordCnt).Info("republishing records") } @@ -200,50 +211,64 @@ func (s *PkarrService) republish() { var nextPageToken []byte var recordsBatch []pkarr.Record var seenRecords, batchCnt, successCnt, errCnt int32 = 0, 0, 0, 0 + for { recordsBatch, nextPageToken, err = s.db.ListRecords(ctx, nextPageToken, 1000) if err != nil { logrus.WithContext(ctx).WithError(err).Error("failed to list record(s) for republishing") return } - seenRecords += int32(len(recordsBatch)) - if len(recordsBatch) == 0 { + batchSize := len(recordsBatch) + seenRecords += int32(batchSize) + if batchSize == 0 { logrus.WithContext(ctx).Info("no records to republish") return } logrus.WithContext(ctx).WithFields(logrus.Fields{ - "record_count": len(recordsBatch), + "record_count": batchSize, "batch_number": batchCnt, "total_seen": seenRecords, - }).Infof("republishing next batch of records") + }).Infof("republishing batch [%d] of [%d] records", batchSize, batchCnt) batchCnt++ var wg sync.WaitGroup - wg.Add(len(recordsBatch)) + wg.Add(batchSize) + var batchSuccessCnt, batchErrCnt int32 = 0, 0 for _, record := range recordsBatch { - go func(record pkarr.Record) { + // Pass the parent context to the goroutine + go func(ctx context.Context, record pkarr.Record) { defer wg.Done() recordID := zbase32.EncodeToString(record.Key[:]) logrus.WithContext(ctx).Debugf("republishing record: %s", recordID) - // Create a new context with a timeout of 10 seconds - putCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + // Create a new context with a timeout of 10 seconds based on the parent context + putCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - if _, err = s.dht.Put(putCtx, record.BEP44()); err != nil { - logrus.WithContext(ctx).WithError(err).Errorf("failed to republish record: %s", recordID) - atomic.AddInt32(&errCnt, 1) + // Use a local variable for the error to avoid shadowing + if _, putErr := s.dht.Put(putCtx, record.BEP44()); putErr != nil { + logrus.WithContext(putCtx).WithError(putErr).Errorf("failed to republish record: %s", recordID) + atomic.AddInt32(&batchErrCnt, 1) } else { - atomic.AddInt32(&successCnt, 1) + atomic.AddInt32(&batchSuccessCnt, 1) } - }(record) + }(ctx, record) } wg.Wait() + atomic.AddInt32(&errCnt, batchErrCnt) + atomic.AddInt32(&successCnt, batchSuccessCnt) + + logrus.WithContext(ctx).WithFields(logrus.Fields{ + "batch_number": batchCnt, + "success": batchSuccessCnt, + "errors": batchErrCnt, + }).Infof("batch completed with [%d] successes and [%d] errors", batchSuccessCnt, batchErrCnt) + if nextPageToken == nil { break }