Skip to content

Commit

Permalink
Adding ElasticSearch helpers to Klutter
Browse files Browse the repository at this point in the history
  • Loading branch information
apatrida committed Sep 14, 2015
1 parent 1218641 commit 2eadfa7
Show file tree
Hide file tree
Showing 15 changed files with 552 additions and 2 deletions.
1 change: 1 addition & 0 deletions all-jdk7/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ dependencies {
runtime relativeProject(":klutter-core-jodatime-jdk6")
runtime relativeProject(":klutter-json-jackson-jdk6")
runtime relativeProject(":klutter-netflix-graph-jdk6")
runtime relativeProject(":klutter-elasticsearch-jdk7")
}
1 change: 1 addition & 0 deletions all-jdk8/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ dependencies {
runtime relativeProject(":klutter-json-jackson-jdk8")
runtime relativeProject(":klutter-netflix-graph-jdk6")
runtime relativeProject(":klutter-vertx3-jdk8")
runtime relativeProject(":klutter-elasticsearch-jdk7")
}
3 changes: 3 additions & 0 deletions elasticsearch-jdk7/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## klutter/elasticsearch-jdk7

See module documentation in [klutter/elasticsearch](../elasticsearch)
7 changes: 7 additions & 0 deletions elasticsearch-jdk7/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
dependencies {
compile relativeProject(":klutter-json-jackson-jdk6")
compile relativeProject(":klutter-core-jdk7")

compile "org.elasticsearch:elasticsearch:$version_elasticsearch"
compile "nl.komponents.kovenant:kovenant:$version_kovenant"
}
153 changes: 153 additions & 0 deletions elasticsearch-jdk7/src/main/kotlin/uy/klutter/elasticsearch/Client.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package uy.klutter.elasticsearch

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.all
import nl.komponents.kovenant.deferred
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.ActionRequest
import org.elasticsearch.action.ActionRequestBuilder
import org.elasticsearch.action.ActionResponse
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus
import org.elasticsearch.client.Client
import org.elasticsearch.client.ElasticsearchClient
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.ImmutableSettings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.common.transport.TransportAddress
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.node.NodeBuilder
import uy.klutter.core.common.with
import java.nio.file.Path

public object EsConfig {
public volatile var adminActionTimeoutInSeconds: Long = 30
public volatile var indexReplicaCount: Int = 1
public volatile var indexShardCount: Int = 4

public volatile var objectMapper: ObjectMapper = jacksonObjectMapper()
}


public fun esNodeClient(clusterName: String, settings: Map<String, String>): Client {
return esNodeClient(clusterName) {
settings.entrySet().forEach {
put(it.getKey(), it.getValue())
}
}
}

public fun esNodeClient(clusterName: String, init: ImmutableSettings.Builder.()->Unit): Client {
val settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", clusterName)
.put("client.transport.sniff", false)
.put("node.name", "nodeClient-" + System.currentTimeMillis())
.put("http.enabled", false)
.put("node.data", false)
.put("node.master", false)
.with { init() }
.build()
return NodeBuilder.nodeBuilder().settings(settings).node().client()
}

public fun esTransportClient(clusterName: String, nodes: List<TransportAddress>, settings: Map<String, String>): Client {
return esTransportClient(clusterName, nodes) {
settings.entrySet().forEach {
put(it.getKey(), it.getValue())
}
}
}

public fun esTransportClient(clusterName: String, nodes: List<TransportAddress>, init: ImmutableSettings.Builder.()->Unit): Client {
val settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", clusterName)
.put("client.transport.sniff", false)
.with { init() }
.build()
val client = TransportClient(settings)
nodes.forEach {
client.addTransportAddress(it)
}
return NodeBuilder.nodeBuilder().settings(settings).node().client()
}


public fun esEmbeddedClient(clusterName: String, baseDir: Path, settings: Map<String, String>): Promise<Client, Exception> {
return esEmbeddedClient(clusterName, baseDir) {
settings.entrySet().forEach {
put(it.getKey(), it.getValue())
}
}
}

