Skip to content

Commit

Permalink
Merge pull request #49 from gianm/rowkey-hashing
Browse files Browse the repository at this point in the history
Rollup-aware partitioning by default for Maps, and a Partitioner interface for other types.
  • Loading branch information
fjy committed Nov 21, 2015
2 parents d709dd9 + 6e35d77 commit d03dfd8
Show file tree
Hide file tree
Showing 12 changed files with 729 additions and 139 deletions.
Original file line number Diff line number Diff line change
@@ -1,44 +1,29 @@
/*
* Tranquility.
* Copyright 2013, 2014, 2015 Metamarkets Group, Inc.
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.metamx.tranquility.beam

import com.google.common.hash.Hashing
import com.metamx.common.scala.Logging
import com.twitter.util.Future
import com.metamx.tranquility.partition.HashCodePartitioner

/**
* Partitions events based on their hashCode modulo the number of delegate beams, and propagates the partitioned events
* via the appropriate beam.
*/
class HashPartitionBeam[A](
val delegates: IndexedSeq[Beam[A]]
) extends Beam[A] with Logging
beams: IndexedSeq[Beam[A]]
) extends MergingPartitioningBeam[A](new HashCodePartitioner[A], beams)
{
def propagate(events: Seq[A]) = {
val futures = events.groupBy(event => Hashing.consistentHash(event.hashCode, delegates.size)) map {
case (i, group) =>
delegates(i).propagate(group)
}
Future.collect(futures.toList).map(_.sum)
}

def close() = {
Future.collect(delegates map (_.close())) map (_ => ())
}

override def toString = "HashPartitionBeam(%s)" format delegates.mkString(", ")
override def toString = s"HashPartitionBeam(${beams.mkString(", ")})"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.metamx.tranquility.beam

import com.metamx.common.scala.Logging
import com.metamx.tranquility.partition.Partitioner
import com.twitter.util.Future

/**
* Partitions events based on the output of a Partitioner, and propagates the partitioned events via the
* appropriate underlying beams.
*/
class MergingPartitioningBeam[A](
val partitioner: Partitioner[A],
val beams: IndexedSeq[Beam[A]]
) extends Beam[A] with Logging
{
def propagate(events: Seq[A]) = {
val futures = events.groupBy(partitioner.partition(_, beams.size)) map {
case (i, group) =>
beams(i).propagate(group)
}
Future.collect(futures.toList).map(_.sum)
}

def close() = {
Future.collect(beams map (_.close())) map (_ => ())
}

override def toString = s"MergingPartitioningBeam(${beams.mkString(", ")})"
}
137 changes: 88 additions & 49 deletions core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ import com.metamx.emitter.service.ServiceEmitter
import com.metamx.tranquility.beam.Beam
import com.metamx.tranquility.beam.ClusteredBeam
import com.metamx.tranquility.beam.ClusteredBeamTuning
import com.metamx.tranquility.beam.HashPartitionBeam
import com.metamx.tranquility.beam.MergingPartitioningBeam
import com.metamx.tranquility.finagle.BeamService
import com.metamx.tranquility.finagle.FinagleRegistry
import com.metamx.tranquility.finagle.FinagleRegistryConfig
import com.metamx.tranquility.partition.GenericTimeAndDimsPartitioner
import com.metamx.tranquility.partition.Partitioner
import com.metamx.tranquility.typeclass.JavaObjectWriter
import com.metamx.tranquility.typeclass.JsonWriter
import com.metamx.tranquility.typeclass.ObjectWriter
Expand All @@ -50,29 +52,29 @@ import scala.collection.JavaConverters._
import scala.language.reflectiveCalls

/**
* Builds Beams or Finagle services that send events to the Druid indexing service.
*
* {{{
* val curator = CuratorFrameworkFactory.newClient("localhost:2181", new BoundedExponentialBackoffRetry(100, 30000, 30))
* curator.start()
* val dataSource = "foo"
* val dimensions = Seq("bar")
* val aggregators = Seq(new LongSumAggregatorFactory("baz", "baz"))
* val service = DruidBeams
* .builder[Map[String, Any]](eventMap => new DateTime(eventMap("timestamp")))
* .curator(curator)
* .discoveryPath("/test/discovery")
* .location(DruidLocation(new DruidEnvironment("druid:local:indexer", "druid:local:firehose:%s"), dataSource))
* .rollup(DruidRollup(dimensions, aggregators, QueryGranularity.MINUTE))
* .tuning(new ClusteredBeamTuning(Granularity.HOUR, 10.minutes, 1, 1))
* .buildService()
* val future = service(Seq(Map("timestamp" -> "2010-01-02T03:04:05.678Z", "bar" -> "hey", "baz" -> 3)))
* println("result = %s" format Await.result(future))
* }}}
*
* Your event type (in this case, {{{Map[String, Any]}}} must be serializable via Jackson to JSON that Druid can
* understand. If Jackson is not an appropriate choice, you can provide an ObjectWriter via {{{.objectWriter(...)}}}.
*/
* Builds Beams or Finagle services that send events to the Druid indexing service.
*
* {{{
* val curator = CuratorFrameworkFactory.newClient("localhost:2181", new BoundedExponentialBackoffRetry(100, 30000, 30))
* curator.start()
* val dataSource = "foo"
* val dimensions = Seq("bar")
* val aggregators = Seq(new LongSumAggregatorFactory("baz", "baz"))
* val service = DruidBeams
* .builder[Map[String, Any]](eventMap => new DateTime(eventMap("timestamp")))
* .curator(curator)
* .discoveryPath("/test/discovery")
* .location(DruidLocation(new DruidEnvironment("druid:local:indexer", "druid:local:firehose:%s"), dataSource))
* .rollup(DruidRollup(dimensions, aggregators, QueryGranularity.MINUTE))
* .tuning(new ClusteredBeamTuning(Granularity.HOUR, 10.minutes, 1, 1))
* .buildService()
* val future = service(Seq(Map("timestamp" -> "2010-01-02T03:04:05.678Z", "bar" -> "hey", "baz" -> 3)))
* println("result = %s" format Await.result(future))
* }}}
*
* Your event type (in this case, {{{Map[String, Any]}}} must be serializable via Jackson to JSON that Druid can
* understand. If Jackson is not an appropriate choice, you can provide an ObjectWriter via {{{.objectWriter(...)}}}.
*/
object DruidBeams
{
val DefaultTimestampSpec = new TimestampSpec("timestamp", "iso", null)
Expand Down Expand Up @@ -108,52 +110,77 @@ object DruidBeams

def rollup(rollup: DruidRollup) = new Builder[EventType](config.copy(_rollup = Some(rollup)))

def timestampSpec(timestampSpec: TimestampSpec) = new Builder[EventType](config.copy(_timestampSpec = Some(timestampSpec)))
def timestampSpec(timestampSpec: TimestampSpec) = {
new Builder[EventType](config.copy(_timestampSpec = Some(timestampSpec)))
}

def clusteredBeamZkBasePath(path: String) = new Builder[EventType](config.copy(_clusteredBeamZkBasePath = Some(path)))
def clusteredBeamZkBasePath(path: String) = {
new Builder[EventType](config.copy(_clusteredBeamZkBasePath = Some(path)))
}

def clusteredBeamIdent(ident: String) = new Builder[EventType](config.copy(_clusteredBeamIdent = Some(ident)))

def druidBeamConfig(beamConfig: DruidBeamConfig) = new Builder[EventType](config.copy(_druidBeamConfig = Some(beamConfig)))
def druidBeamConfig(beamConfig: DruidBeamConfig) = {
new Builder[EventType](config.copy(_druidBeamConfig = Some(beamConfig)))
}

def emitter(emitter: ServiceEmitter) = new Builder[EventType](config.copy(_emitter = Some(emitter)))

def finagleRegistry(registry: FinagleRegistry) = new Builder[EventType](config.copy(_finagleRegistry = Some(registry)))
def finagleRegistry(registry: FinagleRegistry) = {
new Builder[EventType](config.copy(_finagleRegistry = Some(registry)))
}

def timekeeper(timekeeper: Timekeeper) = new Builder[EventType](config.copy(_timekeeper = Some(timekeeper)))

def beamDecorateFn(f: (Interval, Int) => Beam[EventType] => Beam[EventType]) = new
Builder(config.copy(_beamDecorateFn = Some(f)))
def beamDecorateFn(f: (Interval, Int) => Beam[EventType] => Beam[EventType]) = {
new Builder(config.copy(_beamDecorateFn = Some(f)))
}

def beamMergeFn(f: Seq[Beam[EventType]] => Beam[EventType]) = {
if (config._partitioner.nonEmpty) {
throw new IllegalStateException("Cannot set both 'beamMergeFn' and 'partitioner'")
}
new Builder[EventType](config.copy(_beamMergeFn = Some(f)))
}

def beamMergeFn(f: Seq[Beam[EventType]] => Beam[EventType]) = new Builder[EventType](config.copy(_beamMergeFn = Some(f)))
def partitioner(partitioner: Partitioner[EventType]) = {
if (config._beamMergeFn.nonEmpty) {
throw new IllegalStateException("Cannot set both 'beamMergeFn' and 'partitioner'")
}
new Builder[EventType](config.copy(_partitioner = Some(partitioner)))
}

def alertMap(d: Dict) = new Builder[EventType](config.copy(_alertMap = Some(d)))

@deprecated("use .objectWriter(...)", "0.2.21")
def eventWriter(writer: ObjectWriter[EventType]) = new Builder[EventType](config.copy(_objectWriter = Some(writer)))
def eventWriter(writer: ObjectWriter[EventType]) = {
new Builder[EventType](config.copy(_objectWriter = Some(writer)))
}

def objectWriter(writer: ObjectWriter[EventType]) = new Builder[EventType](config.copy(_objectWriter = Some(writer)))
def objectWriter(writer: ObjectWriter[EventType]) = {
new Builder[EventType](config.copy(_objectWriter = Some(writer)))
}

def objectWriter(writer: JavaObjectWriter[EventType]) = {
new Builder[EventType](config.copy(_objectWriter = Some(ObjectWriter.wrap(writer))))
}

def eventTimestamped(timeFn: EventType => DateTime) = new Builder[EventType](
config.copy(
_timestamper = Some(
new Timestamper[EventType]
{
def timestamp(a: EventType) = timeFn(a)
}
def eventTimestamped(timeFn: EventType => DateTime) = {
new Builder[EventType](
config.copy(
_timestamper = Some(
new Timestamper[EventType]
{
def timestamp(a: EventType) = timeFn(a)
}
)
)
)
)
}

def buildBeam(): Beam[EventType] = {
val things = config.buildAll()
implicit val eventTimestamped = things.timestamper getOrElse {
throw new IllegalArgumentException("WTF?! Should have had a Timestamperable event...")
}
implicit val eventTimestamped = things.timestamper
val lifecycle = new Lifecycle
val indexService = new IndexService(
things.location.environment,
Expand Down Expand Up @@ -222,6 +249,7 @@ object DruidBeams
_finagleRegistry: Option[FinagleRegistry] = None,
_timekeeper: Option[Timekeeper] = None,
_beamDecorateFn: Option[(Interval, Int) => Beam[EventType] => Beam[EventType]] = None,
_partitioner: Option[Partitioner[EventType]] = None,
_beamMergeFn: Option[Seq[Beam[EventType]] => Beam[EventType]] = None,
_alertMap: Option[Dict] = None,
_objectWriter: Option[ObjectWriter[EventType]] = None,
Expand Down Expand Up @@ -280,19 +308,30 @@ object DruidBeams
val beamDecorateFn = _beamDecorateFn getOrElse {
(interval: Interval, partition: Int) => (beam: Beam[EventType]) => beam
}
val beamMergeFn = _beamMergeFn getOrElse {
(beams: Seq[Beam[EventType]]) => new HashPartitionBeam[EventType](beams.toIndexedSeq)
}
val alertMap = _alertMap getOrElse Map.empty
val objectWriter = _objectWriter getOrElse {
new JsonWriter[EventType]
{
protected def viaJsonGenerator(a: EventType, jg: JsonGenerator) {
override protected def viaJsonGenerator(a: EventType, jg: JsonGenerator): Unit = {
scalaObjectMapper.writeValue(jg, a)
}
}
}
val timestamper = _timestamper
val timestamper = _timestamper getOrElse {
throw new IllegalArgumentException("WTF?! Should have had a Timestamperable event...")
}
val beamMergeFn = _beamMergeFn getOrElse {
val partitioner = _partitioner getOrElse {
GenericTimeAndDimsPartitioner.create(
timestamper,
timestampSpec,
rollup
)
}
(beams: Seq[Beam[EventType]]) => {
new MergingPartitioningBeam[EventType](partitioner, beams.toIndexedSeq)
}
}
}
}

Expand Down
Loading

0 comments on commit d03dfd8

Please sign in to comment.