Skip to content

Commit

Permalink
Merge pull request #70 from alk-lbinet/clarifications
Browse files Browse the repository at this point in the history
source include doesnt need to be validated against mappings
  • Loading branch information
alk-lbinet authored Jul 5, 2021
2 parents e978774 + 398c688 commit 521776e
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 57 deletions.
72 changes: 37 additions & 35 deletions pandagg/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,45 +163,47 @@ def _parse_group_by(
Yields each row for which last bucket aggregation generated buckets.
"""
# initialization: find ancestors once for faster computation
# initialization: cache ancestors once for faster computation, that will be passed as arguments to downside
# recursive calls
if ancestors is None:
until_id = self._aggs.id_from_key(until)
ancestors = self._aggs.ancestors(until_id, include_current=True)
# remove eventual fake root
ancestors = [(k, n) for k, n in ancestors if k is not None]
# remove root (not an aggregation clause)
ancestors = [(k, n) for k, n in ancestors[:-1]]
agg_name, agg_node = ancestors[-1]

if agg_name not in response:
return

if not row:
row = [] if row_as_tuple else {}
if agg_name in response:
agg_node = [n for k, n in ancestors if k == agg_name][0]
for key, raw_bucket in agg_node.extract_buckets(response[agg_name]):
sub_row = copy.copy(row)
if (
not isinstance(agg_node, UniqueBucketAgg)
or with_single_bucket_groups
):
if row_as_tuple:
sub_row.append(key)
else:
sub_row[agg_name] = key
if agg_name == until:
# end real yield
if row_as_tuple:
yield tuple(sub_row), raw_bucket
else:
yield sub_row, raw_bucket
elif agg_name in {k for k, _ in ancestors}:
# yield children
for child_key, _ in self._aggs.children(agg_node.identifier):
for nrow, nraw_bucket in self._parse_group_by(
row=sub_row,
response=raw_bucket,
agg_name=child_key,
until=until,
row_as_tuple=row_as_tuple,
ancestors=ancestors,
):
yield nrow, nraw_bucket

agg_node = [n for k, n in ancestors if k == agg_name][0]
for key, raw_bucket in agg_node.extract_buckets(response[agg_name]):
sub_row = copy.copy(row)
if not isinstance(agg_node, UniqueBucketAgg) or with_single_bucket_groups:
if row_as_tuple:
sub_row.append(key)
else:
sub_row[agg_name] = key
if agg_name == until:
# end real yield
if row_as_tuple:
yield tuple(sub_row), raw_bucket
else:
yield sub_row, raw_bucket
elif agg_name in {k for k, _ in ancestors}:
# yield children
for child_key, _ in self._aggs.children(agg_node.identifier):
for nrow, nraw_bucket in self._parse_group_by(
row=sub_row,
response=raw_bucket,
agg_name=child_key,
until=until,
row_as_tuple=row_as_tuple,
ancestors=ancestors,
):
yield nrow, nraw_bucket

def _normalize_buckets(self, agg_response, agg_name=None):
"""
Expand Down Expand Up @@ -293,7 +295,7 @@ def to_tabular(
:param index_orient: if True, level-key samples are returned as tuples, else in a dictionnary
:param grouped_by: name of the aggregation node used as last grouping level
:param normalize: if True, normalize columns buckets
:return: index, index_names, values
:return: index_names, values
"""
grouping_key, grouping_agg = self._grouping_agg(grouped_by)
if grouping_key is None:
Expand Down Expand Up @@ -431,7 +433,7 @@ def serialize(self, output="tabular", **kwargs):
elif output == "dataframe":
return self.to_dataframe(**kwargs)
else:
raise NotImplementedError("Unkown %s output format." % output)
raise NotImplementedError("Unknown %s output format." % output)

def __repr__(self):
if not self.keys():
Expand Down
26 changes: 9 additions & 17 deletions pandagg/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import json

from elasticsearch.helpers import scan
from lighttree.exceptions import NotFoundNodeError

from pandagg.connections import get_connection
from pandagg.query import Bool
Expand Down Expand Up @@ -338,22 +337,15 @@ def __getitem__(self, n):
s._params["from"] = n.start or 0
s._params["size"] = n.stop - (n.start or 0) if n.stop is not None else 10
return s
elif isinstance(n, list):
# Columns selection
if self._mappings:
for key in n:
try:
self._mappings.id_from_key(key)
except NotFoundNodeError:
raise KeyError("%s not in index" % key)
return self.source(includes=n)
else: # This is an index lookup, equivalent to slicing by [n:n+1].
# If negative index, abort.
if n < 0:
raise ValueError("Search does not support negative indexing.")
s._params["from"] = n
s._params["size"] = 1
return s
if isinstance(n, list):
return s.source(includes=n)
# This is an index lookup, equivalent to slicing by [n:n+1].
# If negative index, abort.
if n < 0:
raise ValueError("Search does not support negative indexing.")
s._params["from"] = n
s._params["size"] = 1
return s

def size(self, size):
"""
Expand Down
5 changes: 0 additions & 5 deletions tests/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ def test_search_column_selection(self):
{"_source": {"includes": ["col1", "col2"]}},
)

with self.assertRaises(KeyError):
Search(mappings=Mappings(properties={"example": {"type": "keyword"}}))[
["col1", "col2"]
]

def test_using(self):
o = object()
o2 = object()
Expand Down

0 comments on commit 521776e

Please sign in to comment.