From 90bb27d067565bb63500ca23a8f1481623b94bf8 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Thu, 19 Nov 2015 09:55:08 -0600 Subject: [PATCH] handle segment granularity changes --- .../com/metamx/tranquility/beam/Beam.scala | 11 +- .../tranquility/beam/ClusteredBeam.scala | 211 ++++++++++++------ .../tranquility/beam/HashPartitionBeam.scala | 7 + .../metamx/tranquility/beam/HttpBeam.scala | 3 + .../metamx/tranquility/beam/MemoryBeam.scala | 2 + .../metamx/tranquility/beam/NoopBeam.scala | 4 + .../tranquility/beam/RoundRobinBeam.scala | 7 + .../metamx/tranquility/druid/DruidBeam.scala | 6 +- .../tranquility/druid/DruidBeamMaker.scala | 9 +- .../metamx/tranquility/druid/DruidBeams.scala | 3 + .../tranquility/test/BeamPacketizerTest.scala | 2 + .../tranquility/test/ClusteredBeamTest.scala | 4 + .../tranquility/test/StormBoltTest.scala | 2 + 13 files changed, 195 insertions(+), 76 deletions(-) diff --git a/core/src/main/scala/com/metamx/tranquility/beam/Beam.scala b/core/src/main/scala/com/metamx/tranquility/beam/Beam.scala index 2c9b5ae..f0fec0c 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/Beam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/Beam.scala @@ -17,12 +17,13 @@ package com.metamx.tranquility.beam import com.twitter.util.Future +import org.joda.time.Interval /** * Beams can accept events and forward them along. The propagate method may throw a DefunctBeamException, which means * the beam should be discarded (after calling close()). */ -trait Beam[A] +trait Beam[A] extends DiscoverableInterval { /** * Request propagation of events. The operation may fail in various ways, which tend to be specific to @@ -40,3 +41,11 @@ class DefunctBeamException(s: String, t: Throwable) extends Exception(s, t) { def this(s: String) = this(s, null) } + +trait DiscoverableInterval +{ + /** + * Returns the interval handled by the Beam, can return None if there is no associated interval + * */ + def getInterval(): Option[Interval] +} diff --git a/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala index 192ca78..013dc18 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala @@ -18,9 +18,8 @@ package com.metamx.tranquility.beam import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.util.concurrent.ThreadFactoryBuilder -import com.metamx.common.scala.Logging +import com.metamx.common.scala.{untyped, Logging} import com.metamx.common.scala.Predef._ -import com.metamx.common.scala.collection.mutable.ConcurrentMap import com.metamx.common.scala.event._ import com.metamx.common.scala.event.emit.emitAlert import com.metamx.common.scala.option._ @@ -88,6 +87,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( alertMap: Dict ) extends Beam[EventType] with Logging { + def getInterval() = None + require(tuning.partitions > 0, "tuning.partitions > 0") require(tuning.minSegmentsPerBeam > 0, "tuning.minSegmentsPerBeam > 0") require(tuning.maxSegmentsPerBeam >= tuning.minSegmentsPerBeam, "tuning.maxSegmentsPerBeam >= tuning.minSegmentsPerBeam") @@ -131,8 +132,38 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( private[this] val rand = new Random - // Merged beams we are currently aware of, interval start millis -> merged beam. - private[this] val beams = ConcurrentMap[Long, Beam[EventType]]() + // Reverse sorted list (by interval start time) of Merged beams we are currently aware of + private[this] var beams: List[Beam[EventType]] = + { + try { + val dataPath = zpathWithDefault("data", ClusteredBeamMeta.empty.toBytes(objectMapper)) + curator.sync().forPath(dataPath) + val zkMetaData = ClusteredBeamMeta.fromBytes(objectMapper, curator.getData.forPath(dataPath)).fold( + e => { + emitAlert(e, log, emitter, WARN, "Failed to read beam data from cache: %s" format identifier, alertMap) + throw e + }, + meta => meta + ) + log.info("Synced from ZK, Found Beams - [%s]", zkMetaData) + ( + zkMetaData.beamDictss map { + beamDicts => + beamMergeFn( + beamDicts._2.zipWithIndex map { + case (beamDict, partitionNum) => + val decorate = beamDecorateFn(new Interval(beamDict.get("interval").get), partitionNum) + decorate(beamMaker.fromDict(beamDict)) + } + ) + } + ).toList sortBy (-_.getInterval().get.start.millis) + } + catch { + case e: Throwable => + throw e + } + } // Lock updates to "localLatestCloseTime" and "beams" to prevent races. private[this] val beamWriteMonitor = new AnyRef @@ -178,67 +209,78 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( t => theImplicit(t).withZone(DateTimeZone.UTC) } - private[this] def beam(timestamp: DateTime, now: DateTime): Future[Beam[EventType]] = { - val bucket = tuning.segmentBucket(timestamp) + private[this] def beam(beamPair :(Interval, Option[Beam[EventType]]), now: DateTime): Future[Beam[EventType]] = { val creationInterval = new Interval( - tuning.segmentBucket(now - tuning.windowPeriod).start.millis, - tuning.segmentBucket(Seq(now + tuning.warmingPeriod, now + tuning.windowPeriod).maxBy(_.millis)).end.millis, - ISOChronology.getInstanceUTC - ) - val windowInterval = new Interval( - tuning.segmentBucket(now - tuning.windowPeriod).start.millis, - tuning.segmentBucket(now + tuning.windowPeriod).end.millis, - ISOChronology.getInstanceUTC + tuning.segmentBucket(now - tuning.windowPeriod).start, + tuning.segmentBucket(Seq(now + tuning.warmingPeriod, now + tuning.windowPeriod).maxBy(_.millis)).end ) - val futureBeamOption = beams.get(timestamp.millis) match { + val futureBeamOption = beamPair match { case _ if !open => Future.value(None) - case Some(x) if windowInterval.overlaps(bucket) => Future.value(Some(x)) - case Some(x) => Future.value(None) - case None if timestamp <= localLatestCloseTime => Future.value(None) - case None if !creationInterval.overlaps(bucket) => Future.value(None) - case None => + case (interval, Some(foundBeam)) if foundBeam.getInterval().get.end + tuning.windowPeriod <= now => Future.value(None) + case (interval, Some(foundBeam)) => Future.value(Some(foundBeam)) + case (interval, None) if interval.start <= localLatestCloseTime => Future.value(None) + case (interval, None) if !creationInterval.overlaps(interval) => Future.value(None) + case (interval, None) => // We may want to create new merged beam(s). Acquire the zk mutex and examine the situation. // This could be more efficient, but it's happening infrequently so it's probably not a big deal. data.modify { prev => - val prevBeamDicts = prev.beamDictss.getOrElse(timestamp.millis, Nil) - if (prevBeamDicts.size >= tuning.partitions) { + log.info("Trying to create new beam with interval [%s]", interval) + // We want to create this new beam + // But first let us check in ZK if there is already any beam in ZK covering this interval + + val beamDicts: Seq[untyped.Dict] = prev.beamDictss.collectFirst[Seq[untyped.Dict]]({ + case x if new Interval(x._2.head.get("interval").get).overlaps(interval) => x._2 + }) getOrElse Nil + + if (beamDicts.size >= tuning.partitions) { log.info( - "Merged beam already created for identifier[%s] timestamp[%s], with sufficient partitions (target = %d, actual = %d)", + "Merged beam already created for identifier[%s] interval[%s], with sufficient partitions (target = %d, actual = %d)", identifier, - timestamp, + interval, tuning.partitions, - prevBeamDicts.size + beamDicts.size ) prev - } else if (timestamp <= prev.latestCloseTime) { + } else if (interval.start <= prev.latestCloseTime) { log.info( "Global latestCloseTime[%s] for identifier[%s] has moved past timestamp[%s], not creating merged beam", prev.latestCloseTime, identifier, - timestamp + interval.start ) prev + } else if (beamDicts.nonEmpty){ + throw new IllegalStateException( + "WTF?? Requested to create a beam for interval [%s] which overlaps with existing beam [%s]" format(interval, beamDicts) + ) } else { - assert(prevBeamDicts.size < tuning.partitions) - assert(timestamp > prev.latestCloseTime) + // Create the new beam + assert(beamDicts.size < tuning.partitions) + assert(interval.start > prev.latestCloseTime) - // We might want to cover multiple time segments in advance. val numSegmentsToCover = tuning.minSegmentsPerBeam + rand.nextInt(tuning.maxSegmentsPerBeam - tuning.minSegmentsPerBeam + 1) - val intervalToCover = new Interval( - timestamp.millis, - tuning.segmentGranularity.increment(timestamp, numSegmentsToCover).millis, + var intervalToCover = new Interval( + interval.start.millis, + tuning.segmentGranularity.increment(interval.start, numSegmentsToCover).millis, ISOChronology.getInstanceUTC ) - val timestampsToCover = tuning.segmentGranularity.getIterable(intervalToCover).asScala.map(_.start) + var timestampsToCover = tuning.segmentGranularity.getIterable(intervalToCover).asScala.map(_.start) + + // Check if we are trying to create a beam not covering an entire segment bucket + if (!tuning.segmentGranularity.widen(interval).equals(interval)) { + log.warn("Creating partial beam with interval [%s] as beam [%s] already covers some segment portion", beamDicts, interval) + intervalToCover = interval + timestampsToCover = Iterable(intervalToCover.start) + } // OK, create them where needed. val newInnerBeamDictsByPartition = new mutable.HashMap[Int, Dict] val newBeamDictss: Map[Long, Seq[Dict]] = (prev.beamDictss filterNot { case (millis, beam) => // Expire old beamDicts - tuning.segmentGranularity.increment(new DateTime(millis)) + tuning.windowPeriod < now + new Interval(beam.head.get("interval").get).end + tuning.windowPeriod < now }) ++ (for (ts <- timestampsToCover) yield { val tsPrevDicts = prev.beamDictss.getOrElse(ts.millis, Nil) log.info( @@ -250,7 +292,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( ) val tsNewDicts = tsPrevDicts ++ ((tsPrevDicts.size until tuning.partitions) map { partition => - newInnerBeamDictsByPartition.getOrElseUpdate(partition, { + newInnerBeamDictsByPartition.getOrElseUpdate( + partition, { // Create sub-beams and then immediately close them, just so we can get the dict representations. // Close asynchronously, ignore return value. beamMaker.newBeam(intervalToCover, partition).withFinally(_.close()) { @@ -259,7 +302,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( log.info("Created beam: %s", objectMapper.writeValueAsString(beamDict)) beamDict } - }) + } + ) }) (ts.millis, tsNewDicts) }) @@ -267,16 +311,13 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( (Seq(prev.latestCloseTime.millis) ++ (prev.beamDictss.keySet -- newBeamDictss.keySet)).max, ISOChronology.getInstanceUTC ) - ClusteredBeamMeta( - newLatestCloseTime, - newBeamDictss - ) + ClusteredBeamMeta(newLatestCloseTime, newBeamDictss) } } rescue { case e: Throwable => Future.exception( new IllegalStateException( - "Failed to save new beam for identifier[%s] timestamp[%s]" format(identifier, timestamp), e + "Failed to save new beam for identifier[%s] timestamp[%s]" format(identifier, interval.start), e ) ) } map { @@ -284,31 +325,40 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( // Update local stuff with our goodies from zk. beamWriteMonitor.synchronized { localLatestCloseTime = meta.latestCloseTime + val timestamp = interval.start // Only add the beams we actually wanted at this time. This is because there might be other beams in ZK // that we don't want to add just yet, on account of maybe they need their partitions expanded (this only // happens when they are the necessary ones). - if (!beams.contains(timestamp.millis) && meta.beamDictss.contains(timestamp.millis)) { + if (!beams.exists(beam => beam.getInterval().get.start.eq(timestamp)) && + meta.beamDictss.contains(timestamp.millis)) { + val beamDicts = meta.beamDictss(timestamp.millis) log.info("Adding beams for identifier[%s] timestamp[%s]: %s", identifier, timestamp, beamDicts) // Should have better handling of unparseable zk data. Changing BeamMaker implementations currently // just causes exceptions until the old dicts are cleared out. - beams(timestamp.millis) = beamMergeFn( + beams = beamMergeFn( beamDicts.zipWithIndex map { case (beamDict, partitionNum) => val decorate = beamDecorateFn(tuning.segmentBucket(timestamp), partitionNum) decorate(beamMaker.fromDict(beamDict)) } - ) + ) +: beams } // Remove beams that are gone from ZK metadata. They have expired. - for ((timestamp, beam) <- beams -- meta.beamDictss.keys) { - log.info("Removing beams for identifier[%s] timestamp[%s]", identifier, timestamp) + val expiredBeams = beams.filterNot(beam => meta.beamDictss.contains(beam.getInterval().get.start.millis)) + + for (beam <- expiredBeams) { + log.info("Removing beams for identifier[%s] timestamp[%s]", identifier, beam.getInterval().get.start) // Close asynchronously, ignore return value. - beams(timestamp).close() - beams.remove(timestamp) + beam.close() } + beams = beams.diff(expiredBeams) + // This may not be required as we always create and add beams in chronological order + // so in effect the list should always be reverse sorted + beams = beams.sortBy(-_.getInterval().get.start.millis) + // Return requested beam. It may not have actually been created, so it's an Option. - beams.get(timestamp.millis) ifEmpty { + beams.find(beam => beam.getInterval().get.contains(timestamp)) ifEmpty { log.info( "Turns out we decided not to actually make beams for identifier[%s] timestamp[%s]. Returning None.", identifier, @@ -327,16 +377,45 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( beamMergeFn( (0 until tuning.partitions) map { partition => - beamDecorateFn(bucket, partition)(new NoopBeam[EventType]) + beamDecorateFn(beamPair._1, partition)(new NoopBeam[EventType]) } ) ) } } + def groupEvents(event: EventType, intervalBeamPair: mutable.Map[Interval, Option[Beam[EventType]]]): (Interval, Option[Beam[EventType]]) = { + val eventTimestamp = timestamper(event) + // looks in sequential order for the predicate to be true + // Most of the times head beam can handle the event as the beams is reverse sorted list by start time of interval + + intervalBeamPair.find(_._1.contains(eventTimestamp)) match { + case Some(mapEntry) => (mapEntry._1, mapEntry._2) + case None => + val requiredInterval = tuning.segmentBucket(eventTimestamp) + // Check to see if the interval for which we want to create a beam overlaps with interval of any existing beam + // this may happen if segment granularity is changed + val mayBeOverlappingInterval = beams.collectFirst { + case x if x.getInterval().get.overlaps(requiredInterval) => x.getInterval().get + } getOrElse new Interval(0,0) + + ( + new Interval( + Math.max(mayBeOverlappingInterval.end.millis, requiredInterval.start.millis), + Math.max(mayBeOverlappingInterval.end.millis, requiredInterval.end.millis) + ), + None + ) + } + } + def propagate(events: Seq[EventType]) = { val now = timekeeper.now.withZone(DateTimeZone.UTC) - val grouped = events.groupBy(x => tuning.segmentBucket(timestamper(x)).start).toSeq.sortBy(_._1.millis) + val intervalBeamPair = new mutable.HashMap[Interval, Option[Beam[EventType]]]() + beams map { + beam => intervalBeamPair.+=((beam.getInterval().get, Some(beam))) + } + val grouped = events.groupBy(groupEvents(_, intervalBeamPair)).toSeq.sortBy(_._1._1.start.millis) // Possibly warm up future beams def toBeWarmed(dt: DateTime, end: DateTime): List[DateTime] = { if (dt <= end) { @@ -347,14 +426,15 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( } val warmingBeams = Future.collect(for ( latestEvent <- grouped.lastOption.map(_._2.maxBy(timestamper(_).millis)).map(timestamper).toList; - tbwTimestamp <- toBeWarmed(latestEvent, latestEvent + tuning.warmingPeriod) if tbwTimestamp > latestEvent + tbwTimestamp <- toBeWarmed(latestEvent, latestEvent + tuning.warmingPeriod) if tbwTimestamp > latestEvent && + !beams.exists(_.getInterval().get.contains(tbwTimestamp)) ) yield { // Create beam asynchronously - beam(tbwTimestamp, now) + beam((tuning.segmentBucket(tbwTimestamp), None), now) }) // Propagate data - val countFutures = for ((timestamp, eventGroup) <- grouped) yield { - beam(timestamp, now) onFailure { + val countFutures = for ((beamIntervalPair, eventGroup) <- grouped) yield { + beam(beamIntervalPair, now) onFailure { e => emitAlert(e, log, emitter, WARN, "Failed to create merged beam: %s" format identifier, alertMap) } flatMap { @@ -362,6 +442,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( // We expect beams to handle retries, so if we get an exception here let's drop the batch beam.propagate(eventGroup) rescue { case e: DefunctBeamException => + val timestamp = beam.getInterval().get.start // Just drop data until the next segment starts. At some point we should look at doing something // more intelligent. emitAlert( @@ -369,7 +450,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( alertMap ++ Dict( "eventCount" -> eventGroup.size, - "timestamp" -> timestamp.toString(), + "timestamp" -> timestamp.toString, "beam" -> beam.toString ) ) @@ -382,7 +463,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( } onSuccess { meta => beamWriteMonitor.synchronized { - beams.remove(timestamp.millis) + beams = beams diff List(beam) } } map (_ => 0) @@ -392,7 +473,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( alertMap ++ Dict( "eventCount" -> eventGroup.size, - "timestamp" -> timestamp.toString(), + "timestamp" -> beam.getInterval().get.start.toString, "beams" -> beam.toString ) ) @@ -407,8 +488,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]]( def close() = { beamWriteMonitor.synchronized { open = false - val closeFuture = Future.collect(beams.values.toList map (_.close())) map (_ => ()) - beams.clear() + val closeFuture = Future.collect(beams map (_.close())) map (_ => ()) + beams = beams diff beams closeFuture } } @@ -428,11 +509,11 @@ case class ClusteredBeamMeta(latestCloseTime: DateTime, beamDictss: Map[Long, Se Dict( // latestTime is only being written for backwards compatibility "latestTime" -> new DateTime( - (Seq(latestCloseTime.millis) ++ beamDictss.map(_._1)).max, + (Seq(latestCloseTime.millis) ++ beamDictss.keys).max, ISOChronology.getInstanceUTC - ).toString(), - "latestCloseTime" -> latestCloseTime.toString(), - "beams" -> beamDictss.map(kv => (new DateTime(kv._1, ISOChronology.getInstanceUTC).toString(), kv._2)) + ).toString, + "latestCloseTime" -> latestCloseTime.toString, + "beams" -> beamDictss.map(kv => (new DateTime(kv._1, ISOChronology.getInstanceUTC).toString, kv._2)) ) ) } diff --git a/core/src/main/scala/com/metamx/tranquility/beam/HashPartitionBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/HashPartitionBeam.scala index 042064a..06b0c07 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/HashPartitionBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/HashPartitionBeam.scala @@ -41,4 +41,11 @@ class HashPartitionBeam[A]( } override def toString = "HashPartitionBeam(%s)" format delegates.mkString(", ") + + def getInterval() = { + delegates.headOption match { + case Some(x) => x.getInterval() + case None => None + } + } } diff --git a/core/src/main/scala/com/metamx/tranquility/beam/HttpBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/HttpBeam.scala index 358b871..558bf4b 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/HttpBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/HttpBeam.scala @@ -58,6 +58,9 @@ class HttpBeam[A: Timestamper]( emitter: ServiceEmitter ) extends Beam[A] with Logging { + + def getInterval() = None + private[this] implicit val timer: Timer = DefaultTimer.twitter private[this] val port = if (uri.port > 0) { diff --git a/core/src/main/scala/com/metamx/tranquility/beam/MemoryBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/MemoryBeam.scala index cd713b7..6058458 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/MemoryBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/MemoryBeam.scala @@ -28,6 +28,8 @@ class MemoryBeam[A]( jsonWriter: JsonWriter[A] ) extends Beam[A] { + def getInterval() = None + def propagate(events: Seq[A]) = { events.map(event => Jackson.parse[Dict](jsonWriter.asBytes(event))) foreach { d => diff --git a/core/src/main/scala/com/metamx/tranquility/beam/NoopBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/NoopBeam.scala index 52acdbf..b5047c3 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/NoopBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/NoopBeam.scala @@ -17,6 +17,8 @@ package com.metamx.tranquility.beam import com.twitter.util.Future +import org.joda.time.Interval +import org.joda.time.chrono.ISOChronology class NoopBeam[A] extends Beam[A] { @@ -25,4 +27,6 @@ class NoopBeam[A] extends Beam[A] def close() = Future.Done override def toString = "NoopBeam()" + + def getInterval() = Some(new Interval(0, 0, ISOChronology.getInstanceUTC)) } diff --git a/core/src/main/scala/com/metamx/tranquility/beam/RoundRobinBeam.scala b/core/src/main/scala/com/metamx/tranquility/beam/RoundRobinBeam.scala index 3be692c..7c26ffc 100644 --- a/core/src/main/scala/com/metamx/tranquility/beam/RoundRobinBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/beam/RoundRobinBeam.scala @@ -38,4 +38,11 @@ class RoundRobinBeam[A]( } override def toString = "RoundRobinBeam(%s)" format beams.mkString(", ") + + def getInterval() = { + beams.headOption match { + case Some(x) => x.getInterval() + case None => None + } + } } diff --git a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeam.scala b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeam.scala index 04f703e..6eaa394 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeam.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeam.scala @@ -19,8 +19,7 @@ package com.metamx.tranquility.druid import com.metamx.common.scala.Logging import com.metamx.common.scala.Predef._ import com.metamx.emitter.service.ServiceEmitter -import com.metamx.tranquility.beam.Beam -import com.metamx.tranquility.beam.DefunctBeamException +import com.metamx.tranquility.beam.{Beam, DefunctBeamException} import com.metamx.tranquility.finagle._ import com.metamx.tranquility.typeclass.ObjectWriter import com.twitter.util.Closable @@ -44,6 +43,9 @@ class DruidBeam[A]( objectWriter: ObjectWriter[A] ) extends Beam[A] with Logging with Closable { + + def getInterval() = Some(interval) + private[this] val clients = Map( tasks map { task => diff --git a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala index 3bf84ea..e40da15 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeamMaker.scala @@ -138,10 +138,6 @@ class DruidBeamMaker[A: Timestamper]( } override def newBeam(interval: Interval, partition: Int) = { - require( - beamTuning.segmentGranularity.widen(interval) == interval, - "Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval) - ) val availabilityGroup = DruidBeamMaker.generateBaseFirehoseId( location.dataSource, beamTuning.segmentGranularity, @@ -190,10 +186,7 @@ class DruidBeamMaker[A: Timestamper]( // Backwards compatibility (see toDict). beamTuning.segmentBucket(new DateTime(d("timestamp"), ISOChronology.getInstanceUTC)) } - require( - beamTuning.segmentGranularity.widen(interval) == interval, - "Interval does not match segmentGranularity[%s]: %s" format(beamTuning.segmentGranularity, interval) - ) + val partition = int(d("partition")) val tasks = if (d contains "tasks") { list(d("tasks")).map(dict(_)).map(d => TaskPointer(str(d("id")), str(d("firehoseId")))) diff --git a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala index d58e935..9c8185d 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala @@ -194,6 +194,9 @@ object DruidBeams def close() = clusteredBeam.close() map (_ => lifecycle.stop()) override def toString = clusteredBeam.toString + + def getInterval() = clusteredBeam.getInterval() + } } diff --git a/core/src/test/scala/com/metamx/tranquility/test/BeamPacketizerTest.scala b/core/src/test/scala/com/metamx/tranquility/test/BeamPacketizerTest.scala index f9da7e9..6379b1a 100644 --- a/core/src/test/scala/com/metamx/tranquility/test/BeamPacketizerTest.scala +++ b/core/src/test/scala/com/metamx/tranquility/test/BeamPacketizerTest.scala @@ -56,6 +56,8 @@ class BeamPacketizerTest extends FunSuite with Logging } override def close() = memoryBeam.close() + + def getInterval() = None } val acked = new AtomicLong() diff --git a/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala b/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala index 5cf51be..23380d1 100644 --- a/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala +++ b/core/src/test/scala/com/metamx/tranquility/test/ClusteredBeamTest.scala @@ -109,6 +109,8 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA def close() = { beam.close() } + + def getInterval() = None } class TestingBeam(val timestamp: DateTime, val partition: Int, val uuid: String = UUID.randomUUID().toString) @@ -118,6 +120,8 @@ class ClusteredBeamTest extends FunSuite with CuratorRequiringSuite with BeforeA _beams += this } + def getInterval() = None + def propagate(_events: Seq[SimpleEvent]) = _lock.synchronized { if (_events.contains(events("defunct"))) { Future.exception(new DefunctBeamException("Defunct")) diff --git a/storm/src/test/scala/com/metamx/tranquility/test/StormBoltTest.scala b/storm/src/test/scala/com/metamx/tranquility/test/StormBoltTest.scala index 5b79b66..a3e5b71 100644 --- a/storm/src/test/scala/com/metamx/tranquility/test/StormBoltTest.scala +++ b/storm/src/test/scala/com/metamx/tranquility/test/StormBoltTest.scala @@ -52,6 +52,8 @@ class SimpleBeam extends Beam[SimpleEvent] } def close() = Future.Done + + def getInterval() = None } object SimpleBeam