Skip to content

Commit

Permalink
YDA-5416: improve robustness rev cleanup process
Browse files Browse the repository at this point in the history
The solution involves a separate spooling system for batch jobs
that temporarily stores data for batch processing on disk. The
revision cleanup script now first triggers a rule to collect
all revision data, and stores this data in batches in the spooling
system. The cleanup script then runs rules that clean up revisions
in these batches.

The intent of this design is to minimize the impact of resource leaks in iRODS
(by processing revisions in batches) and to reduce the potential for
cascading failures (because batch processing now uses a separate
system for keeping track of batches, rather than a chain of rule
invocations).
  • Loading branch information
stsnel committed Sep 25, 2023
1 parent b81bcf9 commit 5332e1e
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 41 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ certifi==2021.10.8
pysqlcipher3==1.0.4
execnet==1.9.0
deepdiff==3.3.0
persist-queue==0.8.1
88 changes: 69 additions & 19 deletions revisions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import datetime
import hashlib
import json
import os
import random
import time
Expand All @@ -17,13 +16,14 @@
import folder
import groups
from util import *
from util.spool import get_spool_data, has_spool_data, put_spool_data

__all__ = ['api_revisions_restore',
'api_revisions_search_on_filename',
'api_revisions_list',
'rule_revision_batch',
'rule_revisions_info',
'rule_revisions_clean_up']
'rule_revisions_cleanup_collect',
'rule_revisions_cleanup_process']


@api.make()
Expand Down Expand Up @@ -523,12 +523,14 @@ def revision_create(ctx, resource, data_id, max_size, verbose):
return revision_created


@rule.make(inputs=[], outputs=[0])
def rule_revisions_info(ctx):
def revisions_info(ctx):
"""Obtain information about all revisions.
:param ctx: Combined type of a callback and rei struct
:returns: Json string with info about revisions
:returns: Nested list, where the outer list represents revisioned data objects,
and the inner list represents revisions for that data object.
Each revision is represented by a list of length three (revision ID,
modification epoch time, revision path)
"""
zone = user.zone(ctx)
revision_store = '/' + zone + constants.UUREVISIONCOLLECTION
Expand Down Expand Up @@ -575,26 +577,66 @@ def rule_revisions_info(ctx):
if revision_id in rev_dict:
revision_list.append(rev_dict[revision_id])
revisions_info.append(revision_list)
return json.dumps(revisions_info)
return revisions_info