public fun esEmbeddedClient(clusterName: String, baseDir: Path, init: ImmutableSettings.Builder.()->Unit): Promise<Client, Exception> {
val deferred = deferred<Client, Exception>()
try {
val esRoot = baseDir.toAbsolutePath()
val settings = ImmutableSettings.settingsBuilder()
.put("path.data", "$esRoot/data")
.put("path.work", "$esRoot/work")
.put("path.logs", "$esRoot/logs")
.put("http.enabled", false)
.put("index.number_of_shards", "2")
.put("index.number_of_replicas", "0")
.put("cluster.routing.schedule", "50ms")
.put("cluster.name", clusterName)
.put("client.transport.sniff", false)
.put("node.name", "nodeEmbedded-" + System.currentTimeMillis())
.put("node.data", true)
.put("cluster.routing.allocation.disk.threshold_enabled", true)
.put("cluster.routing.allocation.disk.watermark.low", "10gb")
.with { init() }
.build()
val tempNode = NodeBuilder.nodeBuilder().local(true).data(true).settings(settings).node()
val tempClient = tempNode.client()
return tempClient.waitForYellowCluster().bind { deferred.promise }
}
catch (ex: Throwable) {
deferred.reject(wrapThrowable(ex))
}
return deferred.promise
}

public fun Client.waitForGreenCluster(): Promise<ClusterHealthStatus, Exception> {
return admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout(TimeValue.timeValueSeconds(EsConfig.adminActionTimeoutInSeconds)).promise { it.getStatus() }
}

public fun Client.waitForYellowCluster(): Promise<ClusterHealthStatus, Exception> {
return admin().cluster().prepareHealth().setWaitForYellowStatus().setTimeout(TimeValue.timeValueSeconds(EsConfig.adminActionTimeoutInSeconds)).promise { it.getStatus() }
}

public fun Client.waitForGreenIndex(vararg indices: String): Promise<ClusterHealthStatus, Exception> {
return admin().cluster().prepareHealth(*indices).setWaitForGreenStatus().setTimeout(TimeValue.timeValueSeconds(EsConfig.adminActionTimeoutInSeconds)).promise { it.getStatus() }
}

public fun Client.waitForYellowIndex(vararg indices: String): Promise<ClusterHealthStatus, Exception> {
return admin().cluster().prepareHealth(*indices).setWaitForYellowStatus().setTimeout(TimeValue.timeValueSeconds(EsConfig.adminActionTimeoutInSeconds)).promise { it.getStatus() }
}

public fun Client.indexExists(vararg indices: String): Promise<Boolean, Exception> {
return admin().indices().prepareExists(*indices).promise { it.isExists() }
}

public fun Client.createIndex(index: String, mappings: List<IndexTypeMapping>, shardCount: Int = EsConfig.indexShardCount, replicaCount: Int = EsConfig.indexReplicaCount, settingsInit: ImmutableSettings.Builder.()->Unit = {}): Promise<Unit, Exception> {
val indexSettings = ImmutableSettings.settingsBuilder()
.put("number_of_shards", shardCount)
.put("number_of_replicas", replicaCount)
.with { settingsInit() }
.build()
return admin().indices().prepareCreate(index).setSettings(indexSettings).with { mappings.forEach { addMapping(it.type, it.json) } }.promiseNothing()
}

