Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Batch and realtime

P. Oscar Boykin edited this page Oct 12, 2013 · 4 revisions

Running in a hybrid batch/realtime mode requires a few special concepts and configuration parameters.

TODO: This section requires some cleanup to mesh with the new API.

TimeExtractor

The supplied Storm and Scalding platforms use instances of the TimeExtractor[T] typeclass to pull timestamps out of instances of T on source construction.

TODO: Examples and information on how this mechanism works.

Batcher and BatchID

Every store is required to declare a Batcher:

import com.twitter.summingbird.batch.Batcher

implicit val batcher = Batcher.ofHours(1)

Each input is tagged with a time, which should be derived from the serialized representation that enters the Job. From that time a Batcher assigns the event to a batch. Batchers are used for partial aggregation. Internally, summingbird never partially aggregates past two items which are not in the same batch. This is used where we partially aggregate, which is at Summer nodes before pushing into stores and map-side partial aggregation before summers.

When we finally write to the store, summingbird online keeps track of data partitioned into batches. This prevents error from corrupting one batch into the next. In offline mode, we aggregate all date BEFORE a given batch, which, due to associativity, allows us to merge with an Online sequence of batches.

Batching is the secret sauce that allows Summingbird’s client to merge the output of a Hadoop aggregation and a Storm aggregation. You shouldn’t have to think about batching when writing your Summingbird jobs, but it helps to know what’s going on behind the scenes.

A Hadoop job will run every time a new batch of data becomes available (in this case, one hour of new data). Every new key stores some metadata about which batches have been aggregated into its partial value. So, instead of storing (K, V) pairs, Summingbird’s Scalding job stores (K, (BatchID, V)). The paired BatchID is the first batch that Scalding has NOT yet processed.

The Storm job, on the other hand, breaks down the (K, V) pairs it receives into ((K, BatchID), V). When a new Scalding run completes and drops a particular batch, Storm’s data store can safely drop all partial aggregations that reference that particular batch.

When a client performs a lookup into a hybrid summingbird system, it follows this algorithm:

  • Send a K to the key-value store holding Scalding data and get back (BatchID, V).
  • Use this batch and the current batch (based on the wall clock) to generate a sequence of missing batches.
  • For each batch in this sequence, query store for (K, BatchID).
  • Sum all partial values and return the final value to the client

If Scalding returns BatchID(5) and the wall clock states that the current batch is BatchID(7), the Summingbird client will fetch batches 5, 6, and 7 from Storm’s datastore and merge the three resulting partial values with the original value received from Manhattan. If any of these lookups fail, the entire lookup will fail, ensuring that the merged client only returns a fully-merged value.