Skip to content

Commit

Permalink
fix bug and added unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pjain1 committed Nov 30, 2015
1 parent 90bb27d commit dd9b3bd
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 49 deletions.
74 changes: 39 additions & 35 deletions core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,39 +132,27 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](

private[this] val rand = new Random

// Reverse sorted list (by interval start time) of Merged beams we are currently aware of
private[this] var beams: List[Beam[EventType]] =
{
private[this] var zkMetaCache = {
try {
val dataPath = zpathWithDefault("data", ClusteredBeamMeta.empty.toBytes(objectMapper))
curator.sync().forPath(dataPath)
val zkMetaData = ClusteredBeamMeta.fromBytes(objectMapper, curator.getData.forPath(dataPath)).fold(
ClusteredBeamMeta.fromBytes(objectMapper, curator.getData.forPath(dataPath)).fold(
e => {
emitAlert(e, log, emitter, WARN, "Failed to read beam data from cache: %s" format identifier, alertMap)
throw e
},
meta => meta
)
log.info("Synced from ZK, Found Beams - [%s]", zkMetaData)
(
zkMetaData.beamDictss map {
beamDicts =>
beamMergeFn(
beamDicts._2.zipWithIndex map {
case (beamDict, partitionNum) =>
val decorate = beamDecorateFn(new Interval(beamDict.get("interval").get), partitionNum)
decorate(beamMaker.fromDict(beamDict))
}
)
}
).toList sortBy (-_.getInterval().get.start.millis)
}
catch {
case e: Throwable =>
throw e
}
}

// Reverse sorted list (by interval start time) of Merged beams we are currently aware of
private[this] var beams: List[Beam[EventType]] = Nil

// Lock updates to "localLatestCloseTime" and "beams" to prevent races.
private[this] val beamWriteMonitor = new AnyRef

Expand All @@ -184,6 +172,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
)
val newMeta = f(prevMeta)
if (newMeta != prevMeta) {
zkMetaCache = newMeta
val newMetaBytes = newMeta.toBytes(objectMapper)
log.info("Writing new beam data to[%s]: %s", dataPath, new String(newMetaBytes))
curator.setData().forPath(dataPath, newMetaBytes)
Expand Down Expand Up @@ -219,7 +208,9 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
case (interval, Some(foundBeam)) if foundBeam.getInterval().get.end + tuning.windowPeriod <= now => Future.value(None)
case (interval, Some(foundBeam)) => Future.value(Some(foundBeam))
case (interval, None) if interval.start <= localLatestCloseTime => Future.value(None)
case (interval, None) if interval.end + tuning.windowPeriod <= now => Future.value(None)
case (interval, None) if !creationInterval.overlaps(interval) => Future.value(None)
case (interval, None) if interval.toDurationMillis == 0 => Future.value(None)
case (interval, None) =>
// We may want to create new merged beam(s). Acquire the zk mutex and examine the situation.
// This could be more efficient, but it's happening infrequently so it's probably not a big deal.
Expand All @@ -228,9 +219,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
log.info("Trying to create new beam with interval [%s]", interval)
// We want to create this new beam
// But first let us check in ZK if there is already any beam in ZK covering this interval

val beamDicts: Seq[untyped.Dict] = prev.beamDictss.collectFirst[Seq[untyped.Dict]]({
case x if new Interval(x._2.head.get("interval").get).overlaps(interval) => x._2
case x if new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC).overlaps(interval) => x._2
}) getOrElse Nil

if (beamDicts.size >= tuning.partitions) {
Expand All @@ -250,7 +240,11 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
interval.start
)
prev
} else if (beamDicts.nonEmpty){
} else if (beamDicts.nonEmpty &&
!beamDicts.exists(
beamDict => new Interval(beamDict.get("interval").get, ISOChronology.getInstanceUTC).contains(interval)
))
{
throw new IllegalStateException(
"WTF?? Requested to create a beam for interval [%s] which overlaps with existing beam [%s]" format(interval, beamDicts)
)
Expand Down Expand Up @@ -280,7 +274,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
val newBeamDictss: Map[Long, Seq[Dict]] = (prev.beamDictss filterNot {
case (millis, beam) =>
// Expire old beamDicts
new Interval(beam.head.get("interval").get).end + tuning.windowPeriod < now
new Interval(beam.head.get("interval").get, ISOChronology.getInstanceUTC).end + tuning.windowPeriod < now
}) ++ (for (ts <- timestampsToCover) yield {
val tsPrevDicts = prev.beamDictss.getOrElse(ts.millis, Nil)
log.info(
Expand Down Expand Up @@ -390,22 +384,32 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
// Most of the times head beam can handle the event as the beams is reverse sorted list by start time of interval

intervalBeamPair.find(_._1.contains(eventTimestamp)) match {
case Some(mapEntry) => (mapEntry._1, mapEntry._2)
case Some(x) => (x._1, x._2)
case None =>
val requiredInterval = tuning.segmentBucket(eventTimestamp)
val requiredInterval = tuning.segmentBucket(eventTimestamp).withChronology(ISOChronology.getInstanceUTC)
// Check to see if the interval for which we want to create a beam overlaps with interval of any existing beam
// this may happen if segment granularity is changed
val mayBeOverlappingInterval = beams.collectFirst {
case x if x.getInterval().get.overlaps(requiredInterval) => x.getInterval().get
} getOrElse new Interval(0,0)

(
new Interval(
Math.max(mayBeOverlappingInterval.end.millis, requiredInterval.start.millis),
Math.max(mayBeOverlappingInterval.end.millis, requiredInterval.end.millis)
),
None
)
val actualInterval = zkMetaCache.beamDictss.toSeq.sortBy(-_._1) collectFirst {
case x if new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC).contains(eventTimestamp) =>
new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC)
} match {
case Some(x) => x
case None =>
zkMetaCache.beamDictss.toSeq.sortBy(-_._1) collectFirst {
case x if new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC).overlaps(requiredInterval) =>
new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC)
} match {
case None => requiredInterval
case Some(x) =>
new Interval(
Math.max(x.end.millis, requiredInterval.start.millis),
Math.max(x.end.millis, requiredInterval.end.millis),
ISOChronology.getInstanceUTC
)
}
}
intervalBeamPair.put(actualInterval, None)
(actualInterval, None)
}
}

