diff --git a/core/src/main/scala/com/metamx/tranquility/beam/Beam.scala b/core/src/main/scala/com/metamx/tranquility/beam/Beam.scala index 2c9b5ae..f0fec0c 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/Beam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/Beam.scala @@ -17,12 +17,13 @@ package com.metamx.tranquility.beam import com.twitter.util.Future +import org.joda.time.Interval /** * Beams can accept events and forward them along. The propagate method may throw a DefunctBeamException, which means * the beam should be discarded (after calling close()). */ -trait Beam[A] +trait Beam[A] extends DiscoverableInterval { /** * Request propagation of events. The operation may fail in various ways, which tend to be specific to @@ -40,3 +41,11 @@ class DefunctBeamException(s: String, t: Throwable) extends Exception(s, t) { def this(s: String) = this(s, null) } + +trait DiscoverableInterval +{ + /** + * Returns the interval handled by the Beam, can return None if there is no associated interval + * */ + def getInterval(): Option[Interval] +} diff --git a/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala index 192ca78..8aca140 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala @@ -18,9 +18,8 @@ package com.metamx.tranquility.beam import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.util.concurrent.ThreadFactoryBuilder -import com.metamx.common.scala.Logging +import com.metamx.common.scala.{untyped, Logging} import com.metamx.common.scala.Predef._ -import com.metamx.common.scala.collection.mutable.ConcurrentMap import com.metamx.common.scala.event._ import com.metamx.common.scala.event.emit.emitAlert import com.metamx.common.scala.option._ @@ -88,6 +87,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( alertMap: Dict ) extends Beam[EventType] with Logging { + def getInterval() = None + require(tuning.partitions > 0, "tuning.partitions > 0") require(tuning.minSegmentsPerBeam > 0, "tuning.minSegmentsPerBeam > 0") require(tuning.maxSegmentsPerBeam >= tuning.minSegmentsPerBeam, "tuning.maxSegmentsPerBeam >= tuning.minSegmentsPerBeam") @@ -131,8 +132,26 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( private[this] val rand = new Random - // Merged beams we are currently aware of, interval start millis -> merged beam. - private[this] val beams = ConcurrentMap[Long, Beam[EventType]]() + private[this] var zkMetaCache = { + try { + val dataPath = zpathWithDefault("data", ClusteredBeamMeta.empty.toBytes(objectMapper)) + curator.sync().forPath(dataPath) + ClusteredBeamMeta.fromBytes(objectMapper, curator.getData.forPath(dataPath)).fold( + e => { + emitAlert(e, log, emitter, WARN, "Failed to read beam data from cache: %s" format identifier, alertMap) + throw e + }, + meta => meta + ) + } + catch { + // Fail if we fail to read from Zookeeper during startup + case e: Throwable => throw e + } + } + + // Reverse sorted list (by interval start time) of Merged beams we are currently aware of + private[this] var beams: List[Beam[EventType]] = Nil // Lock updates to "localLatestCloseTime" and "beams" to prevent races. private[this] val beamWriteMonitor = new AnyRef @@ -153,6 +172,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( ) val newMeta = f(prevMeta) if (newMeta != prevMeta) { + zkMetaCache = newMeta val newMetaBytes = newMeta.toBytes(objectMapper) log.info("Writing new beam data to[%s]: %s", dataPath, new String(newMetaBytes)) curator.setData().forPath(dataPath, newMetaBytes) @@ -178,67 +198,83 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( t => theImplicit(t).withZone(DateTimeZone.UTC) } - private[this] def beam(timestamp: DateTime, now: DateTime): Future[Beam[EventType]] = { - val bucket = tuning.segmentBucket(timestamp) + private[this] def beam(beamPair :(Interval, Option[Beam[EventType]]), now: DateTime): Future[Beam[EventType]] = { val creationInterval = new Interval( - tuning.segmentBucket(now - tuning.windowPeriod).start.millis, - tuning.segmentBucket(Seq(now + tuning.warmingPeriod, now + tuning.windowPeriod).maxBy(_.millis)).end.millis, - ISOChronology.getInstanceUTC - ) - val windowInterval = new Interval( - tuning.segmentBucket(now - tuning.windowPeriod).start.millis, - tuning.segmentBucket(now + tuning.windowPeriod).end.millis, - ISOChronology.getInstanceUTC + tuning.segmentBucket(now - tuning.windowPeriod).start, + tuning.segmentBucket(Seq(now + tuning.warmingPeriod, now + tuning.windowPeriod).maxBy(_.millis)).end ) - val futureBeamOption = beams.get(timestamp.millis) match { + val futureBeamOption = beamPair match { case _ if !open => Future.value(None) - case Some(x) if windowInterval.overlaps(bucket) => Future.value(Some(x)) - case Some(x) => Future.value(None) - case None if timestamp <= localLatestCloseTime => Future.value(None) - case None if !creationInterval.overlaps(bucket) => Future.value(None) - case None => + case (interval, Some(foundBeam)) if foundBeam.getInterval().get.end + tuning.windowPeriod <= now => Future.value(None) + case (interval, Some(foundBeam)) => Future.value(Some(foundBeam)) + case (interval, None) if interval.start <= localLatestCloseTime => Future.value(None) + case (interval, None) if interval.end + tuning.windowPeriod <= now => Future.value(None) + case (interval, None) if !creationInterval.overlaps(interval) => Future.value(None) + case (interval, None) if interval.toDurationMillis == 0 => Future.value(None) + case (interval, None) => // We may want to create new merged beam(s). Acquire the zk mutex and examine the situation. // This could be more efficient, but it's happening infrequently so it's probably not a big deal. data.modify { prev => - val prevBeamDicts = prev.beamDictss.getOrElse(timestamp.millis, Nil) - if (prevBeamDicts.size >= tuning.partitions) { + log.info("Trying to create new beam with interval [%s]", interval) + // We want to create this new beam + // But first let us check in ZK if there is already any beam in ZK covering this interval + val beamDicts: Seq[untyped.Dict] = prev.beamDictss.collectFirst[Seq[untyped.Dict]]({ + case x if new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC).overlaps(interval) => x._2 + }) getOrElse Nil + + if (beamDicts.size >= tuning.partitions) { log.info( - "Merged beam already created for identifier[%s] timestamp[%s], with sufficient partitions (target = %d, actual = %d)", + "Merged beam already created for identifier[%s] interval[%s], with sufficient partitions (target = %d, actual = %d)", identifier, - timestamp, + interval, tuning.partitions, - prevBeamDicts.size + beamDicts.size ) prev - } else if (timestamp <= prev.latestCloseTime) { + } else if (interval.start <= prev.latestCloseTime) { log.info( "Global latestCloseTime[%s] for identifier[%s] has moved past timestamp[%s], not creating merged beam", prev.latestCloseTime, identifier, - timestamp + interval.start ) prev + } else if (beamDicts.nonEmpty && + !beamDicts.exists( + beamDict => new Interval(beamDict.get("interval").get, ISOChronology.getInstanceUTC).contains(interval) + )) + { + throw new IllegalStateException( + "WTF?? Requested to create a beam for interval [%s] which overlaps with existing beam [%s]" format(interval, beamDicts) + ) } else { - assert(prevBeamDicts.size < tuning.partitions) - assert(timestamp > prev.latestCloseTime) + // Create the new beam + assert(beamDicts.size < tuning.partitions) + assert(interval.start > prev.latestCloseTime) - // We might want to cover multiple time segments in advance. val numSegmentsToCover = tuning.minSegmentsPerBeam + rand.nextInt(tuning.maxSegmentsPerBeam - tuning.minSegmentsPerBeam + 1) - val intervalToCover = new Interval( - timestamp.millis, - tuning.segmentGranularity.increment(timestamp, numSegmentsToCover).millis, + var intervalToCover = new Interval( + interval.start.millis, + tuning.segmentGranularity.increment(interval.start, numSegmentsToCover).millis, ISOChronology.getInstanceUTC ) - val timestampsToCover = tuning.segmentGranularity.getIterable(intervalToCover).asScala.map(_.start) + var timestampsToCover = tuning.segmentGranularity.getIterable(intervalToCover).asScala.map(_.start) + + // Check if we are trying to create a beam not covering an entire segment bucket + if (!tuning.segmentGranularity.widen(interval).equals(interval)) { + log.warn("Creating partial beam with interval [%s] as beam [%s] already covers some segment portion", beamDicts, interval) + intervalToCover = interval + timestampsToCover = Iterable(intervalToCover.start) + } // OK, create them where needed. val newInnerBeamDictsByPartition = new mutable.HashMap[Int, Dict] val newBeamDictss: Map[Long, Seq[Dict]] = (prev.beamDictss filterNot { case (millis, beam) => // Expire old beamDicts - tuning.segmentGranularity.increment(new DateTime(millis)) + tuning.windowPeriod < now + new Interval(beam.head.get("interval").get, ISOChronology.getInstanceUTC).end + tuning.windowPeriod < now }) ++ (for (ts <- timestampsToCover) yield { val tsPrevDicts = prev.beamDictss.getOrElse(ts.millis, Nil) log.info( @@ -250,7 +286,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( ) val tsNewDicts = tsPrevDicts ++ ((tsPrevDicts.size until tuning.partitions) map { partition => - newInnerBeamDictsByPartition.getOrElseUpdate(partition, { + newInnerBeamDictsByPartition.getOrElseUpdate( + partition, { // Create sub-beams and then immediately close them, just so we can get the dict representations. // Close asynchronously, ignore return value. beamMaker.newBeam(intervalToCover, partition).withFinally(_.close()) { @@ -259,7 +296,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( log.info("Created beam: %s", objectMapper.writeValueAsString(beamDict)) beamDict } - }) + } + ) }) (ts.millis, tsNewDicts) }) @@ -267,16 +305,13 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( (Seq(prev.latestCloseTime.millis) ++ (prev.beamDictss.keySet -- newBeamDictss.keySet)).max, ISOChronology.getInstanceUTC ) - ClusteredBeamMeta( - newLatestCloseTime, - newBeamDictss - ) + ClusteredBeamMeta(newLatestCloseTime, newBeamDictss) } } rescue { case e: Throwable => Future.exception( new IllegalStateException( - "Failed to save new beam for identifier[%s] timestamp[%s]" format(identifier, timestamp), e + "Failed to save new beam for identifier[%s] timestamp[%s]" format(identifier, interval.start), e ) ) } map { @@ -284,31 +319,40 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( // Update local stuff with our goodies from zk. beamWriteMonitor.synchronized { localLatestCloseTime = meta.latestCloseTime + val timestamp = interval.start // Only add the beams we actually wanted at this time. This is because there might be other beams in ZK // that we don't want to add just yet, on account of maybe they need their partitions expanded (this only // happens when they are the necessary ones). - if (!beams.contains(timestamp.millis) && meta.beamDictss.contains(timestamp.millis)) { + if (!beams.exists(beam => beam.getInterval().get.start.eq(timestamp)) && + meta.beamDictss.contains(timestamp.millis)) { + val beamDicts = meta.beamDictss(timestamp.millis) log.info("Adding beams for identifier[%s] timestamp[%s]: %s", identifier, timestamp, beamDicts) // Should have better handling of unparseable zk data. Changing BeamMaker implementations currently // just causes exceptions until the old dicts are cleared out. - beams(timestamp.millis) = beamMergeFn( + beams = beamMergeFn( beamDicts.zipWithIndex map { case (beamDict, partitionNum) => val decorate = beamDecorateFn(tuning.segmentBucket(timestamp), partitionNum) decorate(beamMaker.fromDict(beamDict)) } - ) + ) +: beams } // Remove beams that are gone from ZK metadata. They have expired. - for ((timestamp, beam) <- beams -- meta.beamDictss.keys) { - log.info("Removing beams for identifier[%s] timestamp[%s]", identifier, timestamp) + val expiredBeams = beams.filterNot(beam => meta.beamDictss.contains(beam.getInterval().get.start.millis)) + + for (beam <- expiredBeams) { + log.info("Removing beams for identifier[%s] timestamp[%s]", identifier, beam.getInterval().get.start) // Close asynchronously, ignore return value. - beams(timestamp).close() - beams.remove(timestamp) + beam.close() } + beams = beams.diff(expiredBeams) + // This may not be required as we always create and add beams in chronological order + // so in effect the list should always be reverse sorted + beams = beams.sortBy(-_.getInterval().get.start.millis) + // Return requested beam. It may not have actually been created, so it's an Option. - beams.get(timestamp.millis) ifEmpty { + beams.find(beam => beam.getInterval().get.contains(timestamp)) ifEmpty { log.info( "Turns out we decided not to actually make beams for identifier[%s] timestamp[%s]. Returning None.", identifier, @@ -327,16 +371,55 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( beamMergeFn( (0 until tuning.partitions) map { partition => - beamDecorateFn(bucket, partition)(new NoopBeam[EventType]) + beamDecorateFn(beamPair._1, partition)(new NoopBeam[EventType]) } ) ) } } + def groupEvents(event: EventType, intervalBeamPair: mutable.Map[Interval, Option[Beam[EventType]]]): (Interval, Option[Beam[EventType]]) = { + val eventTimestamp = timestamper(event) + // looks in sequential order for the predicate to be true + // Most of the times head beam can handle the event as the beams is reverse sorted list by start time of interval + + intervalBeamPair.find(_._1.contains(eventTimestamp)) match { + case Some(x) => (x._1, x._2) + case None => + val requiredInterval = tuning.segmentBucket(eventTimestamp).withChronology(ISOChronology.getInstanceUTC) + // Check to see if the interval for which we want to create a beam overlaps with interval of any existing beam + // this may happen if segment granularity is changed + val actualInterval = zkMetaCache.beamDictss.toSeq.sortBy(-_._1) collectFirst { + case x if new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC).contains(eventTimestamp) => + new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC) + } match { + case Some(x) => x + case None => + zkMetaCache.beamDictss.toSeq.sortBy(-_._1) collectFirst { + case x if new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC).overlaps(requiredInterval) => + new Interval(x._2.head.get("interval").get, ISOChronology.getInstanceUTC) + } match { + case None => requiredInterval + case Some(x) => + new Interval( + Math.max(x.end.millis, requiredInterval.start.millis), + Math.max(x.end.millis, requiredInterval.end.millis), + ISOChronology.getInstanceUTC + ) + } + } + intervalBeamPair.put(actualInterval, None) + (actualInterval, None) + } + } + def propagate(events: Seq[EventType]) = { val now = timekeeper.now.withZone(DateTimeZone.UTC) - val grouped = events.groupBy(x => tuning.segmentBucket(timestamper(x)).start).toSeq.sortBy(_._1.millis) + val intervalBeamPair = new mutable.HashMap[Interval, Option[Beam[EventType]]]() + beams map { + beam => intervalBeamPair.+=((beam.getInterval().get, Some(beam))) + } + val grouped = events.groupBy(groupEvents(_, intervalBeamPair)).toSeq.sortBy(_._1._1.start.millis) // Possibly warm up future beams def toBeWarmed(dt: DateTime, end: DateTime): List[DateTime] = { if (dt <= end) { @@ -347,14 +430,15 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( } val warmingBeams = Future.collect(for ( latestEvent <- grouped.lastOption.map(_._2.maxBy(timestamper(_).millis)).map(timestamper).toList; - tbwTimestamp <- toBeWarmed(latestEvent, latestEvent + tuning.warmingPeriod) if tbwTimestamp > latestEvent + tbwTimestamp <- toBeWarmed(latestEvent, latestEvent + tuning.warmingPeriod) if tbwTimestamp > latestEvent && + !beams.exists(_.getInterval().get.contains(tbwTimestamp)) ) yield { // Create beam asynchronously - beam(tbwTimestamp, now) + beam((tuning.segmentBucket(tbwTimestamp).withChronology(ISOChronology.getInstanceUTC), None), now) }) // Propagate data - val countFutures = for ((timestamp, eventGroup) <- grouped) yield { - beam(timestamp, now) onFailure { + val countFutures = for ((beamIntervalPair, eventGroup) <- grouped) yield { + beam(beamIntervalPair, now) onFailure { e => emitAlert(e, log, emitter, WARN, "Failed to create merged beam: %s" format identifier, alertMap) } flatMap { @@ -362,6 +446,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( // We expect beams to handle retries, so if we get an exception here let's drop the batch beam.propagate(eventGroup) rescue { case e: DefunctBeamException => + val timestamp = beam.getInterval().get.start // Just drop data until the next segment starts. At some point we should look at doing something // more intelligent. emitAlert( @@ -369,7 +454,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( alertMap ++ Dict( "eventCount" -> eventGroup.size, - "timestamp" -> timestamp.toString(), + "timestamp" -> timestamp.toString, "beam" -> beam.toString ) ) @@ -382,7 +467,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( } onSuccess { meta => beamWriteMonitor.synchronized { - beams.remove(timestamp.millis) + beams = beams diff List(beam) } } map (_ => 0) @@ -392,7 +477,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( alertMap ++ Dict( "eventCount" -> eventGroup.size, - "timestamp" -> timestamp.toString(), + "timestamp" -> beam.getInterval().get.start.toString, "beams" -> beam.toString ) ) @@ -407,8 +492,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( def close() = { beamWriteMonitor.synchronized { open = false - val closeFuture = Future.collect(beams.values.toList map (_.close())) map (_ => ()) - beams.clear() + val closeFuture = Future.collect(beams map (_.close())) map (_ => ()) + beams = beams diff beams closeFuture } } @@ -428,11 +513,11 @@ case class ClusteredBeamMeta(latestCloseTime: DateTime, beamDictss: Map[Long, Se Dict( // latestTime is only being written for backwards compatibility "latestTime" -> new DateTime( - (Seq(latestCloseTime.millis) ++ beamDictss.map(_._1)).max, + (Seq(latestCloseTime.millis) ++ beamDictss.keys).max, ISOChronology.getInstanceUTC - ).toString(), - "latestCloseTime" -> latestCloseTime.toString(), - "beams" -> beamDictss.map(kv => (new DateTime(kv._1, ISOChronology.getInstanceUTC).toString(), kv._2)) + ).toString, + "latestCloseTime" -> latestCloseTime.toString, + "beams" -> beamDictss.map(kv => (new DateTime(kv._1, ISOChronology.getInstanceUTC).toString, kv._2)) ) ) } diff --git a/core/src/main/scala/com/metamx/tranquility/beam/HttpBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/HttpBeam.scala index 358b871..558bf4b 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/HttpBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/HttpBeam.scala @@ -58,6 +58,9 @@ class HttpBeam[A: Timestamper]( emitter: ServiceEmitter ) extends Beam[A] with Logging { + + def getInterval() = None + private[this] implicit val timer: Timer = DefaultTimer.twitter private[this] val port = if (uri.port > 0) { diff --git a/core/src/main/scala/com/metamx/tranquility/beam/MemoryBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/MemoryBeam.scala index cd713b7..6058458 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/MemoryBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/MemoryBeam.scala @@ -28,6 +28,8 @@ class MemoryBeam[A]( jsonWriter: JsonWriter[A] ) extends Beam[A] { + def getInterval() = None + def propagate(events: Seq[A]) = { events.map(event => Jackson.parse[Dict](jsonWriter.asBytes(event))) foreach { d => diff --git a/core/src/main/scala/com/metamx/tranquility/beam/MergingPartitioningBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/MergingPartitioningBeam.scala index 1e3e885..89b972e 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/MergingPartitioningBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/MergingPartitioningBeam.scala @@ -44,5 +44,12 @@ class MergingPartitioningBeam[A]( Future.collect(beams map (_.close())) map (_ => ()) } + def getInterval() = { + beams.headOption match { + case Some(x) => x.getInterval() + case None => None + } + } + override def toString = s"MergingPartitioningBeam(${beams.mkString(", ")})" } diff --git a/core/src/main/scala/com/metamx/tranquility/beam/NoopBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/NoopBeam.scala index 52acdbf..b5047c3 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/NoopBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/NoopBeam.scala @@ -17,6 +17,8 @@ package com.metamx.tranquility.beam import com.twitter.util.Future +import org.joda.time.Interval +import org.joda.time.chrono.ISOChronology class NoopBeam[A] extends Beam[A] { @@ -25,4 +27,6 @@ class NoopBeam[A] extends Beam[A] def close() = Future.Done override def toString = "NoopBeam()" + + def getInterval() = Some(new Interval(0, 0, ISOChronology.getInstanceUTC)) } diff --git a/core/src/main/scala/com/metamx/tranquility/beam/RoundRobinBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/RoundRobinBeam.scala index 3be692c..7c26ffc 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/RoundRobinBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/RoundRobinBeam.scala @@ -38,4 +38,11 @@ class RoundRobinBeam[A]( } override def toString = "RoundRobinBeam(%s)" format beams.mkString(", ") + + def getInterval() = { + beams.headOption match { + case Some(x) => x.getInterval() + case None => None + } + } } diff --git a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeam.scala b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeam.scala index 04f703e..6eaa394 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeam.scala @@ -19,8 +19,7 @@ package com.metamx.tranquility.druid import com.metamx.common.scala.Logging import com.metamx.common.scala.Predef._ import com.metamx.emitter.service.ServiceEmitter -import com.metamx.tranquility.beam.Beam -import com.metamx.tranquility.beam.DefunctBeamException +import com.metamx.tranquility.beam.{Beam, DefunctBeamException} import com.metamx.tranquility.finagle._ import com.metamx.tranquility.typeclass.ObjectWriter import com.twitter.util.Closable @@ -44,6 +43,9 @@ class DruidBeam[A]( objectWriter: ObjectWriter[A] ) extends Beam[A] with Logging with Closable { + + def getInterval() = Some(interval) + private[this] val clients = Map( tasks map { task => diff --git a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala index 3bf84ea..e40da15 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala @@ -138,10 +138,6 @@ class DruidBeamMaker[A: Timestamper]( } override def newBeam(interval: Interval, partition: Int) = { - require( - beamTuning.segmentGranularity.widen(interval) == interval, - "Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval) - ) val availabilityGroup = DruidBeamMaker.generateBaseFirehoseId( location.dataSource, beamTuning.segmentGranularity, @@ -190,10 +186,7 @@ class DruidBeamMaker[A: Timestamper]( // Backwards compatibility (see toDict). beamTuning.segmentBucket(new DateTime(d("timestamp"), ISOChronology.getInstanceUTC)) } - require( - beamTuning.segmentGranularity.widen(interval) == interval, - "Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval) - ) + val partition = int(d("partition")) val tasks = if (d contains "tasks") { list(d("tasks")).map(dict(_)).map(d => TaskPointer(str(d("id")), str(d("firehoseId")))) diff --git a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala index 31f4faf..a9f1ddf 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala @@ -221,6 +221,9 @@ object DruidBeams def close() = clusteredBeam.close() map (_ => lifecycle.stop()) override def toString = clusteredBeam.toString + + def getInterval() = clusteredBeam.getInterval() + } } diff --git a/core/src/test/scala/com/metamx/tranquility/test/BeamPacketizerTest.scala b/core/src/test/scala/com/metamx/tranquility/test/BeamPacketizerTest.scala index f9da7e9..6379b1a 100644 --- a/core/src/test/scala/com/metamx/tranquility/test/BeamPacketizerTest.scala +++ b/core/src/test/scala/com/metamx/tranquility/test/BeamPacketizerTest.scala @@ -56,6 +56,8 @@ class BeamPacketizerTest extends FunSuite with Logging } override def close() = memoryBeam.close() + + def getInterval() = None } val acked = new AtomicLong() diff --git a/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala b/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala index 5cf51be..15313c3 100644 --- a/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala +++ b/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala @@ -44,6 +44,7 @@ import org.apache.curator.framework.CuratorFramework import org.joda.time.DateTimeZone import org.joda.time.DateTime import org.joda.time.Interval +import org.joda.time.chrono.ISOChronology import org.scala_tools.time.Implicits._ import org.scalatest.BeforeAndAfter import org.scalatest.FunSuite @@ -68,7 +69,14 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA SimpleEvent(new DateTime("2012-01-01T01:10Z"), Map("foo" -> "e")), SimpleEvent(new DateTime("2012-01-01T01:20Z"), Map("foo" -> "f")), SimpleEvent(new DateTime("2012-01-01T03:05Z"), Map("foo" -> "g")), - SimpleEvent(new DateTime("2012-01-01T03:20Z"), Map("foo" -> "h")) + SimpleEvent(new DateTime("2012-01-01T03:20Z"), Map("foo" -> "h")), + SimpleEvent(new DateTime("2012-01-01T01:05Z"), Map("foo" -> "i")), + SimpleEvent(new DateTime("2012-01-01T01:06Z"), Map("foo" -> "j")), + SimpleEvent(new DateTime("2012-01-01T01:07Z"), Map("foo" -> "k")), + SimpleEvent(new DateTime("2012-01-01T01:06Z"), Map("foo" -> "l")), + SimpleEvent(new DateTime("2012-01-01T01:05Z"), Map("foo" -> "m")), + SimpleEvent(new DateTime("2012-01-01T01:09Z"), Map("foo" -> "n")), + SimpleEvent(new DateTime("2012-01-01T01:10Z"), Map("foo" -> "o")) ) map { x => x.fields("foo") -> x }).toMap @@ -79,14 +87,18 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA val localZone = new DateTime().getZone def buffers = _lock.synchronized { - _buffers.values.map(x => (x.timestamp.withZone(localZone), x.partition, x.open, x.buffer.toSeq)).toSet + _buffers.values.map(x => (x.interval.start.withZone(localZone), x.partition, x.open, x.buffer.toSeq)).toSet + } + + def buffersWithInterval = _lock.synchronized { + _buffers.values.map(x => (x.interval, x.partition, x.open, x.buffer.toSeq)).toSet } def beamsList = _lock.synchronized { _beams.toList } - class EventBuffer(val timestamp: DateTime, val partition: Int) + class EventBuffer(val interval: Interval, val partition: Int) { val buffer: mutable.Buffer[SimpleEvent] = mutable.ListBuffer() @volatile var open: Boolean = true @@ -109,20 +121,24 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA def close() = { beam.close() } + + def getInterval() = None } - class TestingBeam(val timestamp: DateTime, val partition: Int, val uuid: String = UUID.randomUUID().toString) + class TestingBeam(val interval: Interval, val partition: Int, val uuid: String = UUID.randomUUID().toString) extends Beam[SimpleEvent] { _lock.synchronized { _beams += this } + def getInterval() = Some(interval) + def propagate(_events: Seq[SimpleEvent]) = _lock.synchronized { if (_events.contains(events("defunct"))) { Future.exception(new DefunctBeamException("Defunct")) } else { - val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(timestamp, partition)) + val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(interval, partition)) buffer.open = true buffer.buffer ++= _events Future.value(_events.size) @@ -131,13 +147,13 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA def close() = _lock.synchronized { _beams -= this - val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(timestamp, partition)) + val buffer = _buffers.getOrElseUpdate(uuid, new EventBuffer(interval, partition)) buffer.open = false Future.Done } def toDict = Dict( - "timestamp" -> timestamp.toString(), + "interval" -> interval.toString, "partition" -> partition, "uuid" -> uuid ) @@ -145,21 +161,21 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA class TestingBeamMaker extends BeamMaker[SimpleEvent, TestingBeam] { - def newBeam(interval: Interval, partition: Int) = new TestingBeam(interval.start, partition) + def newBeam(interval: Interval, partition: Int) = new TestingBeam(interval, partition) def toDict(beam: TestingBeam) = { Dict( - "timestamp" -> beam.timestamp.toString(), + "interval" -> beam.interval.toString, "partition" -> beam.partition, "uuid" -> beam.uuid ) } def fromDict(d: Dict) = { - val timestamp = new DateTime(d("timestamp")) + val interval= new Interval(d("interval")) val partition = int(d("partition")) val uuid = str(d("uuid")) - new TestingBeam(timestamp, partition, uuid) + new TestingBeam(interval, partition, uuid) } } @@ -353,6 +369,72 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA } } + test("IncreaseGranularity") { + withLocalCurator { + curator => + val oldTuning = defaultTuning.copy(segmentGranularity = Granularity.MINUTE, windowPeriod = 1.minute) + val newTuning = oldTuning.copy(segmentGranularity = Granularity.FIVE_MINUTE) + + val beamsA = newBeams(curator, oldTuning) + beamsA.timekeeper.now = start + beamsA.blockagate(Seq("i") map events) + beamsA.blockagate(Seq("i") map events) + beamsA.timekeeper.now = start + 1.minute + beamsA.blockagate(Seq("j") map events) + beamsA.blockagate(Seq("j") map events) + + val beamsB = newBeams(curator, newTuning) + beamsB.timekeeper.now = start + 2.minute + beamsB.blockagate(Seq("k") map events) + beamsB.blockagate(Seq("k") map events) + beamsB.blockagate(Seq("l") map events) + beamsB.blockagate(Seq("l") map events) + beamsB.blockagate(Seq("m") map events) + beamsB.blockagate(Seq("m") map events) + beamsB.blockagate(Seq("n") map events) + beamsB.blockagate(Seq("n") map events) + + Await.result(beamsA.close()) + + assert(buffersWithInterval === Set( + (new Interval("2012-01-01T01:05Z/2012-01-01T01:06Z", ISOChronology.getInstanceUTC), 0, false, Seq("i") map events), + (new Interval("2012-01-01T01:05Z/2012-01-01T01:06Z", ISOChronology.getInstanceUTC), 1, false, Seq("i") map events), + // "j" and "l" are in same partition as diff beams were used to propagate them + (new Interval("2012-01-01T01:06Z/2012-01-01T01:07Z", ISOChronology.getInstanceUTC), 0, false, Seq("j", "l") map events), + (new Interval("2012-01-01T01:06Z/2012-01-01T01:07Z", ISOChronology.getInstanceUTC), 1, false, Seq("j", "l") map events), + (new Interval("2012-01-01T01:07Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 0, true, Seq("k", "n") map events), + (new Interval("2012-01-01T01:07Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 1, true, Seq("k", "n") map events) + )) + } + } + + test("DecreaseGranularity") { + withLocalCurator { + curator => + val oldTuning = defaultTuning.copy(segmentGranularity = Granularity.FIVE_MINUTE) + val newTuning = oldTuning.copy(segmentGranularity = Granularity.MINUTE) + + val beamsA = newBeams(curator, oldTuning) + beamsA.timekeeper.now = start + beamsA.blockagate(Seq("i") map events) + + val beamsB = newBeams(curator, newTuning) + beamsB.timekeeper.now = start + 4.minute + beamsB.blockagate(Seq("j") map events) + beamsB.blockagate(Seq("n") map events) + beamsB.blockagate(Seq("o") map events) + beamsB.blockagate(Seq("o") map events) + Await.result(beamsB.close()) + + assert(buffersWithInterval === Set( + (new Interval("2012-01-01T01:05Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 0, false, Seq("i", "j") map events), + (new Interval("2012-01-01T01:05Z/2012-01-01T01:10Z", ISOChronology.getInstanceUTC), 1, false, Seq("n") map events), + (new Interval("2012-01-01T01:10Z/2012-01-01T01:11Z", ISOChronology.getInstanceUTC), 0, false, Seq("o") map events), + (new Interval("2012-01-01T01:10Z/2012-01-01T01:11Z", ISOChronology.getInstanceUTC), 1, false, Seq("o") map events) + )) + } + } + test("DefunctBeam") { withLocalCurator { curator => @@ -385,10 +467,10 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA )) val desired = List("2012-01-01T00Z", "2012-01-01T00Z", "2012-01-01T01Z", "2012-01-01T01Z").map(new DateTime(_)) val startTime = System.currentTimeMillis() - while (System.currentTimeMillis() < startTime + 2000 && beamsList.map(_.timestamp).sortBy(_.millis) != desired) { + while (System.currentTimeMillis() < startTime + 2000 && beamsList.map(_.interval.start).sortBy(_.millis) != desired) { Thread.sleep(100) } - assert(beamsList.map(_.timestamp).sortBy(_.millis) === desired) + assert(beamsList.map(_.interval.start).sortBy(_.millis) === desired) } } diff --git a/core/src/test/scala/com/metamx/tranquility/test/HashPartitionBeamTest.scala b/core/src/test/scala/com/metamx/tranquility/test/HashPartitionBeamTest.scala index b875695..1634f1e 100644 --- a/core/src/test/scala/com/metamx/tranquility/test/HashPartitionBeamTest.scala +++ b/core/src/test/scala/com/metamx/tranquility/test/HashPartitionBeamTest.scala @@ -24,6 +24,7 @@ import com.metamx.tranquility.beam.HashPartitionBeam import com.twitter.util.Await import com.twitter.util.Future import java.util.concurrent.CopyOnWriteArrayList +import org.joda.time.Interval import org.scalatest.FunSuite import org.scalatest.Matchers import scala.collection.JavaConverters._ @@ -44,6 +45,10 @@ class HashPartitionBeamTest extends FunSuite with Matchers Future(events.size) } + override def getInterval() = { + None + } + override def close() = Future.Done } diff --git a/storm/src/test/scala/com/metamx/tranquility/test/StormBoltTest.scala b/storm/src/test/scala/com/metamx/tranquility/test/StormBoltTest.scala index 5b79b66..a3e5b71 100644 --- a/storm/src/test/scala/com/metamx/tranquility/test/StormBoltTest.scala +++ b/storm/src/test/scala/com/metamx/tranquility/test/StormBoltTest.scala @@ -52,6 +52,8 @@ class SimpleBeam extends Beam[SimpleEvent] } def close() = Future.Done + + def getInterval() = None } object SimpleBeam