Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix cursor traversal when indexing #1180

Merged
merged 1 commit into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,20 @@ def elastic_init():
@manager.option('-c', '--collection', dest='collection', default=None)
@manager.option('-t', '--timestamp', dest='timestamp', default=None)
@manager.option('-d', '--direction', dest='direction', choices=['older', 'newer'], default='older')
def index_from_mongo(hours, collection, timestamp, direction):
print('Checking if elastic index exists, a new one will be created if not')
app.data.init_elastic(app)
print('Elastic index check has been completed')
@manager.option('-s', '--start_id', dest='start_id', default=None)
@manager.option('-i', '--skip_init', dest='skip_init', default=False)
def index_from_mongo(hours, collection, timestamp, direction, start_id, skip_init):
if not skip_init:
print('Checking if elastic index exists, a new one will be created if not')
app.data.init_elastic(app)
print('Elastic index check has been completed')
else:
print('Skipping index initialisation')

if timestamp:
index_elastic_from_mongo_from_timestamp(collection, timestamp, direction)
else:
index_elastic_from_mongo(hours=hours, collection=collection)
index_elastic_from_mongo(hours=hours, collection=collection, start_id=start_id)


@manager.command
Expand Down
30 changes: 20 additions & 10 deletions newsroom/mongo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
import pymongo
import superdesk
from bson import ObjectId
from datetime import timedelta, datetime

from flask import current_app as app
Expand All @@ -13,7 +14,7 @@
default_page_size = 500


def index_elastic_from_mongo(hours=None, collection=None):
def index_elastic_from_mongo(hours=None, collection=None, start_id=None):
print('Starting indexing from mongodb for "{}" collection hours={}'.format(collection, hours))

resources = app.data.get_elastic_resources()
Expand All @@ -25,7 +26,7 @@ def index_elastic_from_mongo(hours=None, collection=None):
for resource in resources:
print('Starting indexing collection {}'.format(resource))

for items in _get_mongo_items(resource, hours):
for items in _get_mongo_items(resource, hours, start_id):
print('{} Inserting {} items'.format(time.strftime('%X %x %Z'), len(items)))
s = time.time()

Expand Down Expand Up @@ -94,15 +95,16 @@ def index_elastic_from_mongo_from_timestamp(collection, timestamp_str, direction
print('Finished indexing collection {}'.format(collection))


def _get_mongo_items(mongo_collection_name, hours=None):
def _get_mongo_items(mongo_collection_name, hours=None, start_id=None):
"""Generate list of items from given mongo collection per default page size.

:param mongo_collection_name: Name of the collection to get the items
:return: list of items
"""
print('Indexing data from mongo/{} to elastic/{} for hours={}'.format(mongo_collection_name,
mongo_collection_name,
hours))
print('Indexing data from mongo/{} to elastic/{} for hours={} from id>{}'.format(mongo_collection_name,
mongo_collection_name,
hours,
start_id))

db = app.data.get_mongo_collection(mongo_collection_name)
args = {'limit': default_page_size, 'sort': [(config.ID_FIELD, pymongo.ASCENDING)]}
Expand All @@ -113,16 +115,24 @@ def _get_mongo_items(mongo_collection_name, hours=None):
now = utcnow()
args['filter'] = {}

last_id = None
if start_id:
last_id = ObjectId(start_id)
else:
last_id = None
while True:
if last_id:
args['filter'].update({config.ID_FIELD: {'$gt': last_id}})
cursor = db.find(**args)
if not cursor.count():
break
items = list(cursor)
if not len(items):
break

last_id = items[-1][config.ID_FIELD]
yield items

if start_id:
yield [item for item in items if isinstance(item.get('_id'), ObjectId)]
else:
yield items


def _get_mongo_items_from_timestamp(collection, timestamp, direction):
Expand Down
Loading