Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor out routing system #655

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
6 changes: 4 additions & 2 deletions dual/dual_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
record "github.com/libp2p/go-libp2p-record"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"

dhtrouting "github.com/libp2p/go-libp2p-kad-dht/routing"
)

var wancid, lancid cid.Cid
Expand Down Expand Up @@ -263,7 +265,7 @@ func TestSearchValue(t *testing.T) {

_ = wan.PutValue(ctx, "/v/hello", []byte("valid"))

valCh, err := d.SearchValue(ctx, "/v/hello", dht.Quorum(0))
valCh, err := d.SearchValue(ctx, "/v/hello", dhtrouting.Quorum(0))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -291,7 +293,7 @@ func TestSearchValue(t *testing.T) {
t.Error(err)
}

valCh, err = d.SearchValue(ctx, "/v/hello", dht.Quorum(0))
valCh, err = d.SearchValue(ctx, "/v/hello", dhtrouting.Quorum(0))
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 6 additions & 0 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import (
// If the context is canceled, this function will return the context error along
// with the closest K peers it has found so far.
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
return dht.GetClosestPeersSeeded(ctx, key, nil, true)
}

// GetClosestPeersSeeded is the Kademlia 'node lookup' operation
func (dht *IpfsDHT) GetClosestPeersSeeded(ctx context.Context, key string, seedPeers []peer.ID, useRTPeers bool) (<-chan peer.ID, error) {
Comment on lines +23 to +24
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the first of these double functions, it's a bit unfortunate that we have to expose 2x as many functions for the same thing. We could move them into a new struct, but I'm not sure if it's worth it. Thoughts?

If we're going to leave these functions on the main struct we should have a consistent naming scheme for these. Extended feeds pretty general and something to do with seeding the query or being a continuable query seems a little specific. I'm up for suggestions, otherwise I'll just use [OriginalName]Exteneded everywhere.

Also, while we're rewriting this it would be great to return []peer.ID instead of chan peer.ID, but some tests may have to be modified.

if key == "" {
return nil, fmt.Errorf("can't lookup empty key")
}
Expand Down Expand Up @@ -45,6 +50,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
return peers, err
},
func() bool { return false },
seedPeers, useRTPeers,
)

if err != nil {
Expand Down
21 changes: 17 additions & 4 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ type lookupWithFollowupResult struct {
//
// After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the
// lookup that have not already been successfully queried.
func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn, seedPeers []peer.ID, useRTPeers bool) (*lookupWithFollowupResult, error) {
// run the query
lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn)
lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn, seedPeers, useRTPeers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -145,10 +145,23 @@ processFollowUp:
return lookupRes, nil
}

func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn, manuallySeededPeers []peer.ID, useRTPeers bool) (*lookupWithFollowupResult, error) {
// pick the K closest peers to the key in our Routing table.
targetKadID := kb.ConvertKey(target)
seedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize)

seedPeerSet := peer.NewSet()
for _, p := range manuallySeededPeers {
seedPeerSet.Add(p)
}

if manuallySeededPeers == nil || useRTPeers {
RTSeedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize)
for _, p := range RTSeedPeers {
seedPeerSet.Add(p)
}
}

seedPeers := seedPeerSet.Peers()
if len(seedPeers) == 0 {
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.QueryError,
Expand Down
Loading