Skip to content

Commit

Permalink
Reading Mapped groups from ES
Browse files Browse the repository at this point in the history
  • Loading branch information
kwakeroni committed Feb 24, 2017
1 parent 2e28525 commit 256969f
Show file tree
Hide file tree
Showing 50 changed files with 290 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package be.kwakeroni.parameters.backend.es.api;

import org.json.JSONObject;

/**
* (C) 2017 Maarten Van Puymbroeck
*/
public interface ElasticSearchCriteria {

public void addParameterMatch(String parameter, String value);

public JSONObject toJSONObject();

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
*/
public interface ElasticSearchData {

Stream<JSONObject> query(JSONObject query);
Stream<JSONObject> query(ElasticSearchCriteria query, int pageSize);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ default ElasticSearchQuery<T> raw() {
return this;
}

Optional<T> apply(ElasticSearchData data, JSONObject query);
Optional<T> apply(ElasticSearchData data, ElasticSearchCriteria criteria);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package be.kwakeroni.parameters.backend.es.service;

import be.kwakeroni.parameters.backend.es.api.ElasticSearchCriteria;
import org.json.JSONArray;
import org.json.JSONObject;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* (C) 2017 Maarten Van Puymbroeck
*/
class DefaultElasticSearchCriteria implements ElasticSearchCriteria{

private String group;
private List<Match> parameterMatches = Collections.emptyList();

public DefaultElasticSearchCriteria(String group) {
this.group = group;
}

@Override
public void addParameterMatch(String parameter, String value) {
if (this.parameterMatches.isEmpty()){
this.parameterMatches = new ArrayList<>(1);
}
this.parameterMatches.add(new Match(parameter, value));
}

@Override
public JSONObject toJSONObject() {
if (parameterMatches == null || parameterMatches.isEmpty()) {
return Match.match("_type", group);
} else {
JSONArray criteria = new JSONArray();

criteria.put(Match.match("_type", group));

this.parameterMatches.forEach(match -> criteria.put(match.toJSONObject()));


return new JSONObject().put("bool",
new JSONObject().put("must", criteria)
);
}
}

private static final class Match {
public final String parameter;
public final String value;

public Match(String parameter, String value) {
this.parameter = parameter;
this.value = value;
}

public JSONObject toJSONObject() {
return match(this.parameter, this.value);
}

public static JSONObject match(String parameter, String value) {
return new JSONObject().put("match",
new JSONObject().put(parameter, value));
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import be.kwakeroni.parameters.backend.api.BackendGroup;
import be.kwakeroni.parameters.backend.api.BusinessParametersBackend;
import be.kwakeroni.parameters.backend.api.query.BackendQuery;
import be.kwakeroni.parameters.backend.es.api.ElasticSearchCriteria;
import be.kwakeroni.parameters.backend.es.api.ElasticSearchData;
import be.kwakeroni.parameters.backend.es.api.ElasticSearchQuery;
import org.json.JSONObject;
Expand Down Expand Up @@ -60,13 +61,11 @@ public Collection<String> getGroupNames() {
public <V> V select(BackendGroup<ElasticSearchQuery<?>, ?, ?> group, BackendQuery<? extends ElasticSearchQuery<?>, V> query) {
ElasticSearchData data = new ElasticSearchData() {
@Override
public Stream<JSONObject> query(JSONObject query) {
return client.query(query);
public Stream<JSONObject> query(ElasticSearchCriteria criteria, int pageSize) {
return client.query(criteria.toJSONObject(), pageSize);
}
};
return ((ElasticSearchQuery<V>) query.raw()).apply(data,
new JSONObject().put("match",
new JSONObject().put("_type", group.getName()))).orElse(null);
return ((ElasticSearchQuery<V>) query.raw()).apply(data, new DefaultElasticSearchCriteria(group.getName())).orElse(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class ElasticSearchClient {
private WebResource index;
private WebResource search;

ElasticSearchClient(Configuration configuration){
ElasticSearchClient(Configuration configuration) {
this.client = new Client();

String indexPath = normalizeRelativePath(configuration.getIndexPath());
Expand All @@ -30,13 +30,13 @@ class ElasticSearchClient {
this.search = this.client.resource(url + "/_search");
}

Stream<JSONObject> getAggregation(String name, JSONObject terms){
Stream<JSONObject> getAggregation(String name, JSONObject terms) {
JSONObject request = new JSONObject();
request.put("size", 0);
request.put("aggs",
new JSONObject().put(name,
new JSONObject().put("terms",
terms)));
new JSONObject().put("terms",
terms)));
// request.put("from", page * pageSize);

ClientResponse response = search.post(ClientResponse.class, request.toString());
Expand All @@ -48,38 +48,38 @@ Stream<JSONObject> getAggregation(String name, JSONObject terms){

}

Stream<JSONObject> query(JSONObject query){
JSONObject first = query(query, 0, 10);
Stream<JSONObject> query(JSONObject query, int pageSize) {
JSONObject first = query(query, 0, pageSize);
long totalHits = first.getJSONObject("hits").getLong("total");
// 0->0, 1->1, 10->1, 11->2, 19->, 20->2, 21->3

if (totalHits == 0){
if (totalHits == 0) {
return Stream.empty();
} else {
long pages = 1 + (totalHits - 1) / 10;
long pages = 1 + (totalHits - 1) / pageSize;
if (pages == 1) {
return hits(first);
} else {
return Stream.concat(
hits(first),
LongStream
.range(1, pages)
.mapToObj(i -> query(query, i*10, 10))
.mapToObj(i -> query(query, i * pageSize, pageSize))
.flatMap(this::hits)
);
}
}

}

Stream<JSONObject> hits(JSONObject searchResults){
Stream<JSONObject> hits(JSONObject searchResults) {
JSONArray hits = searchResults.getJSONObject("hits").getJSONArray("hits");
return IntStream.range(0, hits.length())
.mapToObj(hits::getJSONObject)
.map(jo -> jo.getJSONObject("_source"));
}

JSONObject query(JSONObject query, long page, int pageSize){
JSONObject query(JSONObject query, long page, int pageSize) {
JSONObject request = new JSONObject();
request.put("query", query);
request.put("size", pageSize);
Expand All @@ -91,43 +91,43 @@ JSONObject query(JSONObject query, long page, int pageSize){
return new JSONObject(entity);
}

JSONObject _search(String query){
JSONObject _search(String query) {
ClientResponse response = search.post(ClientResponse.class, query);
String entity = extractEntity(response, String.class);
return new JSONObject(entity);
}

// Normalize to form /path
private String normalizeRelativePath(String path){
if (path.endsWith("/")){
path = path.substring(0, path.length()-1);
private String normalizeRelativePath(String path) {
if (path.endsWith("/")) {
path = path.substring(0, path.length() - 1);
}
if (! path.startsWith("/")){
if (!path.startsWith("/")) {
path = "/" + path;
}
return path;
}

private <T> T extractEntity(ClientResponse response, Class<T> type){
if(isError(response)){
private <T> T extractEntity(ClientResponse response, Class<T> type) {
if (isError(response)) {
throw toException(response);
}

return response.getEntity(type);
}

private boolean isError(ClientResponse response){
private boolean isError(ClientResponse response) {
Response.Status.Family family = response.getStatusInfo().getFamily();
return family == Response.Status.Family.CLIENT_ERROR
|| family == Response.Status.Family.SERVER_ERROR;
}

private RuntimeException toException(ClientResponse response){
private RuntimeException toException(ClientResponse response) {
throw new RuntimeException(
String.format("[%s] %s: %s",
response.getStatus(),
response.getStatusInfo().getReasonPhrase(),
response.getEntity(String.class)
));
response.getStatus(),
response.getStatusInfo().getReasonPhrase(),
response.getEntity(String.class)
));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package be.kwakeroni.parameters.basic.backend.query.support;

import be.kwakeroni.parameters.backend.api.BackendGroup;
import be.kwakeroni.parameters.backend.api.query.BackendQuery;
import be.kwakeroni.parameters.backend.api.query.BackendWireFormatterContext;

/**
* (C) 2017 Maarten Van Puymbroeck
*/
public abstract class IntermediaryBackendGroupSupport<Q extends BackendQuery<? extends Q, ?>, S, E> implements BackendGroup<Q, S, E> {

private final BackendGroup<Q, S, E> subGroup;

public IntermediaryBackendGroupSupport(BackendGroup<Q, S, E> subGroup) {
this.subGroup = subGroup;
}

@Override
public BackendQuery<? extends Q, ?> internalize(Object query, BackendWireFormatterContext context) {
return context.internalize(this, query);
}

public BackendGroup<Q, S, E> getSubGroup() {
return this.subGroup;
}


@Override
public String getName() {
return this.subGroup.getName();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package be.kwakeroni.parameters.basic.backend.query.support;

import be.kwakeroni.parameters.backend.api.query.BackendQuery;
import be.kwakeroni.parameters.backend.api.query.BackendWireFormatterContext;

/**
* (C) 2017 Maarten Van Puymbroeck
*/
public abstract class IntermediateBackendQuerySupport<Q extends BackendQuery<Q, V>, V> implements BackendQuery<Q, V> {

private final Q subQuery;

public IntermediateBackendQuerySupport(Q subQuery) {
this.subQuery = subQuery;
}

protected BackendQuery<Q, V> getSubQuery() {
return subQuery;
}

@Override
public Object externalizeValue(V value, BackendWireFormatterContext wireFormatterContext) {
return subQuery.externalizeValue(value, wireFormatterContext);
}

@Override
public V internalizeValue(Object value, BackendWireFormatterContext wireFormatterContext) {
return subQuery.internalizeValue(value, wireFormatterContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package be.kwakeroni.parameters.basic.backend.es;

import be.kwakeroni.parameters.backend.api.BackendGroup;
import be.kwakeroni.parameters.backend.api.query.BackendQuery;
import be.kwakeroni.parameters.backend.api.query.BackendWireFormatterContext;
import be.kwakeroni.parameters.backend.es.api.ElasticSearchCriteria;
import be.kwakeroni.parameters.backend.es.api.ElasticSearchData;
import be.kwakeroni.parameters.backend.es.api.ElasticSearchEntry;
import be.kwakeroni.parameters.backend.es.api.ElasticSearchQuery;
import be.kwakeroni.parameters.basic.backend.query.BasicBackendWireFormatter;
import be.kwakeroni.parameters.basic.backend.query.MappedBackendGroup;
import be.kwakeroni.parameters.basic.backend.query.support.IntermediaryBackendGroupSupport;
import be.kwakeroni.parameters.basic.backend.query.support.IntermediateBackendQuerySupport;
import org.json.JSONObject;

import java.util.Optional;
import java.util.function.BiPredicate;

/**
* (C) 2017 Maarten Van Puymbroeck
*/
public class ElasticSearchMappedGroup
extends IntermediaryBackendGroupSupport<ElasticSearchQuery<?>, Object, ElasticSearchEntry>
implements MappedBackendGroup<ElasticSearchQuery<?>, Object, ElasticSearchEntry> {

private final String keyParameterName;

public ElasticSearchMappedGroup(String keyParameterName, BackendGroup<ElasticSearchQuery<?>, Object, ElasticSearchEntry> subGroup) {
super(subGroup);
this.keyParameterName = keyParameterName;
}

@Override
public ElasticSearchQuery<?> getEntryQuery(String keyValue, ElasticSearchQuery<?> subQuery) {
return new ElasticSearchMappedQuery<>(keyValue, subQuery);
}

@Override
public void validateNewEntry(ElasticSearchEntry entry, Object storage) {
throw new UnsupportedOperationException();
}

private final class ElasticSearchMappedQuery<T>
extends IntermediateBackendQuerySupport<ElasticSearchQuery<T>, T>
implements ElasticSearchQuery<T>{

private final String keyValue;

public ElasticSearchMappedQuery(String keyValue, ElasticSearchQuery<T> subQuery) {
super(subQuery);
this.keyValue = keyValue;
}

@Override
public Optional<T> apply(ElasticSearchData data, ElasticSearchCriteria criteria) {
criteria.addParameterMatch(keyParameterName, this.keyValue);
return getSubQuery().raw().apply(data, criteria);
}

}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package be.kwakeroni.parameters.basic.backend.es;

import be.kwakeroni.parameters.backend.api.query.BackendWireFormatterContext;
import be.kwakeroni.parameters.backend.es.api.ElasticSearchCriteria;
import be.kwakeroni.parameters.backend.es.api.ElasticSearchData;
import be.kwakeroni.parameters.backend.es.api.ElasticSearchQuery;
import be.kwakeroni.parameters.basic.backend.query.BasicBackendWireFormatter;
Expand All @@ -25,8 +26,8 @@ private EntryElasticSearchQuery() {
}

@Override
public Optional<Map<String, String>> apply(ElasticSearchData data, JSONObject query) {
return data.query(query)
public Optional<Map<String, String>> apply(ElasticSearchData data, ElasticSearchCriteria criteria) {
return data.query(criteria, 2)
.reduce(ElasticSearchSimpleGroup.atMostOne())
.map(this::toStringMap);
}
Expand Down
Loading

0 comments on commit 256969f

Please sign in to comment.