Skip to content

Commit

Permalink
Yda-5661 Do not put errors in logs for large files (#421)
Browse files Browse the repository at this point in the history
If a big file has a revision_scheduled flag, do not put this as an error in the logs.
  • Loading branch information
claravox authored May 1, 2024
1 parent 1886415 commit b5adb69
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 192 deletions.
5 changes: 4 additions & 1 deletion integration_tests.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
"""Integration tests for the development environment."""

__copyright__ = 'Copyright (c) 2019-2023, Utrecht University'
__copyright__ = 'Copyright (c) 2019-2024, Utrecht University'
__license__ = 'GPLv3, see LICENSE'

__all__ = ['rule_run_integration_tests']
Expand Down Expand Up @@ -70,6 +70,9 @@ def _call_msvc_stat_vault_check_exc(ctx, resc_name, data_path):
{"name": "util.data_object.size",
"test": lambda ctx: data_object.size(ctx, "/tempZone/home/research-initial/testdata/lorem.txt"),
"check": lambda x: x == 1003240},
{"name": "util.data_object.get_group_owners",
"test": lambda ctx: data_object.get_group_owners(ctx, "/tempZone/home/research-initial/testdata/lorem.txt"),
"check": lambda x: x == [['research-initial', 'tempZone']]},
{"name": "util.resource.exists.yes",
"test": lambda ctx: resource.exists(ctx, "irodsResc"),
"check": lambda x: x},
Expand Down
6 changes: 3 additions & 3 deletions replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_siz
minimum_timestamp = int(time.time() - config.async_replication_delay_time)

log.write(ctx, "verbose = {}".format(verbose))
if verbose:
if print_verbose:
log.write(ctx, "async_replication_delay_time = {} seconds".format(config.async_replication_delay_time))
log.write(ctx, "max_rss = {} bytes".format(config.async_replication_max_rss))
log.write(ctx, "dry_run = {}".format(dry_run))
Expand Down Expand Up @@ -153,7 +153,7 @@ def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_siz
# Skip this one and go to the next data object to be replicated.
continue

# For totalization only count the dataobjects that are within the specified balancing range
# For totalization only count the data objects that are within the specified balancing range
count += 1
data_resc_name = row[4]

Expand Down Expand Up @@ -231,7 +231,7 @@ def rule_replicate_batch(ctx, verbose, balance_id_min, balance_id_max, batch_siz


def is_replication_blocked_by_admin(ctx):
"""Admin can put the replication process on a hold by adding a file called 'stop_replication' in collection /yoda/flags.
"""Admin can put the replication process on hold by adding a file called 'stop_replication' in collection /yoda/flags.
:param ctx: Combined type of a callback and rei struct
Expand Down
2 changes: 1 addition & 1 deletion revision_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def get_revision_strategy(strategy_name):
object can be used to obtain information about the revision strategy.
:param strategy_name: Name of the strategy ("A", B", "Simple"). See
See https://github.com/UtrechtUniversity/yoda/blob/development/docs/design/processes/revisions.md
https://github.com/UtrechtUniversity/yoda/blob/development/docs/design/processes/revisions.md
for an explanation.
:returns: RevisionStrategy object
Expand Down
94 changes: 84 additions & 10 deletions revision_utils.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,63 @@
# -*- coding: utf-8 -*-
"""Utility functions for revision management."""

__copyright__ = 'Copyright (c) 2019-2023, Utrecht University'
__copyright__ = 'Copyright (c) 2019-2024, Utrecht University'
__license__ = 'GPLv3, see LICENSE'


import datetime
import hashlib
import os

from revision_strategies import get_revision_strategy
from util import constants, log
from util import constants, log, pathutil


def calculate_end_of_calendar_day(ctx):
"""Calculate the unix timestamp for the end of the current day (Same as start of next day).
def revision_eligible(max_size, data_obj_exists, size, path, groups, revision_store_exists):
"""Determine whether can create a revision of given data object.
:param max_size: Max size that file can be to create a revision (in bytes)
:param data_obj_exists: Whether the data object exists
:param size: Size of the data object
:param path: Path to the given data object (for logging)
:param groups: List of groups retrieved for this data object
:param revision_store_exists: Whether revision store for this group exists
:returns: 2-tuple containing True / False whether a revision should be created,
and the message (if this is a error condition)
"""

if not data_obj_exists:
return False, "Data object <{}> was not found or path was collection".format(path)

if len(groups) == 0:
return False, "Cannot find owner of data object <{}>. It may have been removed. Skipping.".format(path)

if len(groups) > 1:
return False, "Cannot find unique owner of data object <{}>. Skipping.".format(path)

if not revision_store_exists:
return False, "Revision store collection does not exist for data object <{}>".format(path)

_, zone, _, _ = pathutil.info(path)

# A revision should not be created when the data object is too big,
# but this is not an error condition
if int(size) > max_size:
return False, ""

# Only create revisions for research space
if not path.startswith("/{}/home/{}".format(zone, constants.IIGROUPPREFIX)):
return False, ""

if pathutil.basename(path) in constants.UUBLOCKLIST:
return False, ""

return True, ""

:param ctx: Combined type of a callback and rei struct

def calculate_end_of_calendar_day():
"""Calculate the unix timestamp for the end of the current day (Same as start of next day).
:returns: End of calendar day - Timestamp of the end of the current day
"""
Expand All @@ -26,10 +68,9 @@ def calculate_end_of_calendar_day(ctx):
return int(tomorrow.strftime("%s"))


def get_revision_store_path(ctx, zone, trailing_slash=False):
def get_revision_store_path(zone, trailing_slash=False):
"""Produces the logical path of the revision store
:param ctx: Combined type of a callback and rei struct
:param zone: zone name
:param trailing_slash: Add a trailing slash (default: False)
Expand All @@ -47,7 +88,7 @@ def get_deletion_candidates(ctx, revision_strategy, revisions, initial_upper_tim
:param ctx: Combined type of a callback and rei struct
:param revision_strategy: Revision strategy object
:param revisions: List of revisions for a particular data object. Each revision is represented by a 3-tupel
:param revisions: List of revisions for a particular data object. Each revision is represented by a 3-tuple
(revision ID, modification time in epoch time, original path)
:param initial_upper_time_bound: Initial upper time bound for first bucket
:param verbose: Whether to print additional information for troubleshooting (boolean)
Expand Down Expand Up @@ -144,7 +185,7 @@ def revision_cleanup_prefilter(ctx, revisions_list, revision_strategy_name, verb
:param ctx: Combined type of a callback and rei struct
:param revisions_list: List of versioned data objects. Each versioned data object is represented as a list of revisions,
with each revision represented as a 3-tupel (revision ID, modification time in epoch time, original
with each revision represented as a 3-tuple (revision ID, modification time in epoch time, original
path)
:param revision_strategy_name: Select a revision strategy based on a string ('A', 'B', 'Simple'). See
https://github.com/UtrechtUniversity/yoda/blob/development/docs/design/processes/revisions.md
Expand All @@ -153,11 +194,44 @@ def revision_cleanup_prefilter(ctx, revisions_list, revision_strategy_name, verb
:returns: List of versioned data objects, after prefiltered versioned data objects / revisions have been
removed. Each versioned data object is represented as a list of revisions,
with each revision represented as a 3-tupel (revision ID, modification time in epoch time, original
with each revision represented as a 3-tuple (revision ID, modification time in epoch time, original
path)
"""
minimum_bucket_size = get_revision_strategy(revision_strategy_name).get_minimum_bucket_size()
if verbose:
log.write(ctx, "Removing following revisioned data objects in prefiltering for cleanup: "
+ str([object for object in revisions_list if len(object) <= minimum_bucket_size]))
return [object for object in revisions_list if len(object) > min(minimum_bucket_size, 1)]


def get_resc(row):
"""Get the resc id for a data object given the metadata provided (for revision job).
:param row: metadata for the data object
:returns: resc
"""
info = row[3].split(',')
if len(info) == 2:
return info[0]

# Backwards compatibility with revision metadata created in v1.8 or earlier.
return row[3]


def get_balance_id(row, path):
"""Get the balance id for a data object given the metadata provided (for revision job).
:param row: metadata for the data object
:param path: path to the data object
:returns: Balance id
"""
info = row[3].split(',')
if len(info) == 2:
return int(info[1])

# Backwards compatibility with revision metadata created in v1.8 or earlier.
# Determine a balance_id for this dataobject based on its path.
# This will determine whether this dataobject will be taken into account in this job/range or another that is running parallel
return int(hashlib.md5(path.encode('utf-8')).hexdigest(), 16) % 64 + 1
Loading

0 comments on commit b5adb69

Please sign in to comment.