Expand All @@ -430,7 +434,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
!beams.exists(_.getInterval().get.contains(tbwTimestamp))
) yield {
// Create beam asynchronously
beam((tuning.segmentBucket(tbwTimestamp), None), now)
beam((tuning.segmentBucket(tbwTimestamp).withChronology(ISOChronology.getInstanceUTC), None), now)
})
// Propagate data
val countFutures = for ((beamIntervalPair, eventGroup) <- grouped) yield {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.curator.framework.CuratorFramework
import org.joda.time.DateTimeZone
import org.joda.time.DateTime
import org.joda.time.Interval
import org.joda.time.chrono.ISOChronology
import org.scala_tools.time.Implicits._
import org.scalatest.BeforeAndAfter
import org.scalatest.FunSuite
Expand All @@ -68,7 +69,14 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
SimpleEvent(new DateTime("2012-01-01T01:10Z"), Map("foo" -> "e")),
SimpleEvent(new DateTime("2012-01-01T01:20Z"), Map("foo" -> "f")),
SimpleEvent(new DateTime("2012-01-01T03:05Z"), Map("foo" -> "g")),
SimpleEvent(new DateTime("2012-01-01T03:20Z"), Map("foo" -> "h"))
SimpleEvent(new DateTime("2012-01-01T03:20Z"), Map("foo" -> "h")),
SimpleEvent(new DateTime("2012-01-01T01:05Z"), Map("foo" -> "i")),
SimpleEvent(new DateTime("2012-01-01T01:06Z"), Map("foo" -> "j")),
SimpleEvent(new DateTime("2012-01-01T01:07Z"), Map("foo" -> "k")),
SimpleEvent(new DateTime("2012-01-01T01:06Z"), Map("foo" -> "l")),
SimpleEvent(new DateTime("2012-01-01T01:05Z"), Map("foo" -> "m")),
SimpleEvent(new DateTime("2012-01-01T01:09Z"), Map("foo" -> "n")),
SimpleEvent(new DateTime("2012-01-01T01:10Z"), Map("foo" -> "o"))
) map {
x => x.fields("foo") -> x
}).toMap
Expand All @@ -79,14 +87,18 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
val localZone = new DateTime().getZone

