diff --git a/elastic/core/src/main/scala/Index.scala b/elastic/core/src/main/scala/Index.scala index 95657a5..2a930b6 100644 --- a/elastic/core/src/main/scala/Index.scala +++ b/elastic/core/src/main/scala/Index.scala @@ -7,6 +7,8 @@ import com.sksamuel.elastic4s._ import com.sksamuel.elastic4s.analysis.Analysis import com.sksamuel.elastic4s.circe._ import com.sksamuel.elastic4s.fields.ElasticField +import com.sksamuel.elastic4s.requests.delete.{DeleteByIdRequest, DeleteByQueryRequest} +import com.sksamuel.elastic4s.requests.indexes.IndexRequest import com.sksamuel.elastic4s.requests.searches.SearchRequest import com.sksamuel.elastic4s.requests.searches.queries.Query import com.sksamuel.elastic4s.requests.update.UpdateRequest @@ -84,51 +86,48 @@ abstract class Index( def encodeId(id: Id) = hitIdFromId(id).noSpacesSortKeys - def index(latest: Latest) = execute(indexRequest(latest)) + def index(latest: Latest, transformRequest: IndexRequest => IndexRequest = identity) = execute(indexRequest(latest, transformRequest)) def bulkIndex(lastests: Latest*) = execute(bulkIndexRequest(lastests)) - def indexRequest(latest: Latest) = { - indexInto(name) id encodeId(latest.id) doc latest refresh indexSetup.refreshPolicy - } + def indexRequest(latest: Latest, transformRequest: IndexRequest => IndexRequest = identity) = + transformRequest(indexInto(name) id encodeId(latest.id) doc latest refresh indexSetup.refreshPolicy) def bulkIndexRequest(latests: Seq[Index.this.Latest]) = - ElasticDsl.bulk(latests.map(indexRequest)) refresh indexSetup.refreshPolicy + ElasticDsl.bulk(latests.map(indexRequest(_))) refresh indexSetup.refreshPolicy def bulkDelete(id: Id*) = execute(bulkDeleteRequest(id)) def bulkDeleteRequest(id: Seq[Id]) = - ElasticDsl.bulk(id.map(deleteRequest)) refresh indexSetup.refreshPolicy + ElasticDsl.bulk(id.map(deleteRequest(_))) refresh indexSetup.refreshPolicy def bulk(bulkDelete: Seq[Id], bulkIndex: Seq[Latest]) = execute(bulkRequest(bulkDelete, bulkIndex)) def bulkRequest(bulkDelete: Seq[Id], bulkIndex: Seq[Latest]) = - ElasticDsl.bulk(bulkDelete.map(deleteRequest) ++ bulkIndex.map(indexRequest)) refresh indexSetup.refreshPolicy + ElasticDsl.bulk(bulkDelete.map(deleteRequest(_)) ++ bulkIndex.map(indexRequest(_))) refresh indexSetup.refreshPolicy def get(id: Id): Future[Option[Latest]] = execute(getRequest(id)).map(_.toOpt[Latest]) private def getRequest(id: Id) = ElasticDsl.get(name, encodeId(id)) - def delete(id: Id) = - execute(deleteRequest(id)) + def delete(id: Id, transformRequest: DeleteByIdRequest => DeleteByIdRequest = identity) = + execute(deleteRequest(id, transformRequest)) def deleteAll() = execute(deleteAllRequest()) - def deleteRequest(id: Id) = deleteById(name, encodeId(id)) refresh indexSetup.refreshPolicy + def deleteRequest(id: Id, transformRequest: DeleteByIdRequest => DeleteByIdRequest = identity) = transformRequest(deleteById(name, encodeId(id)) refresh indexSetup.refreshPolicy) def deleteAllRequest() = deleteByQuery(name, matchAllQuery()) refresh indexSetup.refreshPolicy - def deleteQuery(query: Query) = execute(deleteQueryRequest(query)) + def deleteQuery(query: Query, transformRequest: DeleteByQueryRequest => DeleteByQueryRequest = identity) = execute(deleteQueryRequest(query, transformRequest)) - def deleteQueryRequest(query: Query) = deleteByQuery(name, query) refresh indexSetup.refreshPolicy + def deleteQueryRequest(query: Query, transformRequest: DeleteByQueryRequest => DeleteByQueryRequest = identity) = transformRequest(deleteByQuery(name, query) refresh indexSetup.refreshPolicy) - def update(id: Id, transformUpdateRequest: UpdateRequest => UpdateRequest) = execute( - updateRequest(id, transformUpdateRequest) - ) + def update(id: Id, transformRequest: UpdateRequest => UpdateRequest) = execute(updateRequest(id, transformRequest)) - def updateRequest(id: Id, transformRequest: UpdateRequest => UpdateRequest) = transformRequest(updateById(name, encodeId(id)) refresh indexSetup.refreshPolicy) + def updateRequest(id: Id, transformRequest: UpdateRequest => UpdateRequest = identity) = transformRequest(updateById(name, encodeId(id)) refresh indexSetup.refreshPolicy) def updateField(id: Id, field: Latest => Any, value: Any) = execute(updateFieldRequest(id, field, value))