Skip to content

Commit

Permalink
handle segment granularity changes
Browse files Browse the repository at this point in the history
  • Loading branch information
pjain1 committed Nov 30, 2015
1 parent 2ae0c26 commit dbf3eea
Show file tree
Hide file tree
Showing 14 changed files with 295 additions and 89 deletions.
11 changes: 10 additions & 1 deletion core/src/main/scala/com/metamx/tranquility/beam/Beam.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
package com.metamx.tranquility.beam

import com.twitter.util.Future
import org.joda.time.Interval

/**
* Beams can accept events and forward them along. The propagate method may throw a DefunctBeamException, which means
* the beam should be discarded (after calling close()).
*/
trait Beam[A]
trait Beam[A] extends DiscoverableInterval
{
/**
* Request propagation of events. The operation may fail in various ways, which tend to be specific to
Expand All @@ -40,3 +41,11 @@ class DefunctBeamException(s: String, t: Throwable) extends Exception(s, t)
{
def this(s: String) = this(s, null)
}

trait DiscoverableInterval
{
/**
* Returns the interval handled by the Beam, can return None if there is no associated interval
* */
def getInterval(): Option[Interval]
}
215 changes: 150 additions & 65 deletions core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class HttpBeam[A: Timestamper](
emitter: ServiceEmitter
) extends Beam[A] with Logging
{

def getInterval() = None

private[this] implicit val timer: Timer = DefaultTimer.twitter

private[this] val port = if (uri.port > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class MemoryBeam[A](
jsonWriter: JsonWriter[A]
) extends Beam[A]
{
def getInterval() = None

def propagate(events: Seq[A]) = {
events.map(event => Jackson.parse[Dict](jsonWriter.asBytes(event))) foreach {
d =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,12 @@ class MergingPartitioningBeam[A](
Future.collect(beams map (_.close())) map (_ => ())
}

def getInterval() = {
beams.headOption match {
case Some(x) => x.getInterval()
case None => None
}
}

override def toString = s"MergingPartitioningBeam(${beams.mkString(", ")})"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.metamx.tranquility.beam

import com.twitter.util.Future
import org.joda.time.Interval
import org.joda.time.chrono.ISOChronology

class NoopBeam[A] extends Beam[A]
{
Expand All @@ -25,4 +27,6 @@ class NoopBeam[A] extends Beam[A]
def close() = Future.Done

override def toString = "NoopBeam()"

def getInterval() = Some(new Interval(0, 0, ISOChronology.getInstanceUTC))
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,11 @@ class RoundRobinBeam[A](
}

override def toString = "RoundRobinBeam(%s)" format beams.mkString(", ")

def getInterval() = {
beams.headOption match {
case Some(x) => x.getInterval()
case None => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package com.metamx.tranquility.druid
import com.metamx.common.scala.Logging
import com.metamx.common.scala.Predef._
import com.metamx.emitter.service.ServiceEmitter
import com.metamx.tranquility.beam.Beam
import com.metamx.tranquility.beam.DefunctBeamException
import com.metamx.tranquility.beam.{Beam, DefunctBeamException}
import com.metamx.tranquility.finagle._
import com.metamx.tranquility.typeclass.ObjectWriter
import com.twitter.util.Closable
Expand All @@ -44,6 +43,9 @@ class DruidBeam[A](
objectWriter: ObjectWriter[A]
) extends Beam[A] with Logging with Closable
{

def getInterval() = Some(interval)

private[this] val clients = Map(
tasks map {
task =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ class DruidBeamMaker[A: Timestamper](
}

override def newBeam(interval: Interval, partition: Int) = {
require(
beamTuning.segmentGranularity.widen(interval) == interval,
"Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
)
val availabilityGroup = DruidBeamMaker.generateBaseFirehoseId(
location.dataSource,
beamTuning.segmentGranularity,
Expand Down Expand Up @@ -190,10 +186,7 @@ class DruidBeamMaker[A: Timestamper](
// Backwards compatibility (see toDict).
beamTuning.segmentBucket(new DateTime(d("timestamp"), ISOChronology.getInstanceUTC))
}
require(
beamTuning.segmentGranularity.widen(interval) == interval,
"Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
)

val partition = int(d("partition"))
val tasks = if (d contains "tasks") {
list(d("tasks")).map(dict(_)).map(d => TaskPointer(str(d("id")), str(d("firehoseId"))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ object DruidBeams
def close() = clusteredBeam.close() map (_ => lifecycle.stop())

override def toString = clusteredBeam.toString

def getInterval() = clusteredBeam.getInterval()

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class BeamPacketizerTest extends FunSuite with Logging
}

override def close() = memoryBeam.close()

def getInterval() = None
}

val acked = new AtomicLong()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.curator.framework.CuratorFramework
import org.joda.time.DateTimeZone
import org.joda.time.DateTime
import org.joda.time.Interval
import org.joda.time.chrono.ISOChronology
import org.scala_tools.time.Implicits._
import org.scalatest.BeforeAndAfter
import org.scalatest.FunSuite
Expand All @@ -68,7 +69,14 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
SimpleEvent(new DateTime("2012-01-01T01:10Z"), Map("foo" -> "e")),
SimpleEvent(new DateTime("2012-01-01T01:20Z"), Map("foo" -> "f")),
SimpleEvent(new DateTime("2012-01-01T03:05Z"), Map("foo" -> "g")),
SimpleEvent(new DateTime("2012-01-01T03:20Z"), Map("foo" -> "h"))
SimpleEvent(new DateTime("2012-01-01T03:20Z"), Map("foo" -> "h")),
SimpleEvent(new DateTime("2012-01-01T01:05Z"), Map("foo" -> "i")),
SimpleEvent(new DateTime("2012-01-01T01:06Z"), Map("foo" -> "j")),
SimpleEvent(new DateTime("2012-01-01T01:07Z"), Map("foo" -> "k")),
SimpleEvent(new DateTime("2012-01-01T01:06Z"), Map("foo" -> "l")),
SimpleEvent(new DateTime("2012-01-01T01:05Z"), Map("foo" -> "m")),
SimpleEvent(new DateTime("2012-01-01T01:09Z"), Map("foo" -> "n")),
SimpleEvent(new DateTime("2012-01-01T01:10Z"), Map("foo" -> "o"))
) map {
x => x.fields("foo") -> x
}).toMap
Expand All @@ -79,14 +87,18 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
val localZone = new DateTime().getZone

def buffers = _lock.synchronized {
_buffers.values.map(x => (x.timestamp.withZone(localZone), x.partition, x.open, x.buffer.toSeq)).toSet
_buffers.values.map(x => (x.interval.start.withZone(localZone), x.partition, x.open, x.buffer.toSeq)).toSet
}

def buffersWithInterval = _lock.synchronized {
_buffers.values.map(x => (x.interval, x.partition, x.open, x.buffer.toSeq)).toSet
}

def beamsList = _lock.synchronized {
_beams.toList
}

class EventBuffer(val timestamp: DateTime, val partition: Int)
class EventBuffer(val interval: Interval, val partition: Int)
{
val buffer: mutable.Buffer[SimpleEvent] = mutable.ListBuffer()
@volatile var open: Boolean = true
Expand All @@ -109,20 +121,24 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
def close() = {
beam.close()
}

def getInterval() = None
}

class TestingBeam(val timestamp: DateTime, val partition: Int, val uuid: String = UUID.randomUUID().toString)
class TestingBeam(val interval: Interval, val partition: Int, val uuid: String = UUID.randomUUID().toString)
extends Beam[SimpleEvent]
{
_lock.synchronized {
_beams += this
}

def getInterval() = Some(interval)

def propagate(_events: Seq[SimpleEvent]) = _lock.synchronized {
if (_events.contains(events("defunct"))) {
Future.exception(new DefunctBeamException("Defunct"))
} else {
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(timestamp, partition))
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(interval, partition))
buffer.open = true
buffer.buffer ++= _events
Future.value(_events.size)
Expand All @@ -131,35 +147,35 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA

def close() = _lock.synchronized {
_beams -= this
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(timestamp, partition))
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(interval, partition))
buffer.open = false
Future.Done
}

def toDict = Dict(
"timestamp" -> timestamp.toString(),
"interval" -> interval.toString,
"partition" -> partition,
"uuid" -> uuid
)
}

class TestingBeamMaker extends BeamMaker[SimpleEvent, TestingBeam]
{
def newBeam(interval: Interval, partition: Int) = new TestingBeam(interval.start, partition)
def newBeam(interval: Interval, partition: Int) = new TestingBeam(interval, partition)

def toDict(beam: TestingBeam) = {
Dict(
"timestamp" -> beam.timestamp.toString(),
"interval" -> beam.interval.toString,
"partition" -> beam.partition,
"uuid" -> beam.uuid
)
}

def fromDict(d: Dict) = {
val timestamp = new DateTime(d("timestamp"))
val interval= new Interval(d("interval"))
val partition = int(d("partition"))
val uuid = str(d("uuid"))
new TestingBeam(timestamp, partition, uuid)
new TestingBeam(interval, partition, uuid)
}
}

Expand Down Expand Up @@ -353,6 +369,72 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
}
}

test("IncreaseGranularity") {
withLocalCurator {
curator =>
val oldTuning = defaultTuning.copy(segmentGranularity = Granularity.MINUTE, windowPeriod = 1.minute)
val newTuning = oldTuning.copy(segmentGranularity = Granularity.FIVE_MINUTE)

val beamsA = newBeams(curator, oldTuning)
beamsA.timekeeper.now = start
beamsA.blockagate(Seq("i") map events)
beamsA.blockagate(Seq("i") map events)
beamsA.timekeeper.now = start + 1.minute
beamsA.blockagate(Seq("j") map events)
beamsA.blockagate(Seq("j") map events)

val beamsB = newBeams(curator, newTuning)
beamsB.timekeeper.now = start + 2.minute
beamsB.blockagate(Seq("k") map events)
beamsB.blockagate(Seq("k") map events)
beamsB.blockagate(Seq("l") map events)
beamsB.blockagate(Seq("l") map events)
beamsB.blockagate(Seq("m") map events)
beamsB.blockagate(Seq("m") map events)
beamsB.blockagate(Seq("n") map events)
beamsB.blockagate(Seq("n") map events)

Await.result(beamsA.close())

assert(buffersWithInterval === Set(
(new Interval("2012-01-01T01:05Z/2012-01-01T01:06Z", ISOChronology.getInstanceUTC), 0, false, Seq("i") map events),
(new Interval("2012-01-01T01:05Z/2012-01-01T01:06Z", ISOChronology.getInstanceUTC), 1, false, Seq("i") map events),
// "j" and "l" are in same partition as diff beams were used to propagate them
(new Interval("2012-01-01T01:06Z/2012-01-01T01:07Z", ISOChronology.getInstanceUTC), 0, false, Seq("j", "l") map events),
(new Interval("2012-01-01T01:06Z/2012-01-01T01:07Z", ISOChronology.getInstanceUTC), 1, false, Seq("j", "l") map events),
(new Interval("2012-01-01T01:07Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 0, true, Seq("k", "n") map events),
(new Interval("2012-01-01T01:07Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 1, true, Seq("k", "n") map events)
))
}
}

test("DecreaseGranularity") {
withLocalCurator {
curator =>
val oldTuning = defaultTuning.copy(segmentGranularity = Granularity.FIVE_MINUTE)
val newTuning = oldTuning.copy(segmentGranularity = Granularity.MINUTE)

val beamsA = newBeams(curator, oldTuning)
beamsA.timekeeper.now = start
beamsA.blockagate(Seq("i") map events)

val beamsB = newBeams(curator, newTuning)
beamsB.timekeeper.now = start + 4.minute
beamsB.blockagate(Seq("j") map events)
beamsB.blockagate(Seq("n") map events)
beamsB.blockagate(Seq("o") map events)
beamsB.blockagate(Seq("o") map events)
Await.result(beamsB.close())

assert(buffersWithInterval === Set(
(new Interval("2012-01-01T01:05Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 0, false, Seq("i", "j") map events),
(new Interval("2012-01-01T01:05Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 1, false, Seq("n") map events),
(new Interval("2012-01-01T01:10Z/2012-01-01T01:11Z", ISOChronology.getInstanceUTC), 0, false, Seq("o") map events),
(new Interval("2012-01-01T01:10Z/2012-01-01T01:11Z", ISOChronology.getInstanceUTC), 1, false, Seq("o") map events)
))
}
}

test("DefunctBeam") {
withLocalCurator {
curator =>
Expand Down Expand Up @@ -385,10 +467,10 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
))
val desired = List("2012-01-01T00Z", "2012-01-01T00Z", "2012-01-01T01Z", "2012-01-01T01Z").map(new DateTime(_))
val startTime = System.currentTimeMillis()
while (System.currentTimeMillis() < startTime + 2000 && beamsList.map(_.timestamp).sortBy(_.millis) != desired) {
while (System.currentTimeMillis() < startTime + 2000 && beamsList.map(_.interval.start).sortBy(_.millis) != desired) {
Thread.sleep(100)
}
assert(beamsList.map(_.timestamp).sortBy(_.millis) === desired)
assert(beamsList.map(_.interval.start).sortBy(_.millis) === desired)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.metamx.tranquility.beam.HashPartitionBeam
import com.twitter.util.Await
import com.twitter.util.Future
import java.util.concurrent.CopyOnWriteArrayList
import org.joda.time.Interval
import org.scalatest.FunSuite
import org.scalatest.Matchers
import scala.collection.JavaConverters._
Expand All @@ -44,6 +45,10 @@ class HashPartitionBeamTest extends FunSuite with Matchers
Future(events.size)
}

override def getInterval() = {
None
}

override def close() = Future.Done
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class SimpleBeam extends Beam[SimpleEvent]
}

def close() = Future.Done

def getInterval() = None
}

object SimpleBeam
Expand Down

0 comments on commit dbf3eea

Please sign in to comment.