public fun Client.updateIndexMappings(index: String, mappings: List<IndexTypeMapping>): Promise<List<Boolean>, Exception> {
val actions = linkedListOf<Promise<Boolean, Exception>>()
mappings.forEach {
actions.add(admin().indices().preparePutMapping(index).setType(it.type).setSource(it.json).promise { it.isAcknowledged() })
}
return all<Boolean>(actions)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package uy.klutter.elasticsearch

import nl.komponents.kovenant.Deferred

public class WrappedThrowableException(cause: Throwable): Exception(cause.getMessage(), cause)

public fun wrapThrowable(rawEx: Throwable): Exception = if (rawEx is Exception) rawEx else WrappedThrowableException(rawEx)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package uy.klutter.elasticsearch

import org.elasticsearch.action.index.IndexRequestBuilder


public fun IndexRequestBuilder.setSourceFromObject(pojo: Any) {
setSource(EsConfig.objectMapper.writeValueAsString(pojo))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package uy.klutter.elasticsearch

import nl.komponents.kovenant.Deferred
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.ActionRequest
import org.elasticsearch.action.ActionRequestBuilder
import org.elasticsearch.action.ActionResponse
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse
import org.elasticsearch.client.ElasticsearchClient

public fun <T: Any> promiseResult(deferred: Deferred<T, Exception>): ActionListener<T> {
return object: ActionListener<T> {
override fun onResponse(response: T) {
deferred.resolve(response)
}

override fun onFailure(e: Throwable) {
deferred.reject(wrapThrowable(e))
}
}
}

public fun <T: Any, O: Any> promiseResult(deferred: Deferred<T, Exception>, map: (O)->T): ActionListener<O> {
return object: ActionListener<O> {
override fun onResponse(response: O) {
deferred.resolve(map(response))
}

override fun onFailure(e: Throwable) {
deferred.reject(wrapThrowable(e))
}
}
}

public fun <Request: ActionRequest<*>, Response: ActionResponse, RequestBuilder: ActionRequestBuilder<*, *, *, *>, Client: ElasticsearchClient<*>>
ActionRequestBuilder<Request, Response, RequestBuilder, Client>.promise(deferred: Deferred<Response, Exception>): Promise<Response, Exception> {
this.execute(promiseResult(deferred))
return deferred.promise
}


public fun <Request: ActionRequest<*>, Response: ActionResponse, RequestBuilder: ActionRequestBuilder<*, *, *, *>, Client: ElasticsearchClient<*>>
ActionRequestBuilder<Request, Response, RequestBuilder, Client>.promise(): Promise<Response, Exception> {
val deferred = deferred<Response, Exception>()
this.execute(promiseResult(deferred))
return deferred.promise
}

public fun <Request: ActionRequest<*>, Response: ActionResponse, RequestBuilder: ActionRequestBuilder<*, *, *, *>, Client: ElasticsearchClient<*>>
ActionRequestBuilder<Request, Response, RequestBuilder, Client>.promiseNothing(): Promise<Unit, Exception> {
val deferred = deferred<Unit, Exception>()
this.execute(promiseResult(deferred, {}))
return deferred.promise
}


public fun <Request: ActionRequest<*>, Response: ActionResponse, RequestBuilder: ActionRequestBuilder<*, *, *, *>, Client: ElasticsearchClient<*>, O: Any>
ActionRequestBuilder<Request, Response, RequestBuilder, Client>.promise(map: (Response)->O): Promise<O, Exception> {
val deferred = deferred<O, Exception>()
this.execute(promiseResult(deferred, map))
return deferred.promise
}

public fun <Request: ActionRequest<*>, Response: ActionResponse, RequestBuilder: ActionRequestBuilder<*, *, *, *>, Client: ElasticsearchClient<*>, O: Any>
ActionRequestBuilder<Request, Response, RequestBuilder, Client>.promise(deferred: Deferred<O, Exception>, map: (Response)->O): Promise<O, Exception> {
this.execute(promiseResult(deferred, map))
return deferred.promise
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package uy.klutter.elasticsearch

import org.elasticsearch.common.xcontent.XContentBuilder


public data class IndexTypeMapping(val type: String, val json: XContentBuilder)

public enum class EsSystemFields {
_uid, _id, _type, _source, _all, _analyzer, _boost, _parent, _field_names, _routing, _index, _size, _timestamp, _ttl
}

public enum class EsStoredField {
STORED, NOT_STORED
}

public enum class EsIndexedField {
NOT_ANALYZED, ANALYZED, NOT_INDEXED
}

public fun mappingsForTypeWithEnum<T: Enum<T>>(type: String, allowDynamic: Boolean = false, initTopLevel: XContentJsonObjectWithFields<EsSystemFields>.()->Unit = {}, initProperties: XContentJsonObjectWithFields<T>.()->Unit): IndexTypeMapping {
val mappings = xsonObject {
ObjectWithFields<EsSystemFields>(type) {
setValue("dynamic", if (allowDynamic) "true" else "strict")
initTopLevel()
ObjectWithFields<T>("properties") {
initProperties()
}
}
}
return IndexTypeMapping(type, mappings)
}

public fun mappingsForType(type: String, allowDynamic: Boolean = false, initTopLevel: XContentJsonObjectWithFields<EsSystemFields>.()->Unit = {}, initProperties: XContentJsonObject.()->Unit): IndexTypeMapping {
val mappings = xsonObject {
ObjectWithFields<EsSystemFields>(type) {
setValue("dyanmic", if (allowDynamic) "true" else "strict")
initTopLevel()
Object("properties") {
initProperties()
}
}
}
return IndexTypeMapping(type, mappings)
}

public fun <T: Enum<T>> XContentJsonObjectWithFields<T>.stringField(field: T, indexed: EsIndexedField = EsIndexedField.NOT_ANALYZED, stored: EsStoredField = EsStoredField.NOT_STORED, init: XContentJsonObject.()->Unit) {
Object(field) {
setValue("type", "string")
setValue("store", stored == EsStoredField.STORED)
setValue("index", when(indexed) {
EsIndexedField.ANALYZED -> "analyzed"
EsIndexedField.NOT_ANALYZED -> "not_analyzed"
else -> "no"
})
init()
}
}

public fun <T: Enum<T>> XContentJsonObjectWithFields<T>.ignoreField(field: T) {
Object(field) {
setValue("type", "string")
setValue("store", false)
setValue("index", "no")
}
}

public fun <T: Enum<T>> XContentJsonObjectWithFields<T>.dateField(field: T, indexed: EsIndexedField = EsIndexedField.NOT_ANALYZED, stored: EsStoredField = EsStoredField.NOT_STORED, init: XContentJsonObject.()->Unit) {
Object(field) {
setValue("type", "date")
setValue("store", stored == EsStoredField.STORED)
if (indexed == EsIndexedField.NOT_INDEXED) {
setValue("index", "no")
}
init()
}
}

public fun <T: Enum<T>> XContentJsonObjectWithFields<T>.booleanField(field: T, indexed: EsIndexedField = EsIndexedField.NOT_ANALYZED, stored: EsStoredField = EsStoredField.NOT_STORED, init: XContentJsonObject.()->Unit) {
Object(field) {
setValue("type", "boolean")
setValue("store", stored == EsStoredField.STORED)
if (indexed == EsIndexedField.NOT_INDEXED) {
setValue("index", "no")
}
init()
}
}

public fun <T: Enum<T>> XContentJsonObjectWithFields<T>.integerField(field: T, indexed: EsIndexedField = EsIndexedField.NOT_ANALYZED, stored: EsStoredField = EsStoredField.NOT_STORED, init: XContentJsonObject.()->Unit) {
Object(field) {
setValue("type", "integer")
setValue("store", stored == EsStoredField.STORED)
if (indexed == EsIndexedField.NOT_INDEXED) {
setValue("index", "no")
}
init()
}
}

public fun <T: Enum<T>> XContentJsonObjectWithFields<T>.longField(field: T, indexed: EsIndexedField = EsIndexedField.NOT_ANALYZED, stored: EsStoredField = EsStoredField.NOT_STORED, init: XContentJsonObject.()->Unit) {
Object(field) {
setValue("type", "long")
setValue("store", stored == EsStoredField.STORED)
if (indexed == EsIndexedField.NOT_INDEXED) {
setValue("index", "no")
}
init()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package uy.klutter.elasticsearch

import com.fasterxml.jackson.core.type.TypeReference
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.search.SearchHits


public inline fun <reified T: Any> SearchResponse.getHitsAsObjects(): Sequence<T> {
return getHits().getHits().asSequence().map { EsConfig.objectMapper.readValue<T>(it.sourceAsString(), object : TypeReference<T>(){}) }
}

public inline fun <reified T: Any> SearchHits.getHitsAsObjects(): Sequence<T> {
return getHits().asSequence().map { EsConfig.objectMapper.readValue<T>(it.sourceAsString(), object : TypeReference<T>(){}) }
}
Loading

0 comments on commit 2eadfa7

Please sign in to comment.