-
Notifications
You must be signed in to change notification settings - Fork 233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Routing Table Refresh manager #601
Conversation
dht.go
Outdated
// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will | ||
// be published soon. | ||
l1 := math.Log(float64(1) / float64(defaultBucketSize)) //(Log(1/K)) | ||
l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(defaultBucketSize))) // Log(1 - (alpha / K)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cfg.bucketSize
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. We also should have a check that alpha < K. If alpha >= K the math here falls apart and so maxLastSuccessfulOutboundThreshold
should just be cfg.routingTable.refreshInterval
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done.
rtrefresh/rt_refresh_manager.go
Outdated
rfnc := func(c int) (err error) { | ||
cpl := uint(c) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rfnc := func(c int) (err error) { | |
cpl := uint(c) | |
rfnc := func(cpl uint) (err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done.
rtrefresh/rt_refresh_manager.go
Outdated
|
||
var err error | ||
rfnc := func(c int) (err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may be worth making forceRefresh / refreshCpls paramters, and making this a method, rather than an anonymous function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that this anon func is ONLY used here and you'd have to pass it multiple params, I'm not sure we need the new method/abstraction. It's just a simple 4 line func.
dht_bootstrap_test.go
Outdated
require.Equal(t, connectedTo.self, d1.routingTable.ListPeers()[0]) | ||
|
||
// now emit the address change event | ||
em, err := d1.host.EventBus().Emitter(&event.EvtLocalAddressesUpdated{}) | ||
require.NoError(t, err) | ||
require.NoError(t, em.Emit(event.EvtLocalAddressesUpdated{})) | ||
waitForWellFormedTables(t, []*IpfsDHT{d1}, 2, 2, 2*time.Second) | ||
require.True(t, waitForWellFormedTables(t, []*IpfsDHT{d1}, 2, 2, 10*time.Second)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this flaky before, or did some change in this PR necessitate switching from 2 to 10 seconds here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aschmahmann This looks like an accidental change. I might have made it to test something. I've reverted it.
@@ -829,6 +827,19 @@ func TestRefreshBelowMinRTThreshold(t *testing.T) { | |||
assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!") | |||
} | |||
|
|||
func TestQueryWithEmptyRTShouldNotPanic(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add checks for at least SearchValues and GetPublicKey, since they should be easy enough and could end up with slightly different code paths. Could also add in the other functions from the routing interface too (Provide + PutValue).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We now ALSO cover GetValue
, SearchValues
and Provide
.
Looks like GetPublicKey
goes to the other node which involves some wire communication so not testing that for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, it's a bit weird that GetPublicKey
seems difficult to test. However, given that it's basically unused I'm ok not blocking on this for now and coming back to it later.
dht.go
Outdated
// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will | ||
// be published soon. | ||
l1 := math.Log(float64(1) / float64(defaultBucketSize)) //(Log(1/K)) | ||
l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(defaultBucketSize))) // Log(1 - (alpha / K)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. We also should have a check that alpha < K. If alpha >= K the math here falls apart and so maxLastSuccessfulOutboundThreshold
should just be cfg.routingTable.refreshInterval
rtrefresh/rt_refresh_manager.go
Outdated
refreshQueryTimeout time.Duration // timeout for one refresh query | ||
|
||
// interval between two periodic refreshes. | ||
// also, a cpl wont be refreshed it the time since it was last refreshed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// also, a cpl wont be refreshed it the time since it was last refreshed | |
// also, a cpl wont be refreshed if the time since it was last refreshed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done.
rtrefresh/rt_refresh_manager.go
Outdated
|
||
// EXECUTE the refresh | ||
|
||
// ping Routing Table peers that haven't been hear of/from in the interval they should have been. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// ping Routing Table peers that haven't been hear of/from in the interval they should have been. | |
// ping Routing Table peers that haven't been heard of/from in the interval they should have been. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done.
livelinessCtx, cancel := context.WithTimeout(r.ctx, peerPingTimeout) | ||
if err := r.h.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil { | ||
logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err) | ||
r.rt.RemovePeer(ps.Id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's worth throwing in a comment here about if they don't support the DHT protocol upon reconnect that it should be caught by the IdentifyCompleted events? Would adding the comment be too confusing and/or prone to becoming outdated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aschmahmann Yes, I don't think that documentation belongs to the refresh manager at all. This guy's job is simply to remove peers that aren't alive irrespective of whether the support the protocol or not.
rtrefresh/rt_refresh_manager.go
Outdated
// had paid more attention in the Math classes at university. | ||
// So, please be patient and a doc explaining it will be published soon. | ||
if r.rt.NPeersForCpl(cpl) == 0 { | ||
lastCpl := min(2*c, len(refreshCpls)-1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's an off by one error here. i.e. lastCpl := min(2*(c+1), len(refreshCpls)-1)
.
@petar @Stebalien can you double check me here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok: Substitute "len(refreshCpls)-1" for "lastCpl" in the "<" condition of the for loop. You get:
i < len(refreshCpls)-1+1
That's what you want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aarshkshah1992 I talked this over with @petar and where we got was that there's still some formalization to be figured out about how we want to deal with gaps. However, using lastCpl := min(2*c, len(refreshCpls)-1)
isn't necessarily great as is because if c=0
then we stop refreshing which seems bad (although it's highly unlikely in a large network).
Given that the pathological case where the first couple buckets are empty should be quite rare, it probably doesn't matter how many more buckets we refresh beyond 2*c (e.g. instead of the 2c in the min statement use 2c+1, 2(c+1), or min(2c, 5)). For now I'd just go with lastCpl := min(2*(c+1), len(refreshCpls)-1)
.
@aschmahmann I believe I've address all your concerns. Please take a look when you can. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few small changes and I think we're good to go!
go.mod
Outdated
@@ -18,7 +18,7 @@ require ( | |||
github.com/libp2p/go-eventbus v0.1.0 | |||
github.com/libp2p/go-libp2p v0.8.2 | |||
github.com/libp2p/go-libp2p-core v0.5.3 | |||
github.com/libp2p/go-libp2p-kbucket v0.4.1 | |||
github.com/libp2p/go-libp2p-kbucket v0.4.2-0.20200511065848-eb2adab8eaba |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason we shouldn't do a kbucket release and upgrade this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done.
var logger = logging.Logger("dht/RtRefreshManager") | ||
|
||
const ( | ||
peerPingTimeout = 10 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a constant instead of a RefresherManager variable? Is it implicitly relying on other constants (e.g. we're using 2x the libp2p/go-tcp-transport's default 5 second timeout), or just a default that seems reasonable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aschmahmann Just the default seems reasonable. This is a simple dial timeout.
rtrefresh/rt_refresh_manager.go
Outdated
// had paid more attention in the Math classes at university. | ||
// So, please be patient and a doc explaining it will be published soon. | ||
if r.rt.NPeersForCpl(cpl) == 0 { | ||
lastCpl := min(2*c, len(refreshCpls)-1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aarshkshah1992 I talked this over with @petar and where we got was that there's still some formalization to be figured out about how we want to deal with gaps. However, using lastCpl := min(2*c, len(refreshCpls)-1)
isn't necessarily great as is because if c=0
then we stop refreshing which seems bad (although it's highly unlikely in a large network).
Given that the pathological case where the first couple buckets are empty should be quite rare, it probably doesn't matter how many more buckets we refresh beyond 2*c (e.g. instead of the 2c in the min statement use 2c+1, 2(c+1), or min(2c, 5)). For now I'd just go with lastCpl := min(2*(c+1), len(refreshCpls)-1)
.
@@ -829,6 +827,19 @@ func TestRefreshBelowMinRTThreshold(t *testing.T) { | |||
assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!") | |||
} | |||
|
|||
func TestQueryWithEmptyRTShouldNotPanic(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, it's a bit weird that GetPublicKey
seems difficult to test. However, given that it's basically unused I'm ok not blocking on this for now and coming back to it later.
@aschmahmann Thanks for the great catch of the pathological condition. Have made the changes . Let's ship it ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aarshkshah1992 Left one nit that you are free to ignore if you want.
LGTM. Squash + merge when ready.
// FIXME: this can block. Ideally, we'd return a channel without blocking. | ||
// https://github.com/libp2p/go-libp2p-kad-dht/issues/609 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't have strong feelings about having FIXMEs like this in the code. However, this issue still remains so I'd leave the comment. Your call though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added this back.
Routing Table PR at libp2p/go-libp2p-kbucket#77.
It's time we did this. Also incorporates changes for #550 and #555 in meta-issue #556.
Once we have the new
SearchByKadId
RPC implemented, switching to it should just be a config change for theRefreshManager
.