From 9b5cef2fa054b8bc3429d6332ce02fa119f1d893 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 23 Nov 2015 14:22:13 -0800 Subject: [PATCH] Provide a default firehoseServicePattern. (fix #46) --- README.md | 15 ++---- .../tranquility/druid/DruidEnvironment.scala | 42 +++++++++++----- .../tranquility/druid/DruidLocation.scala | 49 +++++++++++++------ 3 files changed, 67 insertions(+), 39 deletions(-) diff --git a/README.md b/README.md index 7fb8720..a87b62a 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,6 @@ You can set up and use a Finagle Service like this: ```java final String indexService = "overlord"; // Your overlord's service name. -final String firehosePattern = "druid:firehose:%s"; // Make up a service pattern, include %s somewhere in it. final String discoveryPath = "/druid/discovery"; // Your overlord's druid.discovery.curator.path final String dataSource = "foo"; final List dimensions = ImmutableList.of("bar", "qux"); @@ -57,13 +56,7 @@ final Service>, Integer> druidService = DruidBeams .builder(timestamper) .curator(curator) .discoveryPath(discoveryPath) - .location( - DruidLocation.create( - indexService, - firehosePattern, - dataSource - ) - ) + .location(DruidLocation.create(indexService, dataSource)) .timestampSpec(timestampSpec) .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularity.MINUTE)) .tuning( @@ -95,7 +88,6 @@ Or in Scala: ```scala val indexService = "overlord" // Your overlord's druid.service, with slashes replaced by colons. -val firehosePattern = "druid:firehose:%s" // Make up a service pattern, include %s somewhere in it. val discoveryPath = "/druid/discovery" // Your overlord's druid.discovery.curator.path. val dataSource = "foo" val dimensions = Seq("bar", "qux") @@ -111,7 +103,7 @@ val druidService = DruidBeams .builder(timestamper) .curator(curator) .discoveryPath(discoveryPath) - .location(DruidLocation(indexService, firehosePattern, dataSource)) + .location(DruidLocation(indexService, dataSource)) .rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularity.MINUTE)) .tuning( ClusteredBeamTuning( @@ -248,7 +240,6 @@ class MyBeamFactory extends BeamFactory[Map[String, Any]] curator.start() val indexService = "overlord" // Your overlord's druid.service, with slashes replaced by colons. - val firehosePattern = "druid:firehose:%s" // Make up a service pattern, include %s somewhere in it. val discoveryPath = "/druid/discovery" // Your overlord's druid.discovery.curator.path. val dataSource = "foo" val dimensions = Seq("bar") @@ -258,7 +249,7 @@ class MyBeamFactory extends BeamFactory[Map[String, Any]] .builder((eventMap: Map[String, Any]) => new DateTime(eventMap("timestamp"))) .curator(curator) .discoveryPath(discoveryPath) - .location(DruidLocation(indexService, firehosePattern, dataSource)) + .location(DruidLocation(indexService, dataSource)) .rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularity.MINUTE)) .tuning( ClusteredBeamTuning( diff --git a/core/src/main/scala/com/metamx/tranquility/druid/DruidEnvironment.scala b/core/src/main/scala/com/metamx/tranquility/druid/DruidEnvironment.scala index eecf122..0071afb 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/DruidEnvironment.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/DruidEnvironment.scala @@ -32,7 +32,7 @@ class DruidEnvironment( override def equals(other: Any) = other match { case that: DruidEnvironment => - (indexService, firehoseServicePattern) == (that.indexService, that.firehoseServicePattern) + (indexService, firehoseServicePattern) ==(that.indexService, that.firehoseServicePattern) case _ => false } @@ -41,21 +41,41 @@ class DruidEnvironment( object DruidEnvironment { + def apply(indexServiceMaybeWithSlashes: String): DruidEnvironment = { + new DruidEnvironment(indexServiceMaybeWithSlashes, defaultFirehoseServicePattern(indexServiceMaybeWithSlashes)) + } + def apply(indexServiceMaybeWithSlashes: String, firehoseServicePattern: String): DruidEnvironment = { new DruidEnvironment(indexServiceMaybeWithSlashes, firehoseServicePattern) } /** - * Factory method for creating DruidEnvironment objects. DruidEnvironments represent a Druid indexing service - * cluster, locatable through service discovery. - * - * @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with - * colons before searching for this in service discovery, because Druid does the - * same thing before announcing. - * @param firehoseServicePattern Make up a service pattern, include %s somewhere in it. This will be used for - * internal service-discovery purposes, to help Tranquility find Druid indexing tasks. - */ + * Factory method for creating DruidEnvironment objects. DruidEnvironments represent a Druid indexing service + * cluster, locatable through service discovery. + * + * @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with + * colons before searching for this in service discovery, because Druid does the + * same thing before announcing. + * @param firehoseServicePattern Make up a service pattern, include %s somewhere in it. This will be used for + * internal service-discovery purposes, to help Tranquility find Druid indexing tasks. + */ def create(indexServiceMaybeWithSlashes: String, firehoseServicePattern: String): DruidEnvironment = { - new DruidEnvironment(indexServiceMaybeWithSlashes, firehoseServicePattern) + apply(indexServiceMaybeWithSlashes, firehoseServicePattern) + } + + /** + * Factory method for creating DruidEnvironment objects. DruidEnvironments represent a Druid indexing service + * cluster, locatable through service discovery. + * + * @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with + * colons before searching for this in service discovery, because Druid does the + * same thing before announcing. + */ + def create(indexServiceMaybeWithSlashes: String): DruidEnvironment = { + apply(indexServiceMaybeWithSlashes) + } + + private def defaultFirehoseServicePattern(indexServiceMaybeWithSlashes: String) = { + s"firehose:${indexServiceMaybeWithSlashes.replace('/', ':')}:%s" } } diff --git a/core/src/main/scala/com/metamx/tranquility/druid/DruidLocation.scala b/core/src/main/scala/com/metamx/tranquility/druid/DruidLocation.scala index 8628816..a7fe44e 100644 --- a/core/src/main/scala/com/metamx/tranquility/druid/DruidLocation.scala +++ b/core/src/main/scala/com/metamx/tranquility/druid/DruidLocation.scala @@ -33,27 +33,27 @@ object DruidLocation } /** - * Factory method for creating DruidLocation objects. DruidLocations represent a specific Druid dataSource in a - * specific Druid indexing service cluster. - * - * @param environment the Druid indexing service - * @param dataSource the Druid dataSource - */ + * Factory method for creating DruidLocation objects. DruidLocations represent a specific Druid dataSource in a + * specific Druid indexing service cluster. + * + * @param environment the Druid indexing service + * @param dataSource the Druid dataSource + */ def create(environment: DruidEnvironment, dataSource: String): DruidLocation = { DruidLocation(environment, dataSource) } /** - * Factory method for creating DruidLocation objects. DruidLocations represent a specific Druid dataSource in a - * specific Druid indexing service cluster. - * - * @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with - * colons before searching for this in service discovery, because Druid does the - * same thing before announcing. - * @param firehoseServicePattern Make up a service pattern, include %s somewhere in it. This will be used for - * internal service-discovery purposes, to help Tranquility find Druid indexing tasks. - * @param dataSource the Druid dataSource - */ + * Factory method for creating DruidLocation objects. DruidLocations represent a specific Druid dataSource in a + * specific Druid indexing service cluster. + * + * @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with + * colons before searching for this in service discovery, because Druid does the + * same thing before announcing. + * @param firehoseServicePattern Make up a service pattern, include %s somewhere in it. This will be used for + * internal service-discovery purposes, to help Tranquility find Druid indexing tasks. + * @param dataSource the Druid dataSource + */ def create( indexServiceMaybeWithSlashes: String, firehoseServicePattern: String, @@ -62,4 +62,21 @@ object DruidLocation { DruidLocation(DruidEnvironment(indexServiceMaybeWithSlashes, firehoseServicePattern), dataSource) } + + /** + * Factory method for creating DruidLocation objects. DruidLocations represent a specific Druid dataSource in a + * specific Druid indexing service cluster. + * + * @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with + * colons before searching for this in service discovery, because Druid does the + * same thing before announcing. + * @param dataSource the Druid dataSource + */ + def create( + indexServiceMaybeWithSlashes: String, + dataSource: String + ): DruidLocation = + { + DruidLocation(DruidEnvironment(indexServiceMaybeWithSlashes), dataSource) + } }