Skip to content

Commit

Permalink
feat: mdns discovery example works
Browse files Browse the repository at this point in the history
  • Loading branch information
SgtPooki committed Dec 13, 2023
1 parent afa522f commit ce4ce5d
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 133 deletions.
33 changes: 33 additions & 0 deletions examples/helia-lan-server-discovery/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
This example shows how you can use mdns to connect two nodes. Either server/client node can be run first.

Both scripts (src/server.js & src/client.js) will create a helia node, and subscribe to a known pubsub topic, and shut each other down (for ease of testing).

Note: No WAN functionality is enabled, so only nodes on your local network can help with peer-discovery, and only nodes on your local network can be discovered as the code currently stands.. If you want to enable connecting to nodes outside of your WAN, you will need to connect to a bootstrap node.

### General flow

When you run these two scripts, the general flow works like this:

1. Each node subscribes to the known pubsub topic.
1. When the client node detects a subscription change on the pubsub topic, it will send a `wut-CID` message to the server node.
1. The server node will respond to the `wut-CID` message with the string representation of a CID the server node is providing.
1. The client node will request the content for that CID via `heliaDagCbor.get(CID.parse(msg))`
1. Once the content is received, the client will publish a `done` message to the pubsub topic.
1. The server node will detect the `done` message, and respond with a `done-ACK` message.
1. The client node will detect the `done-ACK` message, respond with a `done-ACK` message of it's own, and shutdown after a timeout (to allow for the message to be sent)
1. The server node will detect the `done-ACK` message, and shutdown immediately.

### Testing

Both scripts should be able to be run in any order, and the flow should work as expected. You can run `npm run test` to check this. The test will fail if the test runs for more than 10 seconds, or errors, but this failure mode is dependent upon the `timeout` command currently.


### Further exploration of this example

Some things to try:

1. See if you can get ping/pong messages working without the nodes shutting down
1. Run the server node from one computer on your local network, and the client node from another computer on your local network
1. Try removing the shutdown code from the scripts, and see if you can get multiple clients to connect
1. See if you can get the server to respond with a list of CIDs in it's blockstore, and have the client choose which one to request
1. See if you can connect to bootstrap nodes with one of your nodes, and use the other node as a LAN only node.
31 changes: 24 additions & 7 deletions examples/helia-lan-server-discovery/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,35 @@
"type": "module",
"description": "Get two helia nodes to connect on LAN without supplying address",
"scripts": {
"server": "cross-env DEBUG=\"\" node src/server.js",
"client": "cross-env DEBUG=\"\" node src/client.js",
"test": "NODE_OPTIONS=--experimental-vm-modules jest"
"server": "cross-env node src/server.js",
"client": "cross-env node src/client.js",
"test": "npm run test1 && npm run test2",
"test1": "timeout 10 npm run server & npm run client",
"test2": "timeout 10 npm run client & npm run server"
},
"dependencies": {
"@helia/dag-cbor": "^1.0.2",
"helia": "^2.0.1"
"@chainsafe/libp2p-gossipsub": "^11.0.1",
"@chainsafe/libp2p-noise": "^14.1.0",
"@chainsafe/libp2p-yamux": "^6.0.1",
"@helia/dag-cbor": "^1.0.3",
"@libp2p/autonat": "^1.0.5",
"@libp2p/circuit-relay-v2": "^1.0.7",
"@libp2p/identify": "^1.0.6",
"@libp2p/kad-dht": "^11.0.7",
"@libp2p/mdns": "^10.0.7",
"@libp2p/mplex": "^10.0.7",
"@libp2p/ping": "^1.0.6",
"@libp2p/tcp": "^9.0.7",
"@libp2p/webrtc": "^4.0.10",
"@libp2p/websockets": "^8.0.7",
"@libp2p/webtransport": "^4.0.10",
"helia": "^2.1.0",
"libp2p": "^1.0.10"
},
"devDependencies": {
"@types/jest": "^29.5.2",
"@types/jest": "^29.5.11",
"cross-env": "^7.0.3",
"jest": "^29.6.2",
"jest": "^29.7.0",
"ts-jest": "^29.1.1"
}
}
51 changes: 25 additions & 26 deletions examples/helia-lan-server-discovery/src/client.js
Original file line number Diff line number Diff line change
@@ -1,34 +1,33 @@
/* eslint-disable no-console */
// client.js
import { dagCbor } from '@helia/dag-cbor'
import { CID } from 'multiformats/cid'
import { readFile, writeFile } from 'node:fs/promises'
import { comms, connectedPeers, logDiscoveredPeers, getHelia, logConnectedPeers } from './utils.js'
import { comms, getHelia, pubSubTopic } from './utils.js'

const cidString = await readFile('cid.txt', 'utf-8')
const cid = CID.parse(cidString)

const helia = await getHelia()
await writeFile('client-peerId.txt', helia.libp2p.peerId.toString())
await logConnectedPeers('client-connectedPeers.txt', helia)
await logDiscoveredPeers('client-discoveredPeers.txt', helia)
const helia = await getHelia('client')
const heliaDagCbor = dagCbor(helia)
await connectedPeers(helia)