def buffers = _lock.synchronized {
_buffers.values.map(x => (x.timestamp.withZone(localZone), x.partition, x.open, x.buffer.toSeq)).toSet
_buffers.values.map(x => (x.interval.start.withZone(localZone), x.partition, x.open, x.buffer.toSeq)).toSet
}

def buffersWithInterval = _lock.synchronized {
_buffers.values.map(x => (x.interval, x.partition, x.open, x.buffer.toSeq)).toSet
}

def beamsList = _lock.synchronized {
_beams.toList
}

class EventBuffer(val timestamp: DateTime, val partition: Int)
class EventBuffer(val interval: Interval, val partition: Int)
{
val buffer: mutable.Buffer[SimpleEvent] = mutable.ListBuffer()
@volatile var open: Boolean = true
Expand All @@ -113,20 +125,20 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
def getInterval() = None
}

class TestingBeam(val timestamp: DateTime, val partition: Int, val uuid: String = UUID.randomUUID().toString)
class TestingBeam(val interval: Interval, val partition: Int, val uuid: String = UUID.randomUUID().toString)
extends Beam[SimpleEvent]
{
_lock.synchronized {
_beams += this
}

def getInterval() = None
def getInterval() = Some(interval)

def propagate(_events: Seq[SimpleEvent]) = _lock.synchronized {
if (_events.contains(events("defunct"))) {
Future.exception(new DefunctBeamException("Defunct"))
} else {
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(timestamp, partition))
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(interval, partition))
buffer.open = true
buffer.buffer ++= _events
Future.value(_events.size)
Expand All @@ -135,35 +147,35 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA

def close() = _lock.synchronized {
_beams -= this
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(timestamp, partition))
val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(interval, partition))
buffer.open = false
Future.Done
}

def toDict = Dict(
"timestamp" -> timestamp.toString(),
"interval" -> interval.toString,
"partition" -> partition,
"uuid" -> uuid
)
}

class TestingBeamMaker extends BeamMaker[SimpleEvent, TestingBeam]
{
def newBeam(interval: Interval, partition: Int) = new TestingBeam(interval.start, partition)
def newBeam(interval: Interval, partition: Int) = new TestingBeam(interval, partition)

def toDict(beam: TestingBeam) = {
Dict(
"timestamp" -> beam.timestamp.toString(),
"interval" -> beam.interval.toString,
"partition" -> beam.partition,
"uuid" -> beam.uuid
)
}

def fromDict(d: Dict) = {
val timestamp = new DateTime(d("timestamp"))
val interval= new Interval(d("interval"))
val partition = int(d("partition"))
val uuid = str(d("uuid"))
new TestingBeam(timestamp, partition, uuid)
new TestingBeam(interval, partition, uuid)
}
}

Expand Down Expand Up @@ -357,6 +369,72 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
}
}

