Skip to content

Commit

Permalink
Merge pull request #12 from alkemics/search
Browse files Browse the repository at this point in the history
Aggregations tests + misc
  • Loading branch information
alk-lbinet authored May 8, 2020
2 parents 52d5a93 + 46b2b4d commit 5a5619e
Show file tree
Hide file tree
Showing 30 changed files with 569 additions and 480 deletions.
7 changes: 0 additions & 7 deletions docs/source/reference/pandagg.agg.rst

This file was deleted.

7 changes: 0 additions & 7 deletions docs/source/reference/pandagg.client.rst

This file was deleted.

7 changes: 0 additions & 7 deletions docs/source/reference/pandagg.interactive.abstract.rst

This file was deleted.

7 changes: 0 additions & 7 deletions docs/source/reference/pandagg.interactive.client.rst

This file was deleted.

7 changes: 0 additions & 7 deletions docs/source/reference/pandagg.interactive.index.rst

This file was deleted.

7 changes: 0 additions & 7 deletions docs/source/reference/pandagg.node.agg.abstract.rst

This file was deleted.

7 changes: 0 additions & 7 deletions docs/source/reference/pandagg.node.agg.bucket.rst

This file was deleted.

7 changes: 0 additions & 7 deletions docs/source/reference/pandagg.node.agg.deserializer.rst

This file was deleted.

7 changes: 0 additions & 7 deletions docs/source/reference/pandagg.node.agg.metric.rst

This file was deleted.

7 changes: 0 additions & 7 deletions docs/source/reference/pandagg.node.agg.pipeline.rst

This file was deleted.

21 changes: 0 additions & 21 deletions docs/source/reference/pandagg.node.agg.rst

This file was deleted.

7 changes: 0 additions & 7 deletions docs/source/reference/pandagg.node.mapping.deserializer.rst

This file was deleted.

7 changes: 0 additions & 7 deletions docs/source/reference/pandagg.node.mixins.rst

This file was deleted.

7 changes: 0 additions & 7 deletions docs/source/reference/pandagg.node.query.deserializer.rst

This file was deleted.

7 changes: 0 additions & 7 deletions docs/source/reference/pandagg.tree.agg.rst

This file was deleted.

79 changes: 44 additions & 35 deletions examples/imdb/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,52 @@
from os.path import join
from elasticsearch import Elasticsearch, helpers
from examples.imdb.conf import ES_HOST, DATA_DIR
from pandagg.node.mapping.field_datatypes import (
Keyword,
Text,
Date,
Float,
Nested,
Integer,
)
from pandagg.tree.mapping import Mapping

index_name = "movies"
mapping = {
"properties": {
"movie_id": {"type": "integer"},
"name": {"type": "text", "fields": {"raw": {"type": "keyword"}}},
"year": {"type": "date", "format": "yyyy"},
"rank": {"type": "float"},
# array
"genres": {"type": "keyword"},
# nested
"roles": {
"type": "nested",
"properties": {
"role": {"type": "keyword"},
"actor_id": {"type": "integer"},
"gender": {"type": "keyword"},
"first_name": {"type": "text", "fields": {"raw": {"type": "keyword"}}},
"last_name": {"type": "text", "fields": {"raw": {"type": "keyword"}}},
"full_name": {"type": "text", "fields": {"raw": {"type": "keyword"}}},
},
},
# nested
"directors": {
"type": "nested",
"properties": {
"director_id": {"type": "integer"},
"first_name": {"type": "text", "fields": {"raw": {"type": "keyword"}}},
"last_name": {"type": "text", "fields": {"raw": {"type": "keyword"}}},
"full_name": {"type": "text", "fields": {"raw": {"type": "keyword"}}},
"genres": {"type": "keyword"},
},
},
"nb_directors": {"type": "integer"},
"nb_roles": {"type": "integer"},
}
}
mapping = Mapping(
properties=[
Keyword("movie_id"),
Text("name", fields=Keyword("raw")),
Date("year"),
Float("rank"),
Keyword("genres"),
Nested(
"roles",
properties=[
Keyword("role"),
Keyword("actor_id"),
Keyword("gender"),
Text("first_name", copy_to="roles.full_name", fields=Keyword("raw")),
Text("last_name", copy_to="roles.full_name", fields=Keyword("raw")),
Text("full_name"),
],
),
Nested(
"directors",
properties=[
Keyword("role"),
Keyword("director_id"),
Keyword("gender"),
Text(
"first_name", copy_to="directors.full_name", fields=Keyword("raw")
),
Text("last_name", copy_to="directors.full_name", fields=Keyword("raw")),
Text("full_name"),
],
),
Integer("nb_directors"),
Integer("nb_roles"),
]
).to_dict()


def bulk_index(client, docs):
Expand Down
2 changes: 1 addition & 1 deletion pandagg/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, name, settings, mapping, aliases, client=None):
self.name = name
self.settings = settings
self._mapping = mapping
self.mapping = IMapping(mapping, client=client, index_name=name)
self.mapping = IMapping(mapping, client=client, index=name)
self.aliases = aliases

def search(self):
Expand Down
50 changes: 23 additions & 27 deletions pandagg/interactive/_field_agg_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,50 +24,46 @@ def list_available_aggs_on_field(field_type):
]


def field_klass_init(self, mapping_tree, client, field, index_name):
def field_klass_init(self, mapping_tree, client, field, index):
self._mapping_tree = mapping_tree
self._client = client
self._field = field
self._index_name = index_name
self._index = index


