Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚡️ Indexd bulk fetching #448

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions dataservice/api/common/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from functools import wraps
from dateutil import parser
from datetime import datetime
from dataservice.extensions import db, indexd


def paginated(f):
Expand Down Expand Up @@ -101,25 +102,41 @@ def indexd_pagination(q, after, limit):

:returns: A Pagination object
"""
def prefetch_indexd(after):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm indexd_pagination is confusing to me now.. What I would think you want is:

  1. Execute the query to get all of the objs for the page like you normally would with Pagination - without executing the request to indexd per instantiation of an indexd model.
  2. Collect the dids from the objs in the page of results
  3. Send the bulk load request out to indexd to get the page of indexd docs
  4. Iterate over the indexd docs and merge with the query results from step 1
  5. Return the results

I realize you'd have to refactor (maybe remove merge_indexd from constructor) and decouple things quite a bit (separate the indexd get request from the actual merging of an indexd doc with an indexd model instance). So maybe that's why you didn't do it this way.

But if you were able to implement the above, then you could probably also get rid of the entire while loop that checks for deleted files right? You would just be able to return the page right away.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, but it would require decoupling the indexd property loading on construction of the object. Perhaps you could load the indexd properties only when you attempt to access them, but I'm not sure if that is possible.

""" Compute dids for the page and have indexd fetch them in bulk """
model = q._entities[0].mapper.entity
gfs = (q.order_by(model.created_at.asc())
.filter(model.created_at > after)
.with_entities(model.latest_did)
.limit(limit).all())
dids = [gf[0] for gf in gfs]
indexd.prefetch(dids)

indexd.clear_cache()
prefetch_indexd(after)
Copy link
Contributor

@fiendish fiendish Oct 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if indexd bulk fetch were to happen after the Pagination object is initialized instead of before, then you could also speed up empty returns by not checking indexd this first time when total is 0. Is that doable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there's probably a couple optimizations to be made around reducing the number of db queries here. It would involve adding some sort of branching into the Pagination object, though, and I'd prefer to keep it simple as it's generalized to all entities at the moment.

Right now, if the Pagination object were constructed first, it would result in the old behavior of constructing each object with its own request to indexd. Note that if there is a total=0, indexd won't actually be called as the query for gfs will return empty.

Copy link
Contributor

@fiendish fiendish Oct 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. All clear from me, then.

pager = Pagination(q, after, limit)
keep = []
refresh = True
next_after = None
# Continue updating the page until we get a page with no deleted files
while (pager.total > 0 and refresh):
refresh = False
# Move the cursor ahead to the last valid file
next_after = keep[-1].created_at if len(keep) > 0 else after
# Number of results needed to fulfill the original limit
remain = limit - len(keep)
pager = Pagination(q, next_after, remain)

for st in pager.items:
if hasattr(st, 'was_deleted') and st.was_deleted:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this never going to happen now? Before the loop, you first populate the indexd page cache. Then you construct the Pagination obj, which results in a bunch of calls to indexd.get() which results in a bunch of indexd page cache lookups which means the requests to indexd never go out. So then you won't know if a file was deleted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this will still happen because the merge_indexd() is still called for every object in the page, only, it reads from the cache if available. If it's not in the cache, it will attempt to get it from indexd, in which case, it will return 404 and mark it as was_deleted.
Will look more closely at this to confirm.

refresh = True
else:
keep.append(st)

# Only fetch more if we saw there were some items that were deleted
if refresh:
# Move the cursor ahead to the last valid file
next_after = keep[-1].created_at if len(keep) > 0 else after
# Number of results needed to fulfill the original limit
remain = limit - len(keep)

prefetch_indexd(next_after)
pager = Pagination(q, next_after, remain)

# Replace original page's items with new list of valid files
pager.items = keep
pager.after = next_after if next_after else after
Expand Down
29 changes: 27 additions & 2 deletions dataservice/extensions/flask_indexd.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ class Indexd(object):

def __init__(self, app=None):
self.app = app
# Used to store documents prefetched for a page
self.page_cache = {}
if app is not None:
self.init_app(app)

def init_app(self, app):
app.config.setdefault('INDEXD_URL', None)
self.url = app.config['INDEXD_URL']
if self.url:
self.bulk_url = '/'.join(self.url.split('/')[:-1] + ['bulk/'])
else:
self.bulk_url = None
if hasattr(app, 'teardown_appcontext'):
app.teardown_appcontext(self.teardown)
else:
Expand All @@ -33,14 +39,29 @@ def new_session(self):
s = requests.Session()
s.auth = (current_app.config['INDEXD_USER'],
current_app.config['INDEXD_PASS'])
s.headers.update({'Content-Type': 'application/json'})
s.headers.update({'Content-Type': 'application/json',
'User-Agent': 'Kids First Dataservice'})
return s

def teardown(self, exception):
ctx = stack.top
if hasattr(ctx, 'indexd_session'):
ctx.indexd_session.close()

def prefetch(self, dids):
"""
Fetch a list of documents by did into the page cache.
"""
# If running in dev mode, don't call indexd
if self.url is None or self.bulk_url is None:
return
resp = self.session.post(self.bulk_url + 'documents', json=dids).json()
for doc in resp:
self.page_cache[doc['did']] = doc

def clear_cache(self):
self.page_cache = {}

def get(self, record):
"""
Retrieves a record from indexd
Expand All @@ -54,6 +75,9 @@ def get(self, record):
if self.url is None:
return record

if record.latest_did in self.page_cache:
return self.page_cache[record.latest_did]

url = self.url + record.latest_did
resp = self.session.get(url)
self.check_response(resp)
Expand Down Expand Up @@ -170,7 +194,8 @@ def update(self, record):
if record.acl != old['acl']:
self._update_all_acls(record)

url = '{}{}?rev={}'.format(self.url, record.latest_did, record.rev)
url = '{}{}?rev={}'.format(self.url, record.latest_did,
record.rev)
if 'size' in req_body or 'hashes' in req_body:
# Create a new version in indxed
req_body['form'] = 'object'
Expand Down
5 changes: 4 additions & 1 deletion tests/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__(self, *args, status_code=200, **kwargs):

def post(self, url, *args, **kwargs):
"""
Mocks a response from POST /index/
Mocks a response from POST /index/ and POST /bulk/documents
"""

resp = {
Expand All @@ -83,6 +83,9 @@ def post(self, url, *args, **kwargs):
# Otherwise, assume creation of a new doc and track the baseid
self.baseid_by_did[resp['did']] = resp['baseid']

if 'bulk/documents' in url:
resp = [resp]

mock_resp = MockResp(resp=resp, status_code=self.status_code)
return mock_resp

Expand Down