Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Commit

Permalink
feat: Rewrite using exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
mkg20001 committed Jun 15, 2018
1 parent 15efa97 commit 2745279
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 452 deletions.
170 changes: 59 additions & 111 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,35 @@

const debug = require('debug')
const log = debug('libp2p:webrtc-star')

const multiaddr = require('multiaddr')
const mafmt = require('mafmt')
const Id = require('peer-id')

const withIs = require('class-is')
const io = require('socket.io-client')
const EE = require('events').EventEmitter
const assert = require('assert')

const SimplePeer = require('simple-peer')
const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const webrtcSupport = require('webrtcsupport')

const Connection = require('interface-connection').Connection
const toPull = require('stream-to-pull-stream')
const once = require('once')
const setImmediate = require('async/setImmediate')
const webrtcSupport = require('webrtcsupport')
const utils = require('./utils')
const cleanUrlSIO = utils.cleanUrlSIO
const cleanMultiaddr = utils.cleanMultiaddr

const setImmediate = require('async/setImmediate')
const once = require('once')
const noop = once(() => {})

const sioOptions = {
transports: ['websocket'],
'force new connection': true
}

class WebRTCStar {
constructor (options) {
options = options || {}

this.maSelf = undefined

this.sioOptions = {
transports: ['websocket'],
'force new connection': true
}

if (options.wrtc) {
this.wrtc = options.wrtc
}

this.discovery = new EE()
this.discovery.tag = 'webRTCStar'
this.discovery.start = (callback) => { setImmediate(callback) }
this.discovery.stop = (callback) => { setImmediate(callback) }

this.listenersRefs = {}
this._peerDiscovered = this._peerDiscovered.bind(this)
assert(options.exchange, 'Exchange missing!')
this.exchange = options.exchange
}

dial (ma, options, callback) {
Expand All @@ -58,10 +41,9 @@ class WebRTCStar {

callback = callback ? once(callback) : noop

const intentId = (~~(Math.random() * 1e9)).toString(36) + Date.now()
let b58 = ma.toString().split('ipfs/').pop()

const sioClient = this
.listenersRefs[Object.keys(this.listenersRefs)[0]].io
log('dialing %s %s', ma, b58)

const spOptions = { initiator: true, trickle: false }

Expand All @@ -74,43 +56,41 @@ class WebRTCStar {
let connected = false

channel.on('signal', (signal) => {
sioClient.emit('ss-handshake', {
intentId: intentId,
srcMultiaddr: this.maSelf.toString(),
dstMultiaddr: ma.toString(),
signal: signal
})
})

channel.once('timeout', () => callback(new Error('timeout')))

channel.once('error', (err) => {
if (!connected) { callback(err) }
})
log('dial#%s got signal', ma)
this.exchange.request(Id.createFromB58String(b58), 'webrtc', Buffer.from(JSON.stringify({signal, ma: '/ip4/0.0.0.0/tcp/127.0.0.1'})), (err, result) => { // TODO: fix this
if (err) {
log('dial#%s exchange failed %s', ma, err)
return callback(err)
}

// NOTE: aegir segfaults if we do .once on the socket.io event emitter and we
// are clueless as to why.
sioClient.on('ws-handshake', (offer) => {
if (offer.intentId === intentId && offer.err) {
return callback(new Error(offer.err))
}
let offer
try {
offer = JSON.parse(String(result))
} catch (err) {
log('dial#%s malformed response %s', ma, err)
return callback(err)
}

if (offer.intentId !== intentId || !offer.answer) {
return
}
channel.once('connect', () => {
log('dial#%s connected', ma)
connected = true
conn.destroy = channel.destroy.bind(channel)

channel.once('connect', () => {
connected = true
conn.destroy = channel.destroy.bind(channel)
channel.once('close', () => conn.destroy())

channel.once('close', () => conn.destroy())
conn.getObservedAddrs = (callback) => callback(null, [ma])

conn.getObservedAddrs = (callback) => callback(null, [ma])
callback(null, conn)
})

callback(null, conn)
channel.signal(offer.signal)
})
})

channel.signal(offer.signal)
channel.once('timeout', () => callback(new Error('timeout')))

channel.once('error', (err) => {
if (!connected) { callback(err) }
})

return conn
Expand All @@ -131,36 +111,19 @@ class WebRTCStar {
return setImmediate(() => callback(new Error('no WebRTC support')))
}

this.maSelf = ma
log('listening on %s', ma)

const sioUrl = cleanUrlSIO(ma)
let ns = listener.ns = 'webrtc' // TODO: should this be ma.toString() ?
listener.ma = ma

log('Dialing to Signalling Server on: ' + sioUrl)

listener.io = io.connect(sioUrl, sioOptions)

listener.io.once('connect_error', callback)
listener.io.once('error', (err) => {
listener.emit('error', err)
listener.emit('close')
})
this.exchange.listen(ns, (request, cb) => {
let offer

listener.io.on('ws-handshake', incommingDial)
listener.io.on('ws-peer', this._peerDiscovered)

listener.io.on('connect', () => {
listener.io.emit('ss-join', ma.toString())
})

listener.io.once('connect', () => {
listener.emit('listening')
callback()
})

const self = this
function incommingDial (offer) {
if (offer.answer || offer.err) {
return
try {
offer = JSON.parse(String(request))
} catch (err) {
log('got malformed offer', err)
return cb(err)
}

const spOptions = { trickle: false }
Expand All @@ -173,40 +136,37 @@ class WebRTCStar {
const conn = new Connection(toPull.duplex(channel))

channel.once('connect', () => {
log('connected')

conn.getObservedAddrs = (callback) => {
return callback(null, [offer.srcMultiaddr])
return callback(null, [multiaddr(offer.ma)]) // TODO: this isn't really safe AT ALL...
}

listener.emit('connection', conn)
handler(conn)
})

channel.once('signal', (signal) => {
offer.signal = signal
offer.answer = true
listener.io.emit('ss-handshake', offer)
log('sending back signal')
cb(null, Buffer.from(JSON.stringify({signal, ma: listener.ma.toString()})))
})

channel.signal(offer.signal)
}
})
}

listener.close = (callback) => {
callback = callback ? once(callback) : noop

listener.io.emit('ss-leave')
this.exchange.unhandle(listener.ns)

setImmediate(() => {
listener.emit('close')
callback()
})
setImmediate(callback)
}

listener.getAddrs = (callback) => {
setImmediate(() => callback(null, [this.maSelf]))
setImmediate(() => callback(null, listener.ma ? [listener.ma] : []))
}

this.listenersRefs[multiaddr.toString()] = listener
return listener
}

Expand All @@ -223,18 +183,6 @@ class WebRTCStar {
return mafmt.WebRTCStar.matches(ma)
})
}

_peerDiscovered (maStr) {
log('Peer Discovered:', maStr)
maStr = cleanMultiaddr(maStr)

const split = maStr.split('/ipfs/')
const peerIdStr = split[split.length - 1]
const peerId = PeerId.createFromB58String(peerIdStr)
const peerInfo = new PeerInfo(peerId)
peerInfo.multiaddrs.add(multiaddr(maStr))
this.discovery.emit('peer', peerInfo)
}
}

module.exports = withIs(WebRTCStar, { className: 'WebRTCStar', symbolName: '@libp2p/js-libp2p-webrtc-star/webrtcstar' })
28 changes: 0 additions & 28 deletions src/sig-server/bin.js

This file was deleted.

21 changes: 0 additions & 21 deletions src/sig-server/config.js

This file was deleted.

65 changes: 0 additions & 65 deletions src/sig-server/index.html

This file was deleted.

Loading

0 comments on commit 2745279

Please sign in to comment.