Skip to content

Commit

Permalink
Revision cleanup: improve robustness, logging
Browse files Browse the repository at this point in the history
- Add verbose mode for troubleshooting
- Improve default logging (error statistics etc.)
- More input validation for removing revisions and starting job
- Introduce job-level lock for revision cleanups to prevent errors
  due to multiple jobs running simultaneously.
  • Loading branch information
stsnel committed Sep 25, 2023
1 parent a57610b commit 74cfc98
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 31 deletions.
62 changes: 51 additions & 11 deletions revisions.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,17 +563,22 @@ def rule_revisions_info(ctx):
return json.dumps(revisions_info)


@rule.make(inputs=[0, 1, 2], outputs=[3])
def rule_revisions_clean_up(ctx, revisions_info, bucketcase, endOfCalendarDay):
@rule.make(inputs=[0, 1, 2, 3], outputs=[4])
def rule_revisions_clean_up(ctx, revisions_info, bucketcase, endOfCalendarDay, verbose_flag):
"""Step through part of revision store and apply the chosen bucket strategy.
:param ctx: Combined type of a callback and rei struct
:param revisions_info: Json-encoded revision info.
:param bucketcase: Multiple ways of cleaning up revisions can be chosen.
:param bucketcase: Select a bucketlist based on a string ('A', 'B', 'Simple'). If the value is an unknown case, the default
value 'B' will be used. See https://github.com/UtrechtUniversity/yoda/blob/development/docs/design/processes/revisi
for an explanation.
:param endOfCalendarDay: If zero, system will determine end of current day in seconds since epoch (1970-01-01 00:00 UTC)
:param verbose_flag: "1" if rule needs to print additional information for troubleshooting, else "0"
:returns: String with status of cleanup
"""
log.write(ctx, '[revisions] Revision cleanup job starting.')
verbose = verbose_flag == "1"
revisions_list = json.loads(revisions_info)

end_of_calendar_day = int(endOfCalendarDay)
Expand All @@ -583,16 +588,37 @@ def rule_revisions_clean_up(ctx, revisions_info, bucketcase, endOfCalendarDay):
# get definition of buckets
buckets = revision_bucket_list(ctx, bucketcase)

# Statistics
num_candidates = 0
num_errors = 0

for revisions in revisions_list:
if verbose:
log.write(ctx, '[revisions] Processing revisions {} ...'.format(str(revisions)))
# Process the original path conform the bucket settings
candidates = get_deletion_candidates(ctx, buckets, revisions, end_of_calendar_day)
candidates = get_deletion_candidates(ctx, buckets, revisions, end_of_calendar_day, verbose)
num_candidates += len(candidates)

# Create lookup table for revision paths if needed
if len(candidates) > 0:
rev_paths = {r[0]: r[2] for r in revisions}

if verbose:
log.write(ctx, '[revisions] Candidates to be removed: {} ...'.format(str(candidates)))

# Delete the revisions that were found being obsolete
for revision_id in candidates:
if not revision_remove(ctx, revision_id, rev_dict[revision_id][1]):
return 'Something went wrong cleaning up revision store'
rev_path = rev_paths[revision_id]
if verbose:
log.write(ctx, '[revisions] Removing candidate: {} ...'.format(str(revision_id)))
if not revision_remove(ctx, revision_id, rev_path):
num_errors += 1

return 'Successfully cleaned up the revision store'
log.write(ctx, '[revisions] Revision cleanup job completed - {} candidates ({} successful / {} errors).'.format(
str(num_candidates),
str(num_candidates - num_errors),
str(num_errors)))
return 'Revision store cleanup completed'


def revision_remove(ctx, revision_id, revision_path):
Expand All @@ -606,11 +632,20 @@ def revision_remove(ctx, revision_id, revision_path):
:returns: Boolean indicating if revision was removed
"""
if not revision_path.startswith(constants.UUREVISIONCOLLECTION + "/"):
log.write(ctx, "ERROR - sanity check fail when removing revision <{}>: <{}>".format(
revision_id,
revision_path))
return False

try:
msi.data_obj_unlink(ctx, revision_path, irods_types.BytesBuf())
return True
except msi.Error:
log.write(ctx, "ERROR - Something went wrong deleting revision <{}>: <{}>.".format(revision_id, revision_path))
except msi.Error as e:
log.write(ctx, "ERROR - could not remove revision <{}>: <{}> ({}).".format(
revision_id,
revision_path,
str(e)))
return False

log.write(ctx, "[revisions] ERROR - Revision ID <{}> not found or permission denied.".format(revision_id))
Expand All @@ -626,7 +661,9 @@ def revision_bucket_list(ctx, case):
revision after the current original (which should always be kept) , 1 the revision after that, etc.
:param ctx: Combined type of a callback and rei struct
:param case: Select a bucketlist based on a string
:param case: Select a bucketlist based on a string ('A', 'B', 'Simple'). If the case is unknown, the default
value 'B' will be used. See https://github.com/UtrechtUniversity/yoda/blob/development/docs/design/processes/revisions.md
for an explanation.
:returns: List representing revision strategy
"""
Expand Down Expand Up @@ -672,13 +709,14 @@ def revision_bucket_list(ctx, case):
]


