diff --git a/manage.py b/manage.py index b86b2051..54f76a73 100644 --- a/manage.py +++ b/manage.py @@ -58,15 +58,20 @@ def elastic_init(): @manager.option('-c', '--collection', dest='collection', default=None) @manager.option('-t', '--timestamp', dest='timestamp', default=None) @manager.option('-d', '--direction', dest='direction', choices=['older', 'newer'], default='older') -def index_from_mongo(hours, collection, timestamp, direction): - print('Checking if elastic index exists, a new one will be created if not') - app.data.init_elastic(app) - print('Elastic index check has been completed') +@manager.option('-s', '--start_id', dest='start_id', default=None) +@manager.option('-i', '--skip_init', dest='skip_init', default=False) +def index_from_mongo(hours, collection, timestamp, direction, start_id, skip_init): + if not skip_init: + print('Checking if elastic index exists, a new one will be created if not') + app.data.init_elastic(app) + print('Elastic index check has been completed') + else: + print('Skipping index initialisation') if timestamp: index_elastic_from_mongo_from_timestamp(collection, timestamp, direction) else: - index_elastic_from_mongo(hours=hours, collection=collection) + index_elastic_from_mongo(hours=hours, collection=collection, start_id=start_id) @manager.command diff --git a/newsroom/mongo_utils.py b/newsroom/mongo_utils.py index 892587d9..b06a0f82 100644 --- a/newsroom/mongo_utils.py +++ b/newsroom/mongo_utils.py @@ -2,6 +2,7 @@ import time import pymongo import superdesk +from bson import ObjectId from datetime import timedelta, datetime from flask import current_app as app @@ -13,7 +14,7 @@ default_page_size = 500 -def index_elastic_from_mongo(hours=None, collection=None): +def index_elastic_from_mongo(hours=None, collection=None, start_id=None): print('Starting indexing from mongodb for "{}" collection hours={}'.format(collection, hours)) resources = app.data.get_elastic_resources() @@ -25,7 +26,7 @@ def index_elastic_from_mongo(hours=None, collection=None): for resource in resources: print('Starting indexing collection {}'.format(resource)) - for items in _get_mongo_items(resource, hours): + for items in _get_mongo_items(resource, hours, start_id): print('{} Inserting {} items'.format(time.strftime('%X %x %Z'), len(items))) s = time.time() @@ -94,15 +95,16 @@ def index_elastic_from_mongo_from_timestamp(collection, timestamp_str, direction print('Finished indexing collection {}'.format(collection)) -def _get_mongo_items(mongo_collection_name, hours=None): +def _get_mongo_items(mongo_collection_name, hours=None, start_id=None): """Generate list of items from given mongo collection per default page size. :param mongo_collection_name: Name of the collection to get the items :return: list of items """ - print('Indexing data from mongo/{} to elastic/{} for hours={}'.format(mongo_collection_name, - mongo_collection_name, - hours)) + print('Indexing data from mongo/{} to elastic/{} for hours={} from id>{}'.format(mongo_collection_name, + mongo_collection_name, + hours, + start_id)) db = app.data.get_mongo_collection(mongo_collection_name) args = {'limit': default_page_size, 'sort': [(config.ID_FIELD, pymongo.ASCENDING)]} @@ -113,16 +115,24 @@ def _get_mongo_items(mongo_collection_name, hours=None): now = utcnow() args['filter'] = {} - last_id = None + if start_id: + last_id = ObjectId(start_id) + else: + last_id = None while True: if last_id: args['filter'].update({config.ID_FIELD: {'$gt': last_id}}) cursor = db.find(**args) - if not cursor.count(): - break items = list(cursor) + if not len(items): + break + last_id = items[-1][config.ID_FIELD] - yield items + + if start_id: + yield [item for item in items if isinstance(item.get('_id'), ObjectId)] + else: + yield items def _get_mongo_items_from_timestamp(collection, timestamp, direction):