test("IncreaseGranularity") {
withLocalCurator {
curator =>
val oldTuning = defaultTuning.copy(segmentGranularity = Granularity.MINUTE, windowPeriod = 1.minute)
val newTuning = oldTuning.copy(segmentGranularity = Granularity.FIVE_MINUTE)

val beamsA = newBeams(curator, oldTuning)
beamsA.timekeeper.now = start
beamsA.blockagate(Seq("i") map events)
beamsA.blockagate(Seq("i") map events)
beamsA.timekeeper.now = start + 1.minute
beamsA.blockagate(Seq("j") map events)
beamsA.blockagate(Seq("j") map events)

val beamsB = newBeams(curator, newTuning)
beamsB.timekeeper.now = start + 2.minute
beamsB.blockagate(Seq("k") map events)
beamsB.blockagate(Seq("k") map events)
beamsB.blockagate(Seq("l") map events)
beamsB.blockagate(Seq("l") map events)
beamsB.blockagate(Seq("m") map events)
beamsB.blockagate(Seq("m") map events)
beamsB.blockagate(Seq("n") map events)
beamsB.blockagate(Seq("n") map events)

Await.result(beamsA.close())

assert(buffersWithInterval === Set(
(new Interval("2012-01-01T01:05Z/2012-01-01T01:06Z", ISOChronology.getInstanceUTC), 0, false, Seq("i") map events),
(new Interval("2012-01-01T01:05Z/2012-01-01T01:06Z", ISOChronology.getInstanceUTC), 1, false, Seq("i") map events),
// "j" and "l" are in same partition as diff beams were used to propagate them
(new Interval("2012-01-01T01:06Z/2012-01-01T01:07Z", ISOChronology.getInstanceUTC), 0, false, Seq("j", "l") map events),
(new Interval("2012-01-01T01:06Z/2012-01-01T01:07Z", ISOChronology.getInstanceUTC), 1, false, Seq("j", "l") map events),
(new Interval("2012-01-01T01:07Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 0, true, Seq("k", "n") map events),
(new Interval("2012-01-01T01:07Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 1, true, Seq("k", "n") map events)
))
}
}

test("DecreaseGranularity") {
withLocalCurator {
curator =>
val oldTuning = defaultTuning.copy(segmentGranularity = Granularity.FIVE_MINUTE)
val newTuning = oldTuning.copy(segmentGranularity = Granularity.MINUTE)

val beamsA = newBeams(curator, oldTuning)
beamsA.timekeeper.now = start
beamsA.blockagate(Seq("i") map events)

val beamsB = newBeams(curator, newTuning)
beamsB.timekeeper.now = start + 4.minute
beamsB.blockagate(Seq("j") map events)
beamsB.blockagate(Seq("n") map events)
beamsB.blockagate(Seq("o") map events)
beamsB.blockagate(Seq("o") map events)
Await.result(beamsB.close())

assert(buffersWithInterval === Set(
(new Interval("2012-01-01T01:05Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 0, false, Seq("i", "j") map events),
(new Interval("2012-01-01T01:05Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 1, false, Seq("n") map events),
(new Interval("2012-01-01T01:10Z/2012-01-01T01:11Z", ISOChronology.getInstanceUTC), 0, false, Seq("o") map events),
(new Interval("2012-01-01T01:10Z/2012-01-01T01:11Z", ISOChronology.getInstanceUTC), 1, false, Seq("o") map events)
))
}
}

test("DefunctBeam") {
withLocalCurator {
curator =>
Expand Down Expand Up @@ -389,10 +467,10 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA
))
val desired = List("2012-01-01T00Z", "2012-01-01T00Z", "2012-01-01T01Z", "2012-01-01T01Z").map(new DateTime(_))
val startTime = System.currentTimeMillis()
while (System.currentTimeMillis() < startTime + 2000 && beamsList.map(_.timestamp).sortBy(_.millis) != desired) {
while (System.currentTimeMillis() < startTime + 2000 && beamsList.map(_.interval.start).sortBy(_.millis) != desired) {
Thread.sleep(100)
}
assert(beamsList.map(_.timestamp).sortBy(_.millis) === desired)
assert(beamsList.map(_.interval.start).sortBy(_.millis) === desired)
}
}

Expand Down

0 comments on commit dd9b3bd

Please sign in to comment.