diff --git a/revisions.py b/revisions.py index 15d139c33..8b5fa5286 100644 --- a/revisions.py +++ b/revisions.py @@ -578,17 +578,22 @@ def rule_revisions_info(ctx): return json.dumps(revisions_info) -@rule.make(inputs=[0, 1, 2], outputs=[3]) -def rule_revisions_clean_up(ctx, revisions_info, bucketcase, endOfCalendarDay): +@rule.make(inputs=[0, 1, 2, 3], outputs=[4]) +def rule_revisions_clean_up(ctx, revisions_info, bucketcase, endOfCalendarDay, verbose_flag): """Step through part of revision store and apply the chosen bucket strategy. :param ctx: Combined type of a callback and rei struct :param revisions_info: Json-encoded revision info. - :param bucketcase: Multiple ways of cleaning up revisions can be chosen. + :param bucketcase: Select a bucketlist based on a string ('A', 'B', 'Simple'). If the value is an unknown case, the default + value 'B' will be used. See https://github.com/UtrechtUniversity/yoda/blob/development/docs/design/processes/revisi + for an explanation. :param endOfCalendarDay: If zero, system will determine end of current day in seconds since epoch (1970-01-01 00:00 UTC) + :param verbose_flag "1" if rule needs to print additional information for troubleshooting, else "0" :returns: String with status of cleanup """ + log.write(ctx, '[revisions] Revision cleanup job starting.') + verbose = verbose_flag == "1" revisions_list = json.loads(revisions_info) end_of_calendar_day = int(endOfCalendarDay) @@ -598,16 +603,37 @@ def rule_revisions_clean_up(ctx, revisions_info, bucketcase, endOfCalendarDay): # get definition of buckets buckets = revision_bucket_list(ctx, bucketcase) + # Statistics + num_candidates = 0 + num_errors = 0 + for revisions in revisions_list: + if verbose: + log.write(ctx, '[revisions] Processing revisions {} ...'.format(str(revisions))) # Process the original path conform the bucket settings - candidates = get_deletion_candidates(ctx, buckets, revisions, end_of_calendar_day) + candidates = get_deletion_candidates(ctx, buckets, revisions, end_of_calendar_day, verbose) + num_candidates += len(candidates) + + # Create lookup table for revision paths if needed + if len(candidates) > 0: + rev_paths = {r[0]: r[2] for r in revisions} + + if verbose: + log.write(ctx, '[revisions] Candidates to be removed: {} ...'.format(str(candidates))) # Delete the revisions that were found being obsolete for revision_id in candidates: - if not revision_remove(ctx, revision_id, rev_dict[revision_id][1]): - return 'Something went wrong cleaning up revision store' + rev_path = rev_paths[revision_id] + if verbose: + log.write(ctx, '[revisions] Removing candidate: {} ...'.format(str(revision_id))) + if not revision_remove(ctx, revision_id, rev_path): + num_errors += 1 - return 'Successfully cleaned up the revision store' + log.write(ctx, '[revisions] Revision cleanup job completed - {} candidates ({} successful / {} errors).'.format( + str(num_candidates), + str(num_candidates - num_errors), + str(num_errors))) + return 'Revision store cleanup completed' def revision_remove(ctx, revision_id, revision_path): @@ -621,11 +647,20 @@ def revision_remove(ctx, revision_id, revision_path): :returns: Boolean indicating if revision was removed """ + if not revision_path.startswith(constants.UUREVISIONCOLLECTION + "/"): + log.write(ctx, "ERROR - sanity check fail when removing revision <{}>: <{}>".format( + revision_id, + revision_path)) + return False + try: msi.data_obj_unlink(ctx, revision_path, irods_types.BytesBuf()) return True - except msi.Error: - log.write(ctx, "ERROR - Something went wrong deleting revision <{}>: <{}>.".format(revision_id, revision_path)) + except msi.Error as e: + log.write(ctx, "ERROR - could not remove revision <{}>: <{}> ({}).".format( + revision_id, + revision_path, + str(e))) return False log.write(ctx, "ERROR - Revision ID <{}> not found or permission denied.".format(revision_id)) @@ -641,7 +676,9 @@ def revision_bucket_list(ctx, case): revision after the current original (which should always be kept) , 1 the revision after that, etc. :param ctx: Combined type of a callback and rei struct - :param case: Select a bucketlist based on a string + :param case: Select a bucketlist based on a string ('A', 'B', 'Simple'). If the case is unknown, the default + value 'B' will be used. See https://github.com/UtrechtUniversity/yoda/blob/development/docs/design/processes/revisions.md + for an explanation. :returns: List representing revision strategy """ @@ -687,13 +724,14 @@ def revision_bucket_list(ctx, case): ] -def get_deletion_candidates(ctx, buckets, revisions, initial_upper_time_bound): +def get_deletion_candidates(ctx, buckets, revisions, initial_upper_time_bound, verbose): """Get the candidates for deletion based on the active strategy case :param ctx: Combined type of a callback and rei struct :param buckets: List of buckets :param revisions: List of revisions :param initial_upper_time_bound: Initial upper time bound for first bucket + :param verbose Whether to print additional information for troubleshooting (boolean) :returns: List of candidates for deletion based on the active strategy case """ @@ -726,6 +764,8 @@ def get_deletion_candidates(ctx, buckets, revisions, initial_upper_time_bound): max_bucket_size = bucket[1] bucket_start_index = bucket[2] + if verbose: + log.write(ctx, '[revisions] Comparing revisions in bucket {} to max size ({} vs {})'.format(str(bucket), str(len(rev_list)), str(max_bucket_size))) if len(rev_list) > max_bucket_size: nr_to_be_removed = len(rev_list) - max_bucket_size diff --git a/tools/revision-clean-up.py b/tools/revision-clean-up.py index 9cc7d211f..78c96982c 100755 --- a/tools/revision-clean-up.py +++ b/tools/revision-clean-up.py @@ -1,19 +1,47 @@ #!/usr/bin/env python +"""This script cleans up data object revisions, by invoking the revision cleanup rules.""" +import argparse +import atexit import json +import os import subprocess import sys +NAME = os.path.basename(sys.argv[0]) +LOCKFILE_PATH = '/tmp/irods-{}.lock'.format(NAME) -if len(sys.argv) != 3: - print('Usage: {} endOfCalendarDay bucketcase'.format(sys.argv[0])) - exit(1) -endOfCalendarDay = sys.argv[1] -bucketcase = sys.argv[2] +def get_args(): + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("endofcalendarday", help="End of calendar day (epoch time)") + parser.add_argument("bucketcase", choices=["A", "B", "Simple"], help="Bucket case configuration name") + parser.add_argument("--batch-size", type=int, default=1, help="Number of revisions to process at a time (default: 1).", required=False) + parser.add_argument("-v", "--verbose", action="store_true", default=False, + help="Make the revision cleanup rules print additional information for troubleshooting purposes.") + return parser.parse_args() -def clean_up(revisions): +def lock_or_die(): + """Prevent running multiple instances of this job simultaneously""" + + # Create a lockfile for this job type, abort if it exists. + try: + fd = os.open(LOCKFILE_PATH, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600) + except OSError: + if os.path.exists(LOCKFILE_PATH): + print('Not starting job: Lock file {} exists'.format(LOCKFILE_PATH)) + exit(1) + else: + raise + os.write(fd, bytes(str(os.getpid()).encode("utf-8"))) + os.close(fd) + + # Remove lock no matter how we exit. + atexit.register(lambda: os.unlink(LOCKFILE_PATH)) + + +def clean_up(revisions, bucketcase, endofcalendarday, verbose_flag): chunk = json.dumps(revisions) chunk = "\\\\".join(chunk.split("\\")) chunk = "\\'".join(chunk.split("'")) @@ -21,24 +49,43 @@ def clean_up(revisions): 'irule', '-r', 'irods_rule_engine_plugin-irods_rule_language-instance', - "*out=''; rule_revisions_clean_up('{}', '{}', '{}', *out); writeString('stdout', *out);".format(chunk, bucketcase, endOfCalendarDay), + "*out=''; rule_revisions_clean_up('{}', '{}', '{}', '{}', *out); writeString('stdout', *out);".format(chunk, bucketcase, endofcalendarday, verbose_flag), 'null', 'ruleExecOut' ]) -print('START cleaning up revision store') +def get_revisions_info(): + return json.loads(subprocess.check_output([ + 'irule', + '-r', + 'irods_rule_engine_plugin-irods_rule_language-instance', + '*out=""; rule_revisions_info(*out); writeString("stdout", *out);', + 'null', + 'ruleExecOut' + ])) + + +def main(): + args = get_args() + lock_or_die() + revisions_info = get_revisions_info() + + if args.verbose: + print('START cleaning up revision store') + + while len(revisions_info) > args.batch_size: + if args.verbose: + print("Clean up for " + str(revisions_info[:args.batch_size])) + clean_up(revisions_info[:args.batch_size], + args.bucketcase, + args.endofcalendarday, + "1" if args.verbose else "0") + revisions_info = revisions_info[args.batch_size:] + + if args.verbose: + print('END cleaning up revision store') -revisions_info = json.loads(subprocess.check_output([ - 'irule', - '-r', - 'irods_rule_engine_plugin-irods_rule_language-instance', - '*out=""; rule_revisions_info(*out); writeString("stdout", *out);', - 'null', - 'ruleExecOut' -])) -while len(revisions_info) > 30: - clean_up(revisions_info[:30]) - revisions_info = revisions_info[30:] -print(clean_up(revisions_info)) +if __name__ == "__main__": + main()