Skip to content

Commit

Permalink
feat: add support for controlling the seed peers used in queries
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Jan 4, 2021
1 parent 6a546cf commit 9b4bfb9
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 11 deletions.
6 changes: 6 additions & 0 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import (
// If the context is canceled, this function will return the context error along
// with the closest K peers it has found so far.
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
return dht.GetClosestPeersSeeded(ctx, key, nil, true)
}

// GetClosestPeersSeeded is the Kademlia 'node lookup' operation
func (dht *IpfsDHT) GetClosestPeersSeeded(ctx context.Context, key string, seedPeers []peer.ID, useRTPeers bool) (<-chan peer.ID, error) {
if key == "" {
return nil, fmt.Errorf("can't lookup empty key")
}
Expand Down Expand Up @@ -45,6 +50,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
return peers, err
},
func() bool { return false },
seedPeers, useRTPeers,
)

if err != nil {
Expand Down
21 changes: 17 additions & 4 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ type lookupWithFollowupResult struct {
//
// After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the
// lookup that have not already been successfully queried.
func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn, seedPeers []peer.ID, useRTPeers bool) (*lookupWithFollowupResult, error) {
// run the query
lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn)
lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn, seedPeers, useRTPeers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -145,10 +145,23 @@ processFollowUp:
return lookupRes, nil
}

func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn, manuallySeededPeers []peer.ID, useRTPeers bool) (*lookupWithFollowupResult, error) {
// pick the K closest peers to the key in our Routing table.
targetKadID := kb.ConvertKey(target)
seedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize)

seedPeerSet := peer.NewSet()
for _, p := range manuallySeededPeers {
seedPeerSet.Add(p)
}

if manuallySeededPeers == nil || useRTPeers {
RTSeedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize)
for _, p := range RTSeedPeers {
seedPeerSet.Add(p)
}
}

seedPeers := seedPeerSet.Peers()
if len(seedPeers) == 0 {
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.QueryError,
Expand Down
38 changes: 31 additions & 7 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ func (dht *IpfsDHT) PutValueExtended(ctx context.Context, key string, value []by

logger.Debugw("putting value", "key", internal.LoggableRecordKeyString(key))

var cfg routing.Options
if err := cfg.Apply(opts...); err != nil {
return nil, err
}
seedPeerOpts := dhtrouting.GetSeedPeers(&cfg)

// don't even allow local users to put bad values.
if err := dht.Validator.Validate(key, value); err != nil {
return nil, err
Expand Down Expand Up @@ -64,7 +70,7 @@ func (dht *IpfsDHT) PutValueExtended(ctx context.Context, key string, value []by
return nil, err
}

pchan, err := dht.GetClosestPeers(ctx, key)
pchan, err := dht.GetClosestPeersSeeded(ctx, key, seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -151,8 +157,10 @@ func (dht *IpfsDHT) SearchValueExtended(ctx context.Context, key string, opts ..
responsesNeeded = dhtrouting.GetQuorum(&cfg)
}

seedPeerOpts := dhtrouting.GetSeedPeers(&cfg)

stopCh := make(chan struct{})
valCh, lookupRes := dht.getValues(ctx, key, stopCh)
valCh, lookupRes := dht.getValues(ctx, key, seedPeerOpts, stopCh)

out := make(chan []byte)
peers := make(chan []peer.ID, 1)
Expand Down Expand Up @@ -226,7 +234,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R

queryCtx, cancel := context.WithCancel(ctx)
defer cancel()
valCh, _ := dht.getValues(queryCtx, key, nil)
valCh, _ := dht.getValues(queryCtx, key, dhtrouting.SeedPeersOptions{UseRTPeers: true}, nil)

out := make([]RecvdVal, 0, nvals)
for val := range valCh {
Expand Down Expand Up @@ -304,7 +312,7 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte
}
}

func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
func (dht *IpfsDHT) getValues(ctx context.Context, key string, seedPeerOpts dhtrouting.SeedPeersOptions, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
valCh := make(chan RecvdVal, 1)
lookupResCh := make(chan *lookupWithFollowupResult, 1)

Expand Down Expand Up @@ -380,6 +388,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
return false
}
},
seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers,
)

if err != nil {
Expand Down Expand Up @@ -416,6 +425,12 @@ func (dht *IpfsDHT) ProvideExtended(ctx context.Context, key cid.Cid, brdcst boo
keyMH := key.Hash()
logger.Debugw("providing", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))

var cfg routing.Options
if err := cfg.Apply(opts...); err != nil {
return nil, err
}
seedPeerOpts := dhtrouting.GetSeedPeers(&cfg)

// add self locally
dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
if !brdcst {
Expand Down Expand Up @@ -444,7 +459,7 @@ func (dht *IpfsDHT) ProvideExtended(ctx context.Context, key cid.Cid, brdcst boo
}

var exceededDeadline bool
peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
peers, err := dht.GetClosestPeersSeeded(closerCtx, string(keyMH), seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers)
switch err {
case context.DeadlineExceeded:
// If the _inner_ deadline has been exceeded but the _outer_
Expand Down Expand Up @@ -516,6 +531,7 @@ func (dht *IpfsDHT) FindProvidersAsyncExtended(ctx context.Context, key cid.Cid,
}

count := dhtrouting.GetQuorum(&cfg)
seedPeerOpts := dhtrouting.GetSeedPeers(&cfg)

chSize := count
if count == 0 {
Expand All @@ -527,7 +543,7 @@ func (dht *IpfsDHT) FindProvidersAsyncExtended(ctx context.Context, key cid.Cid,
keyMH := key.Hash()

logger.Debugw("finding providers", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))
go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut, closestPeersOut)
go dht.findProvidersAsyncRoutine(ctx, keyMH, count, seedPeerOpts, peerOut, closestPeersOut)
return peerOut, closestPeersOut, nil
}

Expand All @@ -541,7 +557,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i
return providers
}

func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo, closestPeersOut chan []peer.ID) {
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, seedPeerOpts dhtrouting.SeedPeersOptions, peerOut chan peer.AddrInfo, closestPeersOut chan []peer.ID) {
defer close(peerOut)
defer close(closestPeersOut)

Expand Down Expand Up @@ -620,6 +636,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash
func() bool {
return !findAll && ps.Size() >= count
},
seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers,
)

if lookupRes != nil {
Expand All @@ -639,6 +656,12 @@ func (dht *IpfsDHT) FindPeerExtended(ctx context.Context, id peer.ID, opts ...ro

logger.Debugw("finding peer", "peer", id)

var cfg routing.Options
if err := cfg.Apply(opts...); err != nil {
return peer.AddrInfo{}, nil, err
}
seedPeerOpts := dhtrouting.GetSeedPeers(&cfg)

// Check if were already connected to them
if pi := dht.FindLocal(id); pi.ID != "" {
return pi, nil, nil
Expand Down Expand Up @@ -670,6 +693,7 @@ func (dht *IpfsDHT) FindPeerExtended(ctx context.Context, id peer.ID, opts ...ro
func() bool {
return dht.host.Network().Connectedness(id) == network.Connected
},
seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers,
)

if err != nil {
Expand Down

0 comments on commit 9b4bfb9

Please sign in to comment.