Skip to content
DeWayne Filppi edited this page Mar 3, 2016 · 12 revisions

Scope

The purpose of this guide is to provide a practical introduction to successfully authoring Cloudify policies for use in adding dynamic, metric driven, behavior to Cloudify blueprints/deployments using native features provided by the Cloudify manager. It is not an exhaustive guide to the underlying technologies, for which external references are supplied.

Prerequisites

Automated post-deployment dynamism in Cloudify is provided by a combination of TOSCA inspired configuration (groups and policies), the metrics gathering apparatus (i.e. Diamond and RabbitMQ, and the Riemann real time event processing engine. For most prospective authors, understanding of Riemann's implementation and configuration language, Clojure, along with some functional programming concepts will be the most demanding task. Fortunately, nothing like a true mastery of Clojure is required to be productive, but writing policies is an exercise (at a minimum) in coding the Riemann API, which is incomprehensible without some Clojure background. Some resources:

  • IDE - some recommend Emacs for Clojure, but Emacs can be an obstacle for those used to conventional key bindings and development environments. Our needs are simple. I've found that clooj is handy for trying stuff out and learning Clojure in general.
  • Introduction to the language. A video by the creator of Clojure that gives a nice beginners overview.
  • A short intro targetted at prospective Riemann users: http://riemann.io/clojure.html.
  • Handy reference. Like any language, if you don't use it often you forget. I find this cheatsheet to be a handy reference.
  • Books. There are several and all are overkill for what you'll need. Clojure for the Brave is a good one if you want to dig deep.

Once you feel you grasp the basic syntax, even if you have to cheat to a resource frequently, you can approach Riemann. Riemann defines event processing "streams" as high order (functions that return functions) Clojure functions. Clojure serves both as the configuration language and the execution language. Some resources to look over for Riemann:

  • A video introduction to concepts.
  • The quickstart. Follow the instructions here explicitly, including running the examples.
  • A conceptual overview
  • Explore the API. For now you can ignore everything except the riemann.streams namespace.

Also review the relevant documentation on configuring Cloudify policies in the Cloudify wiki.

Lastly, a very superficial understanding of Ruby is needed to understand the test drivers, which in this guide will be based on the Ruby client described in the Riemann "Quick Start" guide. Feel free to use a different client however.

How Riemann Fits in Cloudify

When Cloudify creates a deployment, one of the tasks is to start a distinct Riemann core for that deployment. This serves as an impenetrable barrier between event flows in different deployments. Any policies associated with the blueprint/deployment are put in the Riemann configuration file, wrapped in some boilerplate Clojure code that among other things, connects the Riemann core to the RabbitMQ topic collectors write to, filter for the specific deployment id, and define a Clojure function that executes the workflows associated with the relevant policy. Inside your event processing code, you don't have to worry about updating the Riemann index, calling the Cloudify REST API, or other such concerns. Your concern is to evaluate metrics as they arrive and call the "process-policy-triggers" stream when the purpose of the policy is satisfied (i.e. "scale up when the average CPU utilization was greater than 80% over the last 5 minutes"). This will cause the workflow(s) related to the policy to run with the parameters defined in the blueprint.

Metrics Flow

Basic Concepts For Developing Policies with Riemann

  • You must simulate your event flow accurately. Without simulating the event contents, distribution, timing, and possibly even rates (although rarer), there is little hope of success.
  • Riemann streams are high order functions. The streams you write or compose are called once at configuration time to construct a graph of functions, which are then passed each event as it arrives.
  • Riemann uses event timestamps. Be careful to have clocks synchronized on participating systems or confusion will reign.
  • Riemann only saves the last sample. Riemann only saves, via the index, the last instance of each event for each unique host/service combination. Any other state, other than external to Riemann, will need to be saved in closed function variables.

A Simple Policy Authoring Environment

A important principle behind developing policies is to stay as simple as possible as long as possible. This means avoiding the Cloudify manager itself as long as possible. A simple environment with a quick code/test/debug cycle is available with the following components:

  • The Riemann runtime. As instructed to in the prerequisites, download and install Riemann.
  • A Ruby Interpreter. We'll just expand on the simple example for generating events described in the prerequisites. This requires Ruby available to your command line.
  • A Clojure IDE/REPL. As a beginner, this is essential. The more experience you get, the less critical it is. There are several options. Again, this is for testing out Clojure syntax, not for testing of your Riemann streams. Include the riemann.jar file, in Riemann's lib directory, in your IDE classpath if you want it to understand Riemann APIs.

Exercise 1: A Uselessly Simple Policy

To get your feet wet, let's build a very simple policy with no (likely) real world application. In this policy, we'll trigger our policy triggers when a metric crosses a threshold. We'll work directly in the environment you created in the prerequisites section. In the Riemann etc directory lurks the riemann.config file, which is the default configuration for Riemann and where we'll be trying stuff out. Making a backup of this file is prudent in case you need to start over. Lets look at the bottom of the file:

(let [index (index)]
  ; Inbound events will be passed to these streams:
  (streams
    (default :ttl 60
      ; Index all events immediately.
      index

      ; Log expired events.
      (expired
        (fn [event] (info "expired" event))))))

The let statement at the top defines a new index (by default a hash map), that holds the last sample of each event by host/service pair. Recall that host and service are fundamental keys in the Riemann data model. Rather than take host too literally, think of it more like where the event came from. Likewise, think of service as what the metric is measuring. Ultimately they are just arbitrary strings. Next is the streams function call. This is what Riemann looks for to pass events to. In the default config, the default stream is just setting the default time to live for any events that arrive without ttl fields, and then forwarding to it's children (in this case index and expired). Note that all streams are implicitly passed the event under consideration. Also recall that Riemann is single threaded and processes each event to completion before consuming another. The index stream (from the Riemann API) puts the event in the index, and the expired stream only passes through expired events to its child streams. In this case a custom stream has been created in-line that will simply log events it receives using the Riemann info function. More on custom streams later. For now, we'll use info quite a bit, so we'll be copying the above definition around frequently to see what's happening in our streams (although the more concise Clojure equivalent will be used:

#(info "message" %)

As a first step, lets create a stream that prints out all events that arrive. There can be multiple streams function in the config file, so lets add another with our info log statement:

(let [index (index)]
  ; Inbound events will be passed to these streams:
  (streams
    (default :ttl 60
      ; Index all events immediately.
      index

      ; Log expired events.
      (expired
        (fn [event] (info "expired" event)))))

  (streams
     (info "HELLO WORLD" )
  )
)

Assuming you're in Riemann's etc directory, run ../bin/riemann. You'll see output like this:

INFO [2016-03-02 22:26:43,798] main - riemann.bin - PID 1885
INFO [2016-03-02 22:26:43,854] main - riemann.config - HELLO WORLD
INFO [2016-03-02 22:26:43,964] clojure-agent-send-off-pool-3 - riemann.transport.tcp - TCP server 127.0.0.1 5555 online
INFO [2016-03-02 22:26:43,989] clojure-agent-send-off-pool-0 - riemann.transport.udp - UDP server 127.0.0.1 5555 16384 -1 online
INFO [2016-03-02 22:26:43,991] clojure-agent-send-off-pool-1 - riemann.transport.websockets - Websockets server 127.0.0.1 5556 online
INFO [2016-03-02 22:26:43,992] main - riemann.core - Hyperspace core online

There's a problem here. Look at the second line in the listing. Note that HELLO WORLD has been printed even though we know no event has been sent! This illustrates something you always need to remain aware of with Riemann, and that's the difference between streams and regular Clojure functions. What we're witnessing here is Riemann's "config" step, which executes the streams in order to build a function hierarchy. Riemann is executing each stream, and each child of each stream, recursively to build a tree of functions. Once that tree is built, every event Riemann receives is fed to it. So you see the function we defined getting called at config time (which is useless), but worse that that, the info function returns nil, which gets stored in the function tree. The first time a real event comes in we'll see a message like this:

java.lang.NullPointerException
        at riemann.core$stream_BANG_$fn__5727.invoke(core.clj:19)
        at riemann.core$stream_BANG_.invoke(core.clj:18)
        at riemann.core$instrumentation_service$measure__5736.invoke(core.clj:57)
        at riemann.service.ThreadService$thread_service_runner__3283$fn__3284.invoke(service.clj:71)
        at riemann.service.ThreadService$thread_service_runner__3283.invoke(service.clj:70)
        at clojure.lang.AFn.run(AFn.java:22)
        at java.lang.Thread.run(Thread.java:745)

The NPE is raised in this case because a regular function is a child of the streams function, which unfortunately isn't smart enough to check if it's a high order function. You can use regular functions but mainly in your own streams. Otherwise make sure to wrap functions that a children of stream in a function, like so:

  (streams
     #(info "HELLO WORLD" %)
  )

which produces much more useful output:

INFO [2016-03-02 22:31:37,550] Thread-7 - riemann.config - HELLO WORLD #riemann.codec.Event{:host vagrant-ubuntu-trusty-64, :service riemann streams rate, :state nil, :description nil, :metric 0.0, :tags [riemann], :time 728478948721/500, :ttl 20}

Back to the business at hand. Since policy writing boils down to identifying critical conditions in the event stream and then running the Cloudify supplied stream process-policy-triggers, we can simply substitute an INFO stream with a recognizable message for process-policy-triggers, hit the streams with traffic we feel should cause the desired condition to be breached, and observe the results. Only once the stream has been stressed to our satisfaction do we even consider integrating it into a Cloudify blueprint.

In this case we're going to test a metric for the breach of a threshold. Perusing the Riemann API page, under the streams namespace, we find a stream called over. This simple stream is just a filter that only sends events to its children whose metric exceeds the desired value. For now we'll just use a simple hardcoded '10'. Our streams function looks like this now:

  (streams
    (over 10
     #(info "TRIGGER"  %)
    )
  )

Note I've changed the "HELLO WORLD" to "TRIGGER". Once the policy is debugged, we'll replace the info stream with the process-policy-triggers stream. If I send an event with a metric bigger than 10, I need to see that message. If not, I better not see it. That's brings us to the event driver. Take the Ruby event driver code from the prerequisites section, but we'll modify it a bit and put it in a file called "send.rb" also in the etc directory.

r = Riemann::Client.new
r << { 
host: "www1", 
service: "http req", 
metric: 2.53, 
state: "normal" 
}

Start Riemann, preferable in a terminal session of it's own, and run the following command:

irb -r riemann/client send.rb

This will send our event to Riemann. We expect to see nothing, but if you wait long enough, a bunch of output appears, looking something like:

INFO [2016-03-02 23:58:39,495] Thread-7 - riemann.config - TRIGGER #riemann.codec.Event{:host vagrant-ubuntu-trusty-64, :service riemann index size, :state ok, :description nil, :metric 34, :tags nil, :time 1456963119493/1000, :ttl 20}

These events are from Riemann itself, and we've put no filter to hide them. Let's modify and simplify the config to select only events from the service we're testing. Note that this is unnecessary in production policies, since the Cloudify boilerplate that wraps your code does it for you. Now we have this:

; -*- mode: clojure; -*-
; vim: filetype=clojure

(logging/init {:file "riemann.log"})

; Listen on the local interface over TCP (5555), UDP (5555), and websockets
; (5556)
(let [host "127.0.0.1"]
  (tcp-server {:host host})
  (udp-server {:host host})
  (ws-server  {:host host}))

; Expire old events from the index every 5 seconds.
(periodically-expire 5)

(let [index (index)]
  (streams

    index

    (over 10
     #(info "TRIGGER"  %)
    )
  )
)

Now let's repeat our test. You should see nothing. Now change the metric value in your send.rb to something over 10 and try again. Now you should get something like:

INFO [2016-03-03 00:15:32,761] defaultEventExecutorGroup-2-1 - riemann.config - TRIGGER #riemann.codec.Event{:host www1, :service http req, :state normal, :description nil, :metric 12.53, :tags nil, :time 1456964132, :ttl nil}

Now if we remove the event service filter and replace the info stream, we have something that can be put into a Cloudify policy type:

(let [index (index)]
  (streams
    index
    (over 10
      (process-policy-triggers)
    )
  )
)

Inside a blueprint, we can take just the part of the logic that is specific to the policy, namely the over stream and child. If we put that:

    (over 10
      (process-policy-triggers)
    )

in an arbitrarily named file in the blueprint directory (say "policy.clj") we can define a policy type to use it that might look like this:

policy_types:
  scale_policy_type:
    source: policy.clj

Note we've just defined a policy type, not an actual policy. While we're in the policy type definition, lets consider how bad it is to have threshold-based policy like this with a hard coded ceiling. I'd rather be able to define the ceiling in the blueprint. Cloudify has a syntax for defining policy properties that get fed to the underlying policy code. Let's define a property for our threshold:

policy_types:
  my_policy_type:
    source: policy.clj
    properties:
      threshold:
        description: the threshold
        default: 10

By defining this in the policy type, I tell Cloudify to examine the policy.clj file and do a basic template substitution looking for the string "{{threshold}}" (or more generally "{{property-name}}). So to exploit our new property we make an edit to the policy.clj file:

    (over {{threshold}}
      (process-policy-triggers)
    )

To use the policy type, we have to instantiate it, and this is done in a group definition in the blueprint. The group defines the blueprint nodes that the policy will apply to, and would look something like this:

groups:
  group1:
    members: [node1]
    policies:
      my_policy:
        type: my_policy_type
        properties:
          threshold: 5
        triggers:
          workflow_trigger_1:
            type: cloudify.policies.triggers.execute_workflow
            parameters:
              workflow: some_workflow
              workflow_parameters:
                parm1: val1
                parm2: val2

Now we have a fully functioning, if useless, policy. Note that for integration testing, you'll undoubtedly want to leave log (e.g. info stream) statements in your policies. To see the output on Cloudify manager (as of 3.3.1) tail the file /var/log/cloudify/riemann/riemann.log.

Enhancing the Policy

Triggering a workflow based on a single sample exceeding a threshold is unlikely to be useful in the real world. More likely, some aggregation over time will be useful. Riemann supplies number of streams for aggregating collections of events, which get passed en-masse when they reach a threshold. One useful example is the moving-time-window. moving-time-window accumulates events for a number of seconds (the window), and then sends them in a collection to downstream children every time an event arrives. Downstream children must be streams that handle collections rather than individual events. First let's modify the event driver to send and event every second:

r = Riemann::Client.new
(0..100).each do
  r << {
    host: "www1",
    service: "http req",
    metric: 12.53,
    state: "normal"
  }
  sleep(1)
end

Then let's modify the Clojure to add a moving window:

(let [index (index)]
  (streams
    (where (service "http req")
      index
      (moving-time-window 10
        #(info "EVENT" %)
        (over 10
          #(info "TRIGGER"  %)
        )
      )
    )
  )
)

If you run this you will note that "TRIGGER" is never output. That's because over works on single events and has been handed a collection/seq of events. Let's average the seq's metric values and send that to over. The "go to" stream in Clojure for combining metrics from seqs of events is smao, which applies a regular function (not stream) to a collection of events and passes the result to downstream children. The Riemann folds namespace has a number of utility functions to process seqs of events. Spend some time exploring the code In this case the mean function will do nicely. Recall that in Clojure, explicit namespace references have the syntax namespace/function. So the next version of policy is:

(let [index (index)]
  (streams
    (where (service "http req")
      index
      (moving-time-window 10
        #(info "EVENT" %)
        (smap folds/mean
          (over 10
            #(info "TRIGGER"  %)
          )
        )
      )
    )
  )
)

To make the test a little more interest, we'll modify the test driver to output a metric that increases over time:

r = Riemann::Client.new
(0..100).each do |i|
r << {
host: "www1",
service: "http req",
metric: 5+i,
state: "normal"
}
sleep(1)
end

Running the test driver then shows something a bit more interesting:

INFO [2016-03-03 06:00:50,970] defaultEventExecutorGroup-2-1 - riemann.config - EVENT #riemann.codec.Event{:host www1, :service http req, :state normal, :description nil, :metric 15/2, :tags nil, :time 1456984845, :ttl nil}
INFO [2016-03-03 06:00:51,972] defaultEventExecutorGroup-2-1 - riemann.config - EVENT #riemann.codec.Event{:host www1, :service http req, :state normal, :description nil, :metric 8, :tags nil, :time 1456984845, :ttl nil}
INFO [2016-03-03 06:00:52,976] defaultEventExecutorGroup-2-1 - riemann.config - EVENT #riemann.codec.Event{:host www1, :service http req, :state normal, :description nil, :metric 17/2, :tags nil, :time 1456984845, :ttl nil}
INFO [2016-03-03 06:00:53,977] defaultEventExecutorGroup-2-1 - riemann.config - EVENT #riemann.codec.Event{:host www1, :service http req, :state normal, :description nil, :metric 9, :tags nil, :time 1456984845, :ttl nil}
INFO [2016-03-03 06:00:54,977] defaultEventExecutorGroup-2-1 - riemann.config - EVENT #riemann.codec.Event{:host www1, :service http req, :state normal, :description nil, :metric 19/2, :tags nil, :time 1456984845, :ttl nil}
INFO [2016-03-03 06:00:55,979] defaultEventExecutorGroup-2-1 - riemann.config - EVENT #riemann.codec.Event{:host www1, :service http req, :state normal, :description nil, :metric 21/2, :tags nil, :time 1456984846, :ttl nil}
INFO [2016-03-03 06:00:55,994] defaultEventExecutorGroup-2-1 - riemann.config - TRIGGER #riemann.codec.Event{:host www1, :service http req, :state normal, :description nil, :metric 21/2, :tags nil, :time 1456984846, :ttl nil}

Note the metric increasing sequentially until it exceeds the threshold (10), when it prints the "TRIGGER" log statement. We could go on from here, perhaps parameterizing the window size, adding a cooldown period, and perhaps parameterizing a regular expression for limiting node types. In any case, the same process applies, making incremental improvements, improving the test driver, and then testing with Cloudify itself.

The Index

In Riemann, the index, is that place that events are stored. Only the last event for each host/service pair is saved in the index. Periodically, expired events are removed from the index. By default, the implementation of the index is a hash map. As such, it can be used for storing state in the form of events. Riemann provides the index/update function for storing events and index/lookup for querying the index by host and service. Streams are free to store information in the form of events/maps. The index shouldn't be used as some kind of database for stream state however; see Creating Your Own Streams below for the proper method.
Since events in the index are expired periodically based on their :ttl field (or expired :state field), it is handy for algorithms that require a timeout. Consider the threshold policy above, where a "cooldown" period is desired after invoking the process-policy-triggers stream. One solutions is to create a synthetic event, with a :ttl value equal to the cooldown time and write it to the index:

(index/update index {:host "phony" :service "cooldown" :ttl 60})

Then I'd modify my process-policy-triggers call to be inside a condition, something like:

(if (nil? (index/lookup index "phony" "cooldown")) (process-policy-triggers))

So now, only if my lookup of the "cooldown" event returns nil (meaning it's been expired by the :ttl and removed from the index), only then will I call process-policy-triggers again.

Creating Your Own Streams

Despite the rich API of Riemann, you are likely to need to write your own streams at some point. As mentioned earlier, streams are high order functions. Streams return a function that processes events; they don't process events themselves. Streams can also be composed by containing downstream streams, as we've seen in the Riemann API and the examples above. To make the most out of writing your own streams, you'll need to get a bit beyond the basics of Clojure, and perhaps read up a bit on the software transactional memory and persistent data structures aspects of the language. Let's consider the simplest possible stream:

(fn [e] (info e))

This is the same stream we've been using to log events (#(info "event" %)) without the # syntax shortcut. When I need to compose child streams, the definition is still simple but somewhat different:

(fn [e & children] (info e) (call-rescue e children))

call-rescue is a Clojure macro that calls child streams and handles exceptions. The & children syntax in the function parameters declaration is Clojure's version of a variable argument list. This stream can handle any number of children. An equally valid stream, that takes boolean predicate for example, might only allow two child streams; one for true and one for false.

Recall that streams (that are children of the Riemann streams stream) are called at startup time in order to configure them and get the function references used for processing actual events. Besides enabling one-time startup configuration of event processing functions, high order functions also enable the capturing of state. Consider a stream that sums the metrics flowing through it, passing the sum to child streams. To do this we'll enclose a Clojure atom that refers to the sum:

(let [ sum (atom 0) ]
  (fn [ e ]
    (do
      (swap! sum + (:metric e))
      (call-rescue (assoc e :metric @sum) children)
    )
  )
)

Since the let is external to the internal function, it gets enclosed by the function and can be used to store state. In this case a reference that points to the sum. Recall that since Clojure data structures are immutable, you don't have stateful objects or structures. Instead everything is values referenced by symbolic names. In this case, sum is an atom (a kind of simple reference), which the swap! function modifies to point at a new sum as the events flow through. This stream, configured with a child that dumps the resulting event (created by the assoc function above), creates output similar to this:

INFO [2016-03-03 17:43:49,679] defaultEventExecutorGroup-2-1 - riemann.config - EVENT #riemann.codec.Event{:host www1, :service http req, :state normal, :description nil, :metric 5, :tags nil, :time 1457027029, :ttl nil}
INFO [2016-03-03 17:43:50,625] defaultEventExecutorGroup-2-1 - riemann.config - #<Atom@4f2453fa: 5>
INFO [2016-03-03 17:43:50,626] defaultEventExecutorGroup-2-1 - riemann.config - EVENT #riemann.codec.Event{:host www1, :service http req, :state normal, :description nil, :metric 11, :tags nil, :time 1457027030, :ttl nil}
INFO [2016-03-03 17:43:51,625] defaultEventExecutorGroup-2-1 - riemann.config - #<Atom@4f2453fa: 11>
INFO [2016-03-03 17:43:51,626] defaultEventExecutorGroup-2-1 - riemann.config - EVENT #riemann.codec.Event{:host www1, :service http req, :state normal, :description nil, :metric 18, :tags nil, :time 1457027031, :ttl nil}

Note that the metric value in the events has been replaced by the sum. A similar approach is used to create the moving-time-window stream that we used above from Riemann. moving-time-window closes around a Clojure vector, and adds events as they arrive, sending the whole vector downstream to children. Once you are commited to writing your own streams, spend some time examining the Riemann source in github, and especially the streams.clj code.

Important Note For Custom Streams in Policies: Cloudify doesn't provide a means for defining functions external to the (streams element in the configuration. This means that, unlike the examples you may see on github, as a policy writer your custom streams must be define in-line in the policy.