def get_deletion_candidates(ctx, buckets, revisions, initial_upper_time_bound):
def get_deletion_candidates(ctx, buckets, revisions, initial_upper_time_bound, verbose):
"""Get the candidates for deletion based on the active strategy case
:param ctx: Combined type of a callback and rei struct
:param buckets: List of buckets
:param revisions: List of revisions
:param initial_upper_time_bound: Initial upper time bound for first bucket
:param verbose: Whether to print additional information for troubleshooting (boolean)
:returns: List of candidates for deletion based on the active strategy case
"""
Expand Down Expand Up @@ -711,6 +749,8 @@ def get_deletion_candidates(ctx, buckets, revisions, initial_upper_time_bound):
max_bucket_size = bucket[1]
bucket_start_index = bucket[2]

if verbose:
log.write(ctx, '[revisions] Comparing revisions in bucket {} to max size ({} vs {})'.format(str(bucket), str(len(rev_list)), str(max_bucket_size)))
if len(rev_list) > max_bucket_size:
nr_to_be_removed = len(rev_list) - max_bucket_size

Expand Down
87 changes: 67 additions & 20 deletions tools/revision-clean-up.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,91 @@
#!/usr/bin/env python
"""This script cleans up data object revisions, by invoking the revision cleanup rules."""

import argparse
import atexit
import json
import os
import subprocess
import sys

NAME = os.path.basename(sys.argv[0])
LOCKFILE_PATH = '/tmp/irods-{}.lock'.format(NAME)

if len(sys.argv) != 3:
print('Usage: {} endOfCalendarDay bucketcase'.format(sys.argv[0]))
exit(1)

endOfCalendarDay = sys.argv[1]
bucketcase = sys.argv[2]
def get_args():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("endofcalendarday", help="End of calendar day (epoch time)")
parser.add_argument("bucketcase", choices=["A", "B", "Simple"], help="Bucket case configuration name")
parser.add_argument("--batch-size", type=int, default=1, help="Number of revisions to process at a time (default: 1).", required=False)
parser.add_argument("-v", "--verbose", action="store_true", default=False,
help="Make the revision cleanup rules print additional information for troubleshooting purposes.")
return parser.parse_args()


def clean_up(revisions):
def lock_or_die():
"""Prevent running multiple instances of this job simultaneously"""

# Create a lockfile for this job type, abort if it exists.
try:
fd = os.open(LOCKFILE_PATH, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600)
except OSError:
if os.path.exists(LOCKFILE_PATH):
print('Not starting job: Lock file {} exists'.format(LOCKFILE_PATH))
exit(1)
else:
raise
os.write(fd, bytes(str(os.getpid()).encode("utf-8")))
os.close(fd)

# Remove lock no matter how we exit.
atexit.register(lambda: os.unlink(LOCKFILE_PATH))


def clean_up(revisions, bucketcase, endofcalendarday, verbose_flag):
chunk = json.dumps(revisions)
chunk = "\\\\".join(chunk.split("\\"))
chunk = "\\'".join(chunk.split("'"))
return subprocess.check_output([
'irule',
'-r',
'irods_rule_engine_plugin-irods_rule_language-instance',
"*out=''; rule_revisions_clean_up('{}', '{}', '{}', *out); writeString('stdout', *out);".format(chunk, bucketcase, endOfCalendarDay),
"*out=''; rule_revisions_clean_up('{}', '{}', '{}', '{}', *out); writeString('stdout', *out);".format(chunk, bucketcase, endofcalendarday, verbose_flag),
'null',
'ruleExecOut'
])


print('START cleaning up revision store')
def get_revisions_info():
return json.loads(subprocess.check_output([
'irule',
'-r',
'irods_rule_engine_plugin-irods_rule_language-instance',
'*out=""; rule_revisions_info(*out); writeString("stdout", *out);',
'null',
'ruleExecOut'
]))


def main():
args = get_args()
lock_or_die()
revisions_info = get_revisions_info()

if args.verbose:
print('START cleaning up revision store')

while len(revisions_info) > args.batch_size:
if args.verbose:
print("Clean up for " + str(revisions_info[:args.batch_size]))
clean_up(revisions_info[:args.batch_size],
args.bucketcase,
args.endofcalendarday,
"1" if args.verbose else "0")
revisions_info = revisions_info[args.batch_size:]

if args.verbose:
print('END cleaning up revision store')

revisions_info = json.loads(subprocess.check_output([
'irule',
'-r',
'irods_rule_engine_plugin-irods_rule_language-instance',
'*out=""; rule_revisions_info(*out); writeString("stdout", *out);',
'null',
'ruleExecOut'
]))

while len(revisions_info) > 100:
clean_up(revisions_info[:100])
revisions_info = revisions_info[100:]
print(clean_up(revisions_info))
if __name__ == "__main__":
main()

0 comments on commit 74cfc98

Please sign in to comment.