Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Routing Table Refresh manager #601

Merged
merged 13 commits into from
May 19, 2020
98 changes: 56 additions & 42 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"
"go.uber.org/zap"

"go.opencensus.io/tag"

"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p-kad-dht/rtrefresh"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
Expand All @@ -33,6 +31,8 @@ import (
goprocessctx "github.com/jbenet/goprocess/context"
"github.com/multiformats/go-base32"
"github.com/multiformats/go-multihash"
"go.opencensus.io/tag"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -69,6 +69,9 @@ type IpfsDHT struct {
// ProviderManager stores & manages the provider records for this Dht peer.
ProviderManager *providers.ProviderManager

// manages Routing Table refresh
rtRefreshManager *rtrefresh.RtRefreshManager

birth time.Time // When this peer started up

Validator record.Validator
Expand Down Expand Up @@ -101,11 +104,7 @@ type IpfsDHT struct {
queryPeerFilter QueryFilterFunc
routingTablePeerFilter RouteTableFilterFunc

autoRefresh bool
rtRefreshQueryTimeout time.Duration
rtRefreshInterval time.Duration
triggerRtRefresh chan chan<- error
triggerSelfLookup chan chan<- error
autoRefresh bool

maxRecordAge time.Duration

Expand All @@ -114,11 +113,6 @@ type IpfsDHT struct {
// networks).
enableProviders, enableValues bool

// successfulOutboundQueryGracePeriod is the maximum grace period we will give to a peer
// to between two successful query responses from it, failing which,
// we will ping it to see if it's alive.
successfulOutboundQueryGracePeriod time.Duration

fixLowPeersChan chan struct{}
}

Expand Down Expand Up @@ -148,14 +142,13 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
if err := cfg.validate(); err != nil {
return nil, err
}

dht, err := makeDHT(ctx, h, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create DHT, err=%s", err)
}

dht.autoRefresh = cfg.routingTable.autoRefresh
dht.rtRefreshInterval = cfg.routingTable.refreshInterval
dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout

dht.maxRecordAge = cfg.maxRecordAge
dht.enableProviders = cfg.enableProviders
Expand Down Expand Up @@ -188,8 +181,9 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// handle providers
dht.proc.AddChild(dht.ProviderManager.Process())

dht.startSelfLookup()
dht.startRefreshing()
if err := dht.rtRefreshManager.Start(); err != nil {
return nil, err
}

// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
// since RT membership is decoupled from connectivity
Expand Down Expand Up @@ -257,22 +251,37 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
beta: cfg.resiliency,
triggerRtRefresh: make(chan chan<- error),
triggerSelfLookup: make(chan chan<- error),
queryPeerFilter: cfg.queryPeerFilter,
routingTablePeerFilter: cfg.routingTable.peerFilter,
fixLowPeersChan: make(chan struct{}),
}

// The threshold is calculated based on the expected amount of time that should pass before we
// query a peer as part of our refresh cycle.
// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
// be published soon.
l1 := math.Log(float64(1) / float64(defaultBucketSize)) //(Log(1/K))
l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(defaultBucketSize))) // Log(1 - (alpha / K))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cfg.bucketSize?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. We also should have a check that alpha < K. If alpha >= K the math here falls apart and so maxLastSuccessfulOutboundThreshold should just be cfg.routingTable.refreshInterval

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done.

maxLastSuccessfulOutboundThreshold := time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))

// construct routing table
rt, err := makeRoutingTable(dht, cfg)
rt, err := makeRoutingTable(dht, cfg, maxLastSuccessfulOutboundThreshold)
if err != nil {
return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
}
dht.routingTable = rt

// rt refresh manager
rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
if err != nil {
return nil, fmt.Errorf("failed to construct RT Refresh Manager,err=%s", err)
}
dht.rtRefreshManager = rtRefresh

// create a DHT proc with the given context
dht.proc = goprocessctx.WithContext(ctx)
dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
return rtRefresh.Close()
})

// create a tagged context derived from the original context
ctxTags := dht.newContextWithLocalTags(ctx)
Expand All @@ -284,19 +293,32 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
return dht, nil
}

func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
// The threshold is calculated based on the expected amount of time that should pass before we
// query a peer as part of our refresh cycle.
// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
// be published soon.
l1 := math.Log(float64(1) / float64(defaultBucketSize)) //(Log(1/K))
l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(defaultBucketSize))) // Log(1 - (alpha / K))
maxLastSuccessfulOutboundThreshold := time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
keyGenFnc := func(cpl uint) (string, error) {
p, err := dht.routingTable.GenRandPeerID(cpl)
return string(p), err
}

queryFnc := func(ctx context.Context, key string) error {
_, err := dht.GetClosestPeers(ctx, key)
return err
}

r, err := rtrefresh.NewRtRefreshManager(
dht.host, dht.routingTable, cfg.routingTable.autoRefresh,
keyGenFnc,
queryFnc,
cfg.routingTable.refreshQueryTimeout,
cfg.routingTable.refreshInterval,
maxLastSuccessfulOutboundThreshold)

return r, err
}

func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
self := kb.ConvertPeerID(dht.host.ID())

rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
dht.successfulOutboundQueryGracePeriod = maxLastSuccessfulOutboundThreshold
cmgr := dht.host.ConnManager()

rt.PeerAdded = func(p peer.ID) {
Expand Down Expand Up @@ -335,10 +357,7 @@ func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
}

if dht.autoRefresh {
select {
case dht.triggerRtRefresh <- nil:
default:
}
dht.rtRefreshManager.RefreshNoWait()
}
}

Expand Down Expand Up @@ -458,8 +477,8 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
// might support the DHT protocol.
// If we have a connection a peer but no exchange of a query RPC ->
// LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
// LastUsefulAt=N/A
// If we connect to a peer and exchange a query RPC ->
// LastUsefulAt=0
// If we connect to a peer and then exchange a query RPC ->
// LastQueriedAt=time.Now (same reason as above)
// LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
// If we query a peer we already have in our Routing Table ->
Expand All @@ -480,12 +499,7 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
// peer not added.
return
}

// If we freshly added the peer because of a query, we need to ensure we override the "zero" lastUsefulAt
// value that must have been set in the Routing Table for this peer when it was first added during a connection.
if newlyAdded && queryPeer {
dht.routingTable.UpdateLastUsefulAt(p, time.Now())
} else if queryPeer {
if !newlyAdded && queryPeer {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
Expand Down
Loading