From d3b21d59f7d54224e58c18da757c7a1b1e79ec0b Mon Sep 17 00:00:00 2001 From: Dan Kolbman Date: Fri, 21 Sep 2018 14:06:28 -0400 Subject: [PATCH 1/2] :sparkles: Add user agent to requests headers --- dataservice/extensions/flask_indexd.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dataservice/extensions/flask_indexd.py b/dataservice/extensions/flask_indexd.py index 2bbbb4d73..a574c8c6f 100644 --- a/dataservice/extensions/flask_indexd.py +++ b/dataservice/extensions/flask_indexd.py @@ -33,7 +33,8 @@ def new_session(self): s = requests.Session() s.auth = (current_app.config['INDEXD_USER'], current_app.config['INDEXD_PASS']) - s.headers.update({'Content-Type': 'application/json'}) + s.headers.update({'Content-Type': 'application/json', + 'User-Agent': 'Kids First Dataservice'}) return s def teardown(self, exception): From 33be6bad601c2b671a826787dba5a9cae41fcfe6 Mon Sep 17 00:00:00 2001 From: Dan Kolbman Date: Fri, 21 Sep 2018 15:58:22 -0400 Subject: [PATCH 2/2] :zap: Prefetch indexd documents in bulk --- dataservice/api/common/pagination.py | 29 ++++++++++++++++++++------ dataservice/extensions/flask_indexd.py | 26 ++++++++++++++++++++++- tests/mocks.py | 5 ++++- 3 files changed, 52 insertions(+), 8 deletions(-) diff --git a/dataservice/api/common/pagination.py b/dataservice/api/common/pagination.py index c7a15c87e..f595ffd0e 100644 --- a/dataservice/api/common/pagination.py +++ b/dataservice/api/common/pagination.py @@ -2,6 +2,7 @@ from functools import wraps from dateutil import parser from datetime import datetime +from dataservice.extensions import db, indexd def paginated(f): @@ -101,6 +102,18 @@ def indexd_pagination(q, after, limit): :returns: A Pagination object """ + def prefetch_indexd(after): + """ Compute dids for the page and have indexd fetch them in bulk """ + model = q._entities[0].mapper.entity + gfs = (q.order_by(model.created_at.asc()) + .filter(model.created_at > after) + .with_entities(model.latest_did) + .limit(limit).all()) + dids = [gf[0] for gf in gfs] + indexd.prefetch(dids) + + indexd.clear_cache() + prefetch_indexd(after) pager = Pagination(q, after, limit) keep = [] refresh = True @@ -108,18 +121,22 @@ def indexd_pagination(q, after, limit): # Continue updating the page until we get a page with no deleted files while (pager.total > 0 and refresh): refresh = False - # Move the cursor ahead to the last valid file - next_after = keep[-1].created_at if len(keep) > 0 else after - # Number of results needed to fulfill the original limit - remain = limit - len(keep) - pager = Pagination(q, next_after, remain) - for st in pager.items: if hasattr(st, 'was_deleted') and st.was_deleted: refresh = True else: keep.append(st) + # Only fetch more if we saw there were some items that were deleted + if refresh: + # Move the cursor ahead to the last valid file + next_after = keep[-1].created_at if len(keep) > 0 else after + # Number of results needed to fulfill the original limit + remain = limit - len(keep) + + prefetch_indexd(next_after) + pager = Pagination(q, next_after, remain) + # Replace original page's items with new list of valid files pager.items = keep pager.after = next_after if next_after else after diff --git a/dataservice/extensions/flask_indexd.py b/dataservice/extensions/flask_indexd.py index a574c8c6f..7da864325 100644 --- a/dataservice/extensions/flask_indexd.py +++ b/dataservice/extensions/flask_indexd.py @@ -17,12 +17,18 @@ class Indexd(object): def __init__(self, app=None): self.app = app + # Used to store documents prefetched for a page + self.page_cache = {} if app is not None: self.init_app(app) def init_app(self, app): app.config.setdefault('INDEXD_URL', None) self.url = app.config['INDEXD_URL'] + if self.url: + self.bulk_url = '/'.join(self.url.split('/')[:-1] + ['bulk/']) + else: + self.bulk_url = None if hasattr(app, 'teardown_appcontext'): app.teardown_appcontext(self.teardown) else: @@ -42,6 +48,20 @@ def teardown(self, exception): if hasattr(ctx, 'indexd_session'): ctx.indexd_session.close() + def prefetch(self, dids): + """ + Fetch a list of documents by did into the page cache. + """ + # If running in dev mode, don't call indexd + if self.url is None or self.bulk_url is None: + return + resp = self.session.post(self.bulk_url + 'documents', json=dids).json() + for doc in resp: + self.page_cache[doc['did']] = doc + + def clear_cache(self): + self.page_cache = {} + def get(self, record): """ Retrieves a record from indexd @@ -55,6 +75,9 @@ def get(self, record): if self.url is None: return record + if record.latest_did in self.page_cache: + return self.page_cache[record.latest_did] + url = self.url + record.latest_did resp = self.session.get(url) self.check_response(resp) @@ -171,7 +194,8 @@ def update(self, record): if record.acl != old['acl']: self._update_all_acls(record) - url = '{}{}?rev={}'.format(self.url, record.latest_did, record.rev) + url = '{}{}?rev={}'.format(self.url, record.latest_did, + record.rev) if 'size' in req_body or 'hashes' in req_body: # Create a new version in indxed req_body['form'] = 'object' diff --git a/tests/mocks.py b/tests/mocks.py index 8ca8bccdd..34380bbe7 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -66,7 +66,7 @@ def __init__(self, *args, status_code=200, **kwargs): def post(self, url, *args, **kwargs): """ - Mocks a response from POST /index/ + Mocks a response from POST /index/ and POST /bulk/documents """ resp = { @@ -83,6 +83,9 @@ def post(self, url, *args, **kwargs): # Otherwise, assume creation of a new doc and track the baseid self.baseid_by_did[resp['did']] = resp['baseid'] + if 'bulk/documents' in url: + resp = [resp] + mock_resp = MockResp(resp=resp, status_code=self.status_code) return mock_resp