diff --git a/routing.go b/routing.go index af1710aab..5fabc4b04 100644 --- a/routing.go +++ b/routing.go @@ -102,20 +102,16 @@ func (dht *IpfsDHT) PutValueExtended(ctx context.Context, key string, value []by // PutValue adds value corresponding to given Key. // This is the top level "Store" operation of the DHT - -func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) { - _, err = dht.PutValueExtended(ctx, key, value, opts...) +func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) error { + _, err := dht.PutValueExtended(ctx, key, value, opts...) return err } // RecvdVal stores a value and the peer from which we got the value. -type RecvdVal struct { - Val []byte - From peer.ID -} +type RecvdVal dhtrouting.RecvdVal // GetValue searches for the value corresponding to given Key. -func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) { +func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { if !dht.enableValues { return nil, routing.ErrNotSupported } @@ -152,50 +148,49 @@ func (dht *IpfsDHT) SearchValueExtended(ctx context.Context, key string, opts .. return nil, nil, err } - responsesNeeded := 0 - if !cfg.Offline { - responsesNeeded = dhtrouting.GetQuorum(&cfg) - } - - seedPeerOpts := dhtrouting.GetSeedPeers(&cfg) - - stopCh := make(chan struct{}) - valCh, lookupRes := dht.getValues(ctx, key, seedPeerOpts, stopCh) - - out := make(chan []byte) - peers := make(chan []peer.ID, 1) - go func() { - defer close(out) - defer close(peers) - best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded) - - var l *lookupWithFollowupResult - select { - case l = <-lookupRes: - case <-ctx.Done(): - return + processors := dhtrouting.GetProcessors(&cfg) + if processors == nil { + validation := &dhtrouting.ValidationFilter{ + Key: key, + Validator: dht.Validator, } - if l == nil { - return + quorum := &dhtrouting.CountStopper{ + MaxCount: dhtrouting.GetQuorum(&cfg), } - if best == nil || aborted { - peers <- l.peers - return + bestValue := &dhtrouting.BestValueFilterRecorder{ + Key: key, + Validator: dht.Validator, + PeersWithBest: make(map[peer.ID]struct{}), } - updatePeers := make([]peer.ID, 0, dht.bucketSize) - for _, p := range l.peers { - if _, ok := peersWithBest[p]; !ok { - updatePeers = append(updatePeers, p) + processors = []dhtrouting.Processor{validation, quorum, bestValue} + } + + return dhtrouting.SearchValue(ctx, key, dht.getValues, processors, + func(ctx context.Context, best []byte, closestPeers []peer.ID) { + fixupRec := record.MakePutRecord(key, best) + for _, p := range closestPeers { + go func(p peer.ID) { + //TODO: Is this possible? + if p == dht.self { + err := dht.putLocal(key, fixupRec) + if err != nil { + logger.Error("Error correcting local dht entry:", err) + } + return + } + ctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + err := dht.protoMessenger.PutValue(ctx, p, fixupRec) + if err != nil { + logger.Debug("Error correcting DHT entry: ", err) + } + }(p) } - } - - dht.updatePeerValues(dht.Context(), key, best, updatePeers) - }() - - return out, peers, nil + }, + &cfg) } // SearchValue searches for the value corresponding to given Key and streams the results. @@ -204,28 +199,6 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing return out, err } -func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{}, - out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) { - numResponses := 0 - return dht.processValues(ctx, key, valCh, - func(ctx context.Context, v RecvdVal, better bool) bool { - numResponses++ - if better { - select { - case out <- v.Val: - case <-ctx.Done(): - return false - } - } - - if nvals > 0 && numResponses > nvals { - close(stopCh) - return true - } - return false - }) -} - // GetValues gets nvals values corresponding to the given key. func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) { if !dht.enableValues { @@ -234,11 +207,14 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R queryCtx, cancel := context.WithCancel(ctx) defer cancel() - valCh, _ := dht.getValues(queryCtx, key, dhtrouting.SeedPeersOptions{UseRTPeers: true}, nil) + valCh, _ := dht.getValues(queryCtx, context.Background(), key, dhtrouting.SeedPeersOptions{ + SeedPeers: nil, + UseRTPeers: true, + }) out := make([]RecvdVal, 0, nvals) for val := range valCh { - out = append(out, val) + out = append(out, RecvdVal(val)) if len(out) == nvals { cancel() } @@ -247,80 +223,15 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R return out, ctx.Err() } -func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal, - newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) { -loop: - for { - if aborted { - return - } - - select { - case v, ok := <-vals: - if !ok { - break loop - } - - // Select best value - if best != nil { - if bytes.Equal(best, v.Val) { - peersWithBest[v.From] = struct{}{} - aborted = newVal(ctx, v, false) - continue - } - sel, err := dht.Validator.Select(key, [][]byte{best, v.Val}) - if err != nil { - logger.Warnw("failed to select best value", "key", internal.LoggableRecordKeyString(key), "error", err) - continue - } - if sel != 1 { - aborted = newVal(ctx, v, false) - continue - } - } - peersWithBest = make(map[peer.ID]struct{}) - peersWithBest[v.From] = struct{}{} - best = v.Val - aborted = newVal(ctx, v, true) - case <-ctx.Done(): - return - } - } - - return -} - -func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) { - fixupRec := record.MakePutRecord(key, val) - for _, p := range peers { - go func(p peer.ID) { - //TODO: Is this possible? - if p == dht.self { - err := dht.putLocal(key, fixupRec) - if err != nil { - logger.Error("Error correcting local dht entry:", err) - } - return - } - ctx, cancel := context.WithTimeout(ctx, time.Second*30) - defer cancel() - err := dht.protoMessenger.PutValue(ctx, p, fixupRec) - if err != nil { - logger.Debug("Error correcting DHT entry: ", err) - } - }(p) - } -} - -func (dht *IpfsDHT) getValues(ctx context.Context, key string, seedPeerOpts dhtrouting.SeedPeersOptions, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) { - valCh := make(chan RecvdVal, 1) - lookupResCh := make(chan *lookupWithFollowupResult, 1) +func (dht *IpfsDHT) getValues(ctx, queryAbortedCtx context.Context, key string, seedPeerOpts dhtrouting.SeedPeersOptions) (<-chan dhtrouting.RecvdVal, <-chan []peer.ID) { + valCh := make(chan dhtrouting.RecvdVal, 1) + closestPeersCh := make(chan []peer.ID, 1) logger.Debugw("finding value", "key", internal.LoggableRecordKeyString(key)) if rec, err := dht.getLocal(key); rec != nil && err == nil { select { - case valCh <- RecvdVal{ + case valCh <- dhtrouting.RecvdVal{ Val: rec.GetValue(), From: dht.self, }: @@ -330,7 +241,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, seedPeerOpts dhtr go func() { defer close(valCh) - defer close(lookupResCh) + defer close(closestPeersCh) lookupRes, err := dht.runLookupWithFollowup(ctx, key, func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command @@ -359,7 +270,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, seedPeerOpts dhtr // TODO: What should happen if the record is invalid? // Pre-existing code counted it towards the quorum, but should it? if rec != nil && rec.GetValue() != nil { - rv := RecvdVal{ + rv := dhtrouting.RecvdVal{ Val: rec.GetValue(), From: p, } @@ -382,7 +293,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, seedPeerOpts dhtr }, func() bool { select { - case <-stopQuery: + case <-queryAbortedCtx.Done(): return true default: return false @@ -394,14 +305,14 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, seedPeerOpts dhtr if err != nil { return } - lookupResCh <- lookupRes + closestPeersCh <- lookupRes.peers - if ctx.Err() == nil { + if ctx.Err() == nil && seedPeerOpts.SeedPeers == nil { dht.refreshRTIfNoShortcut(kb.ConvertKey(key), lookupRes) } }() - return valCh, lookupResCh + return valCh, closestPeersCh } func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollowupResult) { @@ -496,8 +407,8 @@ func (dht *IpfsDHT) ProvideExtended(ctx context.Context, key cid.Cid, brdcst boo } // Provide makes this node announce that it can provide a value for the given key -func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { - _, err = dht.ProvideExtended(ctx, key, brdcst) +func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) error { + _, err := dht.ProvideExtended(ctx, key, brdcst) return err } @@ -530,21 +441,23 @@ func (dht *IpfsDHT) FindProvidersAsyncExtended(ctx context.Context, key cid.Cid, return nil, nil, err } - count := dhtrouting.GetQuorum(&cfg) - seedPeerOpts := dhtrouting.GetSeedPeers(&cfg) + keyMH := key.Hash() - chSize := count - if count == 0 { - chSize = 1 - } - peerOut := make(chan peer.AddrInfo, chSize) - closestPeersOut := make(chan []peer.ID, 1) + processors := dhtrouting.GetProcessors(&cfg) + if processors == nil { + newValuesOnly := &dhtrouting.NewPeerIDFilter{ + Key: string(keyMH), + Peers: peer.NewSet(), + } - keyMH := key.Hash() + quorum := &dhtrouting.CountStopper{ + MaxCount: dhtrouting.GetQuorum(&cfg), + } - logger.Debugw("finding providers", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH)) - go dht.findProvidersAsyncRoutine(ctx, keyMH, count, seedPeerOpts, peerOut, closestPeersOut) - return peerOut, closestPeersOut, nil + processors = []dhtrouting.Processor{newValuesOnly, quorum} + } + + return dhtrouting.FindProviders(ctx, keyMH, dht.findProvidersAsyncRoutine, processors, &cfg) } // FindProvidersAsync is the same thing as FindProviders, but returns a channel. @@ -557,95 +470,92 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i return providers } -func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, seedPeerOpts dhtrouting.SeedPeersOptions, peerOut chan peer.AddrInfo, closestPeersOut chan []peer.ID) { - defer close(peerOut) - defer close(closestPeersOut) +func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx, queryAbortedCtx context.Context, key multihash.Multihash, seedPeerOpts dhtrouting.SeedPeersOptions) (<-chan peer.AddrInfo, <-chan []peer.ID) { + logger.Debugw("finding providers", "key", key) - findAll := count == 0 - var ps *peer.Set - if findAll { - ps = peer.NewSet() - } else { - ps = peer.NewLimitedSet(count) - } + provsCh := make(chan peer.AddrInfo, 1) + closestPeersCh := make(chan []peer.ID, 1) - provs := dht.ProviderManager.GetProviders(ctx, key) - for _, p := range provs { - // NOTE: Assuming that this list of peers is unique - if ps.TryAdd(p) { + go func() { + defer close(provsCh) + defer close(closestPeersCh) + + provs := dht.ProviderManager.GetProviders(ctx, key) + for _, p := range provs { pi := dht.peerstore.PeerInfo(p) select { - case peerOut <- pi: + case provsCh <- pi: case <-ctx.Done(): return } } - // If we have enough peers locally, don't bother with remote RPC - // TODO: is this a DOS vector? - if !findAll && ps.Size() >= count { - return - } - } - - lookupRes, err := dht.runLookupWithFollowup(ctx, string(key), - func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { - // For DHT query command - routing.PublishQueryEvent(ctx, &routing.QueryEvent{ - Type: routing.SendingQuery, - ID: p, - }) + lookupRes, err := dht.runLookupWithFollowup(ctx, string(key), + func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { + // For DHT query command + routing.PublishQueryEvent(ctx, &routing.QueryEvent{ + Type: routing.SendingQuery, + ID: p, + }) - provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key) - if err != nil { - return nil, err - } + provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key) + if err != nil { + return nil, err + } - logger.Debugf("%d provider entries", len(provs)) + logger.Debugf("%d provider entries", len(provs)) - // Add unique providers from request, up to 'count' - for _, prov := range provs { - dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL) - logger.Debugf("got provider: %s", prov) - if ps.TryAdd(prov.ID) { - logger.Debugf("using provider: %s", prov) + // Add unique providers from request, up to 'count' + for _, prov := range provs { + dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL) + logger.Debugf("got provider: %s", prov) select { - case peerOut <- *prov: + case provsCh <- *prov: case <-ctx.Done(): logger.Debug("context timed out sending more providers") return nil, ctx.Err() } } - if !findAll && ps.Size() >= count { - logger.Debugf("got enough providers (%d/%d)", ps.Size(), count) - return nil, nil - } - } - // Give closer peers back to the query to be queried - logger.Debugf("got closer peers: %d %s", len(closest), closest) + // Give closer peers back to the query to be queried + logger.Debugf("got closer peers: %d %s", len(closest), closest) - routing.PublishQueryEvent(ctx, &routing.QueryEvent{ - Type: routing.PeerResponse, - ID: p, - Responses: closest, - }) + routing.PublishQueryEvent(ctx, &routing.QueryEvent{ + Type: routing.PeerResponse, + ID: p, + Responses: closest, + }) - return closest, nil - }, - func() bool { - return !findAll && ps.Size() >= count - }, - seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers, - ) + return closest, nil + }, + func() bool { + select { + case <-queryAbortedCtx.Done(): + return true + default: + return false + } + }, + seedPeerOpts.SeedPeers, seedPeerOpts.UseRTPeers, + ) - if lookupRes != nil { - closestPeersOut <- lookupRes.peers - } + if err != nil { + return + } + closestPeersCh <- lookupRes.peers - if err == nil && ctx.Err() == nil { - dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes) - } + if ctx.Err() == nil && seedPeerOpts.SeedPeers == nil { + dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes) + } + }() + + return provsCh, closestPeersCh +} + +// FindPeer searches for a peer with given ID. +func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { + pid, _, err := dht.FindPeerExtended(ctx, id) + return pid, err } // FindPeerExtended searches for a peer with given ID. @@ -720,9 +630,3 @@ func (dht *IpfsDHT) FindPeerExtended(ctx context.Context, id peer.ID, opts ...ro return peer.AddrInfo{}, lookupRes.peers, routing.ErrNotFound } - -// FindPeer searches for a peer with given ID. -func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { - pid, _, err := dht.FindPeerExtended(ctx, id) - return pid, err -} diff --git a/routing/options.go b/routing/options.go index 7f3d1615d..b009e0122 100644 --- a/routing/options.go +++ b/routing/options.go @@ -58,3 +58,49 @@ func GetSeedPeers(opts *routing.Options) SeedPeersOptions { } return seedPeersOpts } + +type updateDuringGetOptionKey struct{} + +// UpdateDuringGet is a DHT option that tells the DHT if it should update peers with +// old data while doing a Get +// +// Default: true for Get/SearchValue, and false otherwise +func UpdateDuringGet(updateDuringGet bool) routing.Option { + return func(opts *routing.Options) error { + if opts.Other == nil { + opts.Other = make(map[interface{}]interface{}, 1) + } + opts.Other[updateDuringGetOptionKey{}] = updateDuringGet + return nil + } +} + +func getUpdateDuringGet(opts *routing.Options, defaulValue bool) bool { + updateDuringGet, ok := opts.Other[updateDuringGetOptionKey{}].(bool) + if !ok { + updateDuringGet = defaulValue + } + return updateDuringGet +} + +type processorsOptionKey struct{} + +// WithProcessors is a DHT option that tells the DHT which processors it should use +// (and the order to apply them) on lookup results. +func WithProcessors(processors ...Processor) routing.Option { + return func(opts *routing.Options) error { + if opts.Other == nil { + opts.Other = make(map[interface{}]interface{}, 1) + } + opts.Other[processorsOptionKey{}] = processors + return nil + } +} + +func GetProcessors(opts *routing.Options) []Processor { + processors, ok := opts.Other[processorsOptionKey{}].([]Processor) + if !ok { + processors = nil + } + return processors +} diff --git a/routing/pipeline.go b/routing/pipeline.go new file mode 100644 index 000000000..39d800c8a --- /dev/null +++ b/routing/pipeline.go @@ -0,0 +1,93 @@ +package routing + +import ( + "context" + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/peer" +) + +var ( + logger = logging.Logger("dht.routing") +) + +// RecvdVal stores a value and the peer from which we got the value. +type RecvdVal struct { + Val []byte + From peer.ID +} + +func RunRecordPipeline(ctx context.Context, processors []Processor, abortQuery func(), out chan<- []byte, in <-chan RecvdVal) <-chan error { + errCh := make(chan error, 1) + go func() { + defer close(errCh) + processInput: + for { + select { + case i, more := <-in: + if !more { + return + } + var state interface{} = i + var err error + for _, t := range processors { + state, err = t.Process(state, abortQuery) + if err != nil { + continue processInput + + } + } + + finalState := state.(RecvdVal) + select { + case out <- finalState.Val: + case <-ctx.Done(): + errCh <- ctx.Err() + return + } + + case <-ctx.Done(): + errCh <- ctx.Err() + return + } + } + }() + return errCh +} + +func RunProvidersPipeline(ctx context.Context, processors []Processor, abortQuery func(), out chan<- peer.AddrInfo, in <-chan peer.AddrInfo) <-chan error { + errCh := make(chan error, 1) + go func() { + defer close(errCh) + processInput: + for { + select { + case i, more := <-in: + if !more { + return + } + var state interface{} = i + var err error + for _, t := range processors { + state, err = t.Process(state, abortQuery) + if err != nil { + continue processInput + + } + } + + finalState := state.(peer.AddrInfo) + select { + case out <- finalState: + case <-ctx.Done(): + errCh <- ctx.Err() + return + } + + case <-ctx.Done(): + errCh <- ctx.Err() + return + } + } + }() + return errCh +} diff --git a/routing/pipes.go b/routing/pipes.go new file mode 100644 index 000000000..7377eca47 --- /dev/null +++ b/routing/pipes.go @@ -0,0 +1,97 @@ +package routing + +import ( + "bytes" + "errors" + "github.com/libp2p/go-libp2p-core/peer" + record "github.com/libp2p/go-libp2p-record" +) + +type Processor interface { + Process(interface{}, func()) (interface{}, error) +} + +var skipErr = errors.New("skip value") + +type CountStopper struct { + Count int + MaxCount int +} + +func (f *CountStopper) Process(val interface{}, abortQuery func()) (interface{}, error) { + f.Count++ + if f.MaxCount > 0 && f.Count >= f.MaxCount { + abortQuery() + } + return val, nil +} + +type ValidationFilter struct { + Key string + Validator record.Validator +} + +func (f *ValidationFilter) Process(val interface{}, _ func()) (interface{}, error) { + v := val.(RecvdVal) + err := f.Validator.Validate(f.Key, v.Val) + if err != nil { + return nil, err + } + return v, nil +} + +type BestValueFilterRecorder struct { + Key string + Best []byte + Validator record.Validator + PeersWithBest map[peer.ID]struct{} +} + +func (p *BestValueFilterRecorder) Process(val interface{}, _ func()) (interface{}, error) { + v := val.(RecvdVal) + + // Select best value + if p.Best != nil { + if bytes.Equal(p.Best, v.Val) { + p.PeersWithBest[v.From] = struct{}{} + return nil, skipErr + } + newIsBetter, err := p.getBetterRecord(p.Key, p.Best, v.Val) + if err != nil { + logger.Warnw("failed to select best value", "key", p.Key, "error", err) + return nil, skipErr + } + if !newIsBetter { + return nil, skipErr + } + } + p.PeersWithBest = make(map[peer.ID]struct{}) + p.PeersWithBest[v.From] = struct{}{} + p.Best = v.Val + return v, nil +} + +func (p *BestValueFilterRecorder) getBetterRecord(key string, current, new []byte) (bool, error) { + sel, err := p.Validator.Select(key, [][]byte{current, new}) + if err != nil { + return false, err + } + return sel == 1, nil +} + +type NewPeerIDFilter struct { + Key string + Peers *peer.Set +} + +func (p *NewPeerIDFilter) Process(val interface{}, _ func()) (interface{}, error) { + prov := val.(peer.AddrInfo) + + logger.Debugf("got provider: %s", prov) + if p.Peers.TryAdd(prov.ID) { + logger.Debugf("using provider: %s", prov) + return prov, nil + } + + return nil, skipErr +} diff --git a/routing/wrapper.go b/routing/wrapper.go new file mode 100644 index 000000000..62e999ea2 --- /dev/null +++ b/routing/wrapper.go @@ -0,0 +1,91 @@ +package routing + +import ( + "context" + "github.com/multiformats/go-multihash" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/routing" +) + +type getValuesFn func(ctx, queryAbortedCtx context.Context, key string, seedPeerOpts SeedPeersOptions) (<-chan RecvdVal, <-chan []peer.ID) +type updatePeerValuesFn func(ctx context.Context, best []byte, closestPeers []peer.ID) + +// SearchValue searches for the value corresponding to given Key and streams the results. +func SearchValue(ctx context.Context, key string, getVals getValuesFn, processors []Processor, updatePeerValues updatePeerValuesFn, cfg *routing.Options) (<-chan []byte, <-chan []peer.ID, error) { + seedPeerOpts := GetSeedPeers(cfg) + updateDuringGet := getUpdateDuringGet(cfg, true) + queryAbortCtx, abortQuery := context.WithCancel(context.Background()) + valCh, closestPeersCh := getVals(ctx, queryAbortCtx, key, seedPeerOpts) + + out := make(chan []byte) + returnClosestPeersCh := make(chan []peer.ID, 1) + + var closestPeers []peer.ID + + go func() { + defer close(out) + defer close(returnClosestPeersCh) + defer abortQuery() + errCh := RunRecordPipeline(ctx, processors, abortQuery, out, valCh) + if err := <-errCh; err != nil { + return + } + + closestPeers = <-closestPeersCh + if updateDuringGet && updatePeerValues != nil { + for _, f := range processors { + if bv, ok := f.(*BestValueFilterRecorder); ok { + if bv.Best == nil { + break + } + + updatePeers := make([]peer.ID, 0, len(closestPeers)) + for _, p := range closestPeers { + if _, ok := bv.PeersWithBest[p]; !ok { + updatePeers = append(updatePeers, p) + } + } + updatePeerValues(ctx, bv.Best, updatePeers) + break + } + } + } + returnClosestPeersCh <- closestPeers + }() + + return out, returnClosestPeersCh, nil +} + +type findProvsFn func(ctx, queryAbortedCtx context.Context, key multihash.Multihash, seedPeerOpts SeedPeersOptions) (<-chan peer.AddrInfo, <-chan []peer.ID) + +// FindProviders searches for the providers corresponding to given Key and streams the results. +func FindProviders(ctx context.Context, key multihash.Multihash, findProvsFn findProvsFn, processors []Processor, cfg *routing.Options) (<-chan peer.AddrInfo, <-chan []peer.ID, error) { + seedPeerOpts := GetSeedPeers(cfg) + maxRequestedRecords := GetQuorum(cfg) + + queryAbortCtx, abortQuery := context.WithCancel(context.Background()) + valCh, closestPeersCh := findProvsFn(ctx, queryAbortCtx, key, seedPeerOpts) + + outChSize := maxRequestedRecords + if outChSize == 0 { + outChSize = 1 + } + out := make(chan peer.AddrInfo, outChSize) + returnClosestPeersCh := make(chan []peer.ID, 1) + + go func() { + defer close(out) + defer close(returnClosestPeersCh) + defer abortQuery() + errCh := RunProvidersPipeline(ctx, processors, abortQuery, out, valCh) + if err := <-errCh; err != nil { + return + } + + closestPeers := <-closestPeersCh + returnClosestPeersCh <- closestPeers + }() + + return out, returnClosestPeersCh, nil +}