diff --git a/requirements.txt b/requirements.txt index b21c5acc0..ba2a5b1d1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,4 @@ python2-secrets==1.0.5 python-dateutil==2.7.0 certifi==2021.10.8 pysqlcipher3==1.0.4 +persist-queue==0.8.1 diff --git a/revisions.py b/revisions.py index 9c86f9db0..190910568 100644 --- a/revisions.py +++ b/revisions.py @@ -5,7 +5,6 @@ __license__ = 'GPLv3, see LICENSE' import datetime -import json import os import time @@ -15,13 +14,14 @@ import folder import groups from util import * +from util.spool import get_spool_data, has_spool_data, put_spool_data __all__ = ['api_revisions_restore', 'api_revisions_search_on_filename', 'api_revisions_list', 'rule_revision_batch', - 'rule_revisions_info', - 'rule_revisions_clean_up'] + 'rule_revisions_cleanup_collect', + 'rule_revisions_cleanup_process'] @api.make() @@ -509,12 +509,14 @@ def revision_create(ctx, resource, path, max_size, verbose): return revision_id -@rule.make(inputs=[], outputs=[0]) -def rule_revisions_info(ctx): +def revisions_info(ctx): """Obtain information about all revisions. :param ctx: Combined type of a callback and rei struct - :returns: Json string with info about revisions + :returns: Nested list, where the outer list represents revisioned data objects, + and the inner list represents revisions for that data object. + Each revision is represented by a list of length three (revision ID, + modification epoch time, revision path) """ zone = user.zone(ctx) revision_store = '/' + zone + constants.UUREVISIONCOLLECTION @@ -560,15 +562,45 @@ def rule_revisions_info(ctx): for revision_id in revisions: revision_list.append(rev_dict[revision_id]) revisions_info.append(revision_list) - return json.dumps(revisions_info) + return revisions_info -@rule.make(inputs=[0, 1, 2, 3], outputs=[4]) -def rule_revisions_clean_up(ctx, revisions_info, bucketcase, endOfCalendarDay, verbose_flag): - """Step through part of revision store and apply the chosen bucket strategy. +@rule.make(inputs=[0], outputs=[1]) +def rule_revisions_cleanup_collect(ctx, batch_size): + """Collect revision data and put it in the spool system for processing by the revision cleanup + processing jobs + + :param ctx: Combined type of a callback and rei struct + :param batch_size: Number of revisions to include in one spool object + + :returns: Status + + :raises Exception: If rule is executed by non-rodsadmin user + """ + if user.user_type(ctx) != 'rodsadmin': + raise Exception("The revision cleanup jobs can only be started by a rodsadmin user.") + + if has_spool_data(constants.PROC_REVISION_CLEANUP): + return "Existing spool data present. Not adding new revision cleanup data." + + data = revisions_info(ctx) + batch_size = int(batch_size) + number_revisions = len(data) + + while len(data) > 0: + current_batch = data[:batch_size] + put_spool_data(constants.PROC_REVISION_CLEANUP, [current_batch]) + data = data[batch_size:] + + log.write(ctx, "Collected {} revisions for revision cleanup.".format(number_revisions)) + return "Revision data has been spooled for cleanup" + + +@rule.make(inputs=[0, 1, 2], outputs=[3]) +def rule_revisions_cleanup_process(ctx, bucketcase, endOfCalendarDay, verbose_flag): + """Apply the chosen the selected revision strategy to spooled revision data :param ctx: Combined type of a callback and rei struct - :param revisions_info: Json-encoded revision info. :param bucketcase: Select a bucketlist based on a string ('A', 'B', 'Simple'). If the value is an unknown case, the default value 'B' will be used. See https://github.com/UtrechtUniversity/yoda/blob/development/docs/design/processes/revisi for an explanation. @@ -576,10 +608,20 @@ def rule_revisions_clean_up(ctx, revisions_info, bucketcase, endOfCalendarDay, v :param verbose_flag: "1" if rule needs to print additional information for troubleshooting, else "0" :returns: String with status of cleanup + + :raises Exception: If rule is executed by non-rodsadmin user """ + + if user.user_type(ctx) != 'rodsadmin': + raise Exception("The revision cleanup jobs can only be started by a rodsadmin user.") + log.write(ctx, '[revisions] Revision cleanup job starting.') verbose = verbose_flag == "1" - revisions_list = json.loads(revisions_info) + revisions_list = get_spool_data(constants.PROC_REVISION_CLEANUP) + + if revisions_list is None: + log.write(ctx, '[revisions] Revision cleanup job stopping - no more spooled revision data.') + return "No more revision cleanup data" end_of_calendar_day = int(endOfCalendarDay) if end_of_calendar_day == 0: @@ -614,11 +656,12 @@ def rule_revisions_clean_up(ctx, revisions_info, bucketcase, endOfCalendarDay, v if not revision_remove(ctx, revision_id, rev_path): num_errors += 1 - log.write(ctx, '[revisions] Revision cleanup job completed - {} candidates ({} successful / {} errors).'.format( + log.write(ctx, '[revisions] Revision cleanup job completed - {} candidates for {} revisions ({} successful / {} errors).'.format( str(num_candidates), + str(len(revisions_list)), str(num_candidates - num_errors), str(num_errors))) - return 'Revision store cleanup completed' + return 'Revision store cleanup job completed' def revision_remove(ctx, revision_id, revision_path): @@ -632,7 +675,8 @@ def revision_remove(ctx, revision_id, revision_path): :returns: Boolean indicating if revision was removed """ - if not revision_path.startswith(constants.UUREVISIONCOLLECTION + "/"): + revision_prefix = os.path.join("/" + user.zone(ctx), constants.UUREVISIONCOLLECTION.lstrip(os.path.sep), '') + if not revision_path.startswith(revision_prefix): log.write(ctx, "ERROR - sanity check fail when removing revision <{}>: <{}>".format( revision_id, revision_path)) @@ -749,8 +793,6 @@ def get_deletion_candidates(ctx, buckets, revisions, initial_upper_time_bound, v max_bucket_size = bucket[1] bucket_start_index = bucket[2] - if verbose: - log.write(ctx, '[revisions] Comparing revisions in bucket {} to max size ({} vs {})'.format(str(bucket), str(len(rev_list)), str(max_bucket_size))) if len(rev_list) > max_bucket_size: nr_to_be_removed = len(rev_list) - max_bucket_size @@ -758,11 +800,19 @@ def get_deletion_candidates(ctx, buckets, revisions, initial_upper_time_bound, v if bucket_start_index >= 0: while count < nr_to_be_removed: # Add revision to list of removal - deletion_candidates.append(rev_list[bucket_start_index + count]) + index = bucket_start_index + count + if verbose: + log.write(ctx, '[revisions] Scheduling revision <{}> in bucket <{}> for removal.'.format(str(index), + str(bucket))) + deletion_candidates.append(rev_list[index]) count += 1 else: while count < nr_to_be_removed: - deletion_candidates.append(rev_list[len(rev_list) + (bucket_start_index) - count]) + index = len(rev_list) + (bucket_start_index) - count + if verbose: + log.write(ctx, '[revisions] Scheduling revision <{}> in bucket <{}> for removal.'.format(str(index), + str(bucket))) + deletion_candidates.append(rev_list[index]) count += 1 bucket_counter += 1 # To keep conciding with strategy list diff --git a/tools/revision-clean-up.py b/tools/revision-clean-up.py index 78c96982c..4cffebf31 100755 --- a/tools/revision-clean-up.py +++ b/tools/revision-clean-up.py @@ -3,7 +3,7 @@ import argparse import atexit -import json +from datetime import datetime import os import subprocess import sys @@ -16,7 +16,8 @@ def get_args(): parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("endofcalendarday", help="End of calendar day (epoch time)") parser.add_argument("bucketcase", choices=["A", "B", "Simple"], help="Bucket case configuration name") - parser.add_argument("--batch-size", type=int, default=1, help="Number of revisions to process at a time (default: 1).", required=False) + parser.add_argument("--batch-size", type=int, default=1000, + help="Number of revisions to process at a time (default: 1000).", required=False) parser.add_argument("-v", "--verbose", action="store_true", default=False, help="Make the revision cleanup rules print additional information for troubleshooting purposes.") return parser.parse_args() @@ -41,50 +42,45 @@ def lock_or_die(): atexit.register(lambda: os.unlink(LOCKFILE_PATH)) -def clean_up(revisions, bucketcase, endofcalendarday, verbose_flag): - chunk = json.dumps(revisions) - chunk = "\\\\".join(chunk.split("\\")) - chunk = "\\'".join(chunk.split("'")) +def process_revision_cleanup_data(bucketcase, endofcalendarday, verbose_flag): return subprocess.check_output([ 'irule', '-r', 'irods_rule_engine_plugin-irods_rule_language-instance', - "*out=''; rule_revisions_clean_up('{}', '{}', '{}', '{}', *out); writeString('stdout', *out);".format(chunk, bucketcase, endofcalendarday, verbose_flag), + "*out=''; rule_revisions_cleanup_process('{}', '{}', '{}', *out); writeString('stdout', *out);".format(bucketcase, endofcalendarday, verbose_flag), 'null', 'ruleExecOut' ]) -def get_revisions_info(): - return json.loads(subprocess.check_output([ +def collect_revision_cleanup_data(batch_size): + return subprocess.check_output([ 'irule', '-r', 'irods_rule_engine_plugin-irods_rule_language-instance', - '*out=""; rule_revisions_info(*out); writeString("stdout", *out);', + "*out=''; rule_revisions_cleanup_collect('{}', *out); writeString('stdout', *out);".format(str(batch_size)), 'null', 'ruleExecOut' - ])) + ]) def main(): args = get_args() lock_or_die() - revisions_info = get_revisions_info() if args.verbose: - print('START cleaning up revision store') + print('START cleaning up revision store at ' + str(datetime.now())) + + collect_revision_cleanup_data(args.batch_size) - while len(revisions_info) > args.batch_size: - if args.verbose: - print("Clean up for " + str(revisions_info[:args.batch_size])) - clean_up(revisions_info[:args.batch_size], - args.bucketcase, - args.endofcalendarday, - "1" if args.verbose else "0") - revisions_info = revisions_info[args.batch_size:] + status = "INITIAL" + while status != "No more revision cleanup data": + status = process_revision_cleanup_data(args.bucketcase, + args.endofcalendarday, + "1" if args.verbose else "0") if args.verbose: - print('END cleaning up revision store') + print('END cleaning up revision store at ' + str(datetime.now())) if __name__ == "__main__": diff --git a/util/constants.py b/util/constants.py index 4de6e68b5..e7a7356fb 100644 --- a/util/constants.py +++ b/util/constants.py @@ -20,6 +20,15 @@ UUREVISIONCOLLECTION = UUSYSTEMCOLLECTION + '/revisions' """iRODS path where all revisions will be stored.""" +PROC_REVISION_CLEANUP = "revision-cleanup" +"""Process name of the revision cleanup job. Used by the spooling system""" + +SPOOL_PROCESSES = {PROC_REVISION_CLEANUP} +"""Set of process names recognized by the spooling system""" + +SPOOL_MAIN_DIRECTORY = "/var/lib/irods/yoda-spool" +"""Directory that is used for storing Yoda batch process spool data on the provider""" + UUBLOCKLIST = ["._*", ".DS_Store"] """ List of file extensions not to be copied to revision""" diff --git a/util/spool.py b/util/spool.py new file mode 100644 index 000000000..d23ede0ab --- /dev/null +++ b/util/spool.py @@ -0,0 +1,108 @@ +"""This file contains an interface to the spooling subsystem. The spooling system can be used to store + temporary data for batch processing. The intended use case is that one job collects data to be processed + and stores it in the spooling system, while another job retrieves the data and processes it. + + The current implementation does not have confirmations. It assumes that after spool data has been retrieved + from the spool system it is either processed, or any errors during processing will be taken + care of outside the spooling system (e.g. by having the job that collects data for the spooling system + re-submit spool data that has not been processed for a retry). + + It is assumed that functions that use the spool subsystem take care of authorization and logging. +""" + +import os + +import persistqueue +import persistqueue.serializers.json + +import constants + + +def get_spool_data(process): + """Retrieves one data object for a given batch process for processing. + This function is non-blocking. + + :param process: Spool process name (see util.constants for defined names) + + :returns: Spool data object, or None if there is no spool data for this process + """ + _ensure_spool_process_initialized(process) + q = _get_spool_queue(process) + + try: + result = q.get(block=False) + q.task_done() + except persistqueue.exceptions.Empty: + result = None + + return result + + +def put_spool_data(process, data_list): + """Stores data structures in the spooling subsystem for batch processing. + + :param process: Spool process name (see util.constants for defined names) + :param data_list: List (or other iterable) of arbitrary serializable data objects to store + in the spooling system + """ + _ensure_spool_process_initialized(process) + q = _get_spool_queue(process) + for data in data_list: + q.put(data) + + +def has_spool_data(process): + """ Checks whether there any data objects in the spool system for a given process + + :param process: Spool process name (see util.constants for defined names) + + :returns: Boolean value that represents whether there is any spool data + present for this process + """ + return num_spool_data(process) > 0 + + +def num_spool_data(process): + """ Returns the number of items in the spool system for a given process + + :param process: Spool process name (see util.constants for defined names) + + :returns: The number of data items in the spool system for this process + """ + _ensure_spool_process_initialized(process) + return _get_spool_queue(process).qsize() + + +def _get_spool_directory(process): + if process in constants.SPOOL_PROCESSES: + return os.path.join(constants.SPOOL_MAIN_DIRECTORY, process, "spool") + else: + raise Exception("Spool process {} not found.".format(process)) + + +def _get_temp_directory(process): + if process in constants.SPOOL_PROCESSES: + return os.path.join(constants.SPOOL_MAIN_DIRECTORY, process, "tmp") + else: + raise Exception("Spool process {} not found.".format(process)) + + +def _get_spool_queue(process): + directory = _get_spool_directory(process) + # JSON serialization is used to make it easier to examine spooled objects manually + return persistqueue.Queue(directory, + tempdir=_get_temp_directory(process), + serializer=persistqueue.serializers.json, + chunksize=1) + + +def _ensure_spool_process_initialized(process): + if process not in constants.SPOOL_PROCESSES: + raise Exception("Spool process {} not found.".format(process)) + + for directory in [constants.SPOOL_MAIN_DIRECTORY, + os.path.join(constants.SPOOL_MAIN_DIRECTORY, process), + _get_spool_directory(process), + _get_temp_directory(process)]: + if not os.path.exists(directory): + os.mkdir(directory)