From b75fe587358c918b019e18318fa78b2ef2da1e64 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Mon, 2 Sep 2019 20:48:14 +0800 Subject: [PATCH] 1. Expire non-provider records older than MaxAge 2. Original publisher should republish putvalue records 3. Peers who receive a record will republish hourly --- dht.go | 5 + go.mod | 1 + handlers.go | 6 +- non_prov_records.go | 193 ++++++++++++++++++++++++++ non_prov_records_test.go | 166 ++++++++++++++++++++++ records.go => pk_records.go | 9 -- records_test.go => pk_records_test.go | 0 providers/providers.go | 7 +- providers/providers_test.go | 2 +- routing.go | 24 ++++ 10 files changed, 399 insertions(+), 14 deletions(-) create mode 100644 non_prov_records.go create mode 100644 non_prov_records_test.go rename records.go => pk_records.go (87%) rename records_test.go => pk_records_test.go (100%) diff --git a/dht.go b/dht.go index ce2151dfb..63d3e54b3 100644 --- a/dht.go +++ b/dht.go @@ -53,6 +53,8 @@ type IpfsDHT struct { routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes providers *providers.ProviderManager + nonProvRecordsManager *NonProvRecordsManager + birth time.Time // When this peer started up Validator record.Validator @@ -98,6 +100,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er }) dht.proc.AddChild(dht.providers.Process()) + dht.proc.AddChild(dht.nonProvRecordsManager.Process()) dht.Validator = cfg.Validator if !cfg.Client { @@ -105,6 +108,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er h.SetStreamHandler(p, dht.handleNewStream) } } + return dht, nil } @@ -156,6 +160,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p } dht.ctx = dht.newContextWithLocalTags(ctx) + dht.nonProvRecordsManager = NewNonProvRecordsManager(ctx, dht, dstore) return dht } diff --git a/go.mod b/go.mod index 770abdc53..958eff7c7 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/mr-tron/base58 v1.1.2 github.com/multiformats/go-multiaddr v0.0.4 github.com/multiformats/go-multiaddr-dns v0.0.2 + github.com/multiformats/go-multihash v0.0.5 github.com/multiformats/go-multistream v0.1.0 github.com/stretchr/testify v1.3.0 github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc diff --git a/handlers.go b/handlers.go index 95f60e674..a0c730520 100644 --- a/handlers.go +++ b/handlers.go @@ -121,7 +121,7 @@ func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) { recordIsBad = true } - if time.Since(recvtime) > MaxRecordAge { + if time.Since(recvtime) > maxNonProvRecordAge { logger.Debug("old record found, tossing.") recordIsBad = true } @@ -396,3 +396,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M func convertToDsKey(s []byte) ds.Key { return ds.NewKey(base32.RawStdEncoding.EncodeToString(s)) } + +func convertToOriginalKey(k string) ([]byte, error) { + return base32.RawStdEncoding.DecodeString(k) +} diff --git a/non_prov_records.go b/non_prov_records.go new file mode 100644 index 000000000..b72a0611e --- /dev/null +++ b/non_prov_records.go @@ -0,0 +1,193 @@ +package dht + +import ( + "context" + "math/rand" + "strings" + "sync" + "time" + + "github.com/gogo/protobuf/proto" + ds "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" + u "github.com/ipfs/go-ipfs-util" + "github.com/jbenet/goprocess" + "github.com/jbenet/goprocess/context" + "github.com/libp2p/go-libp2p-kad-dht/providers" + recpb "github.com/libp2p/go-libp2p-record/pb" +) + +// vars for cleaning up expired records +var nonProvRecordCleanupInterval = time.Hour + +// maxNonProvRecordAge specifies the maximum time that any node will hold onto a record +// from the time its received. This does not apply to any other forms of validity that +// the record may contain. +// For example, a record may contain an ipns entry with an EOL saying its valid +// until the year 2020 (a great time in the future). For that record to stick around +// it must be rebroadcasted more frequently than once every 'maxNonProvRecordAge' +var maxNonProvRecordAge = time.Hour * 36 + +// vars for republishing records +var nonProvRecordRePublishInterval = 1 * time.Hour +var nonProvRecordRePublishAge = 1 * time.Hour +var enableRepublishJitter = true + +type NonProvRecordsManager struct { + dht *IpfsDHT + ctx context.Context + + proc goprocess.Process + dstore ds.Batching + + cleanupInterval time.Duration // scan interval for expiring records + + rePublishInterval time.Duration // scan interval for republishing records +} + +func NewNonProvRecordsManager(ctx context.Context, dht *IpfsDHT, dstore ds.Batching) *NonProvRecordsManager { + m := new(NonProvRecordsManager) + m.dht = dht + m.ctx = ctx + m.dstore = dstore + m.proc = goprocessctx.WithContext(ctx) + + // expire records beyond maxage + m.cleanupInterval = nonProvRecordCleanupInterval + m.proc.Go(m.expire) + + // republish records older than prescribed age + m.rePublishInterval = nonProvRecordRePublishInterval + m.proc.Go(m.rePublish) + + return m +} + +func (m *NonProvRecordsManager) Process() goprocess.Process { + return m.proc +} + +func (m *NonProvRecordsManager) rePublish(proc goprocess.Process) { + for { + var d = 0 * time.Minute + // minimizes the probability of all peers re-publishing together + // the first peer that re-publishes resets the receivedAt time on the record + // on all other peers that are among the K closest to the key, thus minimizing the number of republishes by other peers + if enableRepublishJitter { + d = time.Duration(rand.Intn(16)) * time.Minute + } + + select { + case <-proc.Closing(): + return + case <-time.After(m.rePublishInterval + d): + } + + tFnc := func(t time.Time) bool { + return time.Since(t) > nonProvRecordRePublishAge && time.Since(t) < maxNonProvRecordAge + } + + res, err := m.dstore.Query(query.Query{Filters: []query.Filter{&nonProvRecordFilter{tFnc}}}) + if err != nil { + logger.Errorf("republish records proc: failed to run query against datastore, error is %s", err) + continue + } + + var wg sync.WaitGroup + // semaphore to rate-limit number of concurrent PutValue calls + semaphore := make(chan struct{}, 5) + for { + e, ok := res.NextSync() + if !ok { + break + } + + semaphore <- struct{}{} + wg.Add(1) + go func(e query.Result) { + defer func() { + <-semaphore + wg.Done() + }() + + // unmarshal record + rec := new(recpb.Record) + if err := proto.Unmarshal(e.Value, rec); err != nil { + logger.Debugf("republish records proc: failed to unmarshal DHT record from datastore, error is %s", err) + return + } + + // call put value + putCtx, cancel := context.WithTimeout(m.ctx, 2*time.Minute) + defer cancel() + + // do not use e.key here as that represents the transformed version of the original key + // rec.GetKey is the original key sent by the peer who put this record to dht + if err := m.dht.PutValue(putCtx, string(rec.GetKey()), rec.Value); err != nil { + logger.Debugf("republish records proc: failed to re-publish to the network, error is %s", err) + } + }(e) + } + wg.Wait() + } +} + +func (m *NonProvRecordsManager) expire(proc goprocess.Process) { + for { + select { + case <-proc.Closing(): + return + case <-time.After(m.cleanupInterval): + } + + tFnc := func(t time.Time) bool { + return time.Since(t) > maxNonProvRecordAge + } + + res, err := m.dstore.Query(query.Query{Filters: []query.Filter{&nonProvRecordFilter{tFnc}}}) + if err != nil { + logger.Errorf("expire records proc: failed to run query against datastore, error is %s", err) + continue + } + + for { + e, ok := res.NextSync() + if !ok { + break + } + if err := m.dstore.Delete(ds.RawKey(e.Key)); err != nil { + logger.Errorf("expire records proc: failed to delete key %s from datastore, error is %s", e.Key, err) + } + } + } +} + +type timeFilterFnc = func(t time.Time) bool + +type nonProvRecordFilter struct { + tFnc timeFilterFnc +} + +func (f *nonProvRecordFilter) Filter(e query.Entry) bool { + // unmarshal record + rec := new(recpb.Record) + if err := proto.Unmarshal(e.Value, rec); err != nil { + logger.Debugf("expire records filter: failed to unmarshal DHT record from datastore, error is %s", err) + return false + } + + // should not be a provider record + if strings.HasPrefix(e.Key, providers.ProvidersKeyPrefix) { + return false + } + + // parse received time + t, err := u.ParseRFC3339(rec.TimeReceived) + if err != nil { + logger.Debugf("expire records filter: failed to parse time in DHT record, error is %s", err) + return false + } + + // apply the time filter fnc to the received time + return f.tFnc(t) +} diff --git a/non_prov_records_test.go b/non_prov_records_test.go new file mode 100644 index 000000000..c11172239 --- /dev/null +++ b/non_prov_records_test.go @@ -0,0 +1,166 @@ +package dht + +import ( + "context" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + pb "github.com/libp2p/go-libp2p-kad-dht/pb" + recpb "github.com/libp2p/go-libp2p-record/pb" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/assert" +) + +func TestExpireNonProviderRecords(t *testing.T) { + // short sweep duration for testing + sVal := nonProvRecordCleanupInterval + defer func() { nonProvRecordCleanupInterval = sVal }() + nonProvRecordCleanupInterval = 10 * time.Millisecond + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // TEST expiry does not happen if age(record) < MaxAge + + dhtA := setupDHT(ctx, t, false) + dhtB := setupDHT(ctx, t, false) + connect(t, ctx, dhtA, dhtB) + + // dhtA puts non-provider record with current time which WILL get stored on B + key1 := "/v/key1" + value1 := []byte("v1") + assert.NoError(t, dhtA.PutValue(ctx, key1, value1)) + + // sweep will not delete it + time.Sleep(100 * time.Millisecond) + + // get & verify it's present on B + _, err := dhtB.datastore.Get(convertToDsKey([]byte(key1))) + assert.NoError(t, err) + + // cleanup + dhtA.Close() + dhtA.host.Close() + dhtB.Close() + dhtB.host.Close() + + // TEST expiry happens if age(record) > MaxAge + + mVal := maxNonProvRecordAge + maxNonProvRecordAge = 50 * time.Millisecond + defer func() { maxNonProvRecordAge = mVal }() + + dhtA = setupDHT(ctx, t, false) + dhtB = setupDHT(ctx, t, false) + connect(t, ctx, dhtA, dhtB) + defer func() { + dhtA.Close() + dhtA.host.Close() + dhtB.Close() + dhtB.host.Close() + }() + + // dhtA puts non-provider record with current time + assert.NoError(t, dhtA.PutValue(ctx, key1, value1)) + + // dhtA adds provider record with current time + mh, err := multihash.Sum([]byte("data"), multihash.SHA2_256, -1) + assert.NoError(t, err) + c := cid.NewCidV0(mh) + assert.NoError(t, dhtA.Provide(ctx, c, true)) + + // sweep will remove non-provider record on B now + time.Sleep(1 * time.Second) + + // verify non-provider record is absent on B + _, err = dhtB.datastore.Get(convertToDsKey([]byte(key1))) + assert.Equal(t, ds.ErrNotFound, err) + + // but.... provider record is still available + m, err := getTestProvRecord(t, ctx, dhtB, c) + assert.NoError(t, err) + assert.NotEmpty(t, m.ProviderPeers) +} + +func TestRepublishNonProvRecords(t *testing.T) { + // short scan duration for re-publish + sVal := nonProvRecordRePublishInterval + nonProvRecordRePublishInterval = 10 * time.Millisecond + jVal := enableRepublishJitter + enableRepublishJitter = false + defer func() { + enableRepublishJitter = jVal + nonProvRecordRePublishInterval = sVal + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // TEST re-publish does not happen if age(record) < republishAge + + dhtA := setupDHT(ctx, t, false) + dhtB := setupDHT(ctx, t, false) + connect(t, ctx, dhtA, dhtB) + + // dhtA puts non-provider record with current time which WILL get stored on B + key1 := "/v/key1" + value1 := []byte("v1") + assert.NoError(t, dhtA.PutValue(ctx, key1, value1)) + + // dhtA DELETES it locally + assert.NoError(t, dhtA.datastore.Delete(convertToDsKey([]byte(key1)))) + + // it will not be re-published by B + time.Sleep(2 * time.Second) + + // get on dhtA & verify it's absent + _, err := dhtA.datastore.Get(convertToDsKey([]byte(key1))) + assert.Equal(t, ds.ErrNotFound, err) + + // cleanup + dhtA.Close() + dhtA.host.Close() + dhtB.Close() + dhtB.host.Close() + + // TEST re-publish happens if age(record) > republishAge + + mVal := nonProvRecordRePublishAge + nonProvRecordRePublishAge = 100 * time.Millisecond + defer func() { nonProvRecordRePublishAge = mVal }() + + dhtA = setupDHT(ctx, t, false) + dhtB = setupDHT(ctx, t, false) + connect(t, ctx, dhtA, dhtB) + defer func() { + dhtA.Close() + dhtA.host.Close() + dhtB.Close() + dhtB.host.Close() + }() + + // dhtA puts non-provider record with current time + assert.NoError(t, dhtA.PutValue(ctx, key1, value1)) + + // dhtA DELETES it locally + assert.NoError(t, dhtA.datastore.Delete(convertToDsKey([]byte(key1)))) + + // it will be re-published by B + time.Sleep(2 * time.Second) + + // get on dhtA & verify key is present (because it SHOULD have been re-published by B) + v, err := dhtA.datastore.Get(convertToDsKey([]byte(key1))) + assert.NoError(t, err) + rec := new(recpb.Record) + assert.NoError(t, proto.Unmarshal(v, rec)) + assert.Equal(t, value1, rec.Value) +} + +func getTestProvRecord(t *testing.T, ctx context.Context, d *IpfsDHT, c cid.Cid) (*pb.Message, error) { + pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, c.Bytes(), 0) + m, err := d.handleGetProviders(ctx, "test peer", pmes) + return m, err +} diff --git a/records.go b/pk_records.go similarity index 87% rename from records.go rename to pk_records.go index 5f641b056..7a1ee0ca0 100644 --- a/records.go +++ b/pk_records.go @@ -3,7 +3,6 @@ package dht import ( "context" "fmt" - "time" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/routing" @@ -11,14 +10,6 @@ import ( ci "github.com/libp2p/go-libp2p-core/crypto" ) -// MaxRecordAge specifies the maximum time that any node will hold onto a record -// from the time its received. This does not apply to any other forms of validity that -// the record may contain. -// For example, a record may contain an ipns entry with an EOL saying its valid -// until the year 2020 (a great time in the future). For that record to stick around -// it must be rebroadcasted more frequently than once every 'MaxRecordAge' -const MaxRecordAge = time.Hour * 36 - type pubkrs struct { pubk ci.PubKey err error diff --git a/records_test.go b/pk_records_test.go similarity index 100% rename from records_test.go rename to pk_records_test.go diff --git a/providers/providers.go b/providers/providers.go index ec44cc511..c72125152 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -74,10 +74,11 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) return pm } -const providersKeyPrefix = "/providers/" +// prefix to be used for all provider record keys +const ProvidersKeyPrefix = "/providers/" func mkProvKey(k cid.Cid) string { - return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes()) + return ProvidersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes()) } func (pm *ProviderManager) Process() goprocess.Process { @@ -284,7 +285,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) { // Now, kick off a GC of the datastore. q, err := pm.dstore.Query(dsq.Query{ - Prefix: providersKeyPrefix, + Prefix: ProvidersKeyPrefix, }) if err != nil { log.Error("provider record GC query failed: ", err) diff --git a/providers/providers_test.go b/providers/providers_test.go index 58756c55c..08423ddf6 100644 --- a/providers/providers_test.go +++ b/providers/providers_test.go @@ -185,7 +185,7 @@ func TestProvidesExpire(t *testing.T) { t.Fatal("providers map not cleaned up") } - res, err := ds.Query(dsq.Query{Prefix: providersKeyPrefix}) + res, err := ds.Query(dsq.Query{Prefix: ProvidersKeyPrefix}) if err != nil { t.Fatal(err) } diff --git a/routing.go b/routing.go index 9435d1d98..f26b8e314 100644 --- a/routing.go +++ b/routing.go @@ -27,6 +27,8 @@ import ( // results will wait for the channel to drain. var asyncQueryBuffer = 10 +var putValueRepublishInterval = 24 * time.Hour + // This file implements the Routing interface for the IpfsDHT struct. // Basic Put/Get @@ -98,6 +100,28 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts }(p) } wg.Wait() + + // original publisher should keep re-publishing the record because the network isn't `steady`/`stable` + // and the K closet peers we just published to can become unavailable / no longer be the K closet + go func() { + for { + select { + case <-dht.proc.Closing(): + return + case <-time.After(putValueRepublishInterval): + // TODO:We can not re-use the original context here as it may have expired + // But, is it fair to use this one ? + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + if err := dht.PutValue(ctx, key, value, opts...); err != nil { + logger.Errorf("putValue republish proc: failed to republish key %s, error is %+v", key, err) + } else { + logger.Debugf("putValue republish proc: successfully republished key %s", key) + } + cancel() + } + } + }() + return nil }