diff --git a/dataservice/api/common/pagination.py b/dataservice/api/common/pagination.py index c7a15c87e..8795f03bc 100644 --- a/dataservice/api/common/pagination.py +++ b/dataservice/api/common/pagination.py @@ -2,6 +2,7 @@ from functools import wraps from dateutil import parser from datetime import datetime +from dataservice.extensions import db, indexd def paginated(f): @@ -101,6 +102,17 @@ def indexd_pagination(q, after, limit): :returns: A Pagination object """ + def prefetch_indexd(): + """ Compute dids for the page and have indexd fetch them in bulk """ + model = q._entities[0].mapper.entity + gfs = (q.order_by(model.created_at.asc()) + .filter(model.created_at > after) + .with_entities(model.latest_did) + .limit(limit).all()) + dids = [gf[0] for gf in gfs] + indexd.prefetch(dids) + + prefetch_indexd() pager = Pagination(q, after, limit) keep = [] refresh = True @@ -112,6 +124,8 @@ def indexd_pagination(q, after, limit): next_after = keep[-1].created_at if len(keep) > 0 else after # Number of results needed to fulfill the original limit remain = limit - len(keep) + + prefetch_indexd() pager = Pagination(q, next_after, remain) for st in pager.items: diff --git a/dataservice/extensions/flask_indexd.py b/dataservice/extensions/flask_indexd.py index a574c8c6f..88eceeb48 100644 --- a/dataservice/extensions/flask_indexd.py +++ b/dataservice/extensions/flask_indexd.py @@ -17,12 +17,18 @@ class Indexd(object): def __init__(self, app=None): self.app = app + # Used to store documents prefetched for a page + self.page_cache = {} if app is not None: self.init_app(app) def init_app(self, app): app.config.setdefault('INDEXD_URL', None) self.url = app.config['INDEXD_URL'] + if self.url: + self.bulk_url = '/'.join(self.url.split('/')[:-1] + ['bulk/']) + else: + self.bulk_url = None if hasattr(app, 'teardown_appcontext'): app.teardown_appcontext(self.teardown) else: @@ -42,6 +48,19 @@ def teardown(self, exception): if hasattr(ctx, 'indexd_session'): ctx.indexd_session.close() + def prefetch(self, dids): + """ + Fetch a list of documents by did into the page cache. + """ + # If running in dev mode, don't call indexd + if self.url is None or self.bulk_url is None: + return + print(self.url, self.bulk_url) + self.page_cache = {} + resp = self.session.post(self.bulk_url + 'documents', json=dids).json() + for doc in resp: + self.page_cache[doc['did']] = doc + def get(self, record): """ Retrieves a record from indexd @@ -55,6 +74,9 @@ def get(self, record): if self.url is None: return record + if record.latest_did in self.page_cache: + return self.page_cache[record.latest_did] + url = self.url + record.latest_did resp = self.session.get(url) self.check_response(resp) @@ -171,7 +193,8 @@ def update(self, record): if record.acl != old['acl']: self._update_all_acls(record) - url = '{}{}?rev={}'.format(self.url, record.latest_did, record.rev) + url = '{}{}?rev={}'.format(self.url, record.latest_did, + record.rev) if 'size' in req_body or 'hashes' in req_body: # Create a new version in indxed req_body['form'] = 'object' diff --git a/tests/mocks.py b/tests/mocks.py index 8ca8bccdd..34380bbe7 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -66,7 +66,7 @@ def __init__(self, *args, status_code=200, **kwargs): def post(self, url, *args, **kwargs): """ - Mocks a response from POST /index/ + Mocks a response from POST /index/ and POST /bulk/documents """ resp = { @@ -83,6 +83,9 @@ def post(self, url, *args, **kwargs): # Otherwise, assume creation of a new doc and track the baseid self.baseid_by_did[resp['did']] = resp['baseid'] + if 'bulk/documents' in url: + resp = [resp] + mock_resp = MockResp(resp=resp, status_code=self.status_code) return mock_resp