-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
eugene.tolmachev
committed
Mar 24, 2016
1 parent
bb7310a
commit ed53b46
Showing
14 changed files
with
300 additions
and
226 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,115 +1,136 @@ | ||
(*** hide ***) | ||
// This block of code is omitted in the generated HTML documentation. Use | ||
// it to define helpers that you do not want to show in the documentation. | ||
#I "../../bin" | ||
#I "../../build" | ||
#r "FsShelter.dll" | ||
#r "FsJson.dll" | ||
#r "FsLogging.dll" | ||
|
||
open FsJson | ||
open Storm | ||
open FsShelter | ||
open System | ||
open System.Collections.Generic | ||
|
||
(** | ||
Defining reliable spouts | ||
======================== | ||
-------------------- | ||
[Processing guarantees](https://storm.apache.org/documentation/Guaranteeing-message-processing.html) are the biggest selling point of Storm, please see the official docs for the details. | ||
FsShelter implements reliability semantics with "housekeeper" functions: defaultHousekeeper can be used as is for transient sources or as an inspiration for reliability over external/persistent sources. | ||
The spout implementation is fairly similar to the "unreliable" version, with the addition of unique (int64) tuple ID: | ||
The reliable spout implementation for a source like peristent a queue (RabbitMQ, Kafka, etc) needs to obtain the event id from the source and forward Storm's acks and nacks back to the source. | ||
The obtained Id has to be passed along with the tuple from the spout function: | ||
*) | ||
|
||
// data schema for the topology, every case is a unqiue stream | ||
type Schema = | ||
| Original of int | ||
| Even of int | ||
| Odd of int | ||
|
||
// numbers spout - produces messages | ||
let numbers source = | ||
async { | ||
let! (tupleId,number) = source() | ||
return Some(tupleId, Original (number)) | ||
} | ||
|
||
// add 1 bolt - consumes and emits messages to either Even or Odd stream | ||
let addOne (input,emit) = | ||
async { | ||
match input with | ||
| Original x -> | ||
match x % 2 with | ||
| 1 -> Even (x+1) | ||
| _ -> Odd (x+1) | ||
| _ -> failwithf "unexpected input: %A" input | ||
|> emit | ||
} | ||
|
||
// terminating bolt - consumes messages | ||
let logResult (info,input) = | ||
async { | ||
match input with | ||
| Even x | ||
| Odd x -> info (sprintf "Got: %A" input) | ||
| _ -> failwithf "unexpected input: %A" input | ||
} | ||
|
||
(** | ||
Here we mimic an external source and implement all three possible cases: produce a new message, retry a failed one (indefinetely) and ack a successfully processed. | ||
*) | ||
let rnd = new System.Random() // used for generating random messages | ||
open FsShelter.Topology | ||
|
||
type QueueCmd = | ||
| Get of AsyncReplyChannel<TupleId*int> | ||
| Ack of TupleId | ||
| Nack of TupleId | ||
|
||
///cfg: the configution passed in by storm | ||
///runner: a spout runner function (passed in from topology) | ||
let spout runner (cfg:Configuration) = | ||
// faking an external source here | ||
let source = | ||
let rnd = Random() | ||
let count = ref 0L | ||
//define the "next" function | ||
//emit: a function that emits message to storm with unique ID | ||
let next emit = | ||
fun () -> async { | ||
tuple [ rnd.Next(0, 100) ] | ||
|> emit (Threading.Interlocked.Increment &count.contents) | ||
} | ||
//run the spout | ||
next |> runner | ||
let pending = Dictionary() | ||
|
||
MailboxProcessor.Start (fun inbox -> | ||
let rec loop nacked = | ||
async { | ||
let! cmd = inbox.Receive() | ||
return! loop <| match cmd, nacked with | ||
| Get rc, [] -> | ||
let tupleId,number = string(Threading.Interlocked.Increment &count.contents), rnd.Next(0, 100) | ||
pending.Add(tupleId,number) | ||
rc.Reply(tupleId,number) | ||
[] | ||
| Get rc,(tupleId,number)::xs -> | ||
pending.Add(tupleId,number) | ||
rc.Reply (tupleId,number) | ||
xs | ||
| Ack id, _ -> | ||
pending.Remove id |> ignore | ||
nacked | ||
| Nack id, _ -> | ||
(id,pending.[id])::nacked | ||
} | ||
loop []) | ||
|
||
|
||
(** | ||
Anchoring and named streams | ||
======================== | ||
FsShelter has helper functions to emit to a named stream or to anchor a tuple: | ||
Anchoring | ||
-------------------- | ||
In order to provide processing guarantees Storm needs to construct and track the state of entire "tuple tree", which is built out by emitting "anchored" tuples. | ||
FsShelter implements anchoring statically: instead of ad-hoc, as determined by a component, it is a property of the stream _leading to_ an emit. | ||
Consequently the implementation of emit (anchored/unanchored) is determined by the topology graph and completely transparent to the bolt that processes a tuple that will be used as an anchor. | ||
*) | ||
///cfg: the configuration passed in from Storm | ||
///runner: passed in from topology | ||
///emit: passed in from topology | ||
let addOneBolt runner emit cfg = | ||
//define the consumer function | ||
let add (msg : Json) = | ||
async { | ||
let x = msg?tuple.[0].ValI + 1 | ||
tuple [ x ] | ||
// write to a named stream | ||
|> namedStream (match x % 2 with | 0 -> "even" | _ -> "odd") | ||
// anchor to ensure the entire tuple tree is processed before the spout is ack'ed | ||
|> anchor msg // anchor to the original message | ||
|> emit | ||
} | ||
//run the bolt | ||
add |> runner | ||
|
||
(** | ||
Example of parametrization and use of the tuple's origin (component that emitted it and the stream that it arrived on) inspection: | ||
//define the storm topology | ||
open FsShelter.DSL | ||
|
||
*) | ||
///cfg: the configuration passed in by Storm | ||
///runner: passed in from topology | ||
///log: log write | ||
let resultBolt runner log (cfg:Configuration) = | ||
let desc = cfg.Json?conf?desc.Val // the value passed in with the submitted topology Config | ||
//define the function that will return the consumer | ||
let logResult (msg : Json) = | ||
async { | ||
log desc (sprintf "origin: %A(%A), data: %A" msg?comp.Val msg?stream.Val msg?tuple.[0].ValI) | ||
} | ||
//run the bolt | ||
logResult |> runner | ||
#nowarn "25" // for stream matching expressions | ||
let sampleTopology = topology "Guaranteed" { | ||
let s1 = numbers | ||
|> runReliableSpout (fun log cfg () -> source.PostAndAsyncReply Get) // ignoring logging and cfg available | ||
(fun _ -> Ack >> source.Post, Nack >> source.Post) | ||
let b1 = addOne | ||
|> runBolt (fun log cfg tuple emit -> (tuple,emit)) // pass incoming tuple and emit function | ||
|> withParallelism 2 | ||
|
||
let b2 = logResult | ||
|> runBolt (fun log cfg -> | ||
let mylog = Common.Logging.asyncLog ("odd.log") | ||
fun tuple emit -> (mylog,tuple)) | ||
|> withParallelism 1 | ||
|
||
(** | ||
Topology with named streams and config overrides | ||
======================== | ||
let b3 = logResult | ||
|> runBolt (fun log cfg -> | ||
let mylog = Common.Logging.asyncLog ("even.log") | ||
fun tuple emit -> (mylog,tuple)) | ||
|> withParallelism 1 | ||
|
||
*) | ||
yield s1 ==> b1 |> shuffle.on Original // emit from s1 to b1 on Original stream and anchor immediately following emits to this tuple | ||
yield b1 --> b2 |> shuffle.on Odd // anchored emit from b1 to b2 on Odd stream | ||
yield b1 --> b3 |> shuffle.on Even // anchored emit from b1 to b2 on Even stream | ||
} | ||
|
||
(** | ||
Resulting topology graph: | ||
open StormDSL | ||
|
||
let topology = | ||
{ TopologyName = "FstGuaranteed" | ||
Spouts = | ||
[ { Id = "ReliableSpout" | ||
Outputs = [ Default [ "number" ] ] | ||
Spout = Local { Func = spout (Storm.reliableSpoutRunner Storm.defaultHousekeeper) } | ||
Config = jval [ "topology.max.spout.pending", jval 123 ] // override "backpressure" | ||
Parallelism = 1 } ] | ||
Bolts = | ||
[ { Id = "AddOneBolt" | ||
Outputs = [ Named("even", [ "number" ]) // named stream "even" | ||
Named("odd", [ "number" ]) ] | ||
Inputs = [ DefaultStream "ReliableSpout", Shuffle ] | ||
Bolt = Local { Func = addOneBolt Storm.autoAckBoltRunner Storm.emit } | ||
Config = JsonNull | ||
Parallelism = 2 } | ||
{ Id = "EvenResultBolt" | ||
Outputs = [] | ||
Inputs = [ Stream("AddOneBolt","even"), Shuffle ] | ||
Bolt = Local { Func = resultBolt Storm.autoAckBoltRunner } | ||
Config = jval ["desc", "even"] // pass custom config property to the component | ||
Parallelism = 1 } | ||
{ Id = "OddResultBolt" | ||
Outputs = [] | ||
Inputs = [ Stream("AddOneBolt","odd"), Shuffle ] | ||
// logs to custom (FsLogging) pid-based log file | ||
Bolt = Local { Func = resultBolt Storm.autoAckBoltRunner Logging.log } | ||
Config = jval ["desc", "odd"] // pass custom config property to the component | ||
Parallelism = 1 } ] } | ||
![SVG](svg/Guaranteed.svg "Guaranteed (SVG)") | ||
The solid lines represent "anchoring" streams and the dotted lines indicate the outer limits of the processing guarantees: a tuple emitted along a dotted line is only anchored if the line leading to it is solid. | ||
*) |
Oops, something went wrong.