From 6e35d77c26699542a1a23b856e17b910b201e888 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 6 Nov 2015 16:58:45 -0800 Subject: [PATCH] Rollup-aware partitioning by default for Maps, and a Partitioner interface for other types. The default partitioning only works properly for Maps because other types are actually opaque to tranquility (it doesn't know how the ObjectWriter is going to serialize them). But, you can provide your own Partitioner now (or an entire beamMergeFn, which you could already do). Fixes #6. --- .../tranquility/beam/HashPartitionBeam.scala | 53 +++--- .../beam/MergingPartitioningBeam.scala | 48 ++++++ .../metamx/tranquility/druid/DruidBeams.scala | 137 +++++++++------ .../tranquility/druid/DruidRollup.scala | 112 +++++++----- .../GenericTimeAndDimsPartitioner.scala | 95 +++++++++++ .../partition/HashCodePartitioner.scala | 33 ++++ .../tranquility/partition/Partitioner.scala | 82 +++++++++ .../tranquility/typeclass/Timestamper.scala | 2 +- .../tranquility/javatests/JavaApiTest.java | 8 +- .../tranquility/test/DruidRollupTest.scala | 70 ++++++-- .../GenericTimeAndDimsPartitionerTest.scala | 159 ++++++++++++++++++ .../test/HashPartitionBeamTest.scala | 69 ++++++++ 12 files changed, 729 insertions(+), 139 deletions(-) create mode 100644 core/src/main/scala/com/metamx/tranquility/beam/MergingPartitioningBeam.scala create mode 100644 core/src/main/scala/com/metamx/tranquility/partition/GenericTimeAndDimsPartitioner.scala create mode 100644 core/src/main/scala/com/metamx/tranquility/partition/HashCodePartitioner.scala create mode 100644 core/src/main/scala/com/metamx/tranquility/partition/Partitioner.scala create mode 100644 core/src/test/scala/com/metamx/tranquility/test/GenericTimeAndDimsPartitionerTest.scala create mode 100644 core/src/test/scala/com/metamx/tranquility/test/HashPartitionBeamTest.scala diff --git a/core/src/main/scala/com/metamx/tranquility/beam/HashPartitionBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/HashPartitionBeam.scala index 042064a..7d2a462 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/HashPartitionBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/HashPartitionBeam.scala @@ -1,44 +1,29 @@ /* - * Tranquility. - * Copyright 2013, 2014, 2015 Metamarkets Group, Inc. + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + package com.metamx.tranquility.beam -import com.google.common.hash.Hashing -import com.metamx.common.scala.Logging -import com.twitter.util.Future +import com.metamx.tranquility.partition.HashCodePartitioner -/** - * Partitions events based on their hashCode modulo the number of delegate beams, and propagates the partitioned events - * via the appropriate beam. - */ class HashPartitionBeam[A]( - val delegates: IndexedSeq[Beam[A]] -) extends Beam[A] with Logging + beams: IndexedSeq[Beam[A]] +) extends MergingPartitioningBeam[A](new HashCodePartitioner[A], beams) { - def propagate(events: Seq[A]) = { - val futures = events.groupBy(event => Hashing.consistentHash(event.hashCode, delegates.size)) map { - case (i, group) => - delegates(i).propagate(group) - } - Future.collect(futures.toList).map(_.sum) - } - - def close() = { - Future.collect(delegates map (_.close())) map (_ => ()) - } - - override def toString = "HashPartitionBeam(%s)" format delegates.mkString(", ") + override def toString = s"HashPartitionBeam(${beams.mkString(", ")})" } diff --git a/core/src/main/scala/com/metamx/tranquility/beam/MergingPartitioningBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/MergingPartitioningBeam.scala new file mode 100644 index 0000000..1e3e885 --- /dev/null +++ b/core/src/main/scala/com/metamx/tranquility/beam/MergingPartitioningBeam.scala @@ -0,0 +1,48 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.beam + +import com.metamx.common.scala.Logging +import com.metamx.tranquility.partition.Partitioner +import com.twitter.util.Future + +/** + * Partitions events based on the output of a Partitioner, and propagates the partitioned events via the + * appropriate underlying beams. + */ +class MergingPartitioningBeam[A]( + val partitioner: Partitioner[A], + val beams: IndexedSeq[Beam[A]] +) extends Beam[A] with Logging +{ + def propagate(events: Seq[A]) = { + val futures = events.groupBy(partitioner.partition(_, beams.size)) map { + case (i, group) => + beams(i).propagate(group) + } + Future.collect(futures.toList).map(_.sum) + } + + def close() = { + Future.collect(beams map (_.close())) map (_ => ()) + } + + override def toString = s"MergingPartitioningBeam(${beams.mkString(", ")})" +} diff --git a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala index d58e935..31f4faf 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala @@ -30,10 +30,12 @@ import com.metamx.emitter.service.ServiceEmitter import com.metamx.tranquility.beam.Beam import com.metamx.tranquility.beam.ClusteredBeam import com.metamx.tranquility.beam.ClusteredBeamTuning -import com.metamx.tranquility.beam.HashPartitionBeam +import com.metamx.tranquility.beam.MergingPartitioningBeam import com.metamx.tranquility.finagle.BeamService import com.metamx.tranquility.finagle.FinagleRegistry import com.metamx.tranquility.finagle.FinagleRegistryConfig +import com.metamx.tranquility.partition.GenericTimeAndDimsPartitioner +import com.metamx.tranquility.partition.Partitioner import com.metamx.tranquility.typeclass.JavaObjectWriter import com.metamx.tranquility.typeclass.JsonWriter import com.metamx.tranquility.typeclass.ObjectWriter @@ -50,29 +52,29 @@ import scala.collection.JavaConverters._ import scala.language.reflectiveCalls /** - * Builds Beams or Finagle services that send events to the Druid indexing service. - * - * {{{ - * val curator = CuratorFrameworkFactory.newClient("localhost:2181", new BoundedExponentialBackoffRetry(100, 30000, 30)) - * curator.start() - * val dataSource = "foo" - * val dimensions = Seq("bar") - * val aggregators = Seq(new LongSumAggregatorFactory("baz", "baz")) - * val service = DruidBeams - * .builder[Map[String, Any]](eventMap => new DateTime(eventMap("timestamp"))) - * .curator(curator) - * .discoveryPath("/test/discovery") - * .location(DruidLocation(new DruidEnvironment("druid:local:indexer", "druid:local:firehose:%s"), dataSource)) - * .rollup(DruidRollup(dimensions, aggregators, QueryGranularity.MINUTE)) - * .tuning(new ClusteredBeamTuning(Granularity.HOUR, 10.minutes, 1, 1)) - * .buildService() - * val future = service(Seq(Map("timestamp" -> "2010-01-02T03:04:05.678Z", "bar" -> "hey", "baz" -> 3))) - * println("result = %s" format Await.result(future)) - * }}} - * - * Your event type (in this case, {{{Map[String, Any]}}} must be serializable via Jackson to JSON that Druid can - * understand. If Jackson is not an appropriate choice, you can provide an ObjectWriter via {{{.objectWriter(...)}}}. - */ + * Builds Beams or Finagle services that send events to the Druid indexing service. + * + * {{{ + * val curator = CuratorFrameworkFactory.newClient("localhost:2181", new BoundedExponentialBackoffRetry(100, 30000, 30)) + * curator.start() + * val dataSource = "foo" + * val dimensions = Seq("bar") + * val aggregators = Seq(new LongSumAggregatorFactory("baz", "baz")) + * val service = DruidBeams + * .builder[Map[String, Any]](eventMap => new DateTime(eventMap("timestamp"))) + * .curator(curator) + * .discoveryPath("/test/discovery") + * .location(DruidLocation(new DruidEnvironment("druid:local:indexer", "druid:local:firehose:%s"), dataSource)) + * .rollup(DruidRollup(dimensions, aggregators, QueryGranularity.MINUTE)) + * .tuning(new ClusteredBeamTuning(Granularity.HOUR, 10.minutes, 1, 1)) + * .buildService() + * val future = service(Seq(Map("timestamp" -> "2010-01-02T03:04:05.678Z", "bar" -> "hey", "baz" -> 3))) + * println("result = %s" format Await.result(future)) + * }}} + * + * Your event type (in this case, {{{Map[String, Any]}}} must be serializable via Jackson to JSON that Druid can + * understand. If Jackson is not an appropriate choice, you can provide an ObjectWriter via {{{.objectWriter(...)}}}. + */ object DruidBeams { val DefaultTimestampSpec = new TimestampSpec("timestamp", "iso", null) @@ -108,52 +110,77 @@ object DruidBeams def rollup(rollup: DruidRollup) = new Builder[EventType](config.copy(_rollup = Some(rollup))) - def timestampSpec(timestampSpec: TimestampSpec) = new Builder[EventType](config.copy(_timestampSpec = Some(timestampSpec))) + def timestampSpec(timestampSpec: TimestampSpec) = { + new Builder[EventType](config.copy(_timestampSpec = Some(timestampSpec))) + } - def clusteredBeamZkBasePath(path: String) = new Builder[EventType](config.copy(_clusteredBeamZkBasePath = Some(path))) + def clusteredBeamZkBasePath(path: String) = { + new Builder[EventType](config.copy(_clusteredBeamZkBasePath = Some(path))) + } def clusteredBeamIdent(ident: String) = new Builder[EventType](config.copy(_clusteredBeamIdent = Some(ident))) - def druidBeamConfig(beamConfig: DruidBeamConfig) = new Builder[EventType](config.copy(_druidBeamConfig = Some(beamConfig))) + def druidBeamConfig(beamConfig: DruidBeamConfig) = { + new Builder[EventType](config.copy(_druidBeamConfig = Some(beamConfig))) + } def emitter(emitter: ServiceEmitter) = new Builder[EventType](config.copy(_emitter = Some(emitter))) - def finagleRegistry(registry: FinagleRegistry) = new Builder[EventType](config.copy(_finagleRegistry = Some(registry))) + def finagleRegistry(registry: FinagleRegistry) = { + new Builder[EventType](config.copy(_finagleRegistry = Some(registry))) + } def timekeeper(timekeeper: Timekeeper) = new Builder[EventType](config.copy(_timekeeper = Some(timekeeper))) - def beamDecorateFn(f: (Interval, Int) => Beam[EventType] => Beam[EventType]) = new - Builder(config.copy(_beamDecorateFn = Some(f))) + def beamDecorateFn(f: (Interval, Int) => Beam[EventType] => Beam[EventType]) = { + new Builder(config.copy(_beamDecorateFn = Some(f))) + } + + def beamMergeFn(f: Seq[Beam[EventType]] => Beam[EventType]) = { + if (config._partitioner.nonEmpty) { + throw new IllegalStateException("Cannot set both 'beamMergeFn' and 'partitioner'") + } + new Builder[EventType](config.copy(_beamMergeFn = Some(f))) + } - def beamMergeFn(f: Seq[Beam[EventType]] => Beam[EventType]) = new Builder[EventType](config.copy(_beamMergeFn = Some(f))) + def partitioner(partitioner: Partitioner[EventType]) = { + if (config._beamMergeFn.nonEmpty) { + throw new IllegalStateException("Cannot set both 'beamMergeFn' and 'partitioner'") + } + new Builder[EventType](config.copy(_partitioner = Some(partitioner))) + } def alertMap(d: Dict) = new Builder[EventType](config.copy(_alertMap = Some(d))) @deprecated("use .objectWriter(...)", "0.2.21") - def eventWriter(writer: ObjectWriter[EventType]) = new Builder[EventType](config.copy(_objectWriter = Some(writer))) + def eventWriter(writer: ObjectWriter[EventType]) = { + new Builder[EventType](config.copy(_objectWriter = Some(writer))) + } - def objectWriter(writer: ObjectWriter[EventType]) = new Builder[EventType](config.copy(_objectWriter = Some(writer))) + def objectWriter(writer: ObjectWriter[EventType]) = { + new Builder[EventType](config.copy(_objectWriter = Some(writer))) + } def objectWriter(writer: JavaObjectWriter[EventType]) = { new Builder[EventType](config.copy(_objectWriter = Some(ObjectWriter.wrap(writer)))) } - def eventTimestamped(timeFn: EventType => DateTime) = new Builder[EventType]( - config.copy( - _timestamper = Some( - new Timestamper[EventType] - { - def timestamp(a: EventType) = timeFn(a) - } + def eventTimestamped(timeFn: EventType => DateTime) = { + new Builder[EventType]( + config.copy( + _timestamper = Some( + new Timestamper[EventType] + { + def timestamp(a: EventType) = timeFn(a) + } + ) ) ) - ) + } def buildBeam(): Beam[EventType] = { val things = config.buildAll() - implicit val eventTimestamped = things.timestamper getOrElse { - throw new IllegalArgumentException("WTF?! Should have had a Timestamperable event...") - } + implicit val eventTimestamped = things.timestamper val lifecycle = new Lifecycle val indexService = new IndexService( things.location.environment, @@ -222,6 +249,7 @@ object DruidBeams _finagleRegistry: Option[FinagleRegistry] = None, _timekeeper: Option[Timekeeper] = None, _beamDecorateFn: Option[(Interval, Int) => Beam[EventType] => Beam[EventType]] = None, + _partitioner: Option[Partitioner[EventType]] = None, _beamMergeFn: Option[Seq[Beam[EventType]] => Beam[EventType]] = None, _alertMap: Option[Dict] = None, _objectWriter: Option[ObjectWriter[EventType]] = None, @@ -280,19 +308,30 @@ object DruidBeams val beamDecorateFn = _beamDecorateFn getOrElse { (interval: Interval, partition: Int) => (beam: Beam[EventType]) => beam } - val beamMergeFn = _beamMergeFn getOrElse { - (beams: Seq[Beam[EventType]]) => new HashPartitionBeam[EventType](beams.toIndexedSeq) - } val alertMap = _alertMap getOrElse Map.empty val objectWriter = _objectWriter getOrElse { new JsonWriter[EventType] { - protected def viaJsonGenerator(a: EventType, jg: JsonGenerator) { + override protected def viaJsonGenerator(a: EventType, jg: JsonGenerator): Unit = { scalaObjectMapper.writeValue(jg, a) } } } - val timestamper = _timestamper + val timestamper = _timestamper getOrElse { + throw new IllegalArgumentException("WTF?! Should have had a Timestamperable event...") + } + val beamMergeFn = _beamMergeFn getOrElse { + val partitioner = _partitioner getOrElse { + GenericTimeAndDimsPartitioner.create( + timestamper, + timestampSpec, + rollup + ) + } + (beams: Seq[Beam[EventType]]) => { + new MergingPartitioningBeam[EventType](partitioner, beams.toIndexedSeq) + } + } } } diff --git a/core/src/main/scala/com/metamx/tranquility/druid/DruidRollup.scala b/core/src/main/scala/com/metamx/tranquility/druid/DruidRollup.scala index 7ac18e2..453e21c 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/DruidRollup.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/DruidRollup.scala @@ -16,7 +16,9 @@ */ package com.metamx.tranquility.druid -import io.druid.data.input.impl.{DimensionsSpec, SpatialDimensionSchema} +import io.druid.data.input.impl.TimestampSpec +import io.druid.data.input.impl.DimensionsSpec +import io.druid.data.input.impl.SpatialDimensionSchema import io.druid.granularity.QueryGranularity import io.druid.query.aggregation.AggregatorFactory import scala.collection.JavaConverters._ @@ -28,6 +30,11 @@ class DruidRollup( val indexGranularity: QueryGranularity ) { + private val additionalExclusions: Set[String] = { + (aggregators.flatMap(_.requiredFields().asScala) ++ + aggregators.map(_.getName)).toSet + } + validate() def validate() { @@ -46,13 +53,23 @@ class DruidRollup( throw new IllegalArgumentException("Duplicate columns: %s" format duplicateColumns.mkString(", ")) } } + + def isStringDimension(timestampSpec: TimestampSpec, fieldName: String) = { + dimensions match { + case dims: SpecificDruidDimensions => dims.dimensionsSet.contains(fieldName) + case SchemalessDruidDimensions(exclusions, _) => + fieldName != timestampSpec.getTimestampColumn && + !additionalExclusions.contains(fieldName) && + !exclusions.contains(fieldName) + } + } } sealed abstract class DruidDimensions { def spec: DimensionsSpec - def spatialDimensions: IndexedSeq[DruidSpatialDimension] + def spatialDimensions: Seq[DruidSpatialDimension] def withSpatialDimensions(xs: java.util.List[DruidSpatialDimension]): DruidDimensions } @@ -73,47 +90,48 @@ case class MultipleFieldDruidSpatialDimension(name: String, fieldNames: Seq[Stri } case class SpecificDruidDimensions( - dimensions: IndexedSeq[String], - spatialDimensions: IndexedSeq[DruidSpatialDimension] = Vector.empty + dimensions: Seq[String], + spatialDimensions: Seq[DruidSpatialDimension] = Nil ) extends DruidDimensions { - override def spec = { - // Sort dimenions as a workaround for https://github.com/metamx/druid/issues/658 + val dimensionsSet = dimensions.toSet + + @transient lazy val spec = { + // Sort dimenions as a workaround for https://github.com/druid-io/druid/issues/658 + // Should preserve the originally-provided order once this is fixed. // (Indexer does not merge properly when dimensions are provided in non-lexicographic order.) new DimensionsSpec( - dimensions.sorted.asJava, + dimensions.toIndexedSeq.sorted.asJava, null, spatialDimensions.map(_.schema).asJava ) } /** - * Convenience method for Java users. Scala users should use "copy". - */ + * Convenience method for Java users. Scala users should use "copy". + */ override def withSpatialDimensions(xs: java.util.List[DruidSpatialDimension]) = copy( - spatialDimensions = xs - .asScala - .toIndexedSeq + spatialDimensions = xs.asScala.toIndexedSeq ) } case class SchemalessDruidDimensions( - dimensionExclusions: IndexedSeq[String], - spatialDimensions: IndexedSeq[DruidSpatialDimension] = Vector.empty + dimensionExclusions: Set[String], + spatialDimensions: Seq[DruidSpatialDimension] = Nil ) extends DruidDimensions { override def spec = { // Null dimensions causes the Druid parser to go schemaless. new DimensionsSpec( null, - dimensionExclusions.asJava, + dimensionExclusions.toSeq.asJava, spatialDimensions.map(_.schema).asJava ) } /** - * Convenience method for Java users. Scala users should use "copy". - */ + * Convenience method for Java users. Scala users should use "copy". + */ override def withSpatialDimensions(xs: java.util.List[DruidSpatialDimension]) = copy( spatialDimensions = xs .asScala @@ -121,14 +139,32 @@ case class SchemalessDruidDimensions( ) } +object SchemalessDruidDimensions +{ + def apply( + dimensionExclusions: Seq[String] + ): SchemalessDruidDimensions = + { + SchemalessDruidDimensions(dimensionExclusions.toSet, Vector.empty) + } + + def apply( + dimensionExclusions: Seq[String], + spatialDimensions: IndexedSeq[DruidSpatialDimension] + ): SchemalessDruidDimensions = + { + SchemalessDruidDimensions(dimensionExclusions.toSet, spatialDimensions) + } +} + object DruidRollup { private val InternalTimeColumnName = "__time" /** - * Builder for Scala users. Accepts a druid dimensions object and can be used to build rollups based on specific - * or schemaless dimensions. - */ + * Builder for Scala users. Accepts a druid dimensions object and can be used to build rollups based on specific + * or schemaless dimensions. + */ def apply( dimensions: DruidDimensions, aggregators: Seq[AggregatorFactory], @@ -139,9 +175,9 @@ object DruidRollup } /** - * Builder for Java users. Accepts a druid dimensions object and can be used to build rollups based on specific - * or schemaless dimensions. - */ + * Builder for Java users. Accepts a druid dimensions object and can be used to build rollups based on specific + * or schemaless dimensions. + */ def create( dimensions: DruidDimensions, aggregators: java.util.List[AggregatorFactory], @@ -156,8 +192,8 @@ object DruidRollup } /** - * Builder for Java users. Accepts dimensions as strings, and creates a rollup with those specific dimensions. - */ + * Builder for Java users. Accepts dimensions as strings, and creates a rollup with those specific dimensions. + */ def create( dimensions: java.util.List[String], aggregators: java.util.List[AggregatorFactory], @@ -165,7 +201,7 @@ object DruidRollup ) = { new DruidRollup( - SpecificDruidDimensions(dimensions.asScala.toIndexedSeq, Vector.empty), + SpecificDruidDimensions(dimensions.asScala, Vector.empty), aggregators.asScala.toIndexedSeq, indexGranularity ) @@ -175,39 +211,39 @@ object DruidRollup object DruidDimensions { /** - * Builder for Java users. - */ + * Builder for Java users. + */ def specific(dimensions: java.util.List[String]): DruidDimensions = { - SpecificDruidDimensions(dimensions.asScala.toIndexedSeq, Vector.empty) + SpecificDruidDimensions(dimensions.asScala, Vector.empty) } /** - * Builder for Java users. - */ + * Builder for Java users. + */ def schemaless(): DruidDimensions = { SchemalessDruidDimensions(Vector.empty, Vector.empty) } /** - * Builder for Java users. - */ + * Builder for Java users. + */ def schemalessWithExclusions(dimensionExclusions: java.util.List[String]): DruidDimensions = { - SchemalessDruidDimensions(dimensionExclusions.asScala.toIndexedSeq, Vector.empty) + SchemalessDruidDimensions(dimensionExclusions.asScala.toSet, Vector.empty) } } object DruidSpatialDimension { /** - * Builder for Java users. - */ + * Builder for Java users. + */ def singleField(name: String): DruidSpatialDimension = { new SingleFieldDruidSpatialDimension(name) } /** - * Builder for Java users. - */ + * Builder for Java users. + */ def multipleField(name: String, fieldNames: java.util.List[String]): DruidSpatialDimension = { new MultipleFieldDruidSpatialDimension(name, fieldNames.asScala) } diff --git a/core/src/main/scala/com/metamx/tranquility/partition/GenericTimeAndDimsPartitioner.scala b/core/src/main/scala/com/metamx/tranquility/partition/GenericTimeAndDimsPartitioner.scala new file mode 100644 index 0000000..4cbbb40 --- /dev/null +++ b/core/src/main/scala/com/metamx/tranquility/partition/GenericTimeAndDimsPartitioner.scala @@ -0,0 +1,95 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.partition + +import com.google.common.hash.Hashing +import com.metamx.common.scala.Logging +import com.metamx.tranquility.druid.DruidRollup +import com.metamx.tranquility.typeclass.Timestamper +import io.druid.data.input.impl.TimestampSpec +import java.util.concurrent.atomic.AtomicBoolean +import java.{util => ju} +import org.scala_tools.time.Imports._ +import scala.collection.JavaConverters._ + +object GenericTimeAndDimsPartitioner +{ + private val StringSeparator: Byte = 0xff.toByte + + /** + * Create a Partitioner that can partition Scala and Java Maps according to their Druid grouping key (truncated + * timestamp and dimensions). For other types, you should implement your own Partitioner that accesses your + * object's fields directly. + */ + def create[A]( + timestamper: Timestamper[A], + timestampSpec: TimestampSpec, + rollup: DruidRollup + ): Partitioner[A] = + { + new GenericTimeAndDimsPartitioner[A]( + timestamper, + timestampSpec, + rollup + ) + } +} + +class GenericTimeAndDimsPartitioner[A]( + timestamper: Timestamper[A], + timestampSpec: TimestampSpec, + rollup: DruidRollup +) extends Partitioner[A] with Logging +{ + @transient private val didWarn = new AtomicBoolean + + override def partition(thing: A, numPartitions: Int): Int = { + val dimensions = thing match { + case scalaMap: collection.Map[_, _] => + for ((k: String, v) <- scalaMap if rollup.isStringDimension(timestampSpec, k)) yield { + (k, v) + } + + case javaMap: ju.Map[_, _] => + for ((k: String, v) <- javaMap.asScala if rollup.isStringDimension(timestampSpec, k)) yield { + (k, v) + } + + case obj => + // Oops, we don't really know how to do things other than Maps... + if (didWarn.compareAndSet(false, true)) { + log.warn( + "Cannot partition object of class[%s] by time and dimensions. Consider implementing a Partitioner.", + obj.getClass + ) + } + Nil + } + + val partitionHashCode = if (dimensions.nonEmpty) { + val truncatedTimestamp = rollup.indexGranularity.truncate(timestamper.timestamp(thing).millis) + Partitioner.timeAndDimsHashCode(truncatedTimestamp, dimensions) + } else { + thing.hashCode() + } + + Hashing.consistentHash(partitionHashCode, numPartitions) + } +} diff --git a/core/src/main/scala/com/metamx/tranquility/partition/HashCodePartitioner.scala b/core/src/main/scala/com/metamx/tranquility/partition/HashCodePartitioner.scala new file mode 100644 index 0000000..0ed7e36 --- /dev/null +++ b/core/src/main/scala/com/metamx/tranquility/partition/HashCodePartitioner.scala @@ -0,0 +1,33 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.partition + +import com.google.common.hash.Hashing + +/** + * Partitioner that uses the hashCode of the underlying object. This is rarely going to be what you want, but at + * least it's simple. + */ +class HashCodePartitioner[A] extends Partitioner[A] +{ + override def partition(a: A, numPartitions: Int): Int = { + Hashing.consistentHash(a.hashCode(), numPartitions) + } +} diff --git a/core/src/main/scala/com/metamx/tranquility/partition/Partitioner.scala b/core/src/main/scala/com/metamx/tranquility/partition/Partitioner.scala new file mode 100644 index 0000000..4822182 --- /dev/null +++ b/core/src/main/scala/com/metamx/tranquility/partition/Partitioner.scala @@ -0,0 +1,82 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.partition + +import com.google.common.base.Charsets +import com.google.common.hash.Hashing +import java.{util => ju} +import scala.collection.JavaConverters._ +import scala.collection.immutable.SortedMap +import scala.collection.immutable.SortedSet + +/** + * Class that knows how to partition objects of a certain type into an arbitrary number of buckets. This is + * generally used along with a MergingPartitioningBeam to create a beamMergeFn. + */ +trait Partitioner[-A] extends Serializable +{ + def partition(a: A, numPartitions: Int): Int +} + +object Partitioner +{ + /** + * Helper function for use by Partitioner implementations. Computes a hash code derived from a message's + * truncated timestamp and its dimensions. + * + * @param truncatedTimestamp timestamp, truncated to indexGranularity + * @param dimensions iterable of (dim name, dim value) tuples. The dim value can be a String, + * @return + */ + def timeAndDimsHashCode(truncatedTimestamp: Long, dimensions: Iterable[(String, Any)]): Int = { + val hasher = Hashing.murmur3_32().newHasher() + hasher.putLong(truncatedTimestamp) + + for ((dimName, dimValue) <- SortedMap(dimensions.toSeq: _*)) { + val dimValueAsStrings: Set[String] = dimValue match { + case x: String if x.nonEmpty => + Set(x) + + case x: Number => + Set(String.valueOf(x)) + + case xs: ju.List[_] => + SortedSet(xs.asScala.filterNot(s => s == null).map(String.valueOf): _*).filterNot(_.isEmpty) + + case xs: Seq[_] => + SortedSet(xs.filterNot(s => s == null).map(String.valueOf): _*).filterNot(_.isEmpty) + + case _ => + Set.empty + } + + // This is not a one-to-one mapping, but that's okay, since we're only using it for hashing and + // not for an equality check. Some moderate collisions are fine. + if (dimValueAsStrings.nonEmpty) { + hasher.putBytes(dimName.getBytes(Charsets.UTF_8)) + for (s <- dimValueAsStrings) { + hasher.putBytes(s.getBytes(Charsets.UTF_8)) + } + } + } + + hasher.hash().asInt() + } +} diff --git a/core/src/main/scala/com/metamx/tranquility/typeclass/Timestamper.scala b/core/src/main/scala/com/metamx/tranquility/typeclass/Timestamper.scala index 06b5baf..f7bd6e8 100644 --- a/core/src/main/scala/com/metamx/tranquility/typeclass/Timestamper.scala +++ b/core/src/main/scala/com/metamx/tranquility/typeclass/Timestamper.scala @@ -18,7 +18,7 @@ package com.metamx.tranquility.typeclass import org.joda.time.DateTime -trait Timestamper[A] extends Serializable +trait Timestamper[-A] extends Serializable { def timestamp(a: A): DateTime } diff --git a/core/src/test/java/com/metamx/tranquility/javatests/JavaApiTest.java b/core/src/test/java/com/metamx/tranquility/javatests/JavaApiTest.java index e3b091d..7177afe 100644 --- a/core/src/test/java/com/metamx/tranquility/javatests/JavaApiTest.java +++ b/core/src/test/java/com/metamx/tranquility/javatests/JavaApiTest.java @@ -53,7 +53,7 @@ public void testSpecificDimensionsRollupConfiguration() throws Exception QueryGranularity.MINUTE ); Assert.assertTrue(rollup.dimensions() instanceof SpecificDruidDimensions); - Assert.assertEquals("column", ((SpecificDruidDimensions) rollup.dimensions()).dimensions().apply(0)); + Assert.assertEquals("column", ((SpecificDruidDimensions) rollup.dimensions()).dimensions().iterator().next()); } @Test @@ -77,7 +77,7 @@ public void testSchemalessDimensionsWithExclusionsRollupConfiguration() throws E QueryGranularity.MINUTE ); Assert.assertTrue(rollup.dimensions() instanceof SchemalessDruidDimensions); - Assert.assertEquals("column", ((SchemalessDruidDimensions) rollup.dimensions()).dimensionExclusions().apply(0)); + Assert.assertEquals("column", ((SchemalessDruidDimensions) rollup.dimensions()).dimensionExclusions().iterator().next()); } @Test @@ -97,8 +97,8 @@ public void testSchemalessDimensionsWithExclusionsAndSpatialDimensionsRollupConf QueryGranularity.MINUTE ); Assert.assertTrue(rollup.dimensions() instanceof SchemalessDruidDimensions); - Assert.assertEquals("column", ((SchemalessDruidDimensions) rollup.dimensions()).dimensionExclusions().apply(0)); - Assert.assertEquals("coord.geo", rollup.dimensions().spatialDimensions().apply(0).schema().getDimName()); + Assert.assertEquals("column", ((SchemalessDruidDimensions) rollup.dimensions()).dimensionExclusions().iterator().next()); + Assert.assertEquals("coord.geo", rollup.dimensions().spatialDimensions().iterator().next().schema().getDimName()); } @Test diff --git a/core/src/test/scala/com/metamx/tranquility/test/DruidRollupTest.scala b/core/src/test/scala/com/metamx/tranquility/test/DruidRollupTest.scala index b22f262..d9b8747 100644 --- a/core/src/test/scala/com/metamx/tranquility/test/DruidRollupTest.scala +++ b/core/src/test/scala/com/metamx/tranquility/test/DruidRollupTest.scala @@ -19,14 +19,17 @@ package com.metamx.tranquility.test import com.metamx.common.parsers.ParseException import com.metamx.tranquility.druid.DruidRollup +import com.metamx.tranquility.druid.SchemalessDruidDimensions import com.metamx.tranquility.druid.SpecificDruidDimensions +import io.druid.data.input.impl.TimestampSpec import io.druid.granularity.QueryGranularity import io.druid.query.aggregation.CountAggregatorFactory import io.druid.query.aggregation.LongSumAggregatorFactory import org.scalatest.FunSuite -import org.scalatest.MustMatchers +import org.scalatest.Matchers +import scala.collection.JavaConverters._ -class DruidRollupTest extends FunSuite with MustMatchers +class DruidRollupTest extends FunSuite with Matchers { test("Validations: Passing") { val rollup = DruidRollup( @@ -45,28 +48,69 @@ class DruidRollupTest extends FunSuite with MustMatchers QueryGranularity.NONE ) } - e.getMessage must be("Duplicate columns: hey") + e.getMessage should be("Duplicate columns: hey") } - test("Validations: Two dimensions with the same name") { - val e = the[ParseException] thrownBy { + test("Validations: Two metrics with the same name") { + val e = the[IllegalArgumentException] thrownBy { DruidRollup( - SpecificDruidDimensions(Vector("what", "what"), Vector.empty), - Seq(new CountAggregatorFactory("hey")), + SpecificDruidDimensions(Vector("what"), Vector.empty), + Seq(new CountAggregatorFactory("hey"), new LongSumAggregatorFactory("hey", "blah")), QueryGranularity.NONE ) } - e.getMessage must be("Duplicate column entries found : [what]") + e.getMessage should be("Duplicate columns: hey") } - test("Validations: Two metrics with the same name") { - val e = the[IllegalArgumentException] thrownBy { + test("Validations: Two dimensions with the same name") { + val e = the[ParseException] thrownBy { DruidRollup( - SpecificDruidDimensions(Vector("what"), Vector.empty), - Seq(new CountAggregatorFactory("hey"), new LongSumAggregatorFactory("hey", "blah")), + SpecificDruidDimensions(Vector("what", "what"), Vector.empty), + Seq(new CountAggregatorFactory("hey")), QueryGranularity.NONE ) } - e.getMessage must be("Duplicate columns: hey") + e.getMessage should be("Duplicate column entries found : [what]") + } + + test("Dimensions are sorted") { + val rollup = DruidRollup( + SpecificDruidDimensions(Vector("e", "f", "a", "b", "z", "t"), Vector.empty), + Seq(new CountAggregatorFactory("hey")), + QueryGranularity.NONE + ) + rollup.dimensions.spec.getDimensions.asScala should be(Seq("a", "b", "e", "f", "t", "z")) + } + + test("isStringDimension: Specific") { + val rollup = DruidRollup( + SpecificDruidDimensions(Seq("foo", "bar")), + Seq(new LongSumAggregatorFactory("hey", "there")), + QueryGranularity.NONE + ) + val timestampSpec = new TimestampSpec("t", "auto", null) + rollup.isStringDimension(timestampSpec, "t") should be(false) + rollup.isStringDimension(timestampSpec, "hey") should be(false) + rollup.isStringDimension(timestampSpec, "there") should be(false) + rollup.isStringDimension(timestampSpec, "foo") should be(true) + rollup.isStringDimension(timestampSpec, "bar") should be(true) + rollup.isStringDimension(timestampSpec, "baz") should be(false) + rollup.isStringDimension(timestampSpec, "qux") should be(false) + } + + test("isStringDimension: Schemaless") { + val rollup = DruidRollup( + SchemalessDruidDimensions(Set("qux")), + Seq(new LongSumAggregatorFactory("hey", "there")), + QueryGranularity.NONE + ) + val timestampSpec = new TimestampSpec("t", "auto", null) + rollup.isStringDimension(timestampSpec, "t") should be(false) + rollup.isStringDimension(timestampSpec, "hey") should be(false) + rollup.isStringDimension(timestampSpec, "there") should be(false) + rollup.isStringDimension(timestampSpec, "foo") should be(true) + rollup.isStringDimension(timestampSpec, "bar") should be(true) + rollup.isStringDimension(timestampSpec, "baz") should be(true) + rollup.isStringDimension(timestampSpec, "qux") should be(false) } } diff --git a/core/src/test/scala/com/metamx/tranquility/test/GenericTimeAndDimsPartitionerTest.scala b/core/src/test/scala/com/metamx/tranquility/test/GenericTimeAndDimsPartitionerTest.scala new file mode 100644 index 0000000..35fe559 --- /dev/null +++ b/core/src/test/scala/com/metamx/tranquility/test/GenericTimeAndDimsPartitionerTest.scala @@ -0,0 +1,159 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.test + +import com.fasterxml.jackson.databind.ObjectMapper +import com.google.common.hash.Hashing +import com.metamx.common.scala.Jackson +import com.metamx.common.scala.untyped.Dict +import com.metamx.tranquility.druid.DruidRollup +import com.metamx.tranquility.druid.SpecificDruidDimensions +import com.metamx.tranquility.partition.GenericTimeAndDimsPartitioner +import com.metamx.tranquility.typeclass.Timestamper +import io.druid.data.input.impl.TimestampSpec +import io.druid.granularity.QueryGranularity +import io.druid.query.aggregation.DoubleSumAggregatorFactory +import java.{util => ju} +import org.joda.time.DateTime +import org.scalatest.FunSuite +import org.scalatest.Matchers + +class GenericTimeAndDimsPartitionerTest extends FunSuite with Matchers +{ + val same = Seq( + Dict("t" -> new DateTime("2000T00:00:03"), "foo" -> 1, "bar" -> Seq("y", "z")), + Dict("t" -> new DateTime("2000T00:00:03"), "foo" -> 1, "bar" -> Seq("z", "y", "")), + Dict("t" -> new DateTime("2000T00:00:04"), "foo" -> Seq(1), "bar" -> Seq("z", "y"), "baz" -> Nil), + Dict("t" -> new DateTime("2000T00:00:05"), "foo" -> 1, "bar" -> Seq("y", "z"), "baz" -> ""), + Dict("t" -> new DateTime("2000T00:00:06"), "foo" -> 1, "bar" -> Seq("y", "z"), "baz" -> null), + Dict("t" -> new DateTime("2000T00:00:06"), "foo" -> "1", "bar" -> Seq("y", "z"), "baz" -> null), + Dict("t" -> new DateTime("2000T00:00:06"), "foo" -> Seq("1"), "bar" -> Seq("z", "y")) + ) + + val different = Seq( + Dict("t" -> new DateTime("2000T00:00:03"), "foo" -> 1, "bar" -> Seq("y", "z"), "baz" -> "buh"), + Dict("t" -> new DateTime("2000T00:01:03"), "foo" -> 1, "bar" -> Seq("y", "z")), + Dict("t" -> new DateTime("2000T00:00:03"), "bar" -> Seq("y", "z")), + Dict("t" -> new DateTime("2000T00:00:03"), "foo" -> 2, "bar" -> Seq("y", "z")), + Dict("t" -> new DateTime("2000T00:00:03"), "foo" -> "3", "bar" -> Seq("y", "z")), + Dict("t" -> new DateTime("2000T00:00:06"), "foo" -> Seq("1"), "bar" -> "y"), + Dict("t" -> new DateTime("2000T00:00:06"), "foo" -> Seq("1"), "bar" -> "z"), + Dict("t" -> new DateTime("2000T00:00:06"), "foo" -> Seq("1"), "bar" -> Seq("y", "z", "x")), + Dict("t" -> new DateTime("2000T00:00:06"), "foo" -> Seq("1"), "bar" -> Seq("z", "x")) + ) + + test("Scala Map") { + val timestamper = new Timestamper[Dict] { + override def timestamp(a: Dict): DateTime = new DateTime(a("t")) + } + val partitioner = GenericTimeAndDimsPartitioner.create( + timestamper, + new TimestampSpec("t", "auto", null), + DruidRollup( + SpecificDruidDimensions(Seq("foo", "bar", "baz")), + Seq(new DoubleSumAggregatorFactory("x", "xSum")), + QueryGranularity.MINUTE + ) + ) + + for (x <- same; y <- same) { + val xPartition = partitioner.partition(x, Int.MaxValue) + val yPartition = partitioner.partition(y, Int.MaxValue) + xPartition should be(yPartition) + } + + for (x <- same; y <- different) { + val xPartition = partitioner.partition(x, Int.MaxValue) + val yPartition = partitioner.partition(y, Int.MaxValue) + + // Could have false positives, but it's not likely + xPartition shouldNot be(yPartition) + } + + for ((x, i) <- different.zipWithIndex; y <- different.drop(i + 1)) { + val xPartition = partitioner.partition(x, Int.MaxValue) + val yPartition = partitioner.partition(y, Int.MaxValue) + + // Could have false positives, but it's not likely + xPartition shouldNot be(yPartition) + } + } + + test("Java Map") { + val timestamper = new Timestamper[ju.Map[String, Any]] { + override def timestamp(a: ju.Map[String, Any]): DateTime = new DateTime(a.get("t")) + } + val partitioner = GenericTimeAndDimsPartitioner.create( + timestamper, + new TimestampSpec("t", "auto", null), + DruidRollup( + SpecificDruidDimensions(Seq("foo", "bar", "baz")), + Seq(new DoubleSumAggregatorFactory("x", "xSum")), + QueryGranularity.MINUTE + ) + ) + + val sameJava: Seq[ju.Map[String, Any]] = same map javaCopy + val differentJava: Seq[ju.Map[String, Any]] = different map javaCopy + + for (x <- sameJava; y <- sameJava) { + val xPartition = partitioner.partition(x, Int.MaxValue) + val yPartition = partitioner.partition(y, Int.MaxValue) + xPartition should be(yPartition) + } + + for (x <- sameJava; y <- differentJava) { + val xPartition = partitioner.partition(x, Int.MaxValue) + val yPartition = partitioner.partition(y, Int.MaxValue) + + // Could have false positives, but it's not likely + xPartition shouldNot be(yPartition) + } + + for ((x, i) <- differentJava.zipWithIndex; y <- differentJava.drop(i + 1)) { + val xPartition = partitioner.partition(x, Int.MaxValue) + val yPartition = partitioner.partition(y, Int.MaxValue) + + // Could have false positives, but it's not likely + xPartition shouldNot be(yPartition) + } + } + + test("String") { + val timestamper = new Timestamper[String] { + override def timestamp(a: String): DateTime = new DateTime(1000) + } + val partitioner = GenericTimeAndDimsPartitioner.create( + timestamper, + new TimestampSpec("t", "auto", null), + DruidRollup( + SpecificDruidDimensions(Seq("foo", "bar", "baz")), + Seq(new DoubleSumAggregatorFactory("x", "xSum")), + QueryGranularity.MINUTE + ) + ) + + partitioner.partition("foo", Int.MaxValue) should be(Hashing.consistentHash("foo".hashCode, Int.MaxValue)) + } + + private def javaCopy(d: Dict): ju.Map[String, Any] = { + new ObjectMapper().readValue(Jackson.bytes(d), classOf[ju.Map[String, Any]]) + } +} diff --git a/core/src/test/scala/com/metamx/tranquility/test/HashPartitionBeamTest.scala b/core/src/test/scala/com/metamx/tranquility/test/HashPartitionBeamTest.scala new file mode 100644 index 0000000..b875695 --- /dev/null +++ b/core/src/test/scala/com/metamx/tranquility/test/HashPartitionBeamTest.scala @@ -0,0 +1,69 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.test + +import com.metamx.tranquility.beam.Beam +import com.metamx.tranquility.beam.HashPartitionBeam +import com.twitter.util.Await +import com.twitter.util.Future +import java.util.concurrent.CopyOnWriteArrayList +import org.scalatest.FunSuite +import org.scalatest.Matchers +import scala.collection.JavaConverters._ +import scala.collection.mutable + +class HashPartitionBeamTest extends FunSuite with Matchers +{ + + class TestObject(val s: String) + { + override def hashCode(): Int = s(0).toInt + } + + class TestBeam(callback: TestObject => Unit) extends Beam[TestObject] + { + override def propagate(events: Seq[TestObject]): Future[Int] = { + events foreach callback + Future(events.size) + } + + override def close() = Future.Done + } + + test("Partitioning") { + val bufferOne: mutable.Buffer[TestObject] = new CopyOnWriteArrayList[TestObject]().asScala + val bufferTwo: mutable.Buffer[TestObject] = new CopyOnWriteArrayList[TestObject]().asScala + + val one = new TestBeam(bufferOne += _) + val two = new TestBeam(bufferTwo += _) + val beam = new HashPartitionBeam(Vector(one, two)) + Await.result( + beam.propagate( + Seq( + new TestObject("foo"), + new TestObject("bar"), + new TestObject("foo") + ) + ) + ) should be(3) + bufferOne.map(_.s) should be(Seq("bar")) + bufferTwo.map(_.s) should be(Seq("foo", "foo")) + } +}