def aggregator_factory(agg_klass):
def aggregator(
self, index=None, execute=True, raw_output=False, query=None, **kwargs
):
def aggregator(self, index=None, raw_output=False, query=None, **kwargs):
node = agg_klass(name="%s_agg" % agg_klass.KEY, field=self._field, **kwargs)
return self._operate(node, index, execute, raw_output, query)
return self._operate(node, index, raw_output, query)

aggregator.__doc__ = agg_klass.__init__.__doc__ or agg_klass.__doc__
return aggregator


def _operate(self, agg_node, index, execute, raw_output, query):
index = index or self._index_name
def _operate(self, agg_node, index, raw_output, query):
index = index or self._index
aggregation = {agg_node.name: agg_node.to_dict()}
nesteds = self._mapping_tree.list_nesteds_at_field(self._field) or []
for nested in nesteds:
aggregation = {nested: {"nested": {"path": nested}, "aggs": aggregation}}

if self._client is not None and execute:
body = {"aggs": aggregation, "size": 0}
if query is not None:
body["query"] = query
raw_response = self._client.search(index=index, body=body)["aggregations"]
for nested in nesteds:
raw_response = raw_response[nested]
result = list(agg_node.extract_buckets(raw_response[agg_node.name]))

if raw_output:
return result
try:
import pandas as pd
except ImportError:
return result
keys = map(itemgetter(0), result)
raw_values = map(itemgetter(1), result)
return pd.DataFrame(index=keys, data=raw_values)
return aggregation
body = {"aggs": aggregation, "size": 0}
if query is not None:
body["query"] = query
raw_response = self._client.search(index=index, body=body)["aggregations"]
for nested in nesteds:
raw_response = raw_response[nested]
result = list(agg_node.extract_buckets(raw_response[agg_node.name]))

if raw_output:
return result
try:
import pandas as pd
except ImportError:
return result
keys = map(itemgetter(0), result)
raw_values = map(itemgetter(1), result)
return pd.DataFrame(index=keys, data=raw_values)


def field_type_klass_factory(field_type):
Expand Down
13 changes: 3 additions & 10 deletions pandagg/interactive/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class IMapping(TreeBasedObj):

def __init__(self, *args, **kwargs):
self._client = kwargs.pop("client", None)
self._index_name = kwargs.pop("index_name", None)
self._index = kwargs.pop("index", None)
root_path = kwargs.pop("root_path", None)
depth = kwargs.pop("depth", 1)
initial_tree = kwargs.pop("initial_tree", None)
Expand All @@ -25,21 +25,14 @@ def __init__(self, *args, **kwargs):
# if we reached a leave, add aggregation capabilities based on reached mapping type
self._set_agg_property_if_required()

def _bind(self, client, index_name=None):
self._client = client
if index_name is not None:
self._index_name = index_name
self._set_agg_property_if_required()
return self

def _clone(self, nid, root_path, depth):
return IMapping(
self._tree.subtree(nid),
client=self._client,
root_path=root_path,
depth=depth,
initial_tree=self._initial_tree,
index_name=self._index_name,
index=self._index,
)

def _set_agg_property_if_required(self):
Expand All @@ -50,7 +43,7 @@ def _set_agg_property_if_required(self):
mapping_tree=self._initial_tree,
client=self._client,
field=self._initial_tree.node_path(field_node.identifier),
index_name=self._index_name,
index=self._index,
)

def __call__(self, *args, **kwargs):
Expand Down
8 changes: 5 additions & 3 deletions pandagg/node/aggs/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ def _type_deserializer(cls, name_or_agg, **params):
- either Agg instance if provided
"""
# hack for now
if isinstance(name_or_agg, Tree) and name_or_agg.__class__.__name__ == "Agg":
if isinstance(name_or_agg, Tree) and name_or_agg.__class__.__name__ == "Aggs":
if params:
raise ValueError(
"Cannot accept parameters when passing in an Agg object."
"Cannot accept parameters when passing in an Aggs object."
)
return name_or_agg

Expand Down Expand Up @@ -103,11 +103,13 @@ def _type_deserializer(cls, name_or_agg, **params):
if not isinstance(name_or_agg, string_types):
raise ValueError("Invalid")
# "tags", size=10 (by default apply a terms agg)
if "name" not in params:
if "name" not in params and "field" not in params:
return cls.get_dsl_class("terms")(
name=name_or_agg, field=name_or_agg, **params
)
# "terms", field="tags", name="per_tags"
if "name" not in params:
raise ValueError("Aggregation expects a 'name'. Got %s." % params)
return cls.get_dsl_class(name_or_agg)(**params)

def line_repr(self, depth, **kwargs):
Expand Down
14 changes: 11 additions & 3 deletions pandagg/node/aggs/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,17 @@ class Filter(UniqueBucketAgg):
KEY = "filter"
VALUE_ATTRS = ["doc_count"]

def __init__(self, name, filter, meta=None, aggs=None):
self.filter = filter.copy()
super(Filter, self).__init__(name=name, meta=meta, aggs=aggs, **filter)
def __init__(self, name, filter=None, meta=None, aggs=None, **kwargs):
if (filter is not None) != (not kwargs):
raise ValueError(
'Filter aggregation requires exactly one of "filter" or "kwargs"'
)
if filter:
filter_ = filter.copy()
else:
filter_ = kwargs.copy()
self.filter = filter_
super(Filter, self).__init__(name=name, meta=meta, aggs=aggs, **filter_)

def get_filter(self, key):
return self.filter
Expand Down
Loading

0 comments on commit 5a5619e

Please sign in to comment.