From 7811bda3022ed38eb6e4260ccd5df12d2e936617 Mon Sep 17 00:00:00 2001 From: Sietse Snel Date: Tue, 24 Oct 2023 09:48:47 +0200 Subject: [PATCH] YDA-5415: improve revision cleanup stability This modifies the revision cleanup process, so that the scan stage (collecting data about revisions) is separate from the collect stage (collecting a list of all revisions). Scanning revision data is done in batches now, rather than a one-time scan for the whole environment. This reduces the amount of memory needed for the scan stage, which contributes to making the revision cleanup job usable on large environments. --- revisions.py | 266 ++++++++++++++++++++++++++++--------- tools/revision-clean-up.py | 41 +++--- util/constants.py | 7 +- 3 files changed, 231 insertions(+), 83 deletions(-) diff --git a/revisions.py b/revisions.py index f5bc7f636..b7756d9d0 100644 --- a/revisions.py +++ b/revisions.py @@ -24,7 +24,8 @@ 'api_revisions_list', 'rule_revision_batch', 'rule_revisions_cleanup_collect', - 'rule_revisions_cleanup_process'] + 'rule_revisions_cleanup_process', + 'rule_revisions_cleanup_scan'] @api.make() @@ -473,7 +474,7 @@ def revision_create(ctx, resource, data_id, max_size, verbose): # All revisions are stored in a group with the same name as the research group in a system collection # When this collection is missing, no revisions will be created. When the group manager is used to # create new research groups, the revision collection will be created as well. - revision_store = "/" + user_zone + constants.UUREVISIONCOLLECTION + "/" + group_name + revision_store = os.path.join(get_revision_store_path(ctx), group_name) if collection.exists(ctx, revision_store): # Allow rodsadmin to create subcollections. @@ -534,51 +535,60 @@ def revision_create(ctx, resource, data_id, max_size, verbose): return revision_created -def revisions_info(ctx): +def revision_cleanup_scan_revision_objects(ctx, revision_list, verbose_mode): """Obtain information about all revisions. :param ctx: Combined type of a callback and rei struct + :param revision_list: List of revision data object IDs + :param verbose_mode: Whether to print additional information for troubleshooting (boolean) + :returns: Nested list, where the outer list represents revisioned data objects, and the inner list represents revisions for that data object. Each revision is represented by a list of length three (revision ID, modification epoch time, revision path) """ - zone = user.zone(ctx) - revision_store = '/' + zone + constants.UUREVISIONCOLLECTION + QUERY_BATCH_SIZE = 20 + ORIGINAL_PATH_ATTRIBUTE = constants.UUORGMETADATAPREFIX + 'original_path' + ORIGINAL_MODIFY_TIME_ATTRIBUTE = constants.UUORGMETADATAPREFIX + 'original_modify_time' - if user.user_type(ctx) == 'rodsadmin': - msi.set_acl(ctx, "recursive", "admin:own", user.full_name(ctx), revision_store) - msi.set_acl(ctx, "recursive", "inherit", user.full_name(ctx), revision_store) - - # first, get original_path and ids for every revision - iter = genquery.row_iterator( - "order(META_DATA_ATTR_VALUE), order_desc(DATA_ID)", - "META_DATA_ATTR_NAME = '" + constants.UUORGMETADATAPREFIX + 'original_path' + "'" - " AND COLL_NAME like '" + revision_store + "%'", - genquery.AS_LIST, ctx - ) + revision_store = get_revision_store_path(ctx) + ids = list(revision_list) path_dict = {} - for row in iter: - original_path = row[0] - revision_id = row[1] - if original_path in path_dict: - path_dict[original_path].append(revision_id) - else: - path_dict[original_path] = [revision_id] - - # second, get id, path and modify time for every revision - iter = genquery.row_iterator( - "DATA_ID, COLL_NAME, DATA_NAME, META_DATA_ATTR_VALUE", - "META_DATA_ATTR_NAME = '" + constants.UUORGMETADATAPREFIX + 'original_modify_time' + "'" - " AND COLL_NAME like '" + revision_store + "%'", - genquery.AS_LIST, ctx - ) rev_dict = {} - for row in iter: - revision_id = row[0] - path = row[1] + "/" + row[2] - modify_time = row[3] - rev_dict[revision_id] = [int(revision_id), int(modify_time), path] + + while len(ids) > 0: + batch_ids = ids[:QUERY_BATCH_SIZE] + batch_id_string = "({})".format(",".join(map(lambda e: "'{}'".format(e), batch_ids))) + ids = ids[QUERY_BATCH_SIZE:] + + # first, get original_path and ids for every revision + original_paths = genquery.row_iterator( + "order(META_DATA_ATTR_VALUE), order_desc(DATA_ID)", + "META_DATA_ATTR_NAME = '" + ORIGINAL_PATH_ATTRIBUTE + "'" + " AND COLL_NAME like '" + revision_store + "/%' AND DATA_ID IN " + batch_id_string, + genquery.AS_LIST, ctx) + + for row in original_paths: + original_path = row[0] + revision_id = row[1] + if original_path in path_dict: + path_dict[original_path].append(revision_id) + else: + path_dict[original_path] = [revision_id] + + # second, get id, path and modify time for every revision + modify_times = genquery.row_iterator( + "DATA_ID, COLL_NAME, DATA_NAME, META_DATA_ATTR_VALUE", + "META_DATA_ATTR_NAME = '" + ORIGINAL_MODIFY_TIME_ATTRIBUTE + "'" + " AND COLL_NAME like '" + revision_store + "/%' AND DATA_ID IN " + batch_id_string, + genquery.AS_LIST, ctx + ) + + for row in modify_times: + revision_id = row[0] + path = row[1] + "/" + row[2] + modify_time = row[3] + rev_dict[revision_id] = [int(revision_id), int(modify_time), path] # collate revision info revisions_info = [] @@ -591,13 +601,140 @@ def revisions_info(ctx): return revisions_info +def get_all_revision_data_ids(ctx): + """"Returns all data IDs of revision data objects + + :param ctx: Combined type of a callback and rei struct + + :yields: iterator of 2-tupels containing collection and data object IDs + """ + revision_store = get_revision_store_path(ctx) + + revision_objects = genquery.row_iterator( + "order_desc(COLL_ID), DATA_ID", + "META_DATA_ATTR_NAME = '" + constants.UUORGMETADATAPREFIX + 'original_path' + "'" + " AND COLL_NAME like '" + revision_store + "/%'", + genquery.AS_LIST, ctx) + + for row in revision_objects: + yield (row[0], row[1]) + + +def _update_revision_store_acls(ctx): + """Sets the revision store ACL to grant present rodsadmin user access + + :param ctx: Combined type of a callback and rei struct + + :raises Exception: if current user is not a rodsadmin + """ + revision_store = get_revision_store_path(ctx) + if user.user_type(ctx) == 'rodsadmin': + msi.set_acl(ctx, "recursive", "admin:own", user.full_name(ctx), revision_store) + msi.set_acl(ctx, "recursive", "inherit", user.full_name(ctx), revision_store) + else: + raise Exception("Cannot update revision store ACLs, because present user is not rodsadmin.") + + +def get_revision_store_path(ctx, trailing_slash=False): + """Produces the logical path of the revision store + + :param ctx: Combined type of a callback and rei struct + :param trailing_slash: Add a trailing slash (default: False) + + :returns: Logical path of revision store + """ + if trailing_slash: + return os.path.join("/" + user.zone(ctx), constants.UUREVISIONCOLLECTION.lstrip(os.path.sep), '') + else: + return os.path.join("/" + user.zone(ctx), constants.UUREVISIONCOLLECTION.lstrip(os.path.sep)) + + +@rule.make(inputs=[0], outputs=[1]) +def rule_revisions_cleanup_collect(ctx, target_batch_size): + """Collect a list of revision data object IDs and puts them in the spool system for processing + by the revision cleanup scan job. + + :param ctx: Combined type of a callback and rei struct + :param target_batch_size: Number of revisions to aim for in one batch. The real batch size can be + more, because all revision objects in one collection are always in the + same batch. + + :returns: Status + + :raises Exception: If rule is executed by non-rodsadmin user + """ + if user.user_type(ctx) != 'rodsadmin': + raise Exception("The revision cleanup jobs can only be started by a rodsadmin user.") + + if has_spool_data(constants.PROC_REVISION_CLEANUP_SCAN): + return "Existing revision cleanup scan spool data present. Not adding new revision cleanup data." + + log.write(ctx, "Starting revision cleanup collect process.") + + target_batch_size = int(target_batch_size) + ingest_state = { + "batch": [], + "current_coll": None, + "objects_for_current_coll": [] + } + number_revisions = 0 + + def ingest_new_data_id(ctx, coll_id, data_id, ingest_state, target_batch_size): + """Read data object. Store it in ingest state as long as its collection ID is the same as + the previous one, so that all data objects in the same collection are + part of the same batch. + + If the new data object has a different collection ID from the previous + ones, flush previously collected data objects to the batch buffer, and if + needed from there to the spool queue. + + :param ctx: combined type of a callback and rei struct + :param coll_id: collection ID + :param data_id: data object ID + :param ingest_state: ingest state dictionary + :param target_batch_size: target batch size + """ + if coll_id == ingest_state["current_coll"]: + ingest_state["objects_for_current_coll"].append(data_id) + else: + if (len(ingest_state["batch"]) > 0 + and len(ingest_state["batch"]) + len(ingest_state["objects_for_current_coll"]) >= target_batch_size): + put_spool_data(constants.PROC_REVISION_CLEANUP_SCAN, [ingest_state["batch"]]) + ingest_state["batch"] = [] + + ingest_state["batch"].extend(ingest_state["objects_for_current_coll"]) + ingest_state["objects_for_current_coll"] = [data_id] + ingest_state["current_coll"] = coll_id + + if len(ingest_state["batch"]) >= target_batch_size: + log.write(ctx, "Flush batch 2 " + str(ingest_state["batch"])) + put_spool_data(constants.PROC_REVISION_CLEANUP_SCAN, [ingest_state["batch"]]) + ingest_state["batch"] = [] + + for (coll_id, data_id) in get_all_revision_data_ids(ctx): + number_revisions += 1 + ingest_new_data_id(ctx, coll_id, data_id, ingest_state, target_batch_size) + + if (len(ingest_state["batch"]) > 0 + and len(ingest_state["batch"]) + len(ingest_state["objects_for_current_coll"]) >= target_batch_size): + put_spool_data(constants.PROC_REVISION_CLEANUP_SCAN, [ingest_state["batch"]]) + ingest_state["batch"] = [] + + ingest_state["batch"].extend(ingest_state["objects_for_current_coll"]) + if len(ingest_state["batch"]) > 0: + put_spool_data(constants.PROC_REVISION_CLEANUP_SCAN, [ingest_state["batch"]]) + + log.write(ctx, "Collected {} revisions for revision cleanup scanning.".format(number_revisions)) + return "Revision data has been spooled for scanning" + + @rule.make(inputs=[0], outputs=[1]) -def rule_revisions_cleanup_collect(ctx, batch_size): +def rule_revisions_cleanup_scan(ctx, verbose_flag): """Collect revision data and put it in the spool system for processing by the revision cleanup - processing jobs + scan jobs :param ctx: Combined type of a callback and rei struct - :param batch_size: Number of revisions to include in one spool object + :param verbose_flag: "1" if rule needs to print additional information for troubleshooting, else "0" :returns: Status @@ -606,25 +743,27 @@ def rule_revisions_cleanup_collect(ctx, batch_size): if user.user_type(ctx) != 'rodsadmin': raise Exception("The revision cleanup jobs can only be started by a rodsadmin user.") - if has_spool_data(constants.PROC_REVISION_CLEANUP): - return "Existing spool data present. Not adding new revision cleanup data." + log.write(ctx, 'Revision cleanup scan job starting.') + verbose = verbose_flag == "1" + revisions_list = get_spool_data(constants.PROC_REVISION_CLEANUP_SCAN) + + if revisions_list is None: + log.write(ctx, 'Revision cleanup scan job stopping - no more spooled revision scan data.') + return "No more revision cleanup data" - data = revisions_info(ctx) - batch_size = int(batch_size) - number_revisions = len(data) + if verbose: + log.write(ctx, "Scanning revisions: " + str(revisions_list)) - while len(data) > 0: - current_batch = data[:batch_size] - put_spool_data(constants.PROC_REVISION_CLEANUP, [current_batch]) - data = data[batch_size:] + revision_data = revision_cleanup_scan_revision_objects(ctx, revisions_list, verbose) + put_spool_data(constants.PROC_REVISION_CLEANUP, [revision_data]) - log.write(ctx, "Collected {} revisions for revision cleanup.".format(number_revisions)) - return "Revision data has been spooled for cleanup" + log.write(ctx, 'Revision cleanup scan job finished.') + return 'Revision store cleanup scan job completed' @rule.make(inputs=[0, 1, 2], outputs=[3]) def rule_revisions_cleanup_process(ctx, bucketcase, endOfCalendarDay, verbose_flag): - """Apply the selected revision strategy to a batch of spooled revision data + """Applies the selected revision strategy to a batch of spooled revision data :param ctx: Combined type of a callback and rei struct :param bucketcase: Select a bucketlist based on a string ('A', 'B', 'Simple'). If the value is an unknown case, the default @@ -642,12 +781,13 @@ def rule_revisions_cleanup_process(ctx, bucketcase, endOfCalendarDay, verbose_fl if user.user_type(ctx) != 'rodsadmin': raise Exception("The revision cleanup jobs can only be started by a rodsadmin user.") - log.write(ctx, '[revisions] Revision cleanup job starting.') + log.write(ctx, 'Revision cleanup job processing starting.') verbose = verbose_flag == "1" + _update_revision_store_acls(ctx) revisions_list = get_spool_data(constants.PROC_REVISION_CLEANUP) if revisions_list is None: - log.write(ctx, '[revisions] Revision cleanup job stopping - no more spooled revision data.') + log.write(ctx, 'Revision cleanup processing job stopping - no more spooled revision data.') return "No more revision cleanup data" end_of_calendar_day = int(endOfCalendarDay) @@ -663,7 +803,7 @@ def rule_revisions_cleanup_process(ctx, bucketcase, endOfCalendarDay, verbose_fl for revisions in revisions_list: if verbose: - log.write(ctx, '[revisions] Processing revisions {} ...'.format(str(revisions))) + log.write(ctx, 'Processing revisions {} ...'.format(str(revisions))) # Process the original path conform the bucket settings candidates = get_deletion_candidates(ctx, buckets, revisions, end_of_calendar_day, verbose) num_candidates += len(candidates) @@ -673,22 +813,22 @@ def rule_revisions_cleanup_process(ctx, bucketcase, endOfCalendarDay, verbose_fl rev_paths = {r[0]: r[2] for r in revisions} if verbose: - log.write(ctx, '[revisions] Candidates to be removed: {} ...'.format(str(candidates))) + log.write(ctx, 'Candidates to be removed: {} ...'.format(str(candidates))) # Delete the revisions that were found being obsolete for revision_id in candidates: rev_path = rev_paths[revision_id] if verbose: - log.write(ctx, '[revisions] Removing candidate: {} ...'.format(str(revision_id))) + log.write(ctx, 'Removing candidate: {} ...'.format(str(revision_id))) if not revision_remove(ctx, revision_id, rev_path): num_errors += 1 - log.write(ctx, '[revisions] Revision cleanup job completed - {} candidates for {} versioned data objects ({} successful / {} errors).'.format( + log.write(ctx, 'Revision cleanup processing job completed - {} candidates for {} versioned data objects ({} successful / {} errors).'.format( str(num_candidates), str(len(revisions_list)), str(num_candidates - num_errors), str(num_errors))) - return 'Revision store cleanup job completed' + return 'Revision store cleanup processing job completed' def revision_remove(ctx, revision_id, revision_path): @@ -702,7 +842,7 @@ def revision_remove(ctx, revision_id, revision_path): :returns: Boolean indicating if revision was removed """ - revision_prefix = os.path.join("/" + user.zone(ctx), constants.UUREVISIONCOLLECTION.lstrip(os.path.sep), '') + revision_prefix = get_revision_store_path(ctx, trailing_slash=True) if not revision_path.startswith(revision_prefix): log.write(ctx, "ERROR - sanity check fail when removing revision <{}>: <{}>".format( revision_id, @@ -829,16 +969,16 @@ def get_deletion_candidates(ctx, buckets, revisions, initial_upper_time_bound, v # Add revision to list of removal index = bucket_start_index + count if verbose: - log.write(ctx, '[revisions] Scheduling revision <{}> in bucket <{}> for removal.'.format(str(index), - str(bucket))) + log.write(ctx, 'Scheduling revision <{}> in bucket <{}> for removal.'.format(str(index), + str(bucket))) deletion_candidates.append(rev_list[index]) count += 1 else: while count < nr_to_be_removed: index = len(rev_list) + (bucket_start_index) - count if verbose: - log.write(ctx, '[revisions] Scheduling revision <{}> in bucket <{}> for removal.'.format(str(index), - str(bucket))) + log.write(ctx, 'Scheduling revision <{}> in bucket <{}> for removal.'.format(str(index), + str(bucket))) deletion_candidates.append(rev_list[index]) count += 1 diff --git a/tools/revision-clean-up.py b/tools/revision-clean-up.py index 4cffebf31..2df9bc823 100755 --- a/tools/revision-clean-up.py +++ b/tools/revision-clean-up.py @@ -8,8 +8,9 @@ import subprocess import sys -NAME = os.path.basename(sys.argv[0]) -LOCKFILE_PATH = '/tmp/irods-{}.lock'.format(NAME) +NAME = os.path.basename(sys.argv[0]) +LOCKFILE_PATH = '/tmp/irods-{}.lock'.format(NAME) +NO_MORE_WORK_STATUS = "No more revision cleanup data" def get_args(): @@ -43,22 +44,26 @@ def lock_or_die(): def process_revision_cleanup_data(bucketcase, endofcalendarday, verbose_flag): - return subprocess.check_output([ - 'irule', - '-r', - 'irods_rule_engine_plugin-irods_rule_language-instance', - "*out=''; rule_revisions_cleanup_process('{}', '{}', '{}', *out); writeString('stdout', *out);".format(bucketcase, endofcalendarday, verbose_flag), - 'null', - 'ruleExecOut' - ]) + rule = "rule_revisions_cleanup_process('{}', '{}', '{}', *out);".format(bucketcase, endofcalendarday, verbose_flag) + return subprocess.check_output(_rule_command_for_rule(rule)) + + +def scan_revision_cleanup_data(verbose_flag): + rule = "rule_revisions_cleanup_scan('{}', *out);".format(verbose_flag) + return subprocess.check_output(_rule_command_for_rule(rule)) def collect_revision_cleanup_data(batch_size): - return subprocess.check_output([ + rule = "rule_revisions_cleanup_collect('{}', *out);".format(str(batch_size)) + return subprocess.check_output(_rule_command_for_rule(rule)) + + +def _rule_command_for_rule(rule_text): + return ([ 'irule', '-r', 'irods_rule_engine_plugin-irods_rule_language-instance', - "*out=''; rule_revisions_cleanup_collect('{}', *out); writeString('stdout', *out);".format(str(batch_size)), + "*out=''; " + rule_text + " writeString('stdout', *out);", 'null', 'ruleExecOut' ]) @@ -73,11 +78,13 @@ def main(): collect_revision_cleanup_data(args.batch_size) - status = "INITIAL" - while status != "No more revision cleanup data": - status = process_revision_cleanup_data(args.bucketcase, - args.endofcalendarday, - "1" if args.verbose else "0") + (scan_status, process_status) = ("INITIAL", "INITIAL") + while scan_status != NO_MORE_WORK_STATUS and process_status != NO_MORE_WORK_STATUS: + scan_status = scan_revision_cleanup_data("1" if args.verbose else "0") + process_status = process_revision_cleanup_data( + args.bucketcase, + args.endofcalendarday, + "1" if args.verbose else "0") if args.verbose: print('END cleaning up revision store at ' + str(datetime.now())) diff --git a/util/constants.py b/util/constants.py index 0cc7f942d..b679d090b 100644 --- a/util/constants.py +++ b/util/constants.py @@ -20,10 +20,11 @@ UUREVISIONCOLLECTION = UUSYSTEMCOLLECTION + '/revisions' """iRODS path where all revisions will be stored.""" -PROC_REVISION_CLEANUP = "revision-cleanup" -"""Process name of the revision cleanup job. Used by the spooling system""" +PROC_REVISION_CLEANUP = "revision-cleanup" +PROC_REVISION_CLEANUP_SCAN = "revision-cleanup-scan" +"""Process names of the revision cleanup jobs. Used by the spooling system""" -SPOOL_PROCESSES = {PROC_REVISION_CLEANUP} +SPOOL_PROCESSES = {PROC_REVISION_CLEANUP, PROC_REVISION_CLEANUP_SCAN} """Set of process names recognized by the spooling system""" SPOOL_MAIN_DIRECTORY = "/var/lib/irods/yoda-spool"