// await comms(helia, cidString)

// while(helia.libp2p.services.pubsub.getSubscribers(cidString).length === 0) {
// // console.log(`pubsubPeers: `, helia.libp2p.services.pubsub.getSubscribers(cidString))
// await new Promise((resolve) => setTimeout(resolve, 1000))
// }
// console.log(`pubsubPeers: `, helia.libp2p.services.pubsub.getSubscribers(cidString))

// await helia.libp2p.services.pubsub.publish(cidString, 'hello world')

console.log('requesting CID: %s', cidString)
const data = await heliaDagCbor.get(cid, {
// onProgress: (progress) => {
// console.log('progress:', progress)
// }
helia.libp2p.services.pubsub.addEventListener('subscription-change', (evt) => {
if (helia.libp2p.services.pubsub.getSubscribers(pubSubTopic).length !== 0) {
// we're subscribed, and so is another node, so request the CID
helia.libp2p.services.pubsub?.publish(pubSubTopic, new TextEncoder().encode('wut-CID'))
}
})

console.log(data)
await comms(helia, pubSubTopic, 'client', async (msg) => {
if (msg === 'pong') {
// helia.libp2p.services.pubsub?.publish(pubSubTopic, new TextEncoder().encode('done'))
} else if (msg === 'done-ACK') {
helia.libp2p.services.pubsub?.publish(pubSubTopic, new TextEncoder().encode('done-ACK'))
setTimeout(async () => {
await helia.stop()
process.exit(0)
}, 500)
} else {
// msg is a CID (response to wut-CID request), so fetch it
console.log('requesting CID: %s', msg)
const data = await heliaDagCbor.get(CID.parse(msg))
console.log('got CID data: ', data)
helia.libp2p.services.pubsub?.publish(pubSubTopic, new TextEncoder().encode('done'))
}
})
37 changes: 20 additions & 17 deletions examples/helia-lan-server-discovery/src/server.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
/* eslint-disable no-console */
// server.js
import { randomUUID } from 'node:crypto'
import { dagCbor } from '@helia/dag-cbor'
import { writeFile } from 'node:fs/promises'
import { comms, connectedPeers, logDiscoveredPeers, getHelia, logConnectedPeers } from './utils.js'
import { comms, getHelia, pubSubTopic } from './utils.js'

const helia = await getHelia('server')

const helia = await getHelia()
await writeFile('server-peerId.txt', helia.libp2p.peerId.toString())
logConnectedPeers('server-connectedPeers.txt', helia)
logDiscoveredPeers('server-discoveredPeers.txt', helia)
const heliaDagCbor = dagCbor(helia)
const cid = await heliaDagCbor.add(randomUUID())
await writeFile('cid.txt', cid.toString())
const uuid = `${new Date().toLocaleString()}: My test string that you only know if you're in the same pubsub channel as me and request my CID`
const cid = await heliaDagCbor.add(uuid)
const cidString = cid.toString()

console.log('CID: %s', cid.toString())


await connectedPeers(helia)
// for await (const event of helia.libp2p.services.dht.provide(cid)) {
// console.log('event', event)
// }
// await comms(helia, cid.toString())
console.log('CID: %s', cidString)

await comms(helia, pubSubTopic, 'server', async (msg) => {
if (msg === 'done') {
helia.libp2p.services.pubsub?.publish(pubSubTopic, new TextEncoder().encode('done-ACK'))
} else if (msg === 'ping') {
helia.libp2p.services.pubsub?.publish(pubSubTopic, new TextEncoder().encode('pong'))
} else if (msg === 'done-ACK') {
// other node sent done-ack and should have shut down, now it's our turn to shut down.
await helia.stop()
process.exit(0)
} else if (msg === 'wut-CID') {
helia.libp2p.services.pubsub?.publish(pubSubTopic, new TextEncoder().encode(cidString))
}
})
136 changes: 53 additions & 83 deletions examples/helia-lan-server-discovery/src/utils.js
Original file line number Diff line number Diff line change
@@ -1,137 +1,107 @@
import { writeFile } from 'node:fs/promises'
import { createHelia } from 'helia'
/* eslint-disable no-console */
import { gossipsub } from '@chainsafe/libp2p-gossipsub'
import { createLibp2p } from 'libp2p'
import { tcp } from '@libp2p/tcp'
import { mplex } from '@libp2p/mplex'
import { identifyService } from 'libp2p/identify'
import { noise } from '@chainsafe/libp2p-noise'
import { yamux } from '@chainsafe/libp2p-yamux'
import { webSockets } from '@libp2p/websockets'
import { bootstrap } from '@libp2p/bootstrap'
import { MemoryDatastore } from 'datastore-core'
import { pingService } from 'libp2p/ping'
import { autoNAT as autoNATService } from '@libp2p/autonat'
import { identify as identifyService } from '@libp2p/identify'
import { kadDHT } from '@libp2p/kad-dht'
import { autoNATService } from 'libp2p/autonat'
// import { uPnPNATService } from 'libp2p/upnp-nat'
import { webTransport } from '@libp2p/webtransport'
import { circuitRelayTransport } from 'libp2p/circuit-relay'
import { webRTC, webRTCDirect } from '@libp2p/webrtc'
import { mdns } from '@libp2p/mdns'
import { ping as pingService } from '@libp2p/ping'
import { tcp } from '@libp2p/tcp'
import { webRTC, webRTCDirect } from '@libp2p/webrtc'
import { webSockets } from '@libp2p/websockets'
import { MemoryDatastore } from 'datastore-core'
import { createHelia } from 'helia'
import { bitswap } from 'helia/block-brokers'
import { createLibp2p } from 'libp2p'

// @ts-check

export async function getHelia() {
export async function getHelia (clientName) {
const datastore = new MemoryDatastore()

const libp2p = await createLibp2p({
datastore,
connectionManager: {
minConnections: 1
},
addresses: {
/**
* you have to make sure that listening multiaddrs are announced for mdns to work
*
* @see https://github.com/libp2p/js-libp2p/blob/742915567749072aa784cf179ce9810f66ac6c6e/packages/peer-discovery-mdns/src/query.ts#L87-L89
* @see https://github.com/libp2p/js-libp2p/blob/742915567749072aa784cf179ce9810f66ac6c6e/packages/peer-discovery-mdns/src/mdns.ts#L92-L101
*/
listen: [
'/webrtc',
'/tcp/0',
'/wss',
'/ip4/0.0.0.0/webrtc',
'/ip4/0.0.0.0/ws',
'/ip4/0.0.0.0/tcp/0'
]
},
// addresses: {
// listen: ['/ip4/0.0.0.0/tcp/0']
// },
transports: [
webRTC(),
webRTCDirect(),
webTransport(),
webSockets(),
circuitRelayTransport({
discoverRelays: 1
}),
tcp(),
tcp()
],
connectionEncryption: [
noise(),
noise()
],
streamMuxers: [
mplex(),
// mplex(),
yamux()
],
peerDiscovery: [
mdns(),
bootstrap({
list: [
"/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt"
]
mdns({
// broadcast: mdnsBroadcast
})
],
services: {
identify: identifyService(),
dht: kadDHT(),
ping: pingService({
protocolPrefix: 'ipfs'
}),
dht: kadDHT(),
pubsub: gossipsub(),
nat: autoNATService({
enabled: true,
enabled: true
})
}
})

const helia = await createHelia({
blockBrokers: [
bitswap()
],
datastore,
libp2p
})

const interval = setInterval(() => {
const multiaddrs = helia.libp2p.getMultiaddrs()
if (multiaddrs.length > 0) {
console.log('node multiaddrs:', multiaddrs)
clearInterval(interval)
}
}, 5000)
return helia
}

export async function logConnectedPeers(filename, helia) {
// clear out peers file
await writeFile(filename, '', { flag: 'w' })

helia.libp2p.addEventListener('peer:connect', async (connection) => {
const peer = connection.detail
await writeFile(filename, `${peer.toString()}\n`, { flag: 'a' })
})
}

export async function logDiscoveredPeers(filename, helia) {
// clear out peers file
await writeFile(filename, '', { flag: 'w' })

helia.libp2p.addEventListener('peer:discovery', async (connection) => {
const peer = connection.detail.id
await writeFile(filename, `${peer.toString()}\n`, { flag: 'a' })
console.log('%s discovered peer: ', clientName, peer.toString())
})
helia.libp2p.addEventListener('peer:connect', async (connection) => {
console.log('%s connected to peer: ', clientName, connection.detail.toString())
})
return helia
}

export async function connectedPeers(helia) {
// while we are not connected to any peers, wait
let attempt = 1

// console.log(`helia.libp2p.getPeers(): `, helia.libp2p.getPeers());
while (helia.libp2p.getPeers().length === 0) {
await new Promise((resolve) => setTimeout(resolve, attempt++ * 1000))
export async function comms (helia, topic, prefix, onMessage) {
if (helia.libp2p.services.pubsub == null) {
return
}
}

export async function comms(helia, topic) {
console.log('helia.libp2p.peerId %s subscribing to topic %s', helia.libp2p.peerId.toString(), topic)
if (onMessage == null) {
throw new Error('onMessage is required')
}
console.log('%s helia.libp2p.peerId %s subscribing to topic %s', prefix, helia.libp2p.peerId.toString(), topic)

helia.libp2p.services.pubsub.subscribe(topic)
await helia.libp2p.services.pubsub.addEventListener('message', (evt) => {
console.log(`evt: `, evt);
console.log(`evt.detail.topic: `, evt.detail.topic);
// if (evt.detail.topic === 'topic') {
// handle message
console.log('pubsub message received:', evt.detail.data.toString())
// }
helia.libp2p.services.pubsub?.subscribe(topic)
await helia.libp2p.services.pubsub?.addEventListener('message', async (evt) => {
const messageString = new TextDecoder().decode(evt.detail.data)
console.log('%s gossipsub:message received: %s', prefix, messageString)
await onMessage(messageString)
})
}

export const pubSubTopic = 'helia-lan-discovery'

0 comments on commit ce4ce5d

Please sign in to comment.