Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: FBP Protocol 0.8 compat #116

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
"debug": "^3.0.0",
"json-stringify-safe": "^5.0.1",
"noflo": "^1.0.0",
"temp": "^0.8.3"
"temp": "^0.8.3",
"uuid": "^3.3.2"
},
"nyc": {
"include": [
Expand Down
19 changes: 14 additions & 5 deletions src/Base.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ debugMessagingSendPayload = require('debug') 'noflo-runtime-base:messaging:send:
class BaseTransport
constructor: (@options) ->
@options = {} unless @options
@version = '0.7'
@version = '0.8'
@component = new protocols.Component @
@graph = new protocols.Graph @
@network = new protocols.Network @
Expand Down Expand Up @@ -112,7 +112,10 @@ class BaseTransport
# @param [Object] Message payload
# @param [Object] Message context, dependent on the transport
send: (protocol, topic, payload, context) ->
debugMessagingSend "#{protocol} #{topic}"
if context.responseTo
debugMessagingSend "#{protocol} #{topic} (ID: #{context.responseTo}"
else
debugMessagingSend "#{protocol} #{topic}"
debugMessagingSendPayload payload

# Send a message to *all users* via the transport protocol
Expand All @@ -130,18 +133,24 @@ class BaseTransport
#
# The context is originally received from the transport. This could be
# an iframe origin or a specific WebSocket connection. The context will
# be utilized when sending messages back to the requester.
# be utilized when sending messages back to the requester. The context
# must also contain the secret and requestId the request was made with.
#
# @param [String] Name of the protocol
# @param [String] Topic of the message
# @param [Object] Message payload
# @param [Object] Message context, dependent on the transport
receive: (protocol, topic, payload, context) ->
payload = {} unless payload
debugMessagingReceive "#{protocol} #{topic}"
debugMessagingReceive "#{protocol} #{topic} (ID: #{context.requestId})"
debugMessagingReceivePayload payload

unless @canInput protocol, topic, payload.secret
if payload.secret and not context.secret
# Compatibility with pre-0.8 FBP clients
context.secret = payload.secret
delete payload.secret

unless @canInput protocol, topic, context.secret
@send protocol, 'error', new Error("#{protocol}:#{topic} is not permitted"), context
return

Expand Down
6 changes: 6 additions & 0 deletions src/direct.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ isBrowser = ->

Base = require './Base'
EventEmitter = require('events').EventEmitter
uuid = require 'uuid/v4'

class DirectRuntime extends Base
constructor: (options) ->
Expand Down Expand Up @@ -30,6 +31,10 @@ class DirectRuntime extends Base
protocol: protocol
command: topic
payload: payload

if context.responseTo
m.responseTo = context.responseTo

context.client._receive m

sendAll: (protocol, topic, payload) ->
Expand Down Expand Up @@ -59,6 +64,7 @@ class DirectClient extends EventEmitter
protocol: protocol
command: topic
payload: payload
requestId: uuid()
@emit 'send', m

_receive: (message) ->
Expand Down
10 changes: 9 additions & 1 deletion src/protocol/Component.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ class ComponentProtocol
when 'list' then @listComponents payload, context
when 'getsource' then @getSource payload, context
when 'source' then @setSource payload, context
else @send 'error', new Error("component:#{topic} not supported"), context
else
context.responseTo = context.requestId
@send 'error', new Error("component:#{topic} not supported"), context

getLoader: (baseDir, options = {}) ->
unless @loaders[baseDir]
Expand All @@ -27,6 +29,7 @@ class ComponentProtocol
loader = @getLoader baseDir, @transport.options
loader.listComponents (err, components) =>
if err
context.responseTo = context.requestId
@send 'error', err, context
return
componentNames = Object.keys components
Expand All @@ -35,6 +38,7 @@ class ComponentProtocol
@processComponent loader, component, context, (err) =>
processed++
return if processed < componentNames.length
context.responseTo = context.requestId
@send 'componentsready', processed, context

getSource: (payload, context) ->
Expand All @@ -45,23 +49,27 @@ class ComponentProtocol
# Try one of the registered graphs
graph = @transport.graph.graphs[payload.name]
unless graph?
context.responseTo = context.requestId
@send 'error', err, context
return

nameParts = utils.parseName payload.name
context.responseTo = context.requestId
@send 'source',
name: nameParts.name
library: nameParts.library
code: JSON.stringify graph.toJSON()
language: 'json'
, context
else
context.responseTo = context.requestId
@send 'source', component, context

setSource: (payload, context) ->
baseDir = @transport.options.baseDir
loader = @getLoader baseDir, @transport.options
loader.setSource payload.library, payload.name, payload.code, payload.language, (err) =>
context.responseTo = context.requestId
if err
@send 'error', err, context
return
Expand Down
27 changes: 26 additions & 1 deletion src/protocol/Graph.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@ class GraphProtocol
when 'removegroup' then @removeGroup graph, payload, context
when 'renamegroup' then @renameGroup graph, payload, context
when 'changegroup' then @changeGroup graph, payload, context
else @send 'error', new Error("graph:#{topic} not supported"), context
else
context.responseTo = context.requestId
@send 'error', new Error("graph:#{topic} not supported"), context

resolveGraph: (payload, context) ->
unless payload.graph
context.responseTo = context.requestId
@send 'error', new Error('No graph specified'), context
return
unless @graphs[payload.graph]
context.responseTo = context.requestId
@send 'error', new Error('Requested graph not found'), context
return
return @graphs[payload.graph]
Expand All @@ -59,6 +63,7 @@ class GraphProtocol

initGraph: (payload, context) ->
unless payload.id
context.responseTo = context.requestId
@send 'error', new Error('No graph ID provided'), context
return
unless payload.name
Expand Down Expand Up @@ -89,6 +94,7 @@ class GraphProtocol
@transport.component.registerGraph fullName, graph, context

@graphs[payload.id] = graph
context.responseTo = context.requestId
@sendAll 'clear',
id: payload.id
name: payload.name
Expand Down Expand Up @@ -224,30 +230,35 @@ class GraphProtocol

addNode: (graph, node, context) ->
unless node.id or node.component
context.responseTo = context.requestId
@send 'error', new Error('No ID or component supplied'), context
return
graph.addNode node.id, node.component, node.metadata

removeNode: (graph, payload, context) ->
unless payload.id
context.responseTo = context.requestId
@send 'error', new Error('No ID supplied'), context
return
graph.removeNode payload.id

renameNode: (graph, payload, context) ->
unless payload.from or payload.to
context.responseTo = context.requestId
@send 'error', new Error('No from or to supplied'), context
return
graph.renameNode payload.from, payload.to

changeNode: (graph, payload, context) ->
unless payload.id or payload.metadata
context.responseTo = context.requestId
@send 'error', new Error('No id or metadata supplied'), context
return
graph.setNodeMetadata payload.id, payload.metadata

addEdge: (graph, edge, context) ->
unless edge.src or edge.tgt
context.responseTo = context.requestId
@send 'error', new Error('No src or tgt supplied'), context
return
if typeof edge.src.index is 'number' or typeof edge.tgt.index is 'number'
Expand All @@ -258,18 +269,21 @@ class GraphProtocol

removeEdge: (graph, edge, context) ->
unless edge.src or edge.tgt
context.responseTo = context.requestId
@send 'error', new Error('No src or tgt supplied'), context
return
graph.removeEdge edge.src.node, edge.src.port, edge.tgt.node, edge.tgt.port

changeEdge: (graph, edge, context) ->
unless edge.src or edge.tgt
context.responseTo = context.requestId
@send 'error', new Error('No src or tgt supplied'), context
return
graph.setEdgeMetadata edge.src.node, edge.src.port, edge.tgt.node, edge.tgt.port, edge.metadata

addInitial: (graph, payload, context) ->
unless payload.src or payload.tgt
context.responseTo = context.requestId
@send 'error', new Error('No src or tgt supplied'), context
return
if graph.addInitialIndex and typeof payload.tgt.index is 'number'
Expand All @@ -279,66 +293,77 @@ class GraphProtocol

removeInitial: (graph, payload, context) ->
unless payload.tgt
context.responseTo = context.requestId
@send 'error', new Error('No tgt supplied'), context
return
graph.removeInitial payload.tgt.node, payload.tgt.port

addInport: (graph, payload, context) ->
unless payload.public or payload.node or payload.port
context.responseTo = context.requestId
@send 'error', new Error('Missing exported inport information'), context
return
graph.addInport payload.public, payload.node, payload.port, payload.metadata

removeInport: (graph, payload, context) ->
unless payload.public
context.responseTo = context.requestId
@send 'error', new Error('Missing exported inport name'), context
return
graph.removeInport payload.public

renameInport: (graph, payload, context) ->
unless payload.from or payload.to
context.responseTo = context.requestId
@send 'error', new Error('No from or to supplied'), context
return
graph.renameInport payload.from, payload.to

addOutport: (graph, payload, context) ->
unless payload.public or payload.node or payload.port
context.responseTo = context.requestId
@send 'error', new Error('Missing exported outport information'), context
return
graph.addOutport payload.public, payload.node, payload.port, payload.metadata

removeOutport: (graph, payload, context) ->
unless payload.public
context.responseTo = context.requestId
@send 'error', new Error('Missing exported outport name'), context
return
graph.removeOutport payload.public

renameOutport: (graph, payload, context) ->
unless payload.from or payload.to
context.responseTo = context.requestId
@send 'error', new Error('No from or to supplied'), context
return
graph.renameOutport payload.from, payload.to

addGroup: (graph, payload, context) ->
unless payload.name or payload.nodes or payload.metadata
context.responseTo = context.requestId
@send 'error', new Error('No name or nodes or metadata supplied'), context
return
graph.addGroup payload.name, payload.nodes, payload.metadata

removeGroup: (graph, payload, context) ->
unless payload.name
context.responseTo = context.requestId
@send 'error', new Error('No name supplied'), context
return
graph.removeGroup payload.name

renameGroup: (graph, payload, context) ->
unless payload.from or payload.to
context.responseTo = context.requestId
@send 'error', new Error('No from or to supplied'), context
return
graph.renameGroup payload.from, payload.to

changeGroup: (graph, payload, context) ->
unless payload.name or payload.metadata
context.responseTo = context.requestId
@send 'error', new Error('No name or metadata supplied'), context
return
graph.setEdgeMetadata payload.name, payload.metadata
Expand Down
10 changes: 10 additions & 0 deletions src/protocol/Network.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ class NetworkProtocol extends EventEmitter

resolveGraph: (payload, context) ->
unless payload.graph
context.responseTo = context.requestId
@send 'error', new Error('No graph specified'), context
return
unless @transport.graph.graphs[payload.graph]
context.responseTo = context.requestId
@send 'error', new Error('Requested graph not found'), context
return
return @transport.graph.graphs[payload.graph]
Expand Down Expand Up @@ -215,19 +217,23 @@ class NetworkProtocol extends EventEmitter
startNetwork: (graph, payload, context) ->
network = @networks[payload.graph]
@_startNetwork graph, payload.graph, context, (err) =>
context.responseTo = context.requestId
@send 'error', err, context if err
return

stopNetwork: (graph, payload, context) ->
unless @networks[payload.graph]
context.responseTo = context.requestId
@send 'error', new Error("Network #{payload.graph} not found"), context
return
net = @networks[payload.graph].network
unless net
context.responseTo = context.requestId
@send 'error', new Error("Network #{payload.graph} not found"), context
return
if net.isStarted()
@networks[payload.graph].network.stop (err) =>
context.responseTo = context.requestId
if err
@send 'error', err, context
return
Expand All @@ -240,6 +246,7 @@ class NetworkProtocol extends EventEmitter
return
return
# Was already stopped, just send the confirmation
context.responseTo = context.requestId
@send 'stopped',
time: new Date
graph: payload.graph
Expand All @@ -256,11 +263,14 @@ class NetworkProtocol extends EventEmitter
@send 'error', new Error("Network #{payload.graph} not found"), context
return
net.setDebug payload.enable
context.responseTo = context.requestId
@send 'setdebug',
enable: payload.enable
, context
return

getStatus: (graph, payload, context) ->
context.responseTo = context.requestId
unless @networks[payload.graph]
@send 'error', new Error("Network #{payload.graph} not found"), context
return
Expand Down
6 changes: 5 additions & 1 deletion src/protocol/Runtime.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class RuntimeProtocol extends EventEmitter
when 'getruntime' then @getRuntime payload, context
when 'packet'
@sendPacket payload, (err) =>
context.responseTo = context.requestId
if err
@sendError err.message, context
return
Expand All @@ -101,7 +102,9 @@ class RuntimeProtocol extends EventEmitter
payload: payload.payload
, context
return
else @send 'error', new Error("runtime:#{topic} not supported"), context
else
context.responseTo = context.requestId
@send 'error', new Error("runtime:#{topic} not supported"), context

getRuntime: (payload, context) ->
type = @transport.options.type
Expand Down Expand Up @@ -129,6 +132,7 @@ class RuntimeProtocol extends EventEmitter
payload.repository = @transport.options.repository if @transport.options.repository
payload.repositoryVersion = @transport.options.repositoryVersion if @transport.options.repositoryVersion

context.responseTo = context.requestId
@send 'runtime', payload, context
# send port info about currently set up networks
for name, network of @transport.network.networks
Expand Down
Loading