-
Notifications
You must be signed in to change notification settings - Fork 266
Batch and realtime
Running in a hybrid batch/realtime mode requires a few special concepts and configuration parameters.
TODO: This section requires some cleanup to mesh with the new API.
The supplied Storm and Scalding platforms use instances of the TimeExtractor[T]
typeclass to pull timestamps out of instances of T
on source construction.
TODO: Examples and information on how this mechanism works.
Every store is required to declare a Batcher
:
import com.twitter.summingbird.batch.Batcher
implicit val batcher = Batcher.ofHours(1)
Each input is tagged with a time, which should be derived from the serialized representation that enters the Job. From that time a Batcher assigns the event to a batch. Batchers are used for partial aggregation. Internally, summingbird never partially aggregates past two items which are not in the same batch. This is used where we partially aggregate, which is at Summer nodes before pushing into stores and map-side partial aggregation before summers.
When we finally write to the store, summingbird online keeps track of data partitioned into batches. This prevents error from corrupting one batch into the next. In offline mode, we aggregate all date BEFORE a given batch, which, due to associativity, allows us to merge with an Online sequence of batches.
Batching is the secret sauce that allows Summingbird’s client to merge the output of a Hadoop aggregation and a Storm aggregation. You shouldn’t have to think about batching when writing your Summingbird jobs, but it helps to know what’s going on behind the scenes.
A Hadoop job will run every time a new batch of data becomes available (in this case, one hour of new data). Every new key stores some metadata about which batches have been aggregated into its partial value. So, instead of storing (K, V)
pairs, Summingbird’s Scalding job stores (K, (BatchID, V))
. The paired BatchID
is the first batch that Scalding has NOT yet processed.
The Storm job, on the other hand, breaks down the (K, V)
pairs it receives into ((K, BatchID), V)
. When a new Scalding run completes and drops a particular batch, Storm’s data store can safely drop all partial aggregations that reference that particular batch.
When a client performs a lookup into a hybrid summingbird system, it follows this algorithm:
- Send a
K
to the key-value store holding Scalding data and get back(BatchID, V)
. - Use this batch and the current batch (based on the wall clock) to generate a sequence of missing batches.
- For each batch in this sequence, query store for
(K, BatchID)
. - Sum all partial values and return the final value to the client
If Scalding returns BatchID(5)
and the wall clock states that the current batch is BatchID(7)
, the Summingbird client will fetch batches 5, 6, and 7 from Storm’s datastore and merge the three resulting partial values with the original value received from Manhattan. If any of these lookups fail, the entire lookup will fail, ensuring that the merged client only returns a fully-merged value.