diff --git a/impl/cmd/main.go b/impl/cmd/main.go index 0a20fcb1..7c42163c 100644 --- a/impl/cmd/main.go +++ b/impl/cmd/main.go @@ -31,7 +31,9 @@ import ( // @license.name Apache 2.0 // @license.url http://www.apache.org/licenses/LICENSE-2.0.html func main() { - logrus.SetFormatter(&logrus.JSONFormatter{}) + logrus.SetFormatter(&logrus.JSONFormatter{ + PrettyPrint: true, + }) logrus.SetReportCaller(true) log := logrus.NewEntry(logrus.StandardLogger()).WithField("version", config.Version) diff --git a/impl/go.mod b/impl/go.mod index 12d17762..949d2383 100644 --- a/impl/go.mod +++ b/impl/go.mod @@ -4,7 +4,7 @@ go 1.22 require ( github.com/BurntSushi/toml v1.3.2 - github.com/TBD54566975/ssi-sdk v0.0.4-alpha.0.20240402005820-2c6b20991baa + github.com/TBD54566975/ssi-sdk v0.0.4-alpha.0.20240410030603-dcd73d6ce8b3 github.com/allegro/bigcache/v3 v3.1.0 github.com/anacrolix/dht/v2 v2.21.1 github.com/anacrolix/log v0.15.2 diff --git a/impl/go.sum b/impl/go.sum index 35738452..3a7a10b3 100644 --- a/impl/go.sum +++ b/impl/go.sum @@ -24,8 +24,8 @@ github.com/RoaringBitmap/roaring v0.4.17/go.mod h1:D3qVegWTmfCaX4Bl5CrBE9hfrSrrX github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= -github.com/TBD54566975/ssi-sdk v0.0.4-alpha.0.20240402005820-2c6b20991baa h1:1kJozfMxe8fRI0jjKUbKhj7/o16d1oTDOfZJLOMTU28= -github.com/TBD54566975/ssi-sdk v0.0.4-alpha.0.20240402005820-2c6b20991baa/go.mod h1:nyTjplXnrari2nQg63ztI4C0rgMb7Jjn3gfn0OM656g= +github.com/TBD54566975/ssi-sdk v0.0.4-alpha.0.20240410030603-dcd73d6ce8b3 h1:JgCGL4PSKuSHVRQL9onEf+e3Ti1DpCnLWTIP2tg9iDg= +github.com/TBD54566975/ssi-sdk v0.0.4-alpha.0.20240410030603-dcd73d6ce8b3/go.mod h1:nyTjplXnrari2nQg63ztI4C0rgMb7Jjn3gfn0OM656g= github.com/alecthomas/assert/v2 v2.0.0-alpha3 h1:pcHeMvQ3OMstAWgaeaXIAL8uzB9xMm2zlxt+/4ml8lk= github.com/alecthomas/assert/v2 v2.0.0-alpha3/go.mod h1:+zD0lmDXTeQj7TgDgCt0ePWxb0hMC1G+PGTsTCv1B9o= github.com/alecthomas/atomic v0.1.0-alpha2 h1:dqwXmax66gXvHhsOS4pGPZKqYOlTkapELkLb3MNdlH8= diff --git a/impl/pkg/dht/dht.go b/impl/pkg/dht/dht.go index 43d5074a..cb05ffbf 100644 --- a/impl/pkg/dht/dht.go +++ b/impl/pkg/dht/dht.go @@ -10,7 +10,6 @@ import ( "github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2/bep44" "github.com/anacrolix/dht/v2/exts/getput" - "github.com/anacrolix/log" "github.com/anacrolix/torrent/types/infohash" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" @@ -39,7 +38,6 @@ func NewDHT(bootstrapPeers []string) (*DHT, error) { return nil, errutil.LoggingErrorMsg(err, "failed to listen on udp port 6881") } c.Conn = conn - c.Logger = log.NewLogger().WithFilterLevel(log.Debug) c.Logger.SetHandlers(logHandler{}) c.StartingNodes = func() ([]dht.Addr, error) { return dht.ResolveHostPorts(bootstrapPeers) } // set up rate limiter - 100 requests per second, 500 requests burst @@ -97,9 +95,9 @@ func (d *DHT) Put(ctx context.Context, request bep44.Put) (string, error) { }) if err != nil { if t == nil { - return "", errutil.LoggingNewErrorf("failed to put key[%s] into dht: %v", key, err) + return "", errutil.LoggingCtxNewErrorf(ctx, "failed to put key[%s] into dht: %v", key, err) } - return "", errutil.LoggingNewErrorf("failed to put key[%s] into dht, tried %d nodes, got %d responses", key, t.NumAddrsTried, t.NumResponses) + return "", errutil.LoggingCtxNewErrorf(ctx, "failed to put key[%s] into dht, tried %d nodes, got %d responses", key, t.NumAddrsTried, t.NumResponses) } else { logrus.WithContext(ctx).WithField("key", key).Debug("successfully put key into dht") } @@ -114,11 +112,11 @@ func (d *DHT) Get(ctx context.Context, key string) (*getput.GetResult, error) { z32Decoded, err := util.Z32Decode(key) if err != nil { - return nil, errutil.LoggingErrorMsgf(err, "failed to decode key [%s]", key) + return nil, errutil.LoggingCtxErrorMsgf(ctx, err, "failed to decode key [%s]", key) } res, t, err := getput.Get(ctx, infohash.HashBytes(z32Decoded), d.Server, nil, nil) if err != nil { - return nil, errutil.LoggingNewErrorf("failed to get key[%s] from dht; tried %d nodes, got %d responses", key, t.NumAddrsTried, t.NumResponses) + return nil, errutil.LoggingCtxNewErrorf(ctx, "failed to get key[%s] from dht; tried %d nodes, got %d responses", key, t.NumAddrsTried, t.NumResponses) } return &res, nil } @@ -132,25 +130,11 @@ func (d *DHT) GetFull(ctx context.Context, key string) (*dhtint.FullGetResult, e z32Decoded, err := util.Z32Decode(key) if err != nil { - return nil, errutil.LoggingErrorMsgf(err, "failed to decode key [%s]", key) + return nil, errutil.LoggingCtxErrorMsgf(ctx, err, "failed to decode key [%s]", key) } res, t, err := dhtint.Get(ctx, infohash.HashBytes(z32Decoded), d.Server, nil, nil) if err != nil { - return nil, errutil.LoggingNewErrorf("failed to get key[%s] from dht; tried %d nodes, got %d responses", key, t.NumAddrsTried, t.NumResponses) + return nil, errutil.LoggingCtxNewErrorf(ctx, "failed to get key[%s] from dht; tried %d nodes, got %d responses", key, t.NumAddrsTried, t.NumResponses) } return &res, nil } - -type logHandler struct{} - -var levels = map[log.Level]logrus.Level{ - log.Debug: logrus.DebugLevel, - log.Info: logrus.InfoLevel, - log.Warning: logrus.WarnLevel, - log.Error: logrus.ErrorLevel, - log.Critical: logrus.ErrorLevel, -} - -func (logHandler) Handle(record log.Record) { - logrus.WithField("names", record.Names).Log(levels[record.Level], record.Msg.String()) -} diff --git a/impl/pkg/dht/logger.go b/impl/pkg/dht/logger.go new file mode 100644 index 00000000..6caec9ba --- /dev/null +++ b/impl/pkg/dht/logger.go @@ -0,0 +1,29 @@ +package dht + +import ( + "github.com/anacrolix/log" + "github.com/sirupsen/logrus" +) + +type logHandler struct{} + +func (logHandler) Handle(record log.Record) { + switch record.Level { + case log.Debug: + logrus.WithFields(logrus.Fields{ + "names": record.Names, + }).Debug(record.Msg.Text()) + case log.Info: + logrus.WithFields(logrus.Fields{ + "names": record.Names, + }).Info(record.Msg.Text()) + case log.Warning: + logrus.WithFields(logrus.Fields{ + "names": record.Names, + }).Warn(record.Msg.Text()) + default: + logrus.WithFields(logrus.Fields{ + "names": record.Names, + }).Error(record.Msg.Text()) + } +} diff --git a/impl/pkg/service/pkarr.go b/impl/pkg/service/pkarr.go index ed1b68da..0d796388 100644 --- a/impl/pkg/service/pkarr.go +++ b/impl/pkg/service/pkarr.go @@ -156,7 +156,7 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response } var payload string if err = bencode.Unmarshal(bBytes, &payload); err != nil { - return nil, ssiutil.LoggingErrorMsg(err, "failed to unmarshal bencoded payload") + return nil, ssiutil.LoggingCtxErrorMsg(ctx, err, "failed to unmarshal bencoded payload") } resp := pkarr.Response{ V: []byte(payload), @@ -190,34 +190,35 @@ func (s *PkarrService) republish() { recordCnt, err := s.db.RecordCount(ctx) if err != nil { - logrus.WithError(err).Error("failed to get record count") + logrus.WithContext(ctx).WithError(err).Error("failed to get record count") } else { - logrus.WithField("record_count", recordCnt).Info("republishing records") + logrus.WithContext(ctx).WithField("record_count", recordCnt).Info("republishing records") } var nextPageToken []byte - var allRecords []pkarr.Record - errCnt, successCnt, batchCnt := 0, 0, 0 + var recordsBatch []pkarr.Record + seenRecords, errCnt, successCnt, batchCnt := 0, 0, 0, 0 for { - allRecords, nextPageToken, err = s.db.ListRecords(ctx, nextPageToken, 1000) + recordsBatch, nextPageToken, err = s.db.ListRecords(ctx, nextPageToken, 1000) if err != nil { - logrus.WithError(err).Error("failed to list record(s) for republishing") + logrus.WithContext(ctx).WithError(err).Error("failed to list record(s) for republishing") return } + seenRecords += len(recordsBatch) - if len(allRecords) == 0 { - logrus.Info("no records to republish") + if len(recordsBatch) == 0 { + logrus.WithContext(ctx).Info("no records to republish") return } - logrus.WithField("record_count", len(allRecords)).Infof("republishing records in batch: %d", batchCnt) + logrus.WithContext(ctx).WithField("record_count", len(recordsBatch)).Infof("republishing records in batch: %d", batchCnt) batchCnt++ - for _, record := range allRecords { + for _, record := range recordsBatch { recordID := zbase32.EncodeToString(record.Key[:]) - logrus.Debugf("republishing record: %s", recordID) + logrus.WithContext(ctx).Debugf("republishing record: %s", recordID) if _, err = s.dht.Put(ctx, record.BEP44()); err != nil { - logrus.WithError(err).Errorf("failed to republish record: %s", recordID) + logrus.WithContext(ctx).WithError(err).Errorf("failed to republish record: %s", recordID) errCnt++ continue } @@ -228,9 +229,10 @@ func (s *PkarrService) republish() { break } } - logrus.WithFields(logrus.Fields{ - "success": len(allRecords) - errCnt, + + logrus.WithContext(ctx).WithFields(logrus.Fields{ + "success": seenRecords - errCnt, "errors": errCnt, - "total": len(allRecords), + "total": seenRecords, }).Infof("republishing complete with [%d] batches", batchCnt) }