@rule.make(inputs=[0, 1, 2, 3], outputs=[4])
def rule_revisions_clean_up(ctx, revisions_info, bucketcase, endOfCalendarDay, verbose_flag):
"""Step through part of revision store and apply the chosen bucket strategy.
@rule.make(inputs=[0], outputs=[1])
def rule_revisions_cleanup_collect(ctx, batch_size):
"""Collect revision data and put it in the spool system for processing by the revision cleanup
processing jobs
:param ctx: Combined type of a callback and rei struct
:param batch_size: Number of revisions to include in one spool object
:returns: Status
:raises Exception: If rule is executed by non-rodsadmin user
"""
if user.user_type(ctx) != 'rodsadmin':
raise Exception("The revision cleanup jobs can only be started by a rodsadmin user.")

if has_spool_data(constants.PROC_REVISION_CLEANUP):
return "Existing spool data present. Not adding new revision cleanup data."

data = revisions_info(ctx)
batch_size = int(batch_size)
number_revisions = len(data)

while len(data) > 0:
current_batch = data[:batch_size]
put_spool_data(constants.PROC_REVISION_CLEANUP, [current_batch])
data = data[batch_size:]

log.write(ctx, "Collected {} revisions for revision cleanup.".format(number_revisions))
return "Revision data has been spooled for cleanup"


@rule.make(inputs=[0, 1, 2], outputs=[3])
def rule_revisions_cleanup_process(ctx, bucketcase, endOfCalendarDay, verbose_flag):
"""Apply the chosen the selected revision strategy to spooled revision data
:param ctx: Combined type of a callback and rei struct
:param revisions_info: Json-encoded revision info.
:param bucketcase: Select a bucketlist based on a string ('A', 'B', 'Simple'). If the value is an unknown case, the default
value 'B' will be used. See https://github.com/UtrechtUniversity/yoda/blob/development/docs/design/processes/revisi
for an explanation.
:param endOfCalendarDay: If zero, system will determine end of current day in seconds since epoch (1970-01-01 00:00 UTC)
:param verbose_flag: "1" if rule needs to print additional information for troubleshooting, else "0"
:returns: String with status of cleanup
:raises Exception: If rule is executed by non-rodsadmin user
"""

if user.user_type(ctx) != 'rodsadmin':
raise Exception("The revision cleanup jobs can only be started by a rodsadmin user.")

log.write(ctx, '[revisions] Revision cleanup job starting.')
verbose = verbose_flag == "1"
revisions_list = json.loads(revisions_info)
revisions_list = get_spool_data(constants.PROC_REVISION_CLEANUP)

if revisions_list is None:
log.write(ctx, '[revisions] Revision cleanup job stopping - no more spooled revision data.')
return "No more revision cleanup data"

end_of_calendar_day = int(endOfCalendarDay)
if end_of_calendar_day == 0:
Expand Down Expand Up @@ -629,11 +671,12 @@ def rule_revisions_clean_up(ctx, revisions_info, bucketcase, endOfCalendarDay, v
if not revision_remove(ctx, revision_id, rev_path):
num_errors += 1

log.write(ctx, '[revisions] Revision cleanup job completed - {} candidates ({} successful / {} errors).'.format(
log.write(ctx, '[revisions] Revision cleanup job completed - {} candidates for {} revisions ({} successful / {} errors).'.format(
str(num_candidates),
str(len(revisions_list)),
str(num_candidates - num_errors),
str(num_errors)))
return 'Revision store cleanup completed'
return 'Revision store cleanup job completed'


def revision_remove(ctx, revision_id, revision_path):
Expand All @@ -647,7 +690,8 @@ def revision_remove(ctx, revision_id, revision_path):
:returns: Boolean indicating if revision was removed
"""
if not revision_path.startswith(constants.UUREVISIONCOLLECTION + "/"):
revision_prefix = os.path.join("/" + user.zone(ctx), constants.UUREVISIONCOLLECTION.lstrip(os.path.sep), '')
if not revision_path.startswith(revision_prefix):
log.write(ctx, "ERROR - sanity check fail when removing revision <{}>: <{}>".format(
revision_id,
revision_path))
Expand Down Expand Up @@ -764,20 +808,26 @@ def get_deletion_candidates(ctx, buckets, revisions, initial_upper_time_bound, v
max_bucket_size = bucket[1]
bucket_start_index = bucket[2]

if verbose:
log.write(ctx, '[revisions] Comparing revisions in bucket {} to max size ({} vs {})'.format(str(bucket), str(len(rev_list)), str(max_bucket_size)))
if len(rev_list) > max_bucket_size:
nr_to_be_removed = len(rev_list) - max_bucket_size

count = 0
if bucket_start_index >= 0:
while count < nr_to_be_removed:
# Add revision to list of removal
deletion_candidates.append(rev_list[bucket_start_index + count])
index = bucket_start_index + count
if verbose:
log.write(ctx, '[revisions] Scheduling revision <{}> in bucket <{}> for removal.'.format(str(index),
str(bucket)))
deletion_candidates.append(rev_list[index])
count += 1
else:
while count < nr_to_be_removed:
deletion_candidates.append(rev_list[len(rev_list) + (bucket_start_index) - count])
index = len(rev_list) + (bucket_start_index) - count
if verbose:
log.write(ctx, '[revisions] Scheduling revision <{}> in bucket <{}> for removal.'.format(str(index),
str(bucket)))
deletion_candidates.append(rev_list[index])
count += 1

bucket_counter += 1 # To keep conciding with strategy list
Expand Down
40 changes: 18 additions & 22 deletions tools/revision-clean-up.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import argparse
import atexit
import json
from datetime import datetime
import os
import subprocess
import sys
Expand All @@ -16,7 +16,8 @@ def get_args():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("endofcalendarday", help="End of calendar day (epoch time)")
parser.add_argument("bucketcase", choices=["A", "B", "Simple"], help="Bucket case configuration name")
parser.add_argument("--batch-size", type=int, default=1, help="Number of revisions to process at a time (default: 1).", required=False)
parser.add_argument("--batch-size", type=int, default=1000,
help="Number of revisions to process at a time (default: 1000).", required=False)
parser.add_argument("-v", "--verbose", action="store_true", default=False,
help="Make the revision cleanup rules print additional information for troubleshooting purposes.")
return parser.parse_args()
Expand All @@ -41,50 +42,45 @@ def lock_or_die():
atexit.register(lambda: os.unlink(LOCKFILE_PATH))


def clean_up(revisions, bucketcase, endofcalendarday, verbose_flag):
chunk = json.dumps(revisions)
chunk = "\\\\".join(chunk.split("\\"))
chunk = "\\'".join(chunk.split("'"))
def process_revision_cleanup_data(bucketcase, endofcalendarday, verbose_flag):
return subprocess.check_output([
'irule',
'-r',
'irods_rule_engine_plugin-irods_rule_language-instance',
"*out=''; rule_revisions_clean_up('{}', '{}', '{}', '{}', *out); writeString('stdout', *out);".format(chunk, bucketcase, endofcalendarday, verbose_flag),
"*out=''; rule_revisions_cleanup_process('{}', '{}', '{}', *out); writeString('stdout', *out);".format(bucketcase, endofcalendarday, verbose_flag),
'null',
'ruleExecOut'
])


def get_revisions_info():
return json.loads(subprocess.check_output([
def collect_revision_cleanup_data(batch_size):
return subprocess.check_output([
'irule',
'-r',
'irods_rule_engine_plugin-irods_rule_language-instance',
'*out=""; rule_revisions_info(*out); writeString("stdout", *out);',
"*out=''; rule_revisions_cleanup_collect('{}', *out); writeString('stdout', *out);".format(str(batch_size)),
'null',
'ruleExecOut'
]))
])


def main():
args = get_args()
lock_or_die()
revisions_info = get_revisions_info()

if args.verbose:
print('START cleaning up revision store')
print('START cleaning up revision store at ' + str(datetime.now()))

collect_revision_cleanup_data(args.batch_size)

while len(revisions_info) > args.batch_size:
if args.verbose:
print("Clean up for " + str(revisions_info[:args.batch_size]))
clean_up(revisions_info[:args.batch_size],
args.bucketcase,
args.endofcalendarday,
"1" if args.verbose else "0")
revisions_info = revisions_info[args.batch_size:]
status = "INITIAL"
while status != "No more revision cleanup data":
status = process_revision_cleanup_data(args.bucketcase,
args.endofcalendarday,
"1" if args.verbose else "0")

if args.verbose:
print('END cleaning up revision store')
print('END cleaning up revision store at ' + str(datetime.now()))


if __name__ == "__main__":
Expand Down
9 changes: 9 additions & 0 deletions util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@
UUREVISIONCOLLECTION = UUSYSTEMCOLLECTION + '/revisions'
"""iRODS path where all revisions will be stored."""

PROC_REVISION_CLEANUP = "revision-cleanup"
"""Process name of the revision cleanup job. Used by the spooling system"""

SPOOL_PROCESSES = {PROC_REVISION_CLEANUP}
"""Set of process names recognized by the spooling system"""

SPOOL_MAIN_DIRECTORY = "/var/lib/irods/yoda-spool"
"""Directory that is used for storing Yoda batch process spool data on the provider"""

UUBLOCKLIST = ["._*", ".DS_Store"]
""" List of file extensions not to be copied to revision"""

Expand Down
108 changes: 108 additions & 0 deletions util/spool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""This file contains an interface to the spooling subsystem. The spooling system can be used to store
temporary data for batch processing. The intended use case is that one job collects data to be processed
and stores it in the spooling system, while another job retrieves the data and processes it.
The current implementation does not have confirmations. It assumes that after spool data has been retrieved
from the spool system it is either processed, or any errors during processing will be taken
care of outside the spooling system (e.g. by having the job that collects data for the spooling system
re-submit spool data that has not been processed for a retry).
It is assumed that functions that use the spool subsystem take care of authorization and logging.
"""

import os

import persistqueue
import persistqueue.serializers.json

import constants


def get_spool_data(process):
"""Retrieves one data object for a given batch process for processing.
This function is non-blocking.
:param process: Spool process name (see util.constants for defined names)
:returns: Spool data object, or None if there is no spool data for this process
"""
_ensure_spool_process_initialized(process)
q = _get_spool_queue(process)

try:
result = q.get(block=False)
q.task_done()
except persistqueue.exceptions.Empty:
result = None

return result


def put_spool_data(process, data_list):
"""Stores data structures in the spooling subsystem for batch processing.
:param process: Spool process name (see util.constants for defined names)
:param data_list: List (or other iterable) of arbitrary serializable data objects to store
in the spooling system
"""
_ensure_spool_process_initialized(process)
q = _get_spool_queue(process)
for data in data_list:
q.put(data)


def has_spool_data(process):
""" Checks whether there any data objects in the spool system for a given process
:param process: Spool process name (see util.constants for defined names)
:returns: Boolean value that represents whether there is any spool data
present for this process
"""
return num_spool_data(process) > 0


def num_spool_data(process):
""" Returns the number of items in the spool system for a given process
:param process: Spool process name (see util.constants for defined names)
:returns: The number of data items in the spool system for this process
"""
_ensure_spool_process_initialized(process)
return _get_spool_queue(process).qsize()


def _get_spool_directory(process):
if process in constants.SPOOL_PROCESSES:
return os.path.join(constants.SPOOL_MAIN_DIRECTORY, process, "spool")
else:
raise Exception("Spool process {} not found.".format(process))


def _get_temp_directory(process):
if process in constants.SPOOL_PROCESSES:
return os.path.join(constants.SPOOL_MAIN_DIRECTORY, process, "tmp")
else:
raise Exception("Spool process {} not found.".format(process))


def _get_spool_queue(process):
directory = _get_spool_directory(process)
# JSON serialization is used to make it easier to examine spooled objects manually
return persistqueue.Queue(directory,
tempdir=_get_temp_directory(process),
serializer=persistqueue.serializers.json,
chunksize=1)


def _ensure_spool_process_initialized(process):
if process not in constants.SPOOL_PROCESSES:
raise Exception("Spool process {} not found.".format(process))

for directory in [constants.SPOOL_MAIN_DIRECTORY,
os.path.join(constants.SPOOL_MAIN_DIRECTORY, process),
_get_spool_directory(process),
_get_temp_directory(process)]:
if not os.path.exists(directory):
os.mkdir(directory)

0 comments on commit 5332e1e

Please sign in to comment.