From 2745279f095cdcaf8ad27d5117a94135a4ffdb3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Kr=C3=BCger?= Date: Fri, 15 Jun 2018 13:32:29 +0200 Subject: [PATCH] feat: Rewrite using exchange --- src/index.js | 170 +++++++++++------------------- src/sig-server/bin.js | 28 ----- src/sig-server/config.js | 21 ---- src/sig-server/index.html | 65 ------------ src/sig-server/index.js | 56 ---------- src/sig-server/routes-ws/index.js | 122 --------------------- src/utils.js | 49 --------- 7 files changed, 59 insertions(+), 452 deletions(-) delete mode 100755 src/sig-server/bin.js delete mode 100644 src/sig-server/config.js delete mode 100644 src/sig-server/index.html delete mode 100644 src/sig-server/index.js delete mode 100644 src/sig-server/routes-ws/index.js delete mode 100644 src/utils.js diff --git a/src/index.js b/src/index.js index e8911fd6..edb6aff8 100644 --- a/src/index.js +++ b/src/index.js @@ -2,52 +2,35 @@ const debug = require('debug') const log = debug('libp2p:webrtc-star') + const multiaddr = require('multiaddr') const mafmt = require('mafmt') +const Id = require('peer-id') + const withIs = require('class-is') -const io = require('socket.io-client') const EE = require('events').EventEmitter +const assert = require('assert') + const SimplePeer = require('simple-peer') -const PeerId = require('peer-id') -const PeerInfo = require('peer-info') +const webrtcSupport = require('webrtcsupport') + const Connection = require('interface-connection').Connection const toPull = require('stream-to-pull-stream') -const once = require('once') -const setImmediate = require('async/setImmediate') -const webrtcSupport = require('webrtcsupport') -const utils = require('./utils') -const cleanUrlSIO = utils.cleanUrlSIO -const cleanMultiaddr = utils.cleanMultiaddr +const setImmediate = require('async/setImmediate') +const once = require('once') const noop = once(() => {}) -const sioOptions = { - transports: ['websocket'], - 'force new connection': true -} - class WebRTCStar { constructor (options) { options = options || {} - this.maSelf = undefined - - this.sioOptions = { - transports: ['websocket'], - 'force new connection': true - } - if (options.wrtc) { this.wrtc = options.wrtc } - this.discovery = new EE() - this.discovery.tag = 'webRTCStar' - this.discovery.start = (callback) => { setImmediate(callback) } - this.discovery.stop = (callback) => { setImmediate(callback) } - - this.listenersRefs = {} - this._peerDiscovered = this._peerDiscovered.bind(this) + assert(options.exchange, 'Exchange missing!') + this.exchange = options.exchange } dial (ma, options, callback) { @@ -58,10 +41,9 @@ class WebRTCStar { callback = callback ? once(callback) : noop - const intentId = (~~(Math.random() * 1e9)).toString(36) + Date.now() + let b58 = ma.toString().split('ipfs/').pop() - const sioClient = this - .listenersRefs[Object.keys(this.listenersRefs)[0]].io + log('dialing %s %s', ma, b58) const spOptions = { initiator: true, trickle: false } @@ -74,43 +56,41 @@ class WebRTCStar { let connected = false channel.on('signal', (signal) => { - sioClient.emit('ss-handshake', { - intentId: intentId, - srcMultiaddr: this.maSelf.toString(), - dstMultiaddr: ma.toString(), - signal: signal - }) - }) - - channel.once('timeout', () => callback(new Error('timeout'))) - - channel.once('error', (err) => { - if (!connected) { callback(err) } - }) + log('dial#%s got signal', ma) + this.exchange.request(Id.createFromB58String(b58), 'webrtc', Buffer.from(JSON.stringify({signal, ma: '/ip4/0.0.0.0/tcp/127.0.0.1'})), (err, result) => { // TODO: fix this + if (err) { + log('dial#%s exchange failed %s', ma, err) + return callback(err) + } - // NOTE: aegir segfaults if we do .once on the socket.io event emitter and we - // are clueless as to why. - sioClient.on('ws-handshake', (offer) => { - if (offer.intentId === intentId && offer.err) { - return callback(new Error(offer.err)) - } + let offer + try { + offer = JSON.parse(String(result)) + } catch (err) { + log('dial#%s malformed response %s', ma, err) + return callback(err) + } - if (offer.intentId !== intentId || !offer.answer) { - return - } + channel.once('connect', () => { + log('dial#%s connected', ma) + connected = true + conn.destroy = channel.destroy.bind(channel) - channel.once('connect', () => { - connected = true - conn.destroy = channel.destroy.bind(channel) + channel.once('close', () => conn.destroy()) - channel.once('close', () => conn.destroy()) + conn.getObservedAddrs = (callback) => callback(null, [ma]) - conn.getObservedAddrs = (callback) => callback(null, [ma]) + callback(null, conn) + }) - callback(null, conn) + channel.signal(offer.signal) }) + }) - channel.signal(offer.signal) + channel.once('timeout', () => callback(new Error('timeout'))) + + channel.once('error', (err) => { + if (!connected) { callback(err) } }) return conn @@ -131,36 +111,19 @@ class WebRTCStar { return setImmediate(() => callback(new Error('no WebRTC support'))) } - this.maSelf = ma + log('listening on %s', ma) - const sioUrl = cleanUrlSIO(ma) + let ns = listener.ns = 'webrtc' // TODO: should this be ma.toString() ? + listener.ma = ma - log('Dialing to Signalling Server on: ' + sioUrl) - - listener.io = io.connect(sioUrl, sioOptions) - - listener.io.once('connect_error', callback) - listener.io.once('error', (err) => { - listener.emit('error', err) - listener.emit('close') - }) + this.exchange.listen(ns, (request, cb) => { + let offer - listener.io.on('ws-handshake', incommingDial) - listener.io.on('ws-peer', this._peerDiscovered) - - listener.io.on('connect', () => { - listener.io.emit('ss-join', ma.toString()) - }) - - listener.io.once('connect', () => { - listener.emit('listening') - callback() - }) - - const self = this - function incommingDial (offer) { - if (offer.answer || offer.err) { - return + try { + offer = JSON.parse(String(request)) + } catch (err) { + log('got malformed offer', err) + return cb(err) } const spOptions = { trickle: false } @@ -173,8 +136,10 @@ class WebRTCStar { const conn = new Connection(toPull.duplex(channel)) channel.once('connect', () => { + log('connected') + conn.getObservedAddrs = (callback) => { - return callback(null, [offer.srcMultiaddr]) + return callback(null, [multiaddr(offer.ma)]) // TODO: this isn't really safe AT ALL... } listener.emit('connection', conn) @@ -182,31 +147,26 @@ class WebRTCStar { }) channel.once('signal', (signal) => { - offer.signal = signal - offer.answer = true - listener.io.emit('ss-handshake', offer) + log('sending back signal') + cb(null, Buffer.from(JSON.stringify({signal, ma: listener.ma.toString()}))) }) channel.signal(offer.signal) - } + }) } listener.close = (callback) => { callback = callback ? once(callback) : noop - listener.io.emit('ss-leave') + this.exchange.unhandle(listener.ns) - setImmediate(() => { - listener.emit('close') - callback() - }) + setImmediate(callback) } listener.getAddrs = (callback) => { - setImmediate(() => callback(null, [this.maSelf])) + setImmediate(() => callback(null, listener.ma ? [listener.ma] : [])) } - this.listenersRefs[multiaddr.toString()] = listener return listener } @@ -223,18 +183,6 @@ class WebRTCStar { return mafmt.WebRTCStar.matches(ma) }) } - - _peerDiscovered (maStr) { - log('Peer Discovered:', maStr) - maStr = cleanMultiaddr(maStr) - - const split = maStr.split('/ipfs/') - const peerIdStr = split[split.length - 1] - const peerId = PeerId.createFromB58String(peerIdStr) - const peerInfo = new PeerInfo(peerId) - peerInfo.multiaddrs.add(multiaddr(maStr)) - this.discovery.emit('peer', peerInfo) - } } module.exports = withIs(WebRTCStar, { className: 'WebRTCStar', symbolName: '@libp2p/js-libp2p-webrtc-star/webrtcstar' }) diff --git a/src/sig-server/bin.js b/src/sig-server/bin.js deleted file mode 100755 index cdf29684..00000000 --- a/src/sig-server/bin.js +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env node - -'use strict' - -const signalling = require('./index') -const argv = require('minimist')(process.argv.slice(2)) - -let server - -signalling.start({ - port: argv.port || argv.p || process.env.PORT || 9090, - host: argv.host || argv.h || process.env.HOST || '0.0.0.0', - metrics: !(argv.disableMetrics || process.env.DISABLE_METRICS) -}, (err, _server) => { - if (err) { - throw err - } - server = _server - - console.log('Listening on:', server.info.uri) -}) - -process.on('SIGINT', () => { - server.stop(() => { - console.log('Signalling server stopped') - process.exit() - }) -}) diff --git a/src/sig-server/config.js b/src/sig-server/config.js deleted file mode 100644 index c8003d29..00000000 --- a/src/sig-server/config.js +++ /dev/null @@ -1,21 +0,0 @@ -'use strict' - -const debug = require('debug') -const log = debug('signalling-server') -log.error = debug('signalling-server:error') - -module.exports = { - log: log, - hapi: { - port: process.env.PORT || 13579, - host: '0.0.0.0', - options: { - connections: { - routes: { - cors: true - } - } - } - }, - refreshPeerListIntervalMS: 10000 -} diff --git a/src/sig-server/index.html b/src/sig-server/index.html deleted file mode 100644 index e927bddf..00000000 --- a/src/sig-server/index.html +++ /dev/null @@ -1,65 +0,0 @@ - - - - - - - Signalling Server - - - - - -
-
- Libp2p Logo -

This is a libp2p-webrtc-star signalling-server

-

Signaling Servers are used in libp2p to allow browsers and clients with restricted port-forwarding
to communicate with other peers in the libp2p network

-
- ยป Learn more -
- - - - - diff --git a/src/sig-server/index.js b/src/sig-server/index.js deleted file mode 100644 index baccf966..00000000 --- a/src/sig-server/index.js +++ /dev/null @@ -1,56 +0,0 @@ -'use strict' - -const Hapi = require('hapi') -const config = require('./config') -const log = config.log -const epimetheus = require('epimetheus') -const path = require('path') - -exports = module.exports - -exports.start = (options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - - const port = options.port || config.hapi.port - const host = options.host || config.hapi.host - - const http = new Hapi.Server(config.hapi.options) - - http.connection({ - port: port, - host: host - }) - - http.register({ register: require('inert') }, (err) => { - if (err) { - return callback(err) - } - - http.start((err) => { - if (err) { - return callback(err) - } - - log('signaling server has started on: ' + http.info.uri) - - http.peers = require('./routes-ws')(http, options.metrics).peers - - http.route({ - method: 'GET', - path: '/', - handler: (request, reply) => reply.file(path.join(__dirname, 'index.html'), { - confine: false - }) - }) - - callback(null, http) - }) - - if (options.metrics) { epimetheus.instrument(http) } - }) - - return http -} diff --git a/src/sig-server/routes-ws/index.js b/src/sig-server/routes-ws/index.js deleted file mode 100644 index a3ddadeb..00000000 --- a/src/sig-server/routes-ws/index.js +++ /dev/null @@ -1,122 +0,0 @@ -'use strict' - -const config = require('../config') -const log = config.log -const SocketIO = require('socket.io') -const client = require('prom-client') - -const fake = { - gauge: { - set: () => {} - }, - counter: { - inc: () => {} - } -} - -module.exports = (http, hasMetrics) => { - const io = new SocketIO(http.listener) - io.on('connection', handle) - - const peers = {} - - const peersMetric = hasMetrics ? new client.Gauge({ name: 'signalling_peers', help: 'peers online now' }) : fake.gauge - const dialsSuccessTotal = hasMetrics ? new client.Counter({ name: 'signalling_dials_total_success', help: 'sucessfully completed dials since server started' }) : fake.counter - const dialsFailureTotal = hasMetrics ? new client.Counter({ name: 'signalling_dials_total_failure', help: 'failed dials since server started' }) : fake.counter - const dialsTotal = hasMetrics ? new client.Counter({ name: 'signalling_dials_total', help: 'all dials since server started' }) : fake.counter - const joinsSuccessTotal = hasMetrics ? new client.Counter({ name: 'signalling_joins_total_success', help: 'sucessfully completed joins since server started' }) : fake.counter - const joinsFailureTotal = hasMetrics ? new client.Counter({ name: 'signalling_joins_total_failure', help: 'failed joins since server started' }) : fake.counter - const joinsTotal = hasMetrics ? new client.Counter({ name: 'signalling_joins_total', help: 'all joins since server started' }) : fake.counter - - const refreshMetrics = () => peersMetric.set(Object.keys(peers).length) - - this.peers = () => { - return peers - } - - function safeEmit (addr, event, arg) { - const peer = peers[addr] - if (!peer) { - log('trying to emit %s but peer is gone', event) - return - } - - peer.emit(event, arg) - } - - function handle (socket) { - socket.on('ss-join', join.bind(socket)) - socket.on('ss-leave', leave.bind(socket)) - socket.on('disconnect', disconnect.bind(socket)) // socket.io own event - socket.on('ss-handshake', forwardHandshake) - } - - // join this signaling server network - function join (multiaddr) { - joinsTotal.inc() - if (!multiaddr) { return joinsFailureTotal.inc() } - const socket = peers[multiaddr] = this // socket - let refreshInterval = setInterval(sendPeers, config.refreshPeerListIntervalMS) - - socket.once('ss-leave', stopSendingPeers) - socket.once('disconnect', stopSendingPeers) - - sendPeers() - - function sendPeers () { - Object.keys(peers).forEach((mh) => { - if (mh === multiaddr) { - return - } - safeEmit(mh, 'ws-peer', multiaddr) - }) - } - - function stopSendingPeers () { - if (refreshInterval) { - clearInterval(refreshInterval) - refreshInterval = null - } - } - - joinsSuccessTotal.inc() - refreshMetrics() - } - - function leave (multiaddr) { - if (!multiaddr) { return } - if (peers[multiaddr]) { - delete peers[multiaddr] - refreshMetrics() - } - } - - function disconnect () { - Object.keys(peers).forEach((mh) => { - if (peers[mh].id === this.id) { - delete peers[mh] - } - refreshMetrics() - }) - } - - // forward an WebRTC offer to another peer - function forwardHandshake (offer) { - dialsTotal.inc() - if (offer == null || typeof offer !== 'object' || !offer.srcMultiaddr || !offer.dstMultiaddr) { return dialsFailureTotal.inc() } - if (offer.answer) { - dialsSuccessTotal.inc() - safeEmit(offer.srcMultiaddr, 'ws-handshake', offer) - } else { - if (peers[offer.dstMultiaddr]) { - safeEmit(offer.dstMultiaddr, 'ws-handshake', offer) - } else { - dialsFailureTotal.inc() - offer.err = 'peer is not available' - safeEmit(offer.srcMultiaddr, 'ws-handshake', offer) - } - } - } - - return this -} diff --git a/src/utils.js b/src/utils.js deleted file mode 100644 index 13b39d58..00000000 --- a/src/utils.js +++ /dev/null @@ -1,49 +0,0 @@ -'use strict' - -const multiaddr = require('multiaddr') - -function cleanUrlSIO (ma) { - const maStrSplit = ma.toString().split('/') - const tcpProto = ma.protos()[1].name - const wsProto = ma.protos()[2].name - const tcpPort = ma.stringTuples()[1][1] - - if (tcpProto !== 'tcp' || (wsProto !== 'ws' && wsProto !== 'wss')) { - throw new Error('invalid multiaddr: ' + ma.toString()) - } - - if (!multiaddr.isName(ma)) { - return 'http://' + maStrSplit[2] + ':' + maStrSplit[4] - } - - if (wsProto === 'ws') { - return 'http://' + maStrSplit[2] + (tcpPort === 80 ? '' : ':' + tcpPort) - } - - if (wsProto === 'wss') { - return 'https://' + maStrSplit[2] + (tcpPort === 443 ? '' : ':' + tcpPort) - } -} - -function cleanMultiaddr (maStr) { - const legacy = '/libp2p-webrtc-star' - - if (maStr.indexOf(legacy) !== -1) { - maStr = maStr.substring(legacy.length, maStr.length) - let ma = multiaddr(maStr) - const tuppleIPFS = ma.stringTuples().filter((tupple) => { - return tupple[0] === 421 // ipfs code - })[0] - - ma = ma.decapsulate('ipfs') - ma = ma.encapsulate('/p2p-webrtc-star') - ma = ma.encapsulate(`/ipfs/${tuppleIPFS[1]}`) - maStr = ma.toString() - } - - return maStr -} - -exports = module.exports -exports.cleanUrlSIO = cleanUrlSIO -exports.cleanMultiaddr = cleanMultiaddr