Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
chore: use new content and peer routing apis (#181)
Browse files Browse the repository at this point in the history
* chore: use new content and peer routing apis

* chore: address review
  • Loading branch information
vasco-santos authored Apr 24, 2020
1 parent 129566f commit 568c19c
Show file tree
Hide file tree
Showing 42 changed files with 533 additions and 586 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@
"it-pipe": "^1.1.0",
"k-bucket": "^5.0.0",
"libp2p-crypto": "~0.17.1",
"libp2p-interfaces": "^0.2.3",
"libp2p-interfaces": "^0.3.0",
"libp2p-record": "~0.7.0",
"multiaddr": "^7.4.3",
"multihashes": "~0.4.15",
"multihashing-async": "~0.8.0",
"p-filter": "^2.1.0",
Expand All @@ -60,7 +61,6 @@
"p-timeout": "^3.2.0",
"p-times": "^2.1.0",
"peer-id": "~0.13.5",
"peer-info": "~0.17.0",
"promise-to-callback": "^1.0.0",
"protons": "^1.0.1",
"streaming-iterables": "^4.1.1",
Expand Down
2 changes: 1 addition & 1 deletion src/content-fetching/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ module.exports = (dht) => {
if (localRec) {
vals.push({
val: localRec.value,
from: dht.peerInfo.id
from: dht.peerId
})
}

Expand Down
31 changes: 17 additions & 14 deletions src/content-routing/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
const errcode = require('err-code')
const pTimeout = require('p-timeout')

const PeerInfo = require('peer-info')

const c = require('../constants')
const LimitedPeerList = require('../peer-list/limited-peer-list')
const Message = require('../message')
Expand Down Expand Up @@ -38,10 +36,12 @@ module.exports = (dht) => {
const errors = []

// Add peer as provider
await dht.providers.addProvider(key, dht.peerInfo.id)
await dht.providers.addProvider(key, dht.peerId)

const msg = new Message(Message.TYPES.ADD_PROVIDER, key.buffer, 0)
msg.providerPeers = [dht.peerInfo]
msg.providerPeers = [{
id: dht.peerId
}]

// Notify closest peers
for await (const peer of dht.getClosestPeers(key.buffer)) {
Expand All @@ -58,7 +58,7 @@ module.exports = (dht) => {
// This should be infrequent. This means a peer we previously connected
// to failed to exchange the provide message. If getClosestPeers was an
// iterator, we could continue to pull until we announce to kBucketSize peers.
throw errcode(`Failed to provide to ${errors.length} of ${dht.kBucketSize} peers`, 'ERR_SOME_PROVIDES_FAILED', { errors })
throw errcode(new Error(`Failed to provide to ${errors.length} of ${dht.kBucketSize} peers`, 'ERR_SOME_PROVIDES_FAILED'), { errors })
}
},

Expand All @@ -68,7 +68,7 @@ module.exports = (dht) => {
* @param {Object} options - findProviders options
* @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000)
* @param {number} options.maxNumProviders - maximum number of providers to find
* @returns {AsyncIterable<PeerInfo>}
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async * findProviders (key, options = {}) {
const providerTimeout = options.timeout || c.minute
Expand All @@ -80,15 +80,18 @@ module.exports = (dht) => {
const provs = await dht.providers.getProviders(key)

provs.forEach((id) => {
const info = dht.peerStore.get(id) || new PeerInfo(id)
out.push(info)
const peerData = dht.peerStore.get(id) || {}
out.push({
id: peerData.id || id,
multiaddrs: (peerData.multiaddrInfos || []).map((mi) => mi.multiaddr)
})
})

// All done
if (out.length >= n) {
// yield values
for (const pInfo of out.toArray()) {
yield pInfo
for (const pData of out.toArray()) {
yield pData
}
return
}
Expand All @@ -105,10 +108,10 @@ module.exports = (dht) => {
return async (peer) => {
const msg = await findProvidersSingle(peer, key)
const provs = msg.providerPeers
dht._log('(%s) found %s provider entries', dht.peerInfo.id.toB58String(), provs.length)
dht._log('(%s) found %s provider entries', dht.peerId.toB58String(), provs.length)

provs.forEach((prov) => {
pathProviders.push(prov)
pathProviders.push({ id: prov.id })
})

// hooray we have all that we want
Expand Down Expand Up @@ -147,8 +150,8 @@ module.exports = (dht) => {
throw errcode(new Error('no providers found'), 'ERR_NOT_FOUND')
}

for (const pInfo of out.toArray()) {
yield pInfo
for (const pData of out.toArray()) {
yield pData
}
}
}
Expand Down
59 changes: 26 additions & 33 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const errcode = require('err-code')

const libp2pRecord = require('libp2p-record')
const { MemoryDatastore } = require('interface-datastore')
const PeerInfo = require('peer-info')

const RoutingTable = require('./routing')
const utils = require('./utils')
Expand Down Expand Up @@ -40,7 +39,7 @@ class KadDHT extends EventEmitter {
* Create a new KadDHT.
* @param {Object} props
* @param {Dialer} props.dialer libp2p dialer instance
* @param {PeerInfo} props.peerInfo peer's peerInfo
* @param {PeerId} props.peerId peer's peerId
* @param {PeerStore} props.peerStore libp2p peerStore
* @param {Object} props.registrar libp2p registrar instance
* @param {function} props.registrar.handle
Expand All @@ -55,7 +54,7 @@ class KadDHT extends EventEmitter {
*/
constructor ({
dialer,
peerInfo,
peerId,
peerStore,
registrar,
datastore = new MemoryDatastore(),
Expand All @@ -78,10 +77,10 @@ class KadDHT extends EventEmitter {
this.dialer = dialer

/**
* Local peer info
* @type {PeerInfo}
* Local peer-id
* @type {PeerId}
*/
this.peerInfo = peerInfo
this.peerId = peerId

/**
* Local PeerStore
Expand Down Expand Up @@ -120,7 +119,7 @@ class KadDHT extends EventEmitter {
*
* @type {RoutingTable}
*/
this.routingTable = new RoutingTable(this.peerInfo.id, this.kBucketSize)
this.routingTable = new RoutingTable(this.peerId, this.kBucketSize)

/**
* Reference to the datastore, uses an in-memory store if none given.
Expand All @@ -134,7 +133,7 @@ class KadDHT extends EventEmitter {
*
* @type {Providers}
*/
this.providers = new Providers(this.datastore, this.peerInfo.id)
this.providers = new Providers(this.datastore, this.peerId)

this.validators = {
pk: libp2pRecord.validator.validators.pk,
Expand All @@ -148,7 +147,7 @@ class KadDHT extends EventEmitter {

this.network = new Network(this)

this._log = utils.logger(this.peerInfo.id)
this._log = utils.logger(this.peerId)

/**
* Random walk management
Expand Down Expand Up @@ -259,11 +258,11 @@ class KadDHT extends EventEmitter {
* @param {Object} options - findProviders options
* @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000)
* @param {number} options.maxNumProviders - maximum number of providers to find
* @returns {AsyncIterable<PeerInfo>}
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async * findProviders (key, options = {}) {
for await (const pInfo of this.contentRouting.findProviders(key, options)) {
yield pInfo
for await (const peerData of this.contentRouting.findProviders(key, options)) {
yield peerData
}
}

Expand All @@ -275,7 +274,7 @@ class KadDHT extends EventEmitter {
* @param {PeerId} id
* @param {Object} options - findPeer options
* @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000)
* @returns {Promise<PeerInfo>}
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async findPeer (id, options = {}) { // eslint-disable-line require-await
return this.peerRouting.findPeer(id, options)
Expand All @@ -286,7 +285,7 @@ class KadDHT extends EventEmitter {
* @param {Buffer} key
* @param {Object} [options]
* @param {boolean} [options.shallow] shallow query (default: false)
* @returns {AsyncIterable<PeerId>}
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async * getClosestPeers (key, options = { shallow: false }) {
for await (const pId of this.peerRouting.getClosestPeers(key, options)) {
Expand Down Expand Up @@ -319,7 +318,7 @@ class KadDHT extends EventEmitter {
* the message.
*
* @param {Message} msg
* @returns {Promise<Array<PeerInfo>>}
* @returns {Promise<Array<{ id: PeerId, multiaddrs: Multiaddr[] }>>}
* @private
*/
async _nearestPeersToQuery (msg) {
Expand All @@ -328,14 +327,11 @@ class KadDHT extends EventEmitter {

return ids.map((p) => {
const peer = this.peerStore.get(p)
const peerInfo = new PeerInfo(p)

if (peer) {
peer.protocols.forEach((p) => peerInfo.protocols.add(p))
peer.multiaddrInfos.forEach((mi) => peerInfo.multiaddrs.add(mi.multiaddr))
return {
id: p,
multiaddrs: peer ? peer.multiaddrInfos.map((mi) => mi.multiaddr) : []
}

return peerInfo
})
}

Expand All @@ -344,12 +340,11 @@ class KadDHT extends EventEmitter {
* than self.
*
* @param {Message} msg
* @param {PeerInfo} peer
* @returns {Promise<Array<PeerInfo>>}
* @param {PeerId} peerId
* @returns {Promise<Array<{ id: PeerId, multiaddrs: Multiaddr[] }>>}
* @private
*/

async _betterPeersToQuery (msg, peer) {
async _betterPeersToQuery (msg, peerId) {
this._log('betterPeersToQuery')
const closer = await this._nearestPeersToQuery(msg)

Expand All @@ -360,7 +355,7 @@ class KadDHT extends EventEmitter {
return false
}

return !closer.id.isEqual(peer.id)
return !closer.id.isEqual(peerId)
})
}

Expand Down Expand Up @@ -411,14 +406,12 @@ class KadDHT extends EventEmitter {

/**
* Add the peer to the routing table and update it in the peerStore.
*
* @param {PeerInfo} peer
* @param {PeerId} peerId
* @returns {Promise<void>}
* @private
*/

async _add (peer) {
await this.routingTable.add(peer.id)
async _add (peerId) {
await this.routingTable.add(peerId)
}

/**
Expand All @@ -445,7 +438,7 @@ class KadDHT extends EventEmitter {
*/

_isSelf (other) {
return other && this.peerInfo.id.id.equals(other.id)
return other && this.peerId.id.equals(other.id)
}

/**
Expand Down Expand Up @@ -478,7 +471,7 @@ class KadDHT extends EventEmitter {
*
* @param {PeerId} peer
* @param {Buffer} key
* @returns {Promise<{Record, Array<PeerInfo}>}
* @returns {Promise<{Record, Array<{ id: PeerId, multiaddrs: Multiaddr[] }}>}
* @private
*/

Expand Down
23 changes: 8 additions & 15 deletions src/message/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
const multiaddr = require('multiaddr')
const protons = require('protons')
const { Record } = require('libp2p-record')

Expand Down Expand Up @@ -97,25 +97,18 @@ Message.TYPES = MESSAGE_TYPE
Message.CONNECTION_TYPES = CONNECTION_TYPE

function toPbPeer (peer) {
const res = {
return {
id: peer.id.id,
addrs: peer.multiaddrs.toArray().map((m) => m.buffer)
addrs: (peer.multiaddrs || []).map((m) => m.buffer),
connection: CONNECTION_TYPE.CONNECTED
}

if (peer.isConnected()) {
res.connection = CONNECTION_TYPE.CONNECTED
} else {
res.connection = CONNECTION_TYPE.NOT_CONNECTED
}

return res
}

function fromPbPeer (peer) {
const info = new PeerInfo(new PeerId(peer.id))
peer.addrs.forEach((a) => info.multiaddrs.add(a))

return info
return {
id: new PeerId(peer.id),
multiaddrs: peer.addrs.map((a) => multiaddr(a))
}
}

module.exports = Message
10 changes: 5 additions & 5 deletions src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Network {
constructor (self) {
this.dht = self
this.readMessageTimeout = c.READ_MESSAGE_TIMEOUT
this._log = utils.logger(this.dht.peerInfo.id, 'net')
this._log = utils.logger(this.dht.peerId, 'net')
this._rpc = rpc(this.dht)
this._onPeerConnected = this._onPeerConnected.bind(this)
this._running = false
Expand Down Expand Up @@ -96,12 +96,12 @@ class Network {
/**
* Registrar notifies a connection successfully with dht protocol.
* @private
* @param {PeerInfo} peerInfo remote peer info
* @param {PeerId} peerId remote peer id
* @returns {Promise<void>}
*/
async _onPeerConnected (peerInfo) {
await this.dht._add(peerInfo)
this._log('added to the routing table: %s', peerInfo.id.toB58String())
async _onPeerConnected (peerId) {
await this.dht._add(peerId)
this._log('added to the routing table: %s', peerId.toB58String())
}

/**
Expand Down
Loading

0 comments on commit 568c19c

Please sign in to comment.