-
Notifications
You must be signed in to change notification settings - Fork 57
philosophy
Things in (?question blocks?) are unresolved.
-
Simple: The thing you would draw at the whiteboard, spoken aloud, is the the thing you write.
-
Scalable: Scalability is more important than performance. Scalability is important even when there are no performance concerns -- scalable for robots means scalable for people.
-
Readable: The plot should be as simple as the story. Orthogonal concerns (schema, transformation, topology, transport, resources, and configuration) should be crisply separated.
Every Hanuman graph can be expressed
- Buffered flow -- Flume
- Event-driven flow -- cat, HTTP post, Hadoop
- Picture -- graphviz etc
- Explanation -- each record appends the plain-language action it takes on the record
Whereever concepts can be paired across the following, they should be, until they prove how they are distinct:
- micro dataflow
- macro dataflow
- workflow
-
----~ - subroutine
- system diagram
- every edge connects an action to a product or a product to an action
- The product might be implied by the edge, and elided from the declaration.
- Kafka, ZeroMQ, Storm, Esper, Flume, Pusher
- Rake, Chef, Shell scripts, Thor
- Rack
- Hadoop
-
stages
- products
- actions -- alter, augment or act on data. Some important types of actions:
- transformers -- actions with exactly one input and one output
- sources -- actions with zero inputs and (?one or more outputs?). Sources may be different in another way, not sure yet
- sinks -- actions with (?one or more inputs?) and no output.
- taps -- actions that make no changes to the data (?can a tap add metadata? can it mutate metadata?)
- graphs -- contains other stages. A graph is also an action.
-
schema
-
record
-
control path
-
configuration
-
topic -- a filter This filter is special because the transport can
-
runner -- pairs a graph with concrete executors and products
(?questions exist about the following?)
- partition --
- chain -- (?a group of sequential edges understood to be related in an important way?)
- delivery guarantee --
- contract
Data is primary Data is represented as "Arbitrary record with sideband metadata". That is, rather than an event wrapper with a message body you access, the data is primary and the metadata is accessible through it.
Flows are understandable in isolation:
Unless a processor explicitly maintains state or mixes in external entropy, the outcome of a data stream may be completely characterized by
- Processor
- Configuration (static parameters delivered at run-time to the processors on the graph)
- Data record itself
- Metadata attached to that record
The important point is that the outcome of a processor is not affected by changes to
- the graph as a whole, including the stages that it pulls from and writes to
- the machine or machines it runs on
- the transport layer handling the flow
- spatially or temporally where the data originates from or proceeds to.
As long as it is fed the same data and the same metadata with the same configuration, a processor doesn't care what's running it, where it is, who fed it the data or who is consuming it.
-
guarantees
- end-to-end -- ensures acknowledgement by destination
- next-hop -- guarantees successful handoff to next stage, but nothing more
- best effort -- fire and forget
-
naming stages -- here are some desirable features:
- unambiguously referencable
- name doesn't change if irrelevant changes are made to the graph
- name does change if relevant changes are made
- name is predictable (by looking at the graph
- name is readable _ name is unaffected by configuration
-
fanout: when several stages consume a given stage's ourputs,
- does the stage have one output (a resourec) and the product has many consuming stages
- or does the stage have multiple outputs, and the emit method needs re-think?
-
messages
- can stages message each other directly?
- or do they spek event a sideband that other stages can consume
- or are they broadcase to all notds
-
amelioration: when are
- mixin of a module (calling super)
- wrapping a stage (before/after/around filter)
- flow insertion (put module before / after it in the flow)
-
delivery guarantees
-
biographers
- choke -- emits all records it receives but not faster than a given averaged rate
- Uniform consistent sampling
- dashpot tap
- channel topic
- aggregates -- sum, avg/stdev, %ile
- graph change stream -- an activity stream of macro graph actions (topology or configuration changes).
- many-to-many -- all input records sent to all output slots. (?input partition is somehow recorded in event metadata?).
- switch -- each record sent to exactly one out slot.
-
window --
-
cogroup
-
sort
-
grouping "fences" -- indicate start/end boundaries of a group in the stream (or in metadata) rather than by combining into a rolled-up record
-
buffer -- ...
-
retry --
-
load balance --
-
barrier -- execution pauses until some condition met
-
schedule -- ...
- counters, timers, gauges, values
- announce / discover
- simulate -- sampled yet illustrative flow goes end-to-end
- explain -- processors explain what they do, appending it to the explanation passed along the graph
- audit -- metadata element showing full provenance of the record
- Hamming's Problem
- Newton's method
- Kafka design
- ZeroMQ design and its pattern language
- http://en.wikipedia.org/wiki/Lucid_(programming_language)#Details
- http://en.wikipedia.org/wiki/Oz_(programming_language)
Front-facing:
-
.make(*, {})
-- new stage -
#output(out_name)
-- the stage, if any, that the named output slot of this stage feeds -
#outputs
-- an ordered list of stages this one feeds -
#owner
-- graph containing this stage. Every stage has an owner exceptUniverse
-
#into(other, from_slot, into_slot)
-- asks its graph to connect self to other; returns other
Framework:
#set_output
#set_owner
Inessential (maybe):
-
#name
-- name of stage; is unique on its graph -
#fullname
-- globally unique name of stage, formed as"#{owner.name}.#{self.name}"
Decisions to make / Restrictions we are committing to:
- !! each stage is owned by exactly one graph (except
Universe
) - !! stages cannot fruitfully exist in the absence of a graph
- names are:
- ??determined only by graph??
- ??immutable on object??
- ??only given in special cases??
Front-facing:
-
stages
-- list of all stages added to graph -
inputs
-- list of named input slots -
drive(source)
-- asks the named source to drive. ??run?? ??drive?? - ??
fetch(sink)
-- fetches next record from named sink ??
Framework:
-
set_stage(name, stage)
-- (might need to be(stage, name)
)- adds it to the
stages
collection. A named stage replaces any existing stage with that key, ??at the same position it held??. - note that you cannot
unset_stage
. Feels dangerous.
- adds it to the
connect(from, into, from_slot, into_slot)
how are names chosen?
- optional arbitrary name supplied at time of storage. Only some things are named.
- optional arbitrary name supplied at time of storage. Otherwise a mangled name is supplied
- calls #name on objects. You have to keep things straight. (This is trouble).
(one could separate naming a stage from adding it to graph. still don't know if names are intrinsic)
Sugar:
-
<magic stage method>
-- stage types register for a magic method *eg.StdinSource
isstdin
,MapProcessor
ismap
,RegexpFilter
isre
). This- calls
.make
on that class - add stage to itself (which handles owner and naming)
- calls
currently, from pry (lightly organized):
Gorillib::Model#methods :
== inspect
read_attribute unset_attribute write_attribute attribute_set?
receive! update_attributes attribute_values attributes compact_attributes
read_unset_attribute handle_extra_attributes
as_json to_json to_tsv to_wire
Gorillib::Builder#methods :
getset getset_member inspect_helper
collection_of get_collection_item getset_collection_item has_collection_item? set_collection_item
Meta::Hanuman::StageType#methods : name name? receive_name owner owner? receive_owner doc receive_doc
Hanuman::Stage#methods : report setup stop fullname --- configure key_method lookup notify to_key
#
Hanuman::Action :
#
Hanuman::IsOwnInputSlot#methods : inputs
Hanuman::IsOwnOutputSlot#methods : outputs
Hanuman::Inlinkable#methods : << from set_input
Hanuman::Outlinkable#methods : > into set_output
#
Wukong::Processor#methods : bad_record emit input input? output output? receive_input receive_output
Wukong::Map#methods : blk process call
Meta::Hanuman::StageType#methods : doc name name? owner owner? receive_doc receive_name receive_owner
Hanuman::Stage#methods : configure fullname key_method notify report to_key
#
Hanuman::Action :
#
Hanuman::Inlinkable#methods : << from
Hanuman::SplatInputs#methods : has_input? inslots set_input
Hanuman::Outlinkable#methods : >
Hanuman::SplatOutputs#methods : into outputs
Hanuman::Slottable#methods : handle_extra_attributes inputs
#
Hanuman::Graph#methods : declare_stage connect action next_name_for product tree
as_is file_sink file_source flatten foreach from_json from_tsv graph integers limit lookup map not_re null re shell stderr stdin stdout to_json to_tsv
Meta::Hanuman::GraphType#methods : stage stages has_stage? receive_stages edges receive_edges input? output? receive_input receive_output
#
Wukong::Dataflow#methods : setup stop drive input output process process_stages reject select set_output sink_stages source_stages
Meta::Wukong::DataflowType#methods : has_outslot? has_splat_inslot? has_splat_outslot? outslot outslots receive_outslots receive_splat_inslots receive_splat_outslots splat_inslot splat_inslots splat_outslot splat_outslots
"code is written thoughfully but run recklessly"
- interface to receive is
(*args, {attrs})
:- the last element must be a hash of attributes
- args must win out over attrs
The rule is "last given arg is always the attrs" -- in cases like
def foo(alpha=nil, beta=nil, gamma={}, attrs={}) ; end
input alpha beta gamma attrs
------- ----- ---- ----- -----
foo # nil nil {} {}
foo({}) # nil nil {} {}
foo 7 # 7 nil {} {}
foo :a => b # nil nil {} {:a => :b}
foo 7, 3, :a => :b # 7 3 {} {:a => :b}
foo 7, 3, {:a => :b}, {} # 7 3 {:a => :b} {}
If any positional arg is a hash, you must slap an extra hash on the end. Also, don't use hashes for positional args.
You cannot have all of the following desirable properties, but you can have the right tasteful subset of them:
- I get to choose any signature for
initialize
- The signature for
receive
is predictable, so I can provide generic advice to an object - It is predictable whether attributes are
receive_xx
d or set directly - only type-converts once and only evals block once
- objects are fully-formed when initialize is done
- I can initialize an object directly, without passing values through the
receive
door, if I'm willing to take responsibility that the objects are safe.
Case A: Objects own their own new/initialize
You can initialize an object directly, with no fuckery in between.
-
new
calls initialize (and in every other way is untouched) -
initialize(*args,{attrs})
is whatever you want, but it must be sensible to call it with no args -
receive(*args,{attrs},&block)
calls-
new
-- with no args -
receive!
-- with the args it was given
-
In this world,
- an object must be comfortable with being incomplete even after initialize finishes
Case B:
-
new(*args,{attrs},&block)
- calls initialize (and in every other way is untouched)
-
initialize(*args,{attrs},&block)
- you can do what you want,
- ...but you must call
super
(or otherwise ensure thatreceive!
is called).
-
.receive(*args,{attrs},&block)
just callsnew
, which callsinitialize
, which callsreceive!
In this world,
- you can't choose your own initialize signature
- there's no way to construct a model that doesn't incur the
receive
chain business (apart from adding a_native
bailout flag -- I'm thinking ahead to avro)
You cannot have all of the following desirable properties, but you can have the right tasteful subset of them:
- Stages have a terse name that doesn't look like robot spew
- Stages can produce a globally unique name that doesn't look like robot spew.
- I can label a spot on a graph, assign to it directly (replacing whatever's there), and refer to it uniquely (getting back the last stage assigned to that spot).
- If an object has a clear intrinsic
name
it becomes (or heavily informs) its label on the graph - Objects are encouraged to have
name
s and then make opinionated assumptions about how to configure themselves based on it. (This is really important, as proven by chef; it consequently means thatname
s don't exist at the graph's primary convenience) - I can have multiple differently-named instances of a stage template on the same graph
- I can have multiple identically-named instances of a stage template on the same graph, and I don't have to do anything special -- eg several
pig('dump_to_s3')
stages
Hmm... what if stages weren't name
d but rather label
ed -- that is, the label is just a retrieval key for a stage on a graph.
- in workflow world esp., things are very excited to learn their names/labels, and can do a lot of nice magic with them.
- you can cause trouble with the unique-name-on-graph rule when names become labels always.
- we want to distribute configuration by letting it drape naturally over the graph. (That is: the configuration key
crips.snoop.gat
becomes available to thesnoop
node on thecrips
graph as thegat
configuration setting).- This argues that as macro flows start to get more complex, you'll want to start labelling things more.
- the number
11
doesn't know that I've labeled it asawesomeness
; rather, its binding knows how to retrieve the object labeledawesomeness
.
We have lots of examples where we want a collection of objects that meets the following:
-
retrieval like a hash: objects retrieved by key, key maps uniquely to object, adding object with same key replaces former contents
-
iterates like an array --
each
returns the values only -
serializes as an array
-
Objects all adhere to similar-enough contract
-
receive
ing a group of objects passes each to a factory for creation -
I don't want the kitchen soup of enumerable methods
-
things are stored in order they are added, and retrievable by index
-
things can be retrieved by name:
- how is that name chosen?
- optional arbitrary name supplied at time of storage
- collection calls #name on objects
- collection calls certain method on objects, uses that as retreival key
- regardless: if I replace the object in the
foo
slot, the index does not change. (trying to balance needs of positional slots with keyword slots)
- how is that name chosen?
-
retrieval should be fairly direct -- would rather not iterate over whole collection each time
How aware of its objects should the collection be?
-
if a method is used to assign names, they must all have that method
-
Does the container add anything to the attributes hash as it goes by? It does now, is probably a mistake.
-
receive!
should pass contents to a factory. -
... which means find_or_create seems plausible
-
... create seems a bit less so
-
#[]
-- gets object with given name -
#[]=
-- adds object with given name -
#<<
-- adds object -
delete
-- -
fetch
-- -
include?
-- true if has given key -
to_a
-- all the objects -
empty?
-- true if has no objects -
blank?
-- true ifempty?
-
present?
-- true if notblank?
-
receive!
-- merges in contents of given enumerable. ( -
receive
-- nem
inessential, but reasonable:
-
each
-- iterates over the values -
each_pair
-- iterates over key-value pairs -
values
-- same as to_a -
to_hash
-- hash of key-value pairs, in same order as collection -
keys
-- all the keys -
has_key?
-- same asinclude?
-
length
-- number of items -
size
-- same aslength
-
all stages have a
source
andsink
;emit(result)
callssink.proces(result)
/emit(:spam, result)
callssink(:spam).process(result)
. This method always refers to a concrete stage, the one that has by whatever action of fate been wired to it. -
input
andoutput
refer to the slots of -
The
inslot
-- holds the schema & the label -
a data source -- for argument's sake, a driving stage (it will call
process
on the stage it is wired to) -
from inside the graph, the stage that gets data sent through the port
src inslot
+------------------- | +-+ _ | | +---+
[_]-------|-|----| |--- | | +---+ +-+ | +----------------------
src inslot
+------------------- | +---+ | _ | +---+ |[_]+----| |--- | | +---+ +---+ | +----------------------
http_source > chain(:stuff){ input > parse > fiddle > output } > hbase_sink
http_source > chain(:stuff){ parse > fiddle } > hbase_sink
chain(:stuff){ http_source > parse > fiddle } > hbase_sink
chain(:ln){ consumes(lines: String, ints: Integer) ; produces(numbered_lines: String) numberer << lines << ints > numbered_lines }
chain(:ln){ consumes(lines: String, ints: Integer) ; produces(numbered_lines: String) lines > numberer ; ints > numberer ; numberer > numbered_lines }
chain(:ln){ consumes(lines: String, ints: Integer) ; produces(numbered_lines: String) input(:lines).into numberer, :lines # connect( ??, ??, numberer, :lines) input(:ints).into numberer, :ints # connect( ??, ??, numberer, :ints) numbered_lines.into output }
chain(:ln){ consumes(lines: String, ints: Integer) ; produces(numbered_lines: String) input(:lines).into numberer, :lines # connect( ??, ??, numberer, :lines) input(:ints).into numberer, :ints # connect( ??, ??, numberer, :ints) numbered_lines.into output }
chain(:ln){ consumes(lines: String, ints: Integer) ; produces(numbered_lines: String) lines > numberer ; ints > numberer ; numberer > numbered_lines }
stdin > chain(:ln).lines ; integers > chain(:ln).ints ; chain(:ln) > stdout
chain(:ln) << { lines: stdin, ints: integers } > stdout
chain(:ln).from(lines: stdin, ints: integers).into(stdout)
dataflow(:word_count_map){ input > tokenize > flatten > output } dataflow(:word_count_red){ input > count > output }
.
all stages have one source multiple (anonymous) sinks
def emit(rec) sinks.each{|sink| process(rec) } ; end
To have topological branching, label data wit a topic and use a switch:
class Switch
def emit(rec) sinks.each{|sink| process(rec) if match?(sink, rec._channel) } ; end
stages can have named outputs with (optional) splat outputs, or be singular output
# named outputs
def emit_foo(rec) emit(:foo, rec) ; end
# splat outputs (and used by named outputs)
def emit(label, rec) sink(label).process(rec) ; end
# singular output
def emit(rec) sink.process(rec) ; end
class Transformer # (singular in, singular out)
consumes String
produces Integer
end
class ProcessorWithMultInMultOut
consumes :lines, String
consumes :numbers, Integer
produces :even_numbered_lines, String
produces :odd_numbered_lines, String
end
class ProcessorWithMultInSplatInSplatOut
consumes :splats, String
end