Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

chore: use new content and peer routing apis #181

Merged
merged 2 commits into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@
"it-pipe": "^1.1.0",
"k-bucket": "^5.0.0",
"libp2p-crypto": "~0.17.1",
"libp2p-interfaces": "^0.2.3",
"libp2p-interfaces": "^0.3.0",
"libp2p-record": "~0.7.0",
"multiaddr": "^7.4.3",
"multihashes": "~0.4.15",
"multihashing-async": "~0.8.0",
"p-filter": "^2.1.0",
Expand All @@ -60,7 +61,6 @@
"p-timeout": "^3.2.0",
"p-times": "^2.1.0",
"peer-id": "~0.13.5",
"peer-info": "~0.17.0",
"promise-to-callback": "^1.0.0",
"protons": "^1.0.1",
"streaming-iterables": "^4.1.1",
Expand Down
2 changes: 1 addition & 1 deletion src/content-fetching/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ module.exports = (dht) => {
if (localRec) {
vals.push({
val: localRec.value,
from: dht.peerInfo.id
from: dht.peerId
})
}

Expand Down
31 changes: 17 additions & 14 deletions src/content-routing/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
const errcode = require('err-code')
const pTimeout = require('p-timeout')

const PeerInfo = require('peer-info')

const c = require('../constants')
const LimitedPeerList = require('../peer-list/limited-peer-list')
const Message = require('../message')
Expand Down Expand Up @@ -38,10 +36,12 @@ module.exports = (dht) => {
const errors = []

// Add peer as provider
await dht.providers.addProvider(key, dht.peerInfo.id)
await dht.providers.addProvider(key, dht.peerId)

const msg = new Message(Message.TYPES.ADD_PROVIDER, key.buffer, 0)
msg.providerPeers = [dht.peerInfo]
msg.providerPeers = [{
id: dht.peerId
}]

// Notify closest peers
for await (const peer of dht.getClosestPeers(key.buffer)) {
Expand All @@ -58,7 +58,7 @@ module.exports = (dht) => {
// This should be infrequent. This means a peer we previously connected
// to failed to exchange the provide message. If getClosestPeers was an
// iterator, we could continue to pull until we announce to kBucketSize peers.
throw errcode(`Failed to provide to ${errors.length} of ${dht.kBucketSize} peers`, 'ERR_SOME_PROVIDES_FAILED', { errors })
throw errcode(new Error(`Failed to provide to ${errors.length} of ${dht.kBucketSize} peers`, 'ERR_SOME_PROVIDES_FAILED'), { errors })
}
},

Expand All @@ -68,7 +68,7 @@ module.exports = (dht) => {
* @param {Object} options - findProviders options
* @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000)
* @param {number} options.maxNumProviders - maximum number of providers to find
* @returns {AsyncIterable<PeerInfo>}
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async * findProviders (key, options = {}) {
const providerTimeout = options.timeout || c.minute
Expand All @@ -80,15 +80,18 @@ module.exports = (dht) => {
const provs = await dht.providers.getProviders(key)

provs.forEach((id) => {
const info = dht.peerStore.get(id) || new PeerInfo(id)
out.push(info)
const peerData = dht.peerStore.get(id) || {}
out.push({
id: peerData.id || id,
multiaddrs: (peerData.multiaddrInfos || []).map((mi) => mi.multiaddr)
})
})

// All done
if (out.length >= n) {
// yield values
for (const pInfo of out.toArray()) {
yield pInfo
for (const pData of out.toArray()) {
yield pData
}
return
}
Expand All @@ -105,10 +108,10 @@ module.exports = (dht) => {
return async (peer) => {
const msg = await findProvidersSingle(peer, key)
const provs = msg.providerPeers
dht._log('(%s) found %s provider entries', dht.peerInfo.id.toB58String(), provs.length)
dht._log('(%s) found %s provider entries', dht.peerId.toB58String(), provs.length)

provs.forEach((prov) => {
pathProviders.push(prov)
pathProviders.push({ id: prov.id })
})

// hooray we have all that we want
Expand Down Expand Up @@ -147,8 +150,8 @@ module.exports = (dht) => {
throw errcode(new Error('no providers found'), 'ERR_NOT_FOUND')
}

for (const pInfo of out.toArray()) {
yield pInfo
for (const pData of out.toArray()) {
yield pData
}
}
}
Expand Down
59 changes: 26 additions & 33 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const errcode = require('err-code')

const libp2pRecord = require('libp2p-record')
const { MemoryDatastore } = require('interface-datastore')
const PeerInfo = require('peer-info')

const RoutingTable = require('./routing')
const utils = require('./utils')
Expand Down Expand Up @@ -40,7 +39,7 @@ class KadDHT extends EventEmitter {
* Create a new KadDHT.
* @param {Object} props
* @param {Dialer} props.dialer libp2p dialer instance
* @param {PeerInfo} props.peerInfo peer's peerInfo
* @param {PeerId} props.peerId peer's peerId
* @param {PeerStore} props.peerStore libp2p peerStore
* @param {Object} props.registrar libp2p registrar instance
* @param {function} props.registrar.handle
Expand All @@ -55,7 +54,7 @@ class KadDHT extends EventEmitter {
*/
constructor ({
dialer,
peerInfo,
peerId,
peerStore,
registrar,
datastore = new MemoryDatastore(),
Expand All @@ -78,10 +77,10 @@ class KadDHT extends EventEmitter {
this.dialer = dialer

/**
* Local peer info
* @type {PeerInfo}
* Local peer-id
* @type {PeerId}
*/
this.peerInfo = peerInfo
this.peerId = peerId

/**
* Local PeerStore
Expand Down Expand Up @@ -120,7 +119,7 @@ class KadDHT extends EventEmitter {
*
* @type {RoutingTable}
*/
this.routingTable = new RoutingTable(this.peerInfo.id, this.kBucketSize)
this.routingTable = new RoutingTable(this.peerId, this.kBucketSize)

/**
* Reference to the datastore, uses an in-memory store if none given.
Expand All @@ -134,7 +133,7 @@ class KadDHT extends EventEmitter {
*
* @type {Providers}
*/
this.providers = new Providers(this.datastore, this.peerInfo.id)
this.providers = new Providers(this.datastore, this.peerId)

this.validators = {
pk: libp2pRecord.validator.validators.pk,
Expand All @@ -148,7 +147,7 @@ class KadDHT extends EventEmitter {

this.network = new Network(this)

this._log = utils.logger(this.peerInfo.id)
this._log = utils.logger(this.peerId)

/**
* Random walk management
Expand Down Expand Up @@ -259,11 +258,11 @@ class KadDHT extends EventEmitter {
* @param {Object} options - findProviders options
* @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000)
* @param {number} options.maxNumProviders - maximum number of providers to find
* @returns {AsyncIterable<PeerInfo>}
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async * findProviders (key, options = {}) {
for await (const pInfo of this.contentRouting.findProviders(key, options)) {
yield pInfo
for await (const peerData of this.contentRouting.findProviders(key, options)) {
yield peerData
}
}

Expand All @@ -275,7 +274,7 @@ class KadDHT extends EventEmitter {
* @param {PeerId} id
* @param {Object} options - findPeer options
* @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000)
* @returns {Promise<PeerInfo>}
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async findPeer (id, options = {}) { // eslint-disable-line require-await
return this.peerRouting.findPeer(id, options)
Expand All @@ -286,7 +285,7 @@ class KadDHT extends EventEmitter {
* @param {Buffer} key
* @param {Object} [options]
* @param {boolean} [options.shallow] shallow query (default: false)
* @returns {AsyncIterable<PeerId>}
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async * getClosestPeers (key, options = { shallow: false }) {
for await (const pId of this.peerRouting.getClosestPeers(key, options)) {
Expand Down Expand Up @@ -319,7 +318,7 @@ class KadDHT extends EventEmitter {
* the message.
*
* @param {Message} msg
* @returns {Promise<Array<PeerInfo>>}
* @returns {Promise<Array<{ id: PeerId, multiaddrs: Multiaddr[] }>>}
* @private
*/
async _nearestPeersToQuery (msg) {
Expand All @@ -328,14 +327,11 @@ class KadDHT extends EventEmitter {

return ids.map((p) => {
const peer = this.peerStore.get(p)
const peerInfo = new PeerInfo(p)

if (peer) {
peer.protocols.forEach((p) => peerInfo.protocols.add(p))
peer.multiaddrInfos.forEach((mi) => peerInfo.multiaddrs.add(mi.multiaddr))
return {
id: p,
multiaddrs: peer ? peer.multiaddrInfos.map((mi) => mi.multiaddr) : []
}

return peerInfo
})
}

Expand All @@ -344,12 +340,11 @@ class KadDHT extends EventEmitter {
* than self.
*
* @param {Message} msg
* @param {PeerInfo} peer
* @returns {Promise<Array<PeerInfo>>}
* @param {PeerId} peerId
* @returns {Promise<Array<{ id: PeerId, multiaddrs: Multiaddr[] }>>}
* @private
*/

async _betterPeersToQuery (msg, peer) {
async _betterPeersToQuery (msg, peerId) {
this._log('betterPeersToQuery')
const closer = await this._nearestPeersToQuery(msg)

Expand All @@ -360,7 +355,7 @@ class KadDHT extends EventEmitter {
return false
}

return !closer.id.isEqual(peer.id)
return !closer.id.isEqual(peerId)
})
}

Expand Down Expand Up @@ -411,14 +406,12 @@ class KadDHT extends EventEmitter {

/**
* Add the peer to the routing table and update it in the peerStore.
*
* @param {PeerInfo} peer
* @param {PeerId} peerId
* @returns {Promise<void>}
* @private
*/

async _add (peer) {
await this.routingTable.add(peer.id)
async _add (peerId) {
await this.routingTable.add(peerId)
}

/**
Expand All @@ -445,7 +438,7 @@ class KadDHT extends EventEmitter {
*/

_isSelf (other) {
return other && this.peerInfo.id.id.equals(other.id)
return other && this.peerId.id.equals(other.id)
}

/**
Expand Down Expand Up @@ -478,7 +471,7 @@ class KadDHT extends EventEmitter {
*
* @param {PeerId} peer
* @param {Buffer} key
* @returns {Promise<{Record, Array<PeerInfo}>}
* @returns {Promise<{Record, Array<{ id: PeerId, multiaddrs: Multiaddr[] }}>}
* @private
*/

Expand Down
23 changes: 8 additions & 15 deletions src/message/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
const multiaddr = require('multiaddr')
const protons = require('protons')
const { Record } = require('libp2p-record')

Expand Down Expand Up @@ -97,25 +97,18 @@ Message.TYPES = MESSAGE_TYPE
Message.CONNECTION_TYPES = CONNECTION_TYPE

function toPbPeer (peer) {
const res = {
return {
id: peer.id.id,
addrs: peer.multiaddrs.toArray().map((m) => m.buffer)
addrs: (peer.multiaddrs || []).map((m) => m.buffer),
connection: CONNECTION_TYPE.CONNECTED
}

if (peer.isConnected()) {
res.connection = CONNECTION_TYPE.CONNECTED
} else {
res.connection = CONNECTION_TYPE.NOT_CONNECTED
}

return res
}

function fromPbPeer (peer) {
const info = new PeerInfo(new PeerId(peer.id))
peer.addrs.forEach((a) => info.multiaddrs.add(a))

return info
return {
id: new PeerId(peer.id),
multiaddrs: peer.addrs.map((a) => multiaddr(a))
}
}

module.exports = Message
10 changes: 5 additions & 5 deletions src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Network {
constructor (self) {
this.dht = self
this.readMessageTimeout = c.READ_MESSAGE_TIMEOUT
this._log = utils.logger(this.dht.peerInfo.id, 'net')
this._log = utils.logger(this.dht.peerId, 'net')
this._rpc = rpc(this.dht)
this._onPeerConnected = this._onPeerConnected.bind(this)
this._running = false
Expand Down Expand Up @@ -96,12 +96,12 @@ class Network {
/**
* Registrar notifies a connection successfully with dht protocol.
* @private
* @param {PeerInfo} peerInfo remote peer info
* @param {PeerId} peerId remote peer id
* @returns {Promise<void>}
*/
async _onPeerConnected (peerInfo) {
await this.dht._add(peerInfo)
this._log('added to the routing table: %s', peerInfo.id.toB58String())
async _onPeerConnected (peerId) {
await this.dht._add(peerId)
this._log('added to the routing table: %s', peerId.toB58String())
}

/**
Expand Down
Loading