Skip to content

Commit

Permalink
1. Expire non-provider records older than MaxAge
Browse files Browse the repository at this point in the history
2. Original publisher should republish putvalue records
3. Peers who receive a record will republish hourly
  • Loading branch information
aarshkshah1992 committed Sep 8, 2019
1 parent a12e621 commit b75fe58
Show file tree
Hide file tree
Showing 10 changed files with 399 additions and 14 deletions.
5 changes: 5 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type IpfsDHT struct {
routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
providers *providers.ProviderManager

nonProvRecordsManager *NonProvRecordsManager

birth time.Time // When this peer started up

Validator record.Validator
Expand Down Expand Up @@ -98,13 +100,15 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
})

dht.proc.AddChild(dht.providers.Process())
dht.proc.AddChild(dht.nonProvRecordsManager.Process())
dht.Validator = cfg.Validator

if !cfg.Client {
for _, p := range cfg.Protocols {
h.SetStreamHandler(p, dht.handleNewStream)
}
}

return dht, nil
}

Expand Down Expand Up @@ -156,6 +160,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
}

dht.ctx = dht.newContextWithLocalTags(ctx)
dht.nonProvRecordsManager = NewNonProvRecordsManager(ctx, dht, dstore)

return dht
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/mr-tron/base58 v1.1.2
github.com/multiformats/go-multiaddr v0.0.4
github.com/multiformats/go-multiaddr-dns v0.0.2
github.com/multiformats/go-multihash v0.0.5
github.com/multiformats/go-multistream v0.1.0
github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc
Expand Down
6 changes: 5 additions & 1 deletion handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) {
recordIsBad = true
}

if time.Since(recvtime) > MaxRecordAge {
if time.Since(recvtime) > maxNonProvRecordAge {
logger.Debug("old record found, tossing.")
recordIsBad = true
}
Expand Down Expand Up @@ -396,3 +396,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
func convertToDsKey(s []byte) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString(s))
}

func convertToOriginalKey(k string) ([]byte, error) {
return base32.RawStdEncoding.DecodeString(k)
}
193 changes: 193 additions & 0 deletions non_prov_records.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package dht

import (
"context"
"math/rand"
"strings"
"sync"
"time"

"github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
u "github.com/ipfs/go-ipfs-util"
"github.com/jbenet/goprocess"
"github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-kad-dht/providers"
recpb "github.com/libp2p/go-libp2p-record/pb"
)

// vars for cleaning up expired records
var nonProvRecordCleanupInterval = time.Hour

// maxNonProvRecordAge specifies the maximum time that any node will hold onto a record
// from the time its received. This does not apply to any other forms of validity that
// the record may contain.
// For example, a record may contain an ipns entry with an EOL saying its valid
// until the year 2020 (a great time in the future). For that record to stick around
// it must be rebroadcasted more frequently than once every 'maxNonProvRecordAge'
var maxNonProvRecordAge = time.Hour * 36

// vars for republishing records
var nonProvRecordRePublishInterval = 1 * time.Hour
var nonProvRecordRePublishAge = 1 * time.Hour
var enableRepublishJitter = true

type NonProvRecordsManager struct {
dht *IpfsDHT
ctx context.Context

proc goprocess.Process
dstore ds.Batching

cleanupInterval time.Duration // scan interval for expiring records

rePublishInterval time.Duration // scan interval for republishing records
}

func NewNonProvRecordsManager(ctx context.Context, dht *IpfsDHT, dstore ds.Batching) *NonProvRecordsManager {
m := new(NonProvRecordsManager)
m.dht = dht
m.ctx = ctx
m.dstore = dstore
m.proc = goprocessctx.WithContext(ctx)

// expire records beyond maxage
m.cleanupInterval = nonProvRecordCleanupInterval
m.proc.Go(m.expire)

// republish records older than prescribed age
m.rePublishInterval = nonProvRecordRePublishInterval
m.proc.Go(m.rePublish)

return m
}

func (m *NonProvRecordsManager) Process() goprocess.Process {
return m.proc
}

func (m *NonProvRecordsManager) rePublish(proc goprocess.Process) {
for {
var d = 0 * time.Minute
// minimizes the probability of all peers re-publishing together
// the first peer that re-publishes resets the receivedAt time on the record
// on all other peers that are among the K closest to the key, thus minimizing the number of republishes by other peers
if enableRepublishJitter {
d = time.Duration(rand.Intn(16)) * time.Minute
}

select {
case <-proc.Closing():
return
case <-time.After(m.rePublishInterval + d):
}

tFnc := func(t time.Time) bool {
return time.Since(t) > nonProvRecordRePublishAge && time.Since(t) < maxNonProvRecordAge
}

res, err := m.dstore.Query(query.Query{Filters: []query.Filter{&nonProvRecordFilter{tFnc}}})
if err != nil {
logger.Errorf("republish records proc: failed to run query against datastore, error is %s", err)
continue
}

var wg sync.WaitGroup
// semaphore to rate-limit number of concurrent PutValue calls
semaphore := make(chan struct{}, 5)
for {
e, ok := res.NextSync()
if !ok {
break
}

semaphore <- struct{}{}
wg.Add(1)
go func(e query.Result) {
defer func() {
<-semaphore
wg.Done()
}()

// unmarshal record
rec := new(recpb.Record)
if err := proto.Unmarshal(e.Value, rec); err != nil {
logger.Debugf("republish records proc: failed to unmarshal DHT record from datastore, error is %s", err)
return
}

// call put value
putCtx, cancel := context.WithTimeout(m.ctx, 2*time.Minute)
defer cancel()

// do not use e.key here as that represents the transformed version of the original key
// rec.GetKey is the original key sent by the peer who put this record to dht
if err := m.dht.PutValue(putCtx, string(rec.GetKey()), rec.Value); err != nil {
logger.Debugf("republish records proc: failed to re-publish to the network, error is %s", err)
}
}(e)
}
wg.Wait()
}
}

func (m *NonProvRecordsManager) expire(proc goprocess.Process) {
for {
select {
case <-proc.Closing():
return
case <-time.After(m.cleanupInterval):
}

tFnc := func(t time.Time) bool {
return time.Since(t) > maxNonProvRecordAge
}

res, err := m.dstore.Query(query.Query{Filters: []query.Filter{&nonProvRecordFilter{tFnc}}})
if err != nil {
logger.Errorf("expire records proc: failed to run query against datastore, error is %s", err)
continue
}

for {
e, ok := res.NextSync()
if !ok {
break
}
if err := m.dstore.Delete(ds.RawKey(e.Key)); err != nil {
logger.Errorf("expire records proc: failed to delete key %s from datastore, error is %s", e.Key, err)
}
}
}
}

type timeFilterFnc = func(t time.Time) bool

type nonProvRecordFilter struct {
tFnc timeFilterFnc
}

func (f *nonProvRecordFilter) Filter(e query.Entry) bool {
// unmarshal record
rec := new(recpb.Record)
if err := proto.Unmarshal(e.Value, rec); err != nil {
logger.Debugf("expire records filter: failed to unmarshal DHT record from datastore, error is %s", err)
return false
}

// should not be a provider record
if strings.HasPrefix(e.Key, providers.ProvidersKeyPrefix) {
return false
}

// parse received time
t, err := u.ParseRFC3339(rec.TimeReceived)
if err != nil {
logger.Debugf("expire records filter: failed to parse time in DHT record, error is %s", err)
return false
}

// apply the time filter fnc to the received time
return f.tFnc(t)
}
Loading

0 comments on commit b75fe58

Please sign in to comment.