Skip to content

Commit

Permalink
handle segment granularity changes
Browse files Browse the repository at this point in the history
  • Loading branch information
pjain1 committed Nov 19, 2015
1 parent d709dd9 commit 90bb27d
Show file tree
Hide file tree
Showing 13 changed files with 195 additions and 76 deletions.
11 changes: 10 additions & 1 deletion core/src/main/scala/com/metamx/tranquility/beam/Beam.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
package com.metamx.tranquility.beam

import com.twitter.util.Future
import org.joda.time.Interval

/**
* Beams can accept events and forward them along. The propagate method may throw a DefunctBeamException, which means
* the beam should be discarded (after calling close()).
*/
trait Beam[A]
trait Beam[A] extends DiscoverableInterval
{
/**
* Request propagation of events. The operation may fail in various ways, which tend to be specific to
Expand All @@ -40,3 +41,11 @@ class DefunctBeamException(s: String, t: Throwable) extends Exception(s, t)
{
def this(s: String) = this(s, null)
}

trait DiscoverableInterval
{
/**
* Returns the interval handled by the Beam, can return None if there is no associated interval
* */
def getInterval(): Option[Interval]
}
211 changes: 146 additions & 65 deletions core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,11 @@ class HashPartitionBeam[A](
}

override def toString = "HashPartitionBeam(%s)" format delegates.mkString(", ")

def getInterval() = {
delegates.headOption match {
case Some(x) => x.getInterval()
case None => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class HttpBeam[A: Timestamper](
emitter: ServiceEmitter
) extends Beam[A] with Logging
{

def getInterval() = None

private[this] implicit val timer: Timer = DefaultTimer.twitter

private[this] val port = if (uri.port > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class MemoryBeam[A](
jsonWriter: JsonWriter[A]
) extends Beam[A]
{
def getInterval() = None

def propagate(events: Seq[A]) = {
events.map(event => Jackson.parse[Dict](jsonWriter.asBytes(event))) foreach {
d =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.metamx.tranquility.beam

import com.twitter.util.Future
import org.joda.time.Interval
import org.joda.time.chrono.ISOChronology

class NoopBeam[A] extends Beam[A]
{
Expand All @@ -25,4 +27,6 @@ class NoopBeam[A] extends Beam[A]
def close() = Future.Done

override def toString = "NoopBeam()"

def getInterval() = Some(new Interval(0, 0, ISOChronology.getInstanceUTC))
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,11 @@ class RoundRobinBeam[A](
}

override def toString = "RoundRobinBeam(%s)" format beams.mkString(", ")

def getInterval() = {
beams.headOption match {
case Some(x) => x.getInterval()
case None => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package com.metamx.tranquility.druid
import com.metamx.common.scala.Logging
import com.metamx.common.scala.Predef._
import com.metamx.emitter.service.ServiceEmitter
import com.metamx.tranquility.beam.Beam
import com.metamx.tranquility.beam.DefunctBeamException
import com.metamx.tranquility.beam.{Beam, DefunctBeamException}
import com.metamx.tranquility.finagle._
import com.metamx.tranquility.typeclass.ObjectWriter
import com.twitter.util.Closable
Expand All @@ -44,6 +43,9 @@ class DruidBeam[A](
objectWriter: ObjectWriter[A]
) extends Beam[A] with Logging with Closable
{

def getInterval() = Some(interval)

private[this] val clients = Map(
tasks map {
task =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ class DruidBeamMaker[A: Timestamper](
}

override def newBeam(interval: Interval, partition: Int) = {
require(
beamTuning.segmentGranularity.widen(interval) == interval,
"Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
)
val availabilityGroup = DruidBeamMaker.generateBaseFirehoseId(
location.dataSource,
beamTuning.segmentGranularity,
Expand Down Expand Up @@ -190,10 +186,7 @@ class DruidBeamMaker[A: Timestamper](
// Backwards compatibility (see toDict).
beamTuning.segmentBucket(new DateTime(d("timestamp"), ISOChronology.getInstanceUTC))
}
require(
beamTuning.segmentGranularity.widen(interval) == interval,
"Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval)
)

val partition = int(d("partition"))
val tasks = if (d contains "tasks") {
list(d("tasks")).map(dict(_)).map(d => TaskPointer(str(d("id")), str(d("firehoseId"))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ object DruidBeams
def close() = clusteredBeam.close() map (_ => lifecycle.stop())

override def toString = clusteredBeam.toString

def getInterval() = clusteredBeam.getInterval()

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class BeamPacketizerTest extends FunSuite with Logging
}

override def close() = memoryBeam.close()

def getInterval() = None
}

val acked = new AtomicLong()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
def close() = {
beam.close()
}

def getInterval() = None
}

class TestingBeam(val timestamp: DateTime, val partition: Int, val uuid: String = UUID.randomUUID().toString)
Expand All @@ -118,6 +120,8 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
_beams += this
}

def getInterval() = None

def propagate(_events: Seq[SimpleEvent]) = _lock.synchronized {
if (_events.contains(events("defunct"))) {
Future.exception(new DefunctBeamException("Defunct"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class SimpleBeam extends Beam[SimpleEvent]
}

def close() = Future.Done

def getInterval() = None
}

object SimpleBeam
Expand Down

0 comments on commit 90bb27d

Please sign in to comment.