From b5adb69b0d81ee8a00f0160722817213642348f9 Mon Sep 17 00:00:00 2001 From: claravox Date: Wed, 1 May 2024 11:46:31 +0200 Subject: [PATCH] Yda-5661 Do not put errors in logs for large files (#421) If a big file has a revision_scheduled flag, do not put this as an error in the logs. --- integration_tests.py | 5 +- replication.py | 6 +- revision_strategies.py | 2 +- revision_utils.py | 94 ++++++++-- revisions.py | 339 +++++++++++++++++++---------------- unit-tests/test_revisions.py | 82 ++++++--- util/data_object.py | 22 ++- 7 files changed, 358 insertions(+), 192 deletions(-) diff --git a/integration_tests.py b/integration_tests.py index 5afac324b..f8809f555 100644 --- a/integration_tests.py +++ b/integration_tests.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """Integration tests for the development environment.""" -__copyright__ = 'Copyright (c) 2019-2023, Utrecht University' +__copyright__ = 'Copyright (c) 2019-2024, Utrecht University' __license__ = 'GPLv3, see LICENSE' __all__ = ['rule_run_integration_tests'] @@ -70,6 +70,9 @@ def _call_msvc_stat_vault_check_exc(ctx, resc_name, data_path): {"name": "util.data_object.size", "test": lambda ctx: data_object.size(ctx, "/tempZone/home/research-initial/testdata/lorem.txt"), "check": lambda x: x == 1003240}, + {"name": "util.data_object.get_group_owners", + "test": lambda ctx: data_object.get_group_owners(ctx, "/tempZone/home/research-initial/testdata/lorem.txt"), + "check": lambda x: x == [['research-initial', 'tempZone']]}, {"name": "util.resource.exists.yes", "test": lambda ctx: resource.exists(ctx, "irodsResc"), "check": lambda x: x}, diff --git a/replication.py b/replication.py index 1c47923a9..4d851dff8 100644 --- a/replication.py +++ b/replication.py @@ -96,7 +96,7 @@ def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_siz minimum_timestamp = int(time.time() - config.async_replication_delay_time) log.write(ctx, "verbose = {}".format(verbose)) - if verbose: + if print_verbose: log.write(ctx, "async_replication_delay_time = {} seconds".format(config.async_replication_delay_time)) log.write(ctx, "max_rss = {} bytes".format(config.async_replication_max_rss)) log.write(ctx, "dry_run = {}".format(dry_run)) @@ -153,7 +153,7 @@ def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_siz # Skip this one and go to the next data object to be replicated. continue - # For totalization only count the dataobjects that are within the specified balancing range + # For totalization only count the data objects that are within the specified balancing range count += 1 data_resc_name = row[4] @@ -231,7 +231,7 @@ def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_siz def is_replication_blocked_by_admin(ctx): - """Admin can put the replication process on a hold by adding a file called 'stop_replication' in collection /yoda/flags. + """Admin can put the replication process on hold by adding a file called 'stop_replication' in collection /yoda/flags. :param ctx: Combined type of a callback and rei struct diff --git a/revision_strategies.py b/revision_strategies.py index 80865cea9..bfd8b5fb7 100644 --- a/revision_strategies.py +++ b/revision_strategies.py @@ -11,7 +11,7 @@ def get_revision_strategy(strategy_name): object can be used to obtain information about the revision strategy. :param strategy_name: Name of the strategy ("A", B", "Simple"). See - See https://github.com/UtrechtUniversity/yoda/blob/development/docs/design/processes/revisions.md + https://github.com/UtrechtUniversity/yoda/blob/development/docs/design/processes/revisions.md for an explanation. :returns: RevisionStrategy object diff --git a/revision_utils.py b/revision_utils.py index b81b560de..d77901ebb 100644 --- a/revision_utils.py +++ b/revision_utils.py @@ -1,21 +1,63 @@ # -*- coding: utf-8 -*- """Utility functions for revision management.""" -__copyright__ = 'Copyright (c) 2019-2023, Utrecht University' +__copyright__ = 'Copyright (c) 2019-2024, Utrecht University' __license__ = 'GPLv3, see LICENSE' import datetime +import hashlib import os from revision_strategies import get_revision_strategy -from util import constants, log +from util import constants, log, pathutil -def calculate_end_of_calendar_day(ctx): - """Calculate the unix timestamp for the end of the current day (Same as start of next day). +def revision_eligible(max_size, data_obj_exists, size, path, groups, revision_store_exists): + """Determine whether can create a revision of given data object. + + :param max_size: Max size that file can be to create a revision (in bytes) + :param data_obj_exists: Whether the data object exists + :param size: Size of the data object + :param path: Path to the given data object (for logging) + :param groups: List of groups retrieved for this data object + :param revision_store_exists: Whether revision store for this group exists + + :returns: 2-tuple containing True / False whether a revision should be created, + and the message (if this is a error condition) + """ + + if not data_obj_exists: + return False, "Data object <{}> was not found or path was collection".format(path) + + if len(groups) == 0: + return False, "Cannot find owner of data object <{}>. It may have been removed. Skipping.".format(path) + + if len(groups) > 1: + return False, "Cannot find unique owner of data object <{}>. Skipping.".format(path) + + if not revision_store_exists: + return False, "Revision store collection does not exist for data object <{}>".format(path) + + _, zone, _, _ = pathutil.info(path) + + # A revision should not be created when the data object is too big, + # but this is not an error condition + if int(size) > max_size: + return False, "" + + # Only create revisions for research space + if not path.startswith("/{}/home/{}".format(zone, constants.IIGROUPPREFIX)): + return False, "" + + if pathutil.basename(path) in constants.UUBLOCKLIST: + return False, "" + + return True, "" - :param ctx: Combined type of a callback and rei struct + +def calculate_end_of_calendar_day(): + """Calculate the unix timestamp for the end of the current day (Same as start of next day). :returns: End of calendar day - Timestamp of the end of the current day """ @@ -26,10 +68,9 @@ def calculate_end_of_calendar_day(ctx): return int(tomorrow.strftime("%s")) -def get_revision_store_path(ctx, zone, trailing_slash=False): +def get_revision_store_path(zone, trailing_slash=False): """Produces the logical path of the revision store - :param ctx: Combined type of a callback and rei struct :param zone: zone name :param trailing_slash: Add a trailing slash (default: False) @@ -47,7 +88,7 @@ def get_deletion_candidates(ctx, revision_strategy, revisions, initial_upper_tim :param ctx: Combined type of a callback and rei struct :param revision_strategy: Revision strategy object - :param revisions: List of revisions for a particular data object. Each revision is represented by a 3-tupel + :param revisions: List of revisions for a particular data object. Each revision is represented by a 3-tuple (revision ID, modification time in epoch time, original path) :param initial_upper_time_bound: Initial upper time bound for first bucket :param verbose: Whether to print additional information for troubleshooting (boolean) @@ -144,7 +185,7 @@ def revision_cleanup_prefilter(ctx, revisions_list, revision_strategy_name, verb :param ctx: Combined type of a callback and rei struct :param revisions_list: List of versioned data objects. Each versioned data object is represented as a list of revisions, - with each revision represented as a 3-tupel (revision ID, modification time in epoch time, original + with each revision represented as a 3-tuple (revision ID, modification time in epoch time, original path) :param revision_strategy_name: Select a revision strategy based on a string ('A', 'B', 'Simple'). See https://github.com/UtrechtUniversity/yoda/blob/development/docs/design/processes/revisions.md @@ -153,7 +194,7 @@ def revision_cleanup_prefilter(ctx, revisions_list, revision_strategy_name, verb :returns: List of versioned data objects, after prefiltered versioned data objects / revisions have been removed. Each versioned data object is represented as a list of revisions, - with each revision represented as a 3-tupel (revision ID, modification time in epoch time, original + with each revision represented as a 3-tuple (revision ID, modification time in epoch time, original path) """ minimum_bucket_size = get_revision_strategy(revision_strategy_name).get_minimum_bucket_size() @@ -161,3 +202,36 @@ def revision_cleanup_prefilter(ctx, revisions_list, revision_strategy_name, verb log.write(ctx, "Removing following revisioned data objects in prefiltering for cleanup: " + str([object for object in revisions_list if len(object) <= minimum_bucket_size])) return [object for object in revisions_list if len(object) > min(minimum_bucket_size, 1)] + + +def get_resc(row): + """Get the resc id for a data object given the metadata provided (for revision job). + + :param row: metadata for the data object + + :returns: resc + """ + info = row[3].split(',') + if len(info) == 2: + return info[0] + + # Backwards compatibility with revision metadata created in v1.8 or earlier. + return row[3] + + +def get_balance_id(row, path): + """Get the balance id for a data object given the metadata provided (for revision job). + + :param row: metadata for the data object + :param path: path to the data object + + :returns: Balance id + """ + info = row[3].split(',') + if len(info) == 2: + return int(info[1]) + + # Backwards compatibility with revision metadata created in v1.8 or earlier. + # Determine a balance_id for this dataobject based on its path. + # This will determine whether this dataobject will be taken into account in this job/range or another that is running parallel + return int(hashlib.md5(path.encode('utf-8')).hexdigest(), 16) % 64 + 1 diff --git a/revisions.py b/revisions.py index fc0c19196..d449caf87 100644 --- a/revisions.py +++ b/revisions.py @@ -1,11 +1,10 @@ # -*- coding: utf-8 -*- """Functions for revision management.""" -__copyright__ = 'Copyright (c) 2019-2023, Utrecht University' +__copyright__ = 'Copyright (c) 2019-2024, Utrecht University' __license__ = 'GPLv3, see LICENSE' import datetime -import hashlib import os import random import re @@ -18,7 +17,7 @@ import folder import groups from revision_strategies import get_revision_strategy -from revision_utils import calculate_end_of_calendar_day, get_deletion_candidates, get_revision_store_path, revision_cleanup_prefilter +from revision_utils import calculate_end_of_calendar_day, get_balance_id, get_deletion_candidates, get_resc, get_revision_store_path, revision_cleanup_prefilter, revision_eligible from util import * from util.spool import get_spool_data, has_spool_data, put_spool_data @@ -258,11 +257,16 @@ def resource_modified_post_revision(ctx, resource, zone, path): :param zone: Zone where the original can be found :param path: Path of the original """ - # Only create revisions for research space - if not path.startswith("/{}/home/{}".format(zone, constants.IIGROUPPREFIX)): - return + size = data_object.size(ctx, path) + groups = data_object.get_group_owners(ctx, path) + if groups: + revision_store = get_revision_store(ctx, groups[0][0]) + revision_store_exists = revision_store is not None + else: + revision_store_exists = False - if pathutil.basename(path) in constants.UUBLOCKLIST: + should_create_rev, _ = revision_eligible(constants.UUMAXREVISIONSIZE, size is not None, size, path, groups, revision_store_exists) + if not should_create_rev: return revision_avu_name = constants.UUORGMETADATAPREFIX + "revision_scheduled" @@ -335,7 +339,7 @@ def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size # Get list of up to batch size limit of data objects (in research space) scheduled for revision, taking into account # modification time. log.write(ctx, "verbose = {}".format(verbose)) - if verbose: + if print_verbose: log.write(ctx, "async_revision_delay_time = {} seconds".format(config.async_revision_delay_time)) log.write(ctx, "max_rss = {} bytes".format(config.async_revision_max_rss)) log.write(ctx, "dry_run = {}".format(dry_run)) @@ -365,24 +369,16 @@ def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size data_id = row[0] path = row[1] + "/" + row[2] - # Metadata value contains resc and balace id for load balancing purposes. - info = row[3].split(',') - if len(info) == 2: - resc = info[0] - balance_id = int(info[1]) - else: - # Backwards compatibility with revision metadata created in v1.8 or earlier. - resc = row[3] - # Determine a balance_id for this dataobject based on its path. - # This will determine whether this dataobject will be taken into account in this job/range or another that is running parallel - balance_id = int(hashlib.md5(path.encode('utf-8')).hexdigest(), 16) % 64 + 1 + # Metadata value contains resc and balance id for load balancing purposes. + resc = get_resc(row) + balance_id = get_balance_id(row, path) # Check whether balance id is within the range for this job. if balance_id < int(balance_id_min) or balance_id > int(balance_id_max): # Skip this one and go to the next data object for revision creation. continue - # For getting the total count only the data objects within the wanted range + # For getting the total count, only count the data objects within the wanted range count += 1 # "No action" is meant for easier memory usage debugging. @@ -394,52 +390,11 @@ def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size if print_verbose: log.write(ctx, "Batch revision: creating revision for {} on resc {}".format(path, resc)) - revision_created = revision_create(ctx, resc, data_id, constants.UUMAXREVISIONSIZE, verbose) - - # Remove revision_scheduled flag no matter if it succeeded or not. - # rods should have been given own access via policy to allow AVU - # changes. - if print_verbose: - log.write(ctx, "Batch revision: removing AVU for {}".format(path)) - - # try removing attr/resc meta data - avu_deleted = False - try: - avu.rmw_from_data(ctx, path, attr, "%") # use wildcard cause rm_from_data causes problems - avu_deleted = True - except Exception: - avu_deleted = False - - # try removing attr/resc meta data again with other ACL's - if not avu_deleted: - try: - # The object's ACLs may have changed. - # Force the ACL and try one more time. - msi.sudo_obj_acl_set(ctx, "", "own", user.full_name(ctx), path, "") - avu.rmw_from_data(ctx, path, attr, "%") # use wildcard cause rm_from_data causes problems - except Exception: - log.write(ctx, "ERROR - Scheduled revision creation of <{}>: could not remove schedule flag".format(path)) - - # now back to the created revision + revision_created = check_eligible_and_create_revision(ctx, print_verbose, attr, errorattr, data_id, resc, path) if revision_created: - log.write(ctx, "Revision created for {}".format(path)) count_ok += 1 - # Revision creation OK. Remove any existing error indication attribute. - iter2 = genquery.row_iterator( - "DATA_NAME", - "DATA_ID = '{}' AND META_DATA_ATTR_NAME = '{}' AND META_DATA_ATTR_VALUE = 'true'".format(data_id, errorattr), - genquery.AS_LIST, ctx - ) - for row2 in iter2: - # Only try to remove it if we know for sure it exists, - # otherwise we get useless errors in the log. - avu.rmw_from_data(ctx, path, errorattr, "%") - # all items removed in one go -> so break from this loop through each individual item - break else: count_ignored += 1 - log.write(ctx, "ERROR - Scheduled revision creation of <{}> failed".format(path)) - avu.set_on_data(ctx, path, errorattr, "true") if print_verbose: show_memory_usage(ctx) @@ -449,6 +404,91 @@ def rule_revision_batch(ctx, verbose, balance_id_min, balance_id_max, batch_size log.write(ctx, "Batch revision job ignored {} data objects in research area, excluding data objects postponed because of delay time.".format(count_ignored)) +def check_eligible_and_create_revision(ctx, print_verbose, attr, errorattr, data_id, resc, path): + """ Check that a data object is eligible for a revision, and if so, create a revision. + Then remove or add revision flags as appropriate. + + :param ctx: Combined type of a callback and rei struct + :param print_verbose: Whether to log verbose messages for troubleshooting (Boolean) + :param attr: revision_scheduled flag name + :param errorattr: revision_failed flag name + :param data_id: data_id of the data object + :param resc: Name of resource + :param path: Path to the data object + + :returns: Whether revision was created + """ + revision_created = False + size = data_object.size(ctx, path) + groups = data_object.get_group_owners(ctx, path) + if groups: + revision_store = get_revision_store(ctx, groups[0][0]) + revision_store_exists = revision_store is not None + else: + revision_store_exists = False + + should_create_rev, revision_error_msg = revision_eligible(constants.UUMAXREVISIONSIZE, size is not None, size, path, groups, revision_store_exists) + if should_create_rev: + revision_created = revision_create(ctx, print_verbose, data_id, resc, groups[0][0], revision_store) + elif not should_create_rev and len(revision_error_msg): + log.write(ctx, revision_error_msg) + + remove_revision_scheduled_flag(ctx, print_verbose, path, attr) + + # now back to the created revision + if revision_created: + log.write(ctx, "Revision created for {}".format(path)) + remove_revision_error_flag(ctx, data_id, path, errorattr) + elif should_create_rev: + # Revision should have been created but it was not + log.write(ctx, "ERROR - Scheduled revision creation of <{}> failed".format(path)) + avu.set_on_data(ctx, path, errorattr, "true") + + return revision_created + + +def remove_revision_error_flag(ctx, data_id, path, errorattr): + """Remove revision_error flag""" + # Revision creation OK. Remove any existing error indication attribute. + iter2 = genquery.row_iterator( + "DATA_NAME", + "DATA_ID = '{}' AND META_DATA_ATTR_NAME = '{}' AND META_DATA_ATTR_VALUE = 'true'".format(data_id, errorattr), + genquery.AS_LIST, ctx + ) + for row in iter2: + # Only try to remove it if we know for sure it exists, + # otherwise we get useless errors in the log. + avu.rmw_from_data(ctx, path, errorattr, "%") + # all items removed in one go -> so break from this loop through each individual item + break + + +def remove_revision_scheduled_flag(ctx, print_verbose, path, attr): + """Remove revision_scheduled flag (no matter if it succeeded or not).""" + # rods should have been given own access via policy to allow AVU + # changes. + if print_verbose: + log.write(ctx, "Batch revision: removing AVU for {}".format(path)) + + # try removing attr/resc meta data + avu_deleted = False + try: + avu.rmw_from_data(ctx, path, attr, "%") # use wildcard cause rm_from_data causes problems + avu_deleted = True + except Exception: + avu_deleted = False + + # try removing attr/resc meta data again with other ACL's + if not avu_deleted: + try: + # The object's ACLs may have changed. + # Force the ACL and try one more time. + msi.sudo_obj_acl_set(ctx, "", "own", user.full_name(ctx), path, "") + avu.rmw_from_data(ctx, path, attr, "%") # use wildcard cause rm_from_data causes problems + except Exception: + log.write(ctx, "ERROR - Scheduled revision creation of <{}>: could not remove schedule flag".format(path)) + + def is_revision_blocked_by_admin(ctx): """Admin can put the revision process on a hold by adding a file called 'stop_revisions' in collection /yoda/flags. @@ -461,21 +501,8 @@ def is_revision_blocked_by_admin(ctx): return collection.exists(ctx, path) -def revision_create(ctx, resource, data_id, max_size, verbose): - """Create a revision of a dataobject in a revision folder. - - :param ctx: Combined type of a callback and rei struct - :param resource: Resource to retrieve original from - :param data_id: Data id of data object to create a revision for - :param max_size: Max size of files in bytes - :param verbose: Whether to print messages for troubleshooting to log (1: yes, 0: no) - - :returns: True / False as an indication whether a revision was successfully created - """ - revision_created = False - print_verbose = verbose - found = False - +def get_data_object(ctx, data_id, resource): + """Return data on data object necessary to create a revision.""" iter = genquery.row_iterator( "DATA_ID, DATA_MODIFY_TIME, DATA_OWNER_NAME, DATA_SIZE, COLL_ID, DATA_RESC_HIER, DATA_NAME, COLL_NAME", "DATA_ID = '{}' AND DATA_RESC_HIER like '{}%'".format(data_id, resource), @@ -489,103 +516,105 @@ def revision_create(ctx, resource, data_id, max_size, verbose): data_owner = row[2] basename = row[6] parent = row[7] - found = True break - path = '{}/{}'.format(parent, basename) - - if not found: - log.write(ctx, "Data object <{}> was not found or path was collection".format(path)) - return False + return modify_time, data_size, coll_id, data_owner, basename, parent - if int(data_size) > max_size: - log.write(ctx, "Files larger than {} bytes cannot store revisions".format(max_size)) - return False - groups = list(genquery.row_iterator( - "USER_NAME, USER_ZONE", - "DATA_ID = '" + data_id + "' AND USER_TYPE = 'rodsgroup' AND DATA_ACCESS_NAME = 'own'", - genquery.AS_LIST, ctx - )) +def get_revision_store(ctx, group_name): + """Get path to revision store for group if the path exists. - if len(groups) == 1: - (group_name, user_zone) = groups[0] - elif len(groups) == 0: - log.write(ctx, "Cannot find owner of data object <{}>. It may have been removed. Skipping.".format(path)) - return False - else: - log.write(ctx, "Cannot find unique owner of data object <{}>. Skipping.".format(path)) - return False + :param ctx: Combined type of a callback and rei struct + :param group_name: Name of group for the revision store + :returns: path to group's revision store if exists, else None + """ # All revisions are stored in a group with the same name as the research group in a system collection # When this collection is missing, no revisions will be created. When the group manager is used to # create new research groups, the revision collection will be created as well. - revision_store = os.path.join(get_revision_store_path(ctx, user.zone(ctx)), group_name) + revision_store = os.path.join(get_revision_store_path(user.zone(ctx)), group_name) + revision_store_exists = collection.exists(ctx, revision_store) + return revision_store if revision_store_exists else None - if collection.exists(ctx, revision_store): - # Allow rodsadmin to create subcollections. - msi.set_acl(ctx, "default", "admin:own", "rods#{}".format(user.zone(ctx)), revision_store) - # generate a timestamp in iso8601 format to append to the filename of the revised file. - # 2019-09-07T15:50-04:00 - iso8601 = datetime.datetime.now().replace(microsecond=0).isoformat() +def revision_create(ctx, print_verbose, data_id, resource, group_name, revision_store): + """Create a revision of a data object in a revision folder. - rev_filename = basename + "_" + iso8601 + data_owner - rev_coll = revision_store + "/" + coll_id + :param ctx: Combined type of a callback and rei struct + :param print_verbose: Whether to print messages for troubleshooting to log (Boolean) + :param data_id: data_id of the data object + :param resource: Name of resource + :param group_name: Group name + :param revision_store: Revision store - read_access = msi.check_access(ctx, path, 'read object', irods_types.BytesBuf())['arguments'][2] - if read_access != b'\x01': - try: - msi.set_acl(ctx, "default", "read", "rods#{}".format(user.zone(ctx)), path) - except msi.Error: - return False + :returns: True / False as an indication whether a revision was successfully created + """ + revision_created = False + modify_time, data_size, coll_id, data_owner, basename, parent = get_data_object(ctx, data_id, resource) + path = '{}/{}'.format(parent, basename) - if collection.exists(ctx, rev_coll): - # Rods may not have own access yet. - msi.set_acl(ctx, "default", "own", "rods#{}".format(user.zone(ctx)), rev_coll) - else: - # Inheritance is enabled - ACLs are already good. - # (rods and the research group both have own) - try: - msi.coll_create(ctx, rev_coll, '1', irods_types.BytesBuf()) - except error.UUError: - log.write(ctx, "ERROR - Failed to create staging area at <{}>".format(rev_coll)) - return False + # Allow rodsadmin to create subcollections. + msi.set_acl(ctx, "default", "admin:own", "rods#{}".format(user.zone(ctx)), revision_store) - rev_path = rev_coll + "/" + rev_filename + # generate a timestamp in iso8601 format to append to the filename of the revised file. + # 2019-09-07T15:50-04:00 + iso8601 = datetime.datetime.now().replace(microsecond=0).isoformat() - if print_verbose: - log.write(ctx, "Creating revision {} -> {}".format(path, rev_path)) + rev_filename = basename + "_" + iso8601 + data_owner + rev_coll = revision_store + "/" + coll_id + + read_access = msi.check_access(ctx, path, 'read object', irods_types.BytesBuf())['arguments'][2] + if read_access != b'\x01': + try: + msi.set_acl(ctx, "default", "read", "rods#{}".format(user.zone(ctx)), path) + except msi.Error: + return False - # actual copying to revision store + if collection.exists(ctx, rev_coll): + # Rods may not have own access yet. + msi.set_acl(ctx, "default", "own", "rods#{}".format(user.zone(ctx)), rev_coll) + else: + # Inheritance is enabled - ACLs are already good. + # (rods and the research group both have own) try: - # Workaround the PREP deadlock issue: Restrict threads to 1. - data_object.copy(ctx, path, rev_path, True) - - revision_created = True - - # Add original metadata to revision data object. - avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_data_id", data_id) - avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_path", path) - avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_coll_name", parent) - avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_data_name", basename) - avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_data_owner_name", data_owner) - avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_coll_id", coll_id) - avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_modify_time", modify_time) - avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_group_name", group_name) - avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_filesize", data_size) - except msi.Error as e: - log.write(ctx, 'ERROR - The file could not be copied: {}'.format(str(e))) + msi.coll_create(ctx, rev_coll, '1', irods_types.BytesBuf()) + except error.UUError: + log.write(ctx, "ERROR - Failed to create staging area at <{}>".format(rev_coll)) + return False + + rev_path = rev_coll + "/" + rev_filename + + if print_verbose: + log.write(ctx, "Creating revision {} -> {}".format(path, rev_path)) + + # Actual copying to revision store + try: + # Workaround the PREP deadlock issue: Restrict threads to 1. + data_object.copy(ctx, path, rev_path, True) + + revision_created = True + + # Add original metadata to revision data object. + avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_data_id", data_id) + avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_path", path) + avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_coll_name", parent) + avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_data_name", basename) + avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_data_owner_name", data_owner) + avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_coll_id", coll_id) + avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_modify_time", modify_time) + avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_group_name", group_name) + avu.set_on_data(ctx, rev_path, constants.UUORGMETADATAPREFIX + "original_filesize", data_size) + except msi.Error as e: + log.write(ctx, 'ERROR - The file could not be copied: {}'.format(str(e))) return revision_created -def revision_cleanup_scan_revision_objects(ctx, revision_list, verbose_mode): +def revision_cleanup_scan_revision_objects(ctx, revision_list): """Obtain information about all revisions. :param ctx: Combined type of a callback and rei struct :param revision_list: List of revision data object IDs - :param verbose_mode: Whether to print additional information for troubleshooting (boolean) :returns: Nested list, where the outer list represents revisioned data objects, and the inner list represents revisions for that data object. @@ -596,7 +625,7 @@ def revision_cleanup_scan_revision_objects(ctx, revision_list, verbose_mode): ORIGINAL_PATH_ATTRIBUTE = constants.UUORGMETADATAPREFIX + 'original_path' ORIGINAL_MODIFY_TIME_ATTRIBUTE = constants.UUORGMETADATAPREFIX + 'original_modify_time' - revision_store = get_revision_store_path(ctx, user.zone(ctx)) + revision_store = get_revision_store_path(user.zone(ctx)) ids = list(revision_list) path_dict = {} @@ -652,9 +681,9 @@ def get_all_revision_data_ids(ctx): :param ctx: Combined type of a callback and rei struct - :yields: iterator of 2-tupels containing collection and data object IDs + :yields: iterator of 2-tuples containing collection and data object IDs """ - revision_store = get_revision_store_path(ctx, user.zone(ctx)) + revision_store = get_revision_store_path(user.zone(ctx)) revision_objects = genquery.row_iterator( "order_desc(COLL_ID), DATA_ID", @@ -673,7 +702,7 @@ def _update_revision_store_acls(ctx): :raises Exception: if current user is not a rodsadmin """ - revision_store = get_revision_store_path(ctx, user.zone(ctx)) + revision_store = get_revision_store_path(user.zone(ctx)) if user.user_type(ctx) == 'rodsadmin': msi.set_acl(ctx, "recursive", "admin:own", user.full_name(ctx), revision_store) msi.set_acl(ctx, "recursive", "inherit", user.full_name(ctx), revision_store) @@ -790,7 +819,7 @@ def rule_revisions_cleanup_scan(ctx, revision_strategy_name, verbose_flag): log.write(ctx, "Number of revisions to scan: " + str(len(revisions_list))) log.write(ctx, "Scanning revisions: " + str(revisions_list)) - revision_data = revision_cleanup_scan_revision_objects(ctx, revisions_list, verbose) + revision_data = revision_cleanup_scan_revision_objects(ctx, revisions_list) prefiltered_revision_data = revision_cleanup_prefilter(ctx, revision_data, revision_strategy_name, verbose) output_data_size = len(prefiltered_revision_data) if output_data_size > 0: @@ -835,7 +864,7 @@ def rule_revisions_cleanup_process(ctx, revision_strategy_name, endOfCalendarDay end_of_calendar_day = int(endOfCalendarDay) if end_of_calendar_day == 0: - end_of_calendar_day = calculate_end_of_calendar_day(ctx) + end_of_calendar_day = calculate_end_of_calendar_day() revision_strategy = get_revision_strategy(revision_strategy_name) @@ -884,7 +913,7 @@ def revision_remove(ctx, revision_id, revision_path): :returns: Boolean indicating if revision was removed """ - revision_prefix = get_revision_store_path(ctx, user.zone(ctx), trailing_slash=True) + revision_prefix = get_revision_store_path(user.zone(ctx), trailing_slash=True) if not revision_path.startswith(revision_prefix): log.write(ctx, "ERROR - sanity check fail when removing revision <{}>: <{}>".format( revision_id, diff --git a/unit-tests/test_revisions.py b/unit-tests/test_revisions.py index 5393b2d7c..f718e955b 100644 --- a/unit-tests/test_revisions.py +++ b/unit-tests/test_revisions.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """Unit tests for the revision functions""" -__copyright__ = 'Copyright (c) 2023, Utrecht University' +__copyright__ = 'Copyright (c) 2023-2024, Utrecht University' __license__ = 'GPLv3, see LICENSE' import sys @@ -10,48 +10,90 @@ sys.path.append('..') from revision_strategies import get_revision_strategy -from revision_utils import get_deletion_candidates, revision_cleanup_prefilter +from revision_utils import get_deletion_candidates, revision_cleanup_prefilter, revision_eligible class RevisionTest(TestCase): + def test_revision_eligible(self): + # Happy flow + eligible, msg = revision_eligible(100, True, 2, "/zone/home/research-test/obj", [["research-initial"]], True) + self.assertTrue(eligible) + self.assertEqual(msg, "") + + # Data obj does not exist + eligible, msg = revision_eligible(100, False, None, "/zone/home/research-test/obj", [["research-initial"]], True) + self.assertFalse(eligible) + self.assertIn("was not found", msg) + + # No groups + eligible, msg = revision_eligible(100, True, 2, "/zone/home/research-test/obj", [], True) + self.assertFalse(eligible) + self.assertIn("Cannot find owner", msg) + + # Too many groups + eligible, msg = revision_eligible(100, True, 2, "/zone/home/research-test/obj", [["research-initial"], ["research-initial1"]], True) + self.assertFalse(eligible) + self.assertIn("Cannot find unique owner", msg) + + # No revision store + eligible, msg = revision_eligible(100, True, 2, "/zone/home/research-test/obj", [["research-initial"]], False) + self.assertFalse(eligible) + self.assertIn("Revision store", msg) + self.assertIn("does not exist", msg) + + # Not in research space + eligible, msg = revision_eligible(100, True, "5", "/zone/home/vault-test/obj", [["research-initial"]], True) + self.assertFalse(eligible) + self.assertEqual(msg, "") + + # Blocklist file + eligible, msg = revision_eligible(100, True, 2, "/zone/home/research-test/.DS_Store", [["research-initial"]], True) + self.assertFalse(eligible) + self.assertEqual(msg, "") + + # Too large data object (not an error) + eligible, msg = revision_eligible(2, True, "100", "/zone/home/research-test/obj", [["research-initial"]], True) + self.assertFalse(eligible) + self.assertEqual(msg, "") + def test_revision_strategy(self): strategy = get_revision_strategy("B") - self.assertEquals(len(strategy.get_buckets()), 8) - self.assertEquals(strategy.get_minimum_bucket_size(), 2) + self.assertEqual(len(strategy.get_buckets()), 8) + self.assertEqual(strategy.get_minimum_bucket_size(), 2) # Tests total length of all buckets in seconds; equivalent to roughly 29 weeks. - self.assertEquals(strategy.get_total_bucket_timespan(), 17755200) + self.assertEqual(strategy.get_total_bucket_timespan(), 17755200) def test_revision_cleanup_prefilter(self): empty_input = [] empty_output = revision_cleanup_prefilter(None, empty_input, "B", False) - self.assertEquals(empty_output, []) + self.assertEqual(empty_output, []) single_input = [[(1, 123, "/foo/bar/baz")]] single_output = revision_cleanup_prefilter(None, single_input, "B", False) - self.assertEquals(single_output, []) # Does not exceed min. bucket size for strategy B + self.assertEqual(single_output, []) # Does not exceed min. bucket size for strategy B two_input = [[(1, 123, "/foo/bar/baz"), (2, 234, "/foo/bar/baz")]] two_output = revision_cleanup_prefilter(None, two_input, "B", False) # Does not exceed min. bucket size for strategy B # But more than 1 revision (so cannot prefilter, because # revisions could be outside defined buckets) - self.assertEquals(two_output, two_input) + self.assertEqual(two_output, two_input) three_input = [[(1, 123, "/foo/bar/baz"), (2, 234, "/foo/bar/baz"), (3, 345, "/foo/bar/baz")]] three_output = revision_cleanup_prefilter(None, three_input, "B", False) - self.assertEquals(three_output, three_input) # Exceeds min. bucket size for strategy B + self.assertEqual(three_output, three_input) # Exceeds min. bucket size for strategy B def test_revision_deletion_candidates_empty(self): dummy_time = 1000000000 revision_strategy = get_revision_strategy("B") revisions = [] output = get_deletion_candidates(None, revision_strategy, revisions, dummy_time, False) - self.assertEquals(output, []) + self.assertEqual(output, []) def test_revision_deletion_candidates_1_bucket_no_exceed(self): dummy_time = 1000000000 revision_strategy = get_revision_strategy("B") revisions = [(1, dummy_time - 60, "/foo/bar/baz")] output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) - self.assertEquals(output, []) + self.assertEqual(output, []) def test_revision_deletion_candidates_2_bucket_no_exceed(self): dummy_time = 1000000000 @@ -59,7 +101,7 @@ def test_revision_deletion_candidates_2_bucket_no_exceed(self): revisions = [(1, dummy_time - 60, "/foo/bar/baz"), (2, dummy_time - 120, "/foo/bar/baz")] output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) - self.assertEquals(output, []) + self.assertEqual(output, []) def test_revision_deletion_candidates_2_multi_bucket_no_exceed(self): dummy_time = 1000000000 @@ -67,7 +109,7 @@ def test_revision_deletion_candidates_2_multi_bucket_no_exceed(self): revisions = [(1, dummy_time - 60, "/foo/bar/baz"), (2, dummy_time - 13 * 3600, "/foo/bar/baz")] output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) - self.assertEquals(output, []) + self.assertEqual(output, []) def test_revision_deletion_candidates_4_multi_bucket_no_exceed(self): dummy_time = 1000000000 @@ -77,7 +119,7 @@ def test_revision_deletion_candidates_4_multi_bucket_no_exceed(self): (3, dummy_time - 3600 * 16, "/foo/bar/baz"), (4, dummy_time - 3600 * 17, "/foo/bar/baz")] output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) - self.assertEquals(output, []) + self.assertEqual(output, []) def test_revision_deletion_candidates_3_bucket_exceed(self): dummy_time = 1000000000 @@ -86,7 +128,7 @@ def test_revision_deletion_candidates_3_bucket_exceed(self): (2, dummy_time - 120, "/foo/bar/baz"), (3, dummy_time - 180, "/foo/bar/baz")] output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) - self.assertEquals(output, [2]) + self.assertEqual(output, [2]) def test_revision_deletion_candidates_6_buckets_exceed(self): dummy_time = 1000000000 @@ -98,14 +140,14 @@ def test_revision_deletion_candidates_6_buckets_exceed(self): (5, dummy_time - 3600 * 16 - 120, "/foo/bar/baz"), (6, dummy_time - 3600 * 16 - 180, "/foo/bar/baz")] output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) - self.assertEquals(output, [2, 5]) + self.assertEqual(output, [2, 5]) def test_revision_deletion_1_before_buckets(self): dummy_time = 1000000000 revision_strategy = get_revision_strategy("B") revisions = [(1, dummy_time - 365 * 24 * 3600, "/foo/bar/baz")] output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) - self.assertEquals(output, []) + self.assertEqual(output, []) def test_revision_deletion_1_bucket_1_before(self): dummy_time = 1000000000 @@ -113,7 +155,7 @@ def test_revision_deletion_1_bucket_1_before(self): revisions = [(1, dummy_time - 60, "/foo/bar/baz"), (2, dummy_time - 365 * 24 * 3600, "/foo/bar/baz")] output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) - self.assertEquals(output, [2]) + self.assertEqual(output, [2]) def test_revision_deletion_1_bucket_2_before(self): dummy_time = 1000000000 @@ -122,7 +164,7 @@ def test_revision_deletion_1_bucket_2_before(self): (2, dummy_time - 365 * 24 * 3600 - 60, "/foo/bar/baz"), (3, dummy_time - 365 * 24 * 3600 - 90, "/foo/bar/baz")] output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) - self.assertEquals(output, [2, 3]) + self.assertEqual(output, [2, 3]) def test_revision_deletion_3_before_buckets(self): dummy_time = 1000000000 @@ -131,4 +173,4 @@ def test_revision_deletion_3_before_buckets(self): (2, dummy_time - 365 * 24 * 3600 - 120, "/foo/bar/baz"), (3, dummy_time - 365 * 24 * 3600 - 180, "/foo/bar/baz")] output = get_deletion_candidates(None, revision_strategy, revisions, 1000000000, False) - self.assertEquals(output, [2, 3]) + self.assertEqual(output, [2, 3]) diff --git a/util/data_object.py b/util/data_object.py index 876a2440f..b625672fb 100644 --- a/util/data_object.py +++ b/util/data_object.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """Utility / convenience functions for data object IO.""" -__copyright__ = 'Copyright (c) 2019-2023, Utrecht University' +__copyright__ = 'Copyright (c) 2019-2024, Utrecht University' __license__ = 'GPLv3, see LICENSE' import binascii @@ -34,7 +34,14 @@ def owner(ctx, path): def size(ctx, path): - """Get a data object's size in bytes.""" + """Get a data object's size in bytes. + + :param ctx: Combined type of a callback and rei struct + :param path: Path to iRODS data object + + :returns: Data object's size or None if object is not found + """ + iter = genquery.row_iterator( "DATA_SIZE, order_desc(DATA_MODIFY_TIME)", "COLL_NAME = '%s' AND DATA_NAME = '%s'" % pathutil.chop(path), @@ -202,3 +209,14 @@ def decode_checksum(checksum): return "0" else: return binascii.hexlify(binascii.a2b_base64(checksum[5:])).decode("UTF-8") + + +def get_group_owners(ctx, path): + """Return list of groups of data object, each entry being name of the group and the zone.""" + parent, basename = pathutil.chop(path) + groups = list(genquery.row_iterator( + "USER_NAME, USER_ZONE", + "COLL_NAME = '{}' and DATA_NAME = '{}' AND USER_TYPE = 'rodsgroup' AND DATA_ACCESS_NAME = 'own'".format(parent, basename), + genquery.AS_LIST, ctx + )) + return groups