From 5332e1e672f716f79355eec09082251a91ef8a71 Mon Sep 17 00:00:00 2001 From: Sietse Snel Date: Mon, 25 Sep 2023 16:53:04 +0200 Subject: [PATCH] YDA-5416: improve robustness rev cleanup process The solution involves a separate spooling system for batch jobs that temporarily stores data for batch processing on disk. The revision cleanup script now first triggers a rule to collect all revision data, and stores this data in batches in the spooling system. The cleanup script then runs rules that clean up revisions in these batches. The intent of this design is to minimize the impact of resource leaks in iRODS (by processing revisions in batches) and to reduce the potential for cascading failures (because batch processing now uses a separate system for keeping track of batches, rather than a chain of rule invocations). --- requirements.txt | 1 + revisions.py | 88 +++++++++++++++++++++++------- tools/revision-clean-up.py | 40 +++++++------- util/constants.py | 9 ++++ util/spool.py | 108 +++++++++++++++++++++++++++++++++++++ 5 files changed, 205 insertions(+), 41 deletions(-) create mode 100644 util/spool.py diff --git a/requirements.txt b/requirements.txt index 884307da1..7cfc35673 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,3 +19,4 @@ certifi==2021.10.8 pysqlcipher3==1.0.4 execnet==1.9.0 deepdiff==3.3.0 +persist-queue==0.8.1 diff --git a/revisions.py b/revisions.py index 9d99deffe..54d4b3b23 100644 --- a/revisions.py +++ b/revisions.py @@ -6,7 +6,6 @@ import datetime import hashlib -import json import os import random import time @@ -17,13 +16,14 @@ import folder import groups from util import * +from util.spool import get_spool_data, has_spool_data, put_spool_data __all__ = ['api_revisions_restore', 'api_revisions_search_on_filename', 'api_revisions_list', 'rule_revision_batch', - 'rule_revisions_info', - 'rule_revisions_clean_up'] + 'rule_revisions_cleanup_collect', + 'rule_revisions_cleanup_process'] @api.make() @@ -523,12 +523,14 @@ def revision_create(ctx, resource, data_id, max_size, verbose): return revision_created -@rule.make(inputs=[], outputs=[0]) -def rule_revisions_info(ctx): +def revisions_info(ctx): """Obtain information about all revisions. :param ctx: Combined type of a callback and rei struct - :returns: Json string with info about revisions + :returns: Nested list, where the outer list represents revisioned data objects, + and the inner list represents revisions for that data object. + Each revision is represented by a list of length three (revision ID, + modification epoch time, revision path) """ zone = user.zone(ctx) revision_store = '/' + zone + constants.UUREVISIONCOLLECTION @@ -575,15 +577,45 @@ def rule_revisions_info(ctx): if revision_id in rev_dict: revision_list.append(rev_dict[revision_id]) revisions_info.append(revision_list) - return json.dumps(revisions_info) + return revisions_info -@rule.make(inputs=[0, 1, 2, 3], outputs=[4]) -def rule_revisions_clean_up(ctx, revisions_info, bucketcase, endOfCalendarDay, verbose_flag): - """Step through part of revision store and apply the chosen bucket strategy. +@rule.make(inputs=[0], outputs=[1]) +def rule_revisions_cleanup_collect(ctx, batch_size): + """Collect revision data and put it in the spool system for processing by the revision cleanup + processing jobs + + :param ctx: Combined type of a callback and rei struct + :param batch_size: Number of revisions to include in one spool object + + :returns: Status + + :raises Exception: If rule is executed by non-rodsadmin user + """ + if user.user_type(ctx) != 'rodsadmin': + raise Exception("The revision cleanup jobs can only be started by a rodsadmin user.") + + if has_spool_data(constants.PROC_REVISION_CLEANUP): + return "Existing spool data present. Not adding new revision cleanup data." + + data = revisions_info(ctx) + batch_size = int(batch_size) + number_revisions = len(data) + + while len(data) > 0: + current_batch = data[:batch_size] + put_spool_data(constants.PROC_REVISION_CLEANUP, [current_batch]) + data = data[batch_size:] + + log.write(ctx, "Collected {} revisions for revision cleanup.".format(number_revisions)) + return "Revision data has been spooled for cleanup" + + +@rule.make(inputs=[0, 1, 2], outputs=[3]) +def rule_revisions_cleanup_process(ctx, bucketcase, endOfCalendarDay, verbose_flag): + """Apply the chosen the selected revision strategy to spooled revision data :param ctx: Combined type of a callback and rei struct - :param revisions_info: Json-encoded revision info. :param bucketcase: Select a bucketlist based on a string ('A', 'B', 'Simple'). If the value is an unknown case, the default value 'B' will be used. See https://github.com/UtrechtUniversity/yoda/blob/development/docs/design/processes/revisi for an explanation. @@ -591,10 +623,20 @@ def rule_revisions_clean_up(ctx, revisions_info, bucketcase, endOfCalendarDay, v :param verbose_flag: "1" if rule needs to print additional information for troubleshooting, else "0" :returns: String with status of cleanup + + :raises Exception: If rule is executed by non-rodsadmin user """ + + if user.user_type(ctx) != 'rodsadmin': + raise Exception("The revision cleanup jobs can only be started by a rodsadmin user.") + log.write(ctx, '[revisions] Revision cleanup job starting.') verbose = verbose_flag == "1" - revisions_list = json.loads(revisions_info) + revisions_list = get_spool_data(constants.PROC_REVISION_CLEANUP) + + if revisions_list is None: + log.write(ctx, '[revisions] Revision cleanup job stopping - no more spooled revision data.') + return "No more revision cleanup data" end_of_calendar_day = int(endOfCalendarDay) if end_of_calendar_day == 0: @@ -629,11 +671,12 @@ def rule_revisions_clean_up(ctx, revisions_info, bucketcase, endOfCalendarDay, v if not revision_remove(ctx, revision_id, rev_path): num_errors += 1 - log.write(ctx, '[revisions] Revision cleanup job completed - {} candidates ({} successful / {} errors).'.format( + log.write(ctx, '[revisions] Revision cleanup job completed - {} candidates for {} revisions ({} successful / {} errors).'.format( str(num_candidates), + str(len(revisions_list)), str(num_candidates - num_errors), str(num_errors))) - return 'Revision store cleanup completed' + return 'Revision store cleanup job completed' def revision_remove(ctx, revision_id, revision_path): @@ -647,7 +690,8 @@ def revision_remove(ctx, revision_id, revision_path): :returns: Boolean indicating if revision was removed """ - if not revision_path.startswith(constants.UUREVISIONCOLLECTION + "/"): + revision_prefix = os.path.join("/" + user.zone(ctx), constants.UUREVISIONCOLLECTION.lstrip(os.path.sep), '') + if not revision_path.startswith(revision_prefix): log.write(ctx, "ERROR - sanity check fail when removing revision <{}>: <{}>".format( revision_id, revision_path)) @@ -764,8 +808,6 @@ def get_deletion_candidates(ctx, buckets, revisions, initial_upper_time_bound, v max_bucket_size = bucket[1] bucket_start_index = bucket[2] - if verbose: - log.write(ctx, '[revisions] Comparing revisions in bucket {} to max size ({} vs {})'.format(str(bucket), str(len(rev_list)), str(max_bucket_size))) if len(rev_list) > max_bucket_size: nr_to_be_removed = len(rev_list) - max_bucket_size @@ -773,11 +815,19 @@ def get_deletion_candidates(ctx, buckets, revisions, initial_upper_time_bound, v if bucket_start_index >= 0: while count < nr_to_be_removed: # Add revision to list of removal - deletion_candidates.append(rev_list[bucket_start_index + count]) + index = bucket_start_index + count + if verbose: + log.write(ctx, '[revisions] Scheduling revision <{}> in bucket <{}> for removal.'.format(str(index), + str(bucket))) + deletion_candidates.append(rev_list[index]) count += 1 else: while count < nr_to_be_removed: - deletion_candidates.append(rev_list[len(rev_list) + (bucket_start_index) - count]) + index = len(rev_list) + (bucket_start_index) - count + if verbose: + log.write(ctx, '[revisions] Scheduling revision <{}> in bucket <{}> for removal.'.format(str(index), + str(bucket))) + deletion_candidates.append(rev_list[index]) count += 1 bucket_counter += 1 # To keep conciding with strategy list diff --git a/tools/revision-clean-up.py b/tools/revision-clean-up.py index 78c96982c..4cffebf31 100755 --- a/tools/revision-clean-up.py +++ b/tools/revision-clean-up.py @@ -3,7 +3,7 @@ import argparse import atexit -import json +from datetime import datetime import os import subprocess import sys @@ -16,7 +16,8 @@ def get_args(): parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("endofcalendarday", help="End of calendar day (epoch time)") parser.add_argument("bucketcase", choices=["A", "B", "Simple"], help="Bucket case configuration name") - parser.add_argument("--batch-size", type=int, default=1, help="Number of revisions to process at a time (default: 1).", required=False) + parser.add_argument("--batch-size", type=int, default=1000, + help="Number of revisions to process at a time (default: 1000).", required=False) parser.add_argument("-v", "--verbose", action="store_true", default=False, help="Make the revision cleanup rules print additional information for troubleshooting purposes.") return parser.parse_args() @@ -41,50 +42,45 @@ def lock_or_die(): atexit.register(lambda: os.unlink(LOCKFILE_PATH)) -def clean_up(revisions, bucketcase, endofcalendarday, verbose_flag): - chunk = json.dumps(revisions) - chunk = "\\\\".join(chunk.split("\\")) - chunk = "\\'".join(chunk.split("'")) +def process_revision_cleanup_data(bucketcase, endofcalendarday, verbose_flag): return subprocess.check_output([ 'irule', '-r', 'irods_rule_engine_plugin-irods_rule_language-instance', - "*out=''; rule_revisions_clean_up('{}', '{}', '{}', '{}', *out); writeString('stdout', *out);".format(chunk, bucketcase, endofcalendarday, verbose_flag), + "*out=''; rule_revisions_cleanup_process('{}', '{}', '{}', *out); writeString('stdout', *out);".format(bucketcase, endofcalendarday, verbose_flag), 'null', 'ruleExecOut' ]) -def get_revisions_info(): - return json.loads(subprocess.check_output([ +def collect_revision_cleanup_data(batch_size): + return subprocess.check_output([ 'irule', '-r', 'irods_rule_engine_plugin-irods_rule_language-instance', - '*out=""; rule_revisions_info(*out); writeString("stdout", *out);', + "*out=''; rule_revisions_cleanup_collect('{}', *out); writeString('stdout', *out);".format(str(batch_size)), 'null', 'ruleExecOut' - ])) + ]) def main(): args = get_args() lock_or_die() - revisions_info = get_revisions_info() if args.verbose: - print('START cleaning up revision store') + print('START cleaning up revision store at ' + str(datetime.now())) + + collect_revision_cleanup_data(args.batch_size) - while len(revisions_info) > args.batch_size: - if args.verbose: - print("Clean up for " + str(revisions_info[:args.batch_size])) - clean_up(revisions_info[:args.batch_size], - args.bucketcase, - args.endofcalendarday, - "1" if args.verbose else "0") - revisions_info = revisions_info[args.batch_size:] + status = "INITIAL" + while status != "No more revision cleanup data": + status = process_revision_cleanup_data(args.bucketcase, + args.endofcalendarday, + "1" if args.verbose else "0") if args.verbose: - print('END cleaning up revision store') + print('END cleaning up revision store at ' + str(datetime.now())) if __name__ == "__main__": diff --git a/util/constants.py b/util/constants.py index ba5a2efde..6cb644792 100644 --- a/util/constants.py +++ b/util/constants.py @@ -20,6 +20,15 @@ UUREVISIONCOLLECTION = UUSYSTEMCOLLECTION + '/revisions' """iRODS path where all revisions will be stored.""" +PROC_REVISION_CLEANUP = "revision-cleanup" +"""Process name of the revision cleanup job. Used by the spooling system""" + +SPOOL_PROCESSES = {PROC_REVISION_CLEANUP} +"""Set of process names recognized by the spooling system""" + +SPOOL_MAIN_DIRECTORY = "/var/lib/irods/yoda-spool" +"""Directory that is used for storing Yoda batch process spool data on the provider""" + UUBLOCKLIST = ["._*", ".DS_Store"] """ List of file extensions not to be copied to revision""" diff --git a/util/spool.py b/util/spool.py new file mode 100644 index 000000000..d23ede0ab --- /dev/null +++ b/util/spool.py @@ -0,0 +1,108 @@ +"""This file contains an interface to the spooling subsystem. The spooling system can be used to store + temporary data for batch processing. The intended use case is that one job collects data to be processed + and stores it in the spooling system, while another job retrieves the data and processes it. + + The current implementation does not have confirmations. It assumes that after spool data has been retrieved + from the spool system it is either processed, or any errors during processing will be taken + care of outside the spooling system (e.g. by having the job that collects data for the spooling system + re-submit spool data that has not been processed for a retry). + + It is assumed that functions that use the spool subsystem take care of authorization and logging. +""" + +import os + +import persistqueue +import persistqueue.serializers.json + +import constants + + +def get_spool_data(process): + """Retrieves one data object for a given batch process for processing. + This function is non-blocking. + + :param process: Spool process name (see util.constants for defined names) + + :returns: Spool data object, or None if there is no spool data for this process + """ + _ensure_spool_process_initialized(process) + q = _get_spool_queue(process) + + try: + result = q.get(block=False) + q.task_done() + except persistqueue.exceptions.Empty: + result = None + + return result + + +def put_spool_data(process, data_list): + """Stores data structures in the spooling subsystem for batch processing. + + :param process: Spool process name (see util.constants for defined names) + :param data_list: List (or other iterable) of arbitrary serializable data objects to store + in the spooling system + """ + _ensure_spool_process_initialized(process) + q = _get_spool_queue(process) + for data in data_list: + q.put(data) + + +def has_spool_data(process): + """ Checks whether there any data objects in the spool system for a given process + + :param process: Spool process name (see util.constants for defined names) + + :returns: Boolean value that represents whether there is any spool data + present for this process + """ + return num_spool_data(process) > 0 + + +def num_spool_data(process): + """ Returns the number of items in the spool system for a given process + + :param process: Spool process name (see util.constants for defined names) + + :returns: The number of data items in the spool system for this process + """ + _ensure_spool_process_initialized(process) + return _get_spool_queue(process).qsize() + + +def _get_spool_directory(process): + if process in constants.SPOOL_PROCESSES: + return os.path.join(constants.SPOOL_MAIN_DIRECTORY, process, "spool") + else: + raise Exception("Spool process {} not found.".format(process)) + + +def _get_temp_directory(process): + if process in constants.SPOOL_PROCESSES: + return os.path.join(constants.SPOOL_MAIN_DIRECTORY, process, "tmp") + else: + raise Exception("Spool process {} not found.".format(process)) + + +def _get_spool_queue(process): + directory = _get_spool_directory(process) + # JSON serialization is used to make it easier to examine spooled objects manually + return persistqueue.Queue(directory, + tempdir=_get_temp_directory(process), + serializer=persistqueue.serializers.json, + chunksize=1) + + +def _ensure_spool_process_initialized(process): + if process not in constants.SPOOL_PROCESSES: + raise Exception("Spool process {} not found.".format(process)) + + for directory in [constants.SPOOL_MAIN_DIRECTORY, + os.path.join(constants.SPOOL_MAIN_DIRECTORY, process), + _get_spool_directory(process), + _get_temp_directory(process)]: + if not os.path.exists(directory): + os.mkdir(directory)