Skip to content

Guide to custom operations

pld edited this page Mar 4, 2013 · 6 revisions

Cascalog has many flavors of functions, filters, and aggregators. As discussed in How Cascalog executes a query, a Cascalog query is composed of a list of predicates that define and constrain its result variables. There are three categories of predicates (discussed in Nathan's introduction):

Generators: A generator is a source of tuples. The various operations in a query act on the tuples supplied by the query's generators.

Operations: These accept a series of input variables, and act either as functions that bind new output variables, or as filters that capture or release tuples based on the truthiness of their return values. Examples include (split ?two-chars :> ?first-char ?last-char) and (< ?val 5).

Aggregators: While operations act on one tuple at a time, aggregators act on sequences of tuples. Examples include count, sum, min and max.

Let's go into each of these in a bit more depth.

Generators

A cascalog generator appears within a query as a list with the generator var, following by a number of output variables equal to the generator's tuple fields. A generator with two output fields can be composed in two ways, depending on if you have defined an array that holds the variables you wish to output the generated data to:

    (<- [?a ?b]
        (generator :> ?a ?b))

Or (note the use of stringified variable names; the :>> operator will interpolate them for you)

    (def output-variables ["?a" "?b"])

    (<- output-variables 
        (generator :>> output-variables))

If no :> keyword is specified, the variables are considered input variables for operations and output variables for generators and aggregators. Hence this code will result in the same output:

    (<- [?a ?b] (generator-seq ?a ?b))

Cascalog allows for three types of generators:

Clojure Sequences: These are the simplest form of generator, and are ideal for testing (note the use of the ?<- operator to actually run the queries and emit their output to stdout - if we were just defining the queries we would use the <- operator). For example:

    (def generator-seq [["a" 1]
                        ["b" 2]])

    (?<- (stdout) [?a ?b] (generator-seq :> ?a ?b))

Results in:

    ["a" 1]
    ["b" 2]

Existing Queries, defined by <-: Queries are eminently composable; very complex workflows can be decomposed into multiple subqueries, allowing for solid, abstract design. For example, re-using generator-seq from above:

    (let [subquery (<- [?a ?b] (generator-seq ?a ?b))]
      (?<- (stdout) [?also-a ?also-b]
          (subquery ?also-a ?also-b)))

Results in, as before:

    ["a" 1]
    ["b" 2]

Cascading Taps: These taps process data from a wide range of input sources into tuple format. Most production workflows will make heavy use of these taps as the initial data source for low-level queries.

The hfs-textline function, for example, located in cascalog.api, accepts a path to a file or directory containing textfiles, and returns a Cascading tap that produces a 1-tuple for each line of text. These source textfiles can be terabytes in size; Cascalog will take care of parallelization for us. All we have to think about are the 1-tuples generated by the tap.

As an example, for a file /some/textfile.txt with the following lines of text:

Rage -- Goddess, sing the rage of Peleus' son Achilles,
murderous, doomed, that cost the Achaeans countless losses,
hurling down to the house of Death so many sturdy souls,

The following query:

    (let [text-tap (hfs-textline "/some/textfile.txt")]
      (?<- (stdout) [?textline]
          (text-tap ?textline)))

Would generate these 1-tuples:

    ["Rage -- Goddess, sing the rage of Peleus' son Achilles,"]
    ["murderous, doomed, that cost the Achaeans countless losses,"]
    ["hurling down to the house of Death so many sturdy souls,"]

For the following operation examples, we'll make use of the following "test" dataset of 2-tuples, generated from a clojure sequence:

    (def test-tap [["a" 1] 
                   ["b" 2]
                   ["a" 3]])

Operations

Custom operations return tuples. A tuple is just a vector of values. So [1 2 3] is a 3-tuple, and ["a" 2] is a 2-tuple. A non-seq value is interpreted as a 1-tuple. If Cascalog expects a tuple as a return from an operation and receives "2", it will translate that to [ 2 ] for you.

defmapop: Defines a custom operation which adds fields to a tuple. Expects a single tuple to be returned.

e.g.

        (defmapop add-2-fields [x] [1 2]) 

        (?<- (stdout) [?a ?b ?c] (test-tap :> _ ?a) (add-2-fields ?a :> ?b ?c)) 

Results in:

        [1 1 2] 
        [2 1 2] 
        [3 1 2] 

deffilterop: Defines a custom operation which only keeps tuples for which this operation returns true.

        (deffilterop is-2? [x] (= x 2)) 

        (?<- (stdout) [?a] (test-tap :> ?a ?b) (is-2? ?b)) 

Results in:

        ["b"] 

Note that filtering can also be accomplished by defining static variables within some query. For example, the above query could be replaced with

        (?<- (stdout) [?a] (test-tap :> ?a 2))

and produce identical results.

defmapcatop: Defines a custom operation which creates multiple tuples for a given input. Return should be a seq of tuples. [ [1 2] ] would be interpreted as having one 2-tuple returned, while [1 2] would be interpreted as being two 1-tuples returned. (One sign that you've forgotten to wrap your tuples in a seq is an error message noting that the operation added the wrong number of fields.)

        (defmapcatop two-more-tuples [x] [ [(inc x)] [(+ 2 x)] ]) 

        (?<- (stdout) [?a ?b ?c] (test-tap :> ?a ?b) (two-more-tuples ?b :> ?c)) 

Results in:

        ["a" 1 2]
        ["a" 1 3]
        ["b" 2 3]
        ["b" 2 4]
        ["a" 3 4]
        ["a" 3 5]

Vanilla Clojure functions can also be used as operations. When given no output vars they work as filterops, and when given output vars they work as mapops. The drawback of using a regular Clojure function is that they're more verbose to use dynamically. For example:

    (defn mk-query [op]
      (<- [?a ?b]
          (test-tap :> _ ?a)
          (op ?a :> ?b)))

The op passed to mk-query must either be defined using one of Cascalog's def* macros or be a var. This is because Cascalog uses the var name of functions to distribute the operation across the cluster. The def* macros deal with this under the hood, but vanilla clojure clojure functions must employ the #' macro to retrieve a symbol's var. For example:

    ;; With a vanilla clojure function:
    (mk-query #'odd?) ; valid
    (mk-query odd?)   ; will result in error
    
    ;; Or, wrapped in a defmapop:
    (defmapop valid-odd? [x] (odd? x))
    (mk-query valid-odd?) ; valid

Aggregators

defbufferop: Defines an aggregator which receives all the tuples for the group in a single seq. Buffers cannot be used with any other buffers/aggregators in a query. Buffers operate reduce-side. Buffers should return a seq of tuples.

        (defbufferop dosum [tuples] [(reduce + (map first tuples))]) 

        (<- [?a ?sum] (test-tap :> ?a ?b) (dosum ?b :> ?sum)) 

Results in:

        ["a" 4] 
        ["b" 2] 

defaggregateop: Defines an aggregator which must be written in a more restricted way. Aggregators can be used with other aggregators in a query (i.e., you can do a count and sum of a group at same time). Aggregators operate reduce-side. Aggregators consist of code for "initializing", "aggregating", and "extracting a result". Aggregators return a seq of tuples. Aggregators have the same performance as buffers but are more composable, so aggregators are preferable to buffers when possible. Aggregators return a seq of tuples.

An aggregateop accumulates some state over the course of the aggregation.

The code body with no parameters sets the initial value of the state.

The code body with >1 parameter is the "accumulation" function. It takes a state value and a tuple in the aggregation and returns a new state value. In this case, it receives a 1-tuple containing the next value in the grouping to add to the sum. If the aggregator took in 2-tuples as input, that piece of code would take 3 parameters (1 for the state, 2 for the tuple).

The code body with 1 parameter is the "return" function. It takes in the totally accumulated state and returns a seq of tuples as output. In this case it just returns the state as-is. [state] is equivalent to saying [ [state] ].

    (defaggregateop dosum
      ([] 0)
      ([state val] (+ state val))
      ([state] [state]))

      (<- [?a ?sum] (test-tap ?a ?b) (dosum ?b :> ?sum)) 

The results of the aggregateop version of dosum are identical to the bufferop version:

        ["a" 4] 
        ["b" 2]

defparallelagg: Defines an even more restricted aggregator that is defined using two functions. These aggregators are more efficient as they make use of map-side combiner optimizations. parallelaggs can be composed with other parallelaggs/regular aggregators. However, when composed with regular aggregators the entire computation is moved reduce-side.

The init function is run once per tuple, and the combine function is run on the results of inits and other combines until only one value is left.

    (defparallelagg dosum :init-var #'identity :combine-var #'+)

    (<- [?a ?sum] (test-tap ?a ?b) (dosum ?b :> ?sum)) 

Again, our results are the same:

        ["a" 4] 
        ["b" 2]

TO BE CONTINUED

Future revisions will include discussion of:

  • defbufferiterop
  • defmultibufferop
  • defparallelbuf