Skip to content

Commit

Permalink
Provide a default firehoseServicePattern. (fix #46)
Browse files Browse the repository at this point in the history
  • Loading branch information
gianm committed Nov 23, 2015
1 parent d03dfd8 commit 9b5cef2
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 39 deletions.
15 changes: 3 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ You can set up and use a Finagle Service like this:

```java
final String indexService = "overlord"; // Your overlord's service name.
final String firehosePattern = "druid:firehose:%s"; // Make up a service pattern, include %s somewhere in it.
final String discoveryPath = "/druid/discovery"; // Your overlord's druid.discovery.curator.path
final String dataSource = "foo";
final List<String> dimensions = ImmutableList.of("bar", "qux");
Expand Down Expand Up @@ -57,13 +56,7 @@ final Service<List<Map<String, Object>>, Integer> druidService = DruidBeams
.builder(timestamper)
.curator(curator)
.discoveryPath(discoveryPath)
.location(
DruidLocation.create(
indexService,
firehosePattern,
dataSource
)
)
.location(DruidLocation.create(indexService, dataSource))
.timestampSpec(timestampSpec)
.rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularity.MINUTE))
.tuning(
Expand Down Expand Up @@ -95,7 +88,6 @@ Or in Scala:

```scala
val indexService = "overlord" // Your overlord's druid.service, with slashes replaced by colons.
val firehosePattern = "druid:firehose:%s" // Make up a service pattern, include %s somewhere in it.
val discoveryPath = "/druid/discovery" // Your overlord's druid.discovery.curator.path.
val dataSource = "foo"
val dimensions = Seq("bar", "qux")
Expand All @@ -111,7 +103,7 @@ val druidService = DruidBeams
.builder(timestamper)
.curator(curator)
.discoveryPath(discoveryPath)
.location(DruidLocation(indexService, firehosePattern, dataSource))
.location(DruidLocation(indexService, dataSource))
.rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularity.MINUTE))
.tuning(
ClusteredBeamTuning(
Expand Down Expand Up @@ -248,7 +240,6 @@ class MyBeamFactory extends BeamFactory[Map[String, Any]]
curator.start()

val indexService = "overlord" // Your overlord's druid.service, with slashes replaced by colons.
val firehosePattern = "druid:firehose:%s" // Make up a service pattern, include %s somewhere in it.
val discoveryPath = "/druid/discovery" // Your overlord's druid.discovery.curator.path.
val dataSource = "foo"
val dimensions = Seq("bar")
Expand All @@ -258,7 +249,7 @@ class MyBeamFactory extends BeamFactory[Map[String, Any]]
.builder((eventMap: Map[String, Any]) => new DateTime(eventMap("timestamp")))
.curator(curator)
.discoveryPath(discoveryPath)
.location(DruidLocation(indexService, firehosePattern, dataSource))
.location(DruidLocation(indexService, dataSource))
.rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularity.MINUTE))
.tuning(
ClusteredBeamTuning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class DruidEnvironment(

override def equals(other: Any) = other match {
case that: DruidEnvironment =>
(indexService, firehoseServicePattern) == (that.indexService, that.firehoseServicePattern)
(indexService, firehoseServicePattern) ==(that.indexService, that.firehoseServicePattern)
case _ => false
}

Expand All @@ -41,21 +41,41 @@ class DruidEnvironment(

object DruidEnvironment
{
def apply(indexServiceMaybeWithSlashes: String): DruidEnvironment = {
new DruidEnvironment(indexServiceMaybeWithSlashes, defaultFirehoseServicePattern(indexServiceMaybeWithSlashes))
}

def apply(indexServiceMaybeWithSlashes: String, firehoseServicePattern: String): DruidEnvironment = {
new DruidEnvironment(indexServiceMaybeWithSlashes, firehoseServicePattern)
}

/**
* Factory method for creating DruidEnvironment objects. DruidEnvironments represent a Druid indexing service
* cluster, locatable through service discovery.
*
* @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with
* colons before searching for this in service discovery, because Druid does the
* same thing before announcing.
* @param firehoseServicePattern Make up a service pattern, include %s somewhere in it. This will be used for
* internal service-discovery purposes, to help Tranquility find Druid indexing tasks.
*/
* Factory method for creating DruidEnvironment objects. DruidEnvironments represent a Druid indexing service
* cluster, locatable through service discovery.
*
* @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with
* colons before searching for this in service discovery, because Druid does the
* same thing before announcing.
* @param firehoseServicePattern Make up a service pattern, include %s somewhere in it. This will be used for
* internal service-discovery purposes, to help Tranquility find Druid indexing tasks.
*/
def create(indexServiceMaybeWithSlashes: String, firehoseServicePattern: String): DruidEnvironment = {
new DruidEnvironment(indexServiceMaybeWithSlashes, firehoseServicePattern)
apply(indexServiceMaybeWithSlashes, firehoseServicePattern)
}

/**
* Factory method for creating DruidEnvironment objects. DruidEnvironments represent a Druid indexing service
* cluster, locatable through service discovery.
*
* @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with
* colons before searching for this in service discovery, because Druid does the
* same thing before announcing.
*/
def create(indexServiceMaybeWithSlashes: String): DruidEnvironment = {
apply(indexServiceMaybeWithSlashes)
}

private def defaultFirehoseServicePattern(indexServiceMaybeWithSlashes: String) = {
s"firehose:${indexServiceMaybeWithSlashes.replace('/', ':')}:%s"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,27 @@ object DruidLocation
}

/**
* Factory method for creating DruidLocation objects. DruidLocations represent a specific Druid dataSource in a
* specific Druid indexing service cluster.
*
* @param environment the Druid indexing service
* @param dataSource the Druid dataSource
*/
* Factory method for creating DruidLocation objects. DruidLocations represent a specific Druid dataSource in a
* specific Druid indexing service cluster.
*
* @param environment the Druid indexing service
* @param dataSource the Druid dataSource
*/
def create(environment: DruidEnvironment, dataSource: String): DruidLocation = {
DruidLocation(environment, dataSource)
}

/**
* Factory method for creating DruidLocation objects. DruidLocations represent a specific Druid dataSource in a
* specific Druid indexing service cluster.
*
* @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with
* colons before searching for this in service discovery, because Druid does the
* same thing before announcing.
* @param firehoseServicePattern Make up a service pattern, include %s somewhere in it. This will be used for
* internal service-discovery purposes, to help Tranquility find Druid indexing tasks.
* @param dataSource the Druid dataSource
*/
* Factory method for creating DruidLocation objects. DruidLocations represent a specific Druid dataSource in a
* specific Druid indexing service cluster.
*
* @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with
* colons before searching for this in service discovery, because Druid does the
* same thing before announcing.
* @param firehoseServicePattern Make up a service pattern, include %s somewhere in it. This will be used for
* internal service-discovery purposes, to help Tranquility find Druid indexing tasks.
* @param dataSource the Druid dataSource
*/
def create(
indexServiceMaybeWithSlashes: String,
firehoseServicePattern: String,
Expand All @@ -62,4 +62,21 @@ object DruidLocation
{
DruidLocation(DruidEnvironment(indexServiceMaybeWithSlashes, firehoseServicePattern), dataSource)
}

/**
* Factory method for creating DruidLocation objects. DruidLocations represent a specific Druid dataSource in a
* specific Druid indexing service cluster.
*
* @param indexServiceMaybeWithSlashes Your overlord's "druid.service" configuration. Slashes will be replaced with
* colons before searching for this in service discovery, because Druid does the
* same thing before announcing.
* @param dataSource the Druid dataSource
*/
def create(
indexServiceMaybeWithSlashes: String,
dataSource: String
): DruidLocation =
{
DruidLocation(DruidEnvironment(indexServiceMaybeWithSlashes), dataSource)
}
}

0 comments on commit 9b5cef2

Please sign in to comment.