Skip to content

Commit

Permalink
simple cache (#61)
Browse files Browse the repository at this point in the history
* simple cache

* tested caching

* update codeowners
  • Loading branch information
decentralgabe authored Nov 30, 2023
1 parent abfe68e commit dc88e5b
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 67 deletions.
4 changes: 2 additions & 2 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
# The format is described: https://github.blog/2017-07-06-introducing-code-owners/

# These owners will be the default owners for everything in the repo.
* @decentralgabe @csuwildcat

* @decentralgabe
spec @csuwildcat

# -----------------------------------------------
# BELOW THIS LINE ARE TEMPLATES, UNUSED
Expand Down
10 changes: 6 additions & 4 deletions impl/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (e EnvironmentVariable) String() string {
type Config struct {
ServerConfig ServerConfig `toml:"server"`
DHTConfig DHTServiceConfig `toml:"dht"`
PKARRConfig PKARRServiceConfig `toml:"pkarr"`
PkarrConfig PKARRServiceConfig `toml:"pkarr"`
}

type ServerConfig struct {
Expand All @@ -56,7 +56,8 @@ type DHTServiceConfig struct {
}

type PKARRServiceConfig struct {
RepublishCRON string `toml:"republish_cron"`
RepublishCRON string `toml:"republish_cron"`
CacheTTLMinutes int64 `toml:"cache_ttl_minutes"`
}

func GetDefaultConfig() Config {
Expand All @@ -73,8 +74,9 @@ func GetDefaultConfig() Config {
DHTConfig: DHTServiceConfig{
BootstrapPeers: GetDefaultBootstrapPeers(),
},
PKARRConfig: PKARRServiceConfig{
RepublishCRON: "0 */2 * * *",
PkarrConfig: PKARRServiceConfig{
RepublishCRON: "0 */2 * * *",
CacheTTLMinutes: 10,
},
}
}
Expand Down
3 changes: 2 additions & 1 deletion impl/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ bootstrap_peers = ["router.magnets.im:6881", "router.bittorrent.com:6881", "dht.
"router.utorrent.com:6881", "router.nuh.dev:6881"]

[pkarr]
republish_cron = "0 */2 * * *" # every 2 hours
republish_cron = "0 */2 * * *" # every 2 hours
cache_ttl_minutes = 10
1 change: 1 addition & 0 deletions impl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ require (
github.com/hyperledger/aries-framework-go/component/models v0.0.0-20230501135648-a9a7ad029347 // indirect
github.com/hyperledger/aries-framework-go/spi v0.0.0-20230427134832-0c9969493bd3 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kilic/bls12-381 v0.1.1-0.20210503002446-7b7597926c69 // indirect
Expand Down
2 changes: 2 additions & 0 deletions impl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jellydator/ttlcache/v3 v3.1.0 h1:0gPFG0IHHP6xyUyXq+JaD8fwkDCqgqwohXNJBcYE71g=
github.com/jellydator/ttlcache/v3 v3.1.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
Expand Down
1 change: 1 addition & 0 deletions impl/pkg/server/gateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package server
10 changes: 5 additions & 5 deletions impl/pkg/server/pkarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (

// PKARRRouter is the router for the PKARR API
type PKARRRouter struct {
service *service.PKARRService
service *service.PkarrService
}

// NewPKARRRouter returns a new instance of the Relay router
func NewPKARRRouter(service *service.PKARRService) (*PKARRRouter, error) {
func NewPKARRRouter(service *service.PkarrService) (*PKARRRouter, error) {
return &PKARRRouter{service: service}, nil
}

Expand All @@ -42,7 +42,7 @@ func (r *PKARRRouter) GetRecord(c *gin.Context) {
return
}

resp, err := r.service.GetPKARR(c, *id)
resp, err := r.service.GetPkarr(c, *id)
if err != nil {
LoggingRespondErrWithMsg(c, err, "failed to get pkarr record", http.StatusInternalServerError)
return
Expand Down Expand Up @@ -109,13 +109,13 @@ func (r *PKARRRouter) PutRecord(c *gin.Context) {
bytes := body[:64]
sigBytes := [64]byte(bytes)
seq := int64(binary.BigEndian.Uint64(body[64:72]))
request := service.PublishPKARRRequest{
request := service.PublishPkarrRequest{
V: vBytes,
K: keyBytes,
Sig: sigBytes,
Seq: seq,
}
if err = r.service.PublishPKARR(c, request); err != nil {
if err = r.service.PublishPkarr(c, *id, request); err != nil {
LoggingRespondErrWithMsg(c, err, "failed to publish pkarr record", http.StatusInternalServerError)
return
}
Expand Down
20 changes: 15 additions & 5 deletions impl/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Server struct {
shutdown chan os.Signal

cfg *config.Config
svc *service.PKARRService
svc *service.PkarrService
}

// NewServer returns a new instance of Server with the given db and host.
Expand All @@ -43,7 +43,7 @@ func NewServer(cfg *config.Config, shutdown chan os.Signal) (*Server, error) {
return nil, util.LoggingErrorMsg(err, "failed to instantiate storage")
}

pkarrService, err := service.NewPKARRService(cfg, db)
pkarrService, err := service.NewPkarrService(cfg, db)
if err != nil {
return nil, util.LoggingErrorMsg(err, "could not instantiate pkarr service")
}
Expand All @@ -57,7 +57,7 @@ func NewServer(cfg *config.Config, shutdown chan os.Signal) (*Server, error) {
handler.GET("/swagger/*any", ginswagger.WrapHandler(swaggerfiles.Handler, ginswagger.URL("/swagger.yaml")))

// root relay API
if err = PKARRAPI(&handler.RouterGroup, pkarrService); err != nil {
if err = PkarrAPI(&handler.RouterGroup, pkarrService); err != nil {
return nil, util.LoggingErrorMsg(err, "could not setup pkarr API")
}
return &Server{
Expand Down Expand Up @@ -112,8 +112,8 @@ func setupHandler(env config.Environment) *gin.Engine {
return handler
}

// PKARRAPI sets up the relay API routes according to https://github.com/Nuhvi/pkarr/blob/main/design/relays.md
func PKARRAPI(rg *gin.RouterGroup, service *service.PKARRService) error {
// PkarrAPI sets up the relay API routes according to https://github.com/Nuhvi/pkarr/blob/main/design/relays.md
func PkarrAPI(rg *gin.RouterGroup, service *service.PkarrService) error {
relayRouter, err := NewPKARRRouter(service)
if err != nil {
return util.LoggingErrorMsg(err, "could not instantiate relay router")
Expand All @@ -123,3 +123,13 @@ func PKARRAPI(rg *gin.RouterGroup, service *service.PKARRService) error {
rg.GET("/:id", relayRouter.GetRecord)
return nil
}

// func GatewayAPI(rg *gin.RouterGroup, service *service.PkarrService) error {
// gatewayRouter, err := NewGatewayRouter(service)
// if err != nil {
// return util.LoggingErrorMsg(err, "could not instantiate gateway router")
// }
//
// rg.GET("/did", gatewayRouter.GetRecord)
// return nil
// }
4 changes: 2 additions & 2 deletions impl/pkg/server/server_pkarr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func TestPKARRRouter(t *testing.T) {
})
}

func testPKARRService(t *testing.T) service.PKARRService {
func testPKARRService(t *testing.T) service.PkarrService {
defaultConfig := config.GetDefaultConfig()
db, err := storage.NewStorage(defaultConfig.ServerConfig.DBFile)
require.NoError(t, err)
require.NotEmpty(t, db)
pkarrService, err := service.NewPKARRService(&defaultConfig, db)
pkarrService, err := service.NewPkarrService(&defaultConfig, db)
require.NoError(t, err)
require.NotEmpty(t, pkarrService)
return *pkarrService
Expand Down
74 changes: 48 additions & 26 deletions impl/pkg/service/pkarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"encoding/base64"
"errors"
"time"

"github.com/TBD54566975/ssi-sdk/util"
"github.com/anacrolix/dht/v2/bep44"
"github.com/anacrolix/torrent/bencode"
"github.com/jellydator/ttlcache/v3"
"github.com/sirupsen/logrus"

"github.com/TBD54566975/did-dht-method/config"
Expand All @@ -16,16 +18,17 @@ import (
"github.com/TBD54566975/did-dht-method/pkg/storage"
)

// PKARRService is the PKARR service responsible for managing the PKARR DHT and reading/writing records
type PKARRService struct {
// PkarrService is the Pkarr service responsible for managing the Pkarr DHT and reading/writing records
type PkarrService struct {
cfg *config.Config
db *storage.Storage
dht *dht.DHT
cache *ttlcache.Cache[string, storage.PkarrRecord]
scheduler *dhtint.Scheduler
}

// NewPKARRService returns a new instance of the PKARR service
func NewPKARRService(cfg *config.Config, db *storage.Storage) (*PKARRService, error) {
// NewPkarrService returns a new instance of the Pkarr service
func NewPkarrService(cfg *config.Config, db *storage.Storage) (*PkarrService, error) {
if cfg == nil {
return nil, util.LoggingNewError("config is required")
}
Expand All @@ -36,30 +39,37 @@ func NewPKARRService(cfg *config.Config, db *storage.Storage) (*PKARRService, er
if err != nil {
return nil, util.LoggingErrorMsg(err, "failed to instantiate dht")
}

// create and start cache and scheduler
ttl := time.Duration(cfg.PkarrConfig.CacheTTLMinutes) * time.Minute
cache := ttlcache.New[string, storage.PkarrRecord](
ttlcache.WithTTL[string, storage.PkarrRecord](ttl),
)
scheduler := dhtint.NewScheduler()
service := PKARRService{
service := PkarrService{
cfg: cfg,
db: db,
dht: d,
cache: cache,
scheduler: &scheduler,
}
if err = scheduler.Schedule(cfg.PKARRConfig.RepublishCRON, service.republish); err != nil {
go cache.Start()
if err = scheduler.Schedule(cfg.PkarrConfig.RepublishCRON, service.republish); err != nil {
return nil, util.LoggingErrorMsg(err, "failed to start republisher")
}
return &service, nil
}

// PublishPKARRRequest is the request to publish a PKARR record
type PublishPKARRRequest struct {
// PublishPkarrRequest is the request to publish a Pkarr record
type PublishPkarrRequest struct {
V []byte `validate:"required"`
K [32]byte `validate:"required"`
Sig [64]byte `validate:"required"`
Seq int64 `validate:"required"`
}

// isValid returns an error if the request is invalid
// also validates the signature
func (p PublishPKARRRequest) isValid() error {
// isValid returns an error if the request is invalid; also validates the signature
func (p PublishPkarrRequest) isValid() error {
if err := util.IsValidStruct(p); err != nil {
return err
}
Expand All @@ -74,26 +84,29 @@ func (p PublishPKARRRequest) isValid() error {
return nil
}

func (p PublishPKARRRequest) toRecord() storage.PKARRRecord {
func (p PublishPkarrRequest) toRecord() storage.PkarrRecord {
encoding := base64.RawURLEncoding
return storage.PKARRRecord{
return storage.PkarrRecord{
V: encoding.EncodeToString(p.V),
K: encoding.EncodeToString(p.K[:]),
Sig: encoding.EncodeToString(p.Sig[:]),
Seq: p.Seq,
}
}

// PublishPKARR stores the record in the db, publishes the given PKARR to the DHT, and returns the z-base-32 encoded ID
func (s *PKARRService) PublishPKARR(ctx context.Context, request PublishPKARRRequest) error {
// PublishPkarr stores the record in the db, publishes the given Pkarr record to the DHT, and returns the z-base-32 encoded ID
func (s *PkarrService) PublishPkarr(ctx context.Context, id string, request PublishPkarrRequest) error {
if err := request.isValid(); err != nil {
return err
}

// write to db and cache
// TODO(gabe): if putting to the DHT fails we should note that in the db and retry later
if err := s.db.WriteRecord(request.toRecord()); err != nil {
record := request.toRecord()
if err := s.db.WriteRecord(record); err != nil {
return err
}
s.cache.Set(id, record, ttlcache.DefaultTTL)

// return here and put it in the DHT asynchronously
go s.dht.Put(ctx, bep44.Put{
Expand All @@ -106,14 +119,14 @@ func (s *PKARRService) PublishPKARR(ctx context.Context, request PublishPKARRReq
return nil
}

// GetPKARRResponse is the response to a get PKARR request
type GetPKARRResponse struct {
// GetPkarrResponse is the response to a get Pkarr request
type GetPkarrResponse struct {
V []byte `validate:"required"`
Seq int64 `validate:"required"`
Sig [64]byte `validate:"required"`
}

func fromPKARRRecord(record storage.PKARRRecord) (*GetPKARRResponse, error) {
func fromPkarrRecord(record storage.PkarrRecord) (*GetPkarrResponse, error) {
encoding := base64.RawURLEncoding
vBytes, err := encoding.DecodeString(record.V)
if err != nil {
Expand All @@ -123,15 +136,24 @@ func fromPKARRRecord(record storage.PKARRRecord) (*GetPKARRResponse, error) {
if err != nil {
return nil, err
}
return &GetPKARRResponse{
return &GetPkarrResponse{
V: vBytes,
Seq: record.Seq,
Sig: [64]byte(sigBytes),
}, nil
}

// GetPKARR returns the full PKARR (including sig data) for the given z-base-32 encoded ID
func (s *PKARRService) GetPKARR(ctx context.Context, id string) (*GetPKARRResponse, error) {
// GetPkarr returns the full Pkarr record (including sig data) for the given z-base-32 encoded ID
func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*GetPkarrResponse, error) {
// first do a cache lookup
if s.cache.Has(id) {
cacheItem := s.cache.Get(id)
until := cacheItem.ExpiresAt().Sub(time.Now())
logrus.Debugf("resolved pkarr record<%s> from cache, with remaining TTL: %s", id, until)
return fromPkarrRecord(cacheItem.Value())
}

// next do a dht lookup
got, err := s.dht.GetFull(ctx, id)
if err != nil {
// try to resolve from storage before returning and error
Expand All @@ -143,7 +165,7 @@ func (s *PKARRService) GetPKARR(ctx context.Context, id string) (*GetPKARRRespon
return nil, err
}
logrus.Debugf("resolved pkarr<%s> from storage", id)
return fromPKARRRecord(*record)
return fromPkarrRecord(*record)
}
bBytes, err := got.V.MarshalBencode()
if err != nil {
Expand All @@ -153,15 +175,15 @@ func (s *PKARRService) GetPKARR(ctx context.Context, id string) (*GetPKARRRespon
if err = bencode.Unmarshal(bBytes, &payload); err != nil {
return nil, err
}
return &GetPKARRResponse{
return &GetPkarrResponse{
V: []byte(payload),
Seq: got.Seq,
Sig: got.Sig,
}, nil
}

// TODO(gabe) make this more efficient. create a publish schedule based on each individual record, not all records
func (s *PKARRService) republish() {
func (s *PkarrService) republish() {
allRecords, err := s.db.ListRecords()
if err != nil {
logrus.WithError(err).Error("failed to list record(s) for republishing")
Expand Down Expand Up @@ -189,7 +211,7 @@ func (s *PKARRService) republish() {
logrus.Infof("Republishing complete. Successfully republished %d out of %d record(s)", len(allRecords)-errCnt, len(allRecords))
}

func recordToBEP44Put(record storage.PKARRRecord) (*bep44.Put, error) {
func recordToBEP44Put(record storage.PkarrRecord) (*bep44.Put, error) {
encoding := base64.RawURLEncoding
vBytes, err := encoding.DecodeString(record.V)
if err != nil {
Expand Down
Loading

0 comments on commit dc88e5b

Please sign in to comment.