From dd9b3bdade1792058e7c6f1b5b6e98c5de3c9c99 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Mon, 30 Nov 2015 14:09:34 -0600 Subject: [PATCH] fix bug and added unit tests --- .../tranquility/beam/ClusteredBeam.scala | 74 ++++++------ .../tranquility/test/ClusteredBeamTest.scala | 106 +++++++++++++++--- 2 files changed, 131 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala index 013dc18..a656dca 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala @@ -132,32 +132,17 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( private[this] val rand = new Random - // Reverse sorted list (by interval start time) of Merged beams we are currently aware of - private[this] var beams: List[Beam[EventType]] = - { + private[this] var zkMetaCache = { try { val dataPath = zpathWithDefault("data", ClusteredBeamMeta.empty.toBytes(objectMapper)) curator.sync().forPath(dataPath) - val zkMetaData = ClusteredBeamMeta.fromBytes(objectMapper, curator.getData.forPath(dataPath)).fold( + ClusteredBeamMeta.fromBytes(objectMapper, curator.getData.forPath(dataPath)).fold( e => { emitAlert(e, log, emitter, WARN, "Failed to read beam data from cache: %s" format identifier, alertMap) throw e }, meta => meta ) - log.info("Synced from ZK, Found Beams - [%s]", zkMetaData) - ( - zkMetaData.beamDictss map { - beamDicts => - beamMergeFn( - beamDicts._2.zipWithIndex map { - case (beamDict, partitionNum) => - val decorate = beamDecorateFn(new Interval(beamDict.get("interval").get), partitionNum) - decorate(beamMaker.fromDict(beamDict)) - } - ) - } - ).toList sortBy (-_.getInterval().get.start.millis) } catch { case e: Throwable => @@ -165,6 +150,9 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( } } + // Reverse sorted list (by interval start time) of Merged beams we are currently aware of + private[this] var beams: List[Beam[EventType]] = Nil + // Lock updates to "localLatestCloseTime" and "beams" to prevent races. private[this] val beamWriteMonitor = new AnyRef @@ -184,6 +172,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( ) val newMeta = f(prevMeta) if (newMeta != prevMeta) { + zkMetaCache = newMeta val newMetaBytes = newMeta.toBytes(objectMapper) log.info("Writing new beam data to[%s]: %s", dataPath, new String(newMetaBytes)) curator.setData().forPath(dataPath, newMetaBytes) @@ -219,7 +208,9 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( case (interval, Some(foundBeam)) if foundBeam.getInterval().get.end + tuning.windowPeriod <= now => Future.value(None) case (interval, Some(foundBeam)) => Future.value(Some(foundBeam)) case (interval, None) if interval.start <= localLatestCloseTime => Future.value(None) + case (interval, None) if interval.end + tuning.windowPeriod <= now => Future.value(None) case (interval, None) if !creationInterval.overlaps(interval) => Future.value(None) + case (interval, None) if interval.toDurationMillis == 0 => Future.value(None) case (interval, None) => // We may want to create new merged beam(s). Acquire the zk mutex and examine the situation. // This could be more efficient, but it's happening infrequently so it's probably not a big deal. @@ -228,9 +219,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( log.info("Trying to create new beam with interval [%s]", interval) // We want to create this new beam // But first let us check in ZK if there is already any beam in ZK covering this interval - val beamDicts: Seq[untyped.Dict] = prev.beamDictss.collectFirst[Seq[untyped.Dict]]({ - case x if new Interval(x._2.head.get("interval").get).overlaps(interval) => x._2 + case x if new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC).overlaps(interval) => x._2 }) getOrElse Nil if (beamDicts.size >= tuning.partitions) { @@ -250,7 +240,11 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( interval.start ) prev - } else if (beamDicts.nonEmpty){ + } else if (beamDicts.nonEmpty && + !beamDicts.exists( + beamDict => new Interval(beamDict.get("interval").get, ISOChronology.getInstanceUTC).contains(interval) + )) + { throw new IllegalStateException( "WTF?? Requested to create a beam for interval [%s] which overlaps with existing beam [%s]" format(interval, beamDicts) ) @@ -280,7 +274,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( val newBeamDictss: Map[Long, Seq[Dict]] = (prev.beamDictss filterNot { case (millis, beam) => // Expire old beamDicts - new Interval(beam.head.get("interval").get).end + tuning.windowPeriod < now + new Interval(beam.head.get("interval").get, ISOChronology.getInstanceUTC).end + tuning.windowPeriod < now }) ++ (for (ts <- timestampsToCover) yield { val tsPrevDicts = prev.beamDictss.getOrElse(ts.millis, Nil) log.info( @@ -390,22 +384,32 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( // Most of the times head beam can handle the event as the beams is reverse sorted list by start time of interval intervalBeamPair.find(_._1.contains(eventTimestamp)) match { - case Some(mapEntry) => (mapEntry._1, mapEntry._2) + case Some(x) => (x._1, x._2) case None => - val requiredInterval = tuning.segmentBucket(eventTimestamp) + val requiredInterval = tuning.segmentBucket(eventTimestamp).withChronology(ISOChronology.getInstanceUTC) // Check to see if the interval for which we want to create a beam overlaps with interval of any existing beam // this may happen if segment granularity is changed - val mayBeOverlappingInterval = beams.collectFirst { - case x if x.getInterval().get.overlaps(requiredInterval) => x.getInterval().get - } getOrElse new Interval(0,0) - - ( - new Interval( - Math.max(mayBeOverlappingInterval.end.millis, requiredInterval.start.millis), - Math.max(mayBeOverlappingInterval.end.millis, requiredInterval.end.millis) - ), - None - ) + val actualInterval = zkMetaCache.beamDictss.toSeq.sortBy(-_._1) collectFirst { + case x if new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC).contains(eventTimestamp) => + new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC) + } match { + case Some(x) => x + case None => + zkMetaCache.beamDictss.toSeq.sortBy(-_._1) collectFirst { + case x if new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC).overlaps(requiredInterval) => + new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC) + } match { + case None => requiredInterval + case Some(x) => + new Interval( + Math.max(x.end.millis, requiredInterval.start.millis), + Math.max(x.end.millis, requiredInterval.end.millis), + ISOChronology.getInstanceUTC + ) + } + } + intervalBeamPair.put(actualInterval, None) + (actualInterval, None) } } @@ -430,7 +434,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( !beams.exists(_.getInterval().get.contains(tbwTimestamp)) ) yield { // Create beam asynchronously - beam((tuning.segmentBucket(tbwTimestamp), None), now) + beam((tuning.segmentBucket(tbwTimestamp).withChronology(ISOChronology.getInstanceUTC), None), now) }) // Propagate data val countFutures = for ((beamIntervalPair, eventGroup) <- grouped) yield { diff --git a/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala b/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala index 23380d1..15313c3 100644 --- a/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala +++ b/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala @@ -44,6 +44,7 @@ import org.apache.curator.framework.CuratorFramework import org.joda.time.DateTimeZone import org.joda.time.DateTime import org.joda.time.Interval +import org.joda.time.chrono.ISOChronology import org.scala_tools.time.Implicits._ import org.scalatest.BeforeAndAfter import org.scalatest.FunSuite @@ -68,7 +69,14 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA SimpleEvent(new DateTime("2012-01-01T01:10Z"), Map("foo" -> "e")), SimpleEvent(new DateTime("2012-01-01T01:20Z"), Map("foo" -> "f")), SimpleEvent(new DateTime("2012-01-01T03:05Z"), Map("foo" -> "g")), - SimpleEvent(new DateTime("2012-01-01T03:20Z"), Map("foo" -> "h")) + SimpleEvent(new DateTime("2012-01-01T03:20Z"), Map("foo" -> "h")), + SimpleEvent(new DateTime("2012-01-01T01:05Z"), Map("foo" -> "i")), + SimpleEvent(new DateTime("2012-01-01T01:06Z"), Map("foo" -> "j")), + SimpleEvent(new DateTime("2012-01-01T01:07Z"), Map("foo" -> "k")), + SimpleEvent(new DateTime("2012-01-01T01:06Z"), Map("foo" -> "l")), + SimpleEvent(new DateTime("2012-01-01T01:05Z"), Map("foo" -> "m")), + SimpleEvent(new DateTime("2012-01-01T01:09Z"), Map("foo" -> "n")), + SimpleEvent(new DateTime("2012-01-01T01:10Z"), Map("foo" -> "o")) ) map { x => x.fields("foo") -> x }).toMap @@ -79,14 +87,18 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA val localZone = new DateTime().getZone def buffers = _lock.synchronized { - _buffers.values.map(x => (x.timestamp.withZone(localZone), x.partition, x.open, x.buffer.toSeq)).toSet + _buffers.values.map(x => (x.interval.start.withZone(localZone), x.partition, x.open, x.buffer.toSeq)).toSet + } + + def buffersWithInterval = _lock.synchronized { + _buffers.values.map(x => (x.interval, x.partition, x.open, x.buffer.toSeq)).toSet } def beamsList = _lock.synchronized { _beams.toList } - class EventBuffer(val timestamp: DateTime, val partition: Int) + class EventBuffer(val interval: Interval, val partition: Int) { val buffer: mutable.Buffer[SimpleEvent] = mutable.ListBuffer() @volatile var open: Boolean = true @@ -113,20 +125,20 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA def getInterval() = None } - class TestingBeam(val timestamp: DateTime, val partition: Int, val uuid: String = UUID.randomUUID().toString) + class TestingBeam(val interval: Interval, val partition: Int, val uuid: String = UUID.randomUUID().toString) extends Beam[SimpleEvent] { _lock.synchronized { _beams += this } - def getInterval() = None + def getInterval() = Some(interval) def propagate(_events: Seq[SimpleEvent]) = _lock.synchronized { if (_events.contains(events("defunct"))) { Future.exception(new DefunctBeamException("Defunct")) } else { - val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(timestamp, partition)) + val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(interval, partition)) buffer.open = true buffer.buffer ++= _events Future.value(_events.size) @@ -135,13 +147,13 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA def close() = _lock.synchronized { _beams -= this - val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(timestamp, partition)) + val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(interval, partition)) buffer.open = false Future.Done } def toDict = Dict( - "timestamp" -> timestamp.toString(), + "interval" -> interval.toString, "partition" -> partition, "uuid" -> uuid ) @@ -149,21 +161,21 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA class TestingBeamMaker extends BeamMaker[SimpleEvent, TestingBeam] { - def newBeam(interval: Interval, partition: Int) = new TestingBeam(interval.start, partition) + def newBeam(interval: Interval, partition: Int) = new TestingBeam(interval, partition) def toDict(beam: TestingBeam) = { Dict( - "timestamp" -> beam.timestamp.toString(), + "interval" -> beam.interval.toString, "partition" -> beam.partition, "uuid" -> beam.uuid ) } def fromDict(d: Dict) = { - val timestamp = new DateTime(d("timestamp")) + val interval= new Interval(d("interval")) val partition = int(d("partition")) val uuid = str(d("uuid")) - new TestingBeam(timestamp, partition, uuid) + new TestingBeam(interval, partition, uuid) } } @@ -357,6 +369,72 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA } } + test("IncreaseGranularity") { + withLocalCurator { + curator => + val oldTuning = defaultTuning.copy(segmentGranularity = Granularity.MINUTE, windowPeriod = 1.minute) + val newTuning = oldTuning.copy(segmentGranularity = Granularity.FIVE_MINUTE) + + val beamsA = newBeams(curator, oldTuning) + beamsA.timekeeper.now = start + beamsA.blockagate(Seq("i") map events) + beamsA.blockagate(Seq("i") map events) + beamsA.timekeeper.now = start + 1.minute + beamsA.blockagate(Seq("j") map events) + beamsA.blockagate(Seq("j") map events) + + val beamsB = newBeams(curator, newTuning) + beamsB.timekeeper.now = start + 2.minute + beamsB.blockagate(Seq("k") map events) + beamsB.blockagate(Seq("k") map events) + beamsB.blockagate(Seq("l") map events) + beamsB.blockagate(Seq("l") map events) + beamsB.blockagate(Seq("m") map events) + beamsB.blockagate(Seq("m") map events) + beamsB.blockagate(Seq("n") map events) + beamsB.blockagate(Seq("n") map events) + + Await.result(beamsA.close()) + + assert(buffersWithInterval === Set( + (new Interval("2012-01-01T01:05Z/2012-01-01T01:06Z", ISOChronology.getInstanceUTC), 0, false, Seq("i") map events), + (new Interval("2012-01-01T01:05Z/2012-01-01T01:06Z", ISOChronology.getInstanceUTC), 1, false, Seq("i") map events), + // "j" and "l" are in same partition as diff beams were used to propagate them + (new Interval("2012-01-01T01:06Z/2012-01-01T01:07Z", ISOChronology.getInstanceUTC), 0, false, Seq("j", "l") map events), + (new Interval("2012-01-01T01:06Z/2012-01-01T01:07Z", ISOChronology.getInstanceUTC), 1, false, Seq("j", "l") map events), + (new Interval("2012-01-01T01:07Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 0, true, Seq("k", "n") map events), + (new Interval("2012-01-01T01:07Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 1, true, Seq("k", "n") map events) + )) + } + } + + test("DecreaseGranularity") { + withLocalCurator { + curator => + val oldTuning = defaultTuning.copy(segmentGranularity = Granularity.FIVE_MINUTE) + val newTuning = oldTuning.copy(segmentGranularity = Granularity.MINUTE) + + val beamsA = newBeams(curator, oldTuning) + beamsA.timekeeper.now = start + beamsA.blockagate(Seq("i") map events) + + val beamsB = newBeams(curator, newTuning) + beamsB.timekeeper.now = start + 4.minute + beamsB.blockagate(Seq("j") map events) + beamsB.blockagate(Seq("n") map events) + beamsB.blockagate(Seq("o") map events) + beamsB.blockagate(Seq("o") map events) + Await.result(beamsB.close()) + + assert(buffersWithInterval === Set( + (new Interval("2012-01-01T01:05Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 0, false, Seq("i", "j") map events), + (new Interval("2012-01-01T01:05Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 1, false, Seq("n") map events), + (new Interval("2012-01-01T01:10Z/2012-01-01T01:11Z", ISOChronology.getInstanceUTC), 0, false, Seq("o") map events), + (new Interval("2012-01-01T01:10Z/2012-01-01T01:11Z", ISOChronology.getInstanceUTC), 1, false, Seq("o") map events) + )) + } + } + test("DefunctBeam") { withLocalCurator { curator => @@ -389,10 +467,10 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA )) val desired = List("2012-01-01T00Z", "2012-01-01T00Z", "2012-01-01T01Z", "2012-01-01T01Z").map(new DateTime(_)) val startTime = System.currentTimeMillis() - while (System.currentTimeMillis() < startTime + 2000 && beamsList.map(_.timestamp).sortBy(_.millis) != desired) { + while (System.currentTimeMillis() < startTime + 2000 && beamsList.map(_.interval.start).sortBy(_.millis) != desired) { Thread.sleep(100) } - assert(beamsList.map(_.timestamp).sortBy(_.millis) === desired) + assert(beamsList.map(_.interval.start).sortBy(_.millis) === desired) } }