Skip to content

Commit

Permalink
YDA-5415: improve revision cleanup stability
Browse files Browse the repository at this point in the history
This modifies the revision cleanup process, so that the scan stage
(collecting data about revisions) is separate from the collect stage
(collecting a list of all revisions). Scanning revision data is done
in batches now, rather than a one-time scan for the whole environment.

This reduces the amount of memory needed for the scan stage, which
contributes to making the revision cleanup job usable on large
environments.
  • Loading branch information
stsnel committed Oct 25, 2023
1 parent 3be38bd commit 7811bda
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 83 deletions.
266 changes: 203 additions & 63 deletions revisions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
'api_revisions_list',
'rule_revision_batch',
'rule_revisions_cleanup_collect',
'rule_revisions_cleanup_process']
'rule_revisions_cleanup_process',
'rule_revisions_cleanup_scan']


@api.make()
Expand Down Expand Up @@ -473,7 +474,7 @@ def revision_create(ctx, resource, data_id, max_size, verbose):
# All revisions are stored in a group with the same name as the research group in a system collection
# When this collection is missing, no revisions will be created. When the group manager is used to
# create new research groups, the revision collection will be created as well.
revision_store = "/" + user_zone + constants.UUREVISIONCOLLECTION + "/" + group_name
revision_store = os.path.join(get_revision_store_path(ctx), group_name)

if collection.exists(ctx, revision_store):
# Allow rodsadmin to create subcollections.
Expand Down Expand Up @@ -534,51 +535,60 @@ def revision_create(ctx, resource, data_id, max_size, verbose):
return revision_created


def revisions_info(ctx):
def revision_cleanup_scan_revision_objects(ctx, revision_list, verbose_mode):
"""Obtain information about all revisions.
:param ctx: Combined type of a callback and rei struct
:param revision_list: List of revision data object IDs
:param verbose_mode: Whether to print additional information for troubleshooting (boolean)
:returns: Nested list, where the outer list represents revisioned data objects,
and the inner list represents revisions for that data object.
Each revision is represented by a list of length three (revision ID,
modification epoch time, revision path)
"""
zone = user.zone(ctx)
revision_store = '/' + zone + constants.UUREVISIONCOLLECTION
QUERY_BATCH_SIZE = 20
ORIGINAL_PATH_ATTRIBUTE = constants.UUORGMETADATAPREFIX + 'original_path'
ORIGINAL_MODIFY_TIME_ATTRIBUTE = constants.UUORGMETADATAPREFIX + 'original_modify_time'

if user.user_type(ctx) == 'rodsadmin':
msi.set_acl(ctx, "recursive", "admin:own", user.full_name(ctx), revision_store)
msi.set_acl(ctx, "recursive", "inherit", user.full_name(ctx), revision_store)

# first, get original_path and ids for every revision
iter = genquery.row_iterator(
"order(META_DATA_ATTR_VALUE), order_desc(DATA_ID)",
"META_DATA_ATTR_NAME = '" + constants.UUORGMETADATAPREFIX + 'original_path' + "'"
" AND COLL_NAME like '" + revision_store + "%'",
genquery.AS_LIST, ctx
)
revision_store = get_revision_store_path(ctx)
ids = list(revision_list)
path_dict = {}
for row in iter:
original_path = row[0]
revision_id = row[1]
if original_path in path_dict:
path_dict[original_path].append(revision_id)
else:
path_dict[original_path] = [revision_id]

# second, get id, path and modify time for every revision
iter = genquery.row_iterator(
"DATA_ID, COLL_NAME, DATA_NAME, META_DATA_ATTR_VALUE",
"META_DATA_ATTR_NAME = '" + constants.UUORGMETADATAPREFIX + 'original_modify_time' + "'"
" AND COLL_NAME like '" + revision_store + "%'",
genquery.AS_LIST, ctx
)
rev_dict = {}
for row in iter:
revision_id = row[0]
path = row[1] + "/" + row[2]
modify_time = row[3]
rev_dict[revision_id] = [int(revision_id), int(modify_time), path]

while len(ids) > 0:
batch_ids = ids[:QUERY_BATCH_SIZE]
batch_id_string = "({})".format(",".join(map(lambda e: "'{}'".format(e), batch_ids)))
ids = ids[QUERY_BATCH_SIZE:]

# first, get original_path and ids for every revision
original_paths = genquery.row_iterator(
"order(META_DATA_ATTR_VALUE), order_desc(DATA_ID)",
"META_DATA_ATTR_NAME = '" + ORIGINAL_PATH_ATTRIBUTE + "'"
" AND COLL_NAME like '" + revision_store + "/%' AND DATA_ID IN " + batch_id_string,
genquery.AS_LIST, ctx)

for row in original_paths:
original_path = row[0]
revision_id = row[1]
if original_path in path_dict:
path_dict[original_path].append(revision_id)
else:
path_dict[original_path] = [revision_id]

# second, get id, path and modify time for every revision
modify_times = genquery.row_iterator(
"DATA_ID, COLL_NAME, DATA_NAME, META_DATA_ATTR_VALUE",
"META_DATA_ATTR_NAME = '" + ORIGINAL_MODIFY_TIME_ATTRIBUTE + "'"
" AND COLL_NAME like '" + revision_store + "/%' AND DATA_ID IN " + batch_id_string,
genquery.AS_LIST, ctx
)

for row in modify_times:
revision_id = row[0]
path = row[1] + "/" + row[2]
modify_time = row[3]
rev_dict[revision_id] = [int(revision_id), int(modify_time), path]

# collate revision info
revisions_info = []
Expand All @@ -591,13 +601,140 @@ def revisions_info(ctx):
return revisions_info


def get_all_revision_data_ids(ctx):
""""Returns all data IDs of revision data objects
:param ctx: Combined type of a callback and rei struct
:yields: iterator of 2-tupels containing collection and data object IDs
"""
revision_store = get_revision_store_path(ctx)

revision_objects = genquery.row_iterator(
"order_desc(COLL_ID), DATA_ID",
"META_DATA_ATTR_NAME = '" + constants.UUORGMETADATAPREFIX + 'original_path' + "'"
" AND COLL_NAME like '" + revision_store + "/%'",
genquery.AS_LIST, ctx)

for row in revision_objects:
yield (row[0], row[1])


def _update_revision_store_acls(ctx):
"""Sets the revision store ACL to grant present rodsadmin user access
:param ctx: Combined type of a callback and rei struct
:raises Exception: if current user is not a rodsadmin
"""
revision_store = get_revision_store_path(ctx)
if user.user_type(ctx) == 'rodsadmin':
msi.set_acl(ctx, "recursive", "admin:own", user.full_name(ctx), revision_store)
msi.set_acl(ctx, "recursive", "inherit", user.full_name(ctx), revision_store)
else:
raise Exception("Cannot update revision store ACLs, because present user is not rodsadmin.")


def get_revision_store_path(ctx, trailing_slash=False):
"""Produces the logical path of the revision store
:param ctx: Combined type of a callback and rei struct
:param trailing_slash: Add a trailing slash (default: False)
:returns: Logical path of revision store
"""
if trailing_slash:
return os.path.join("/" + user.zone(ctx), constants.UUREVISIONCOLLECTION.lstrip(os.path.sep), '')
else:
return os.path.join("/" + user.zone(ctx), constants.UUREVISIONCOLLECTION.lstrip(os.path.sep))


@rule.make(inputs=[0], outputs=[1])
def rule_revisions_cleanup_collect(ctx, target_batch_size):
"""Collect a list of revision data object IDs and puts them in the spool system for processing
by the revision cleanup scan job.
:param ctx: Combined type of a callback and rei struct
:param target_batch_size: Number of revisions to aim for in one batch. The real batch size can be
more, because all revision objects in one collection are always in the
same batch.
:returns: Status
:raises Exception: If rule is executed by non-rodsadmin user
"""
if user.user_type(ctx) != 'rodsadmin':
raise Exception("The revision cleanup jobs can only be started by a rodsadmin user.")

if has_spool_data(constants.PROC_REVISION_CLEANUP_SCAN):
return "Existing revision cleanup scan spool data present. Not adding new revision cleanup data."

log.write(ctx, "Starting revision cleanup collect process.")

target_batch_size = int(target_batch_size)
ingest_state = {
"batch": [],
"current_coll": None,
"objects_for_current_coll": []
}
number_revisions = 0

def ingest_new_data_id(ctx, coll_id, data_id, ingest_state, target_batch_size):
"""Read data object. Store it in ingest state as long as its collection ID is the same as
the previous one, so that all data objects in the same collection are
part of the same batch.
If the new data object has a different collection ID from the previous
ones, flush previously collected data objects to the batch buffer, and if
needed from there to the spool queue.
:param ctx: combined type of a callback and rei struct
:param coll_id: collection ID
:param data_id: data object ID
:param ingest_state: ingest state dictionary
:param target_batch_size: target batch size
"""
if coll_id == ingest_state["current_coll"]:
ingest_state["objects_for_current_coll"].append(data_id)
else:
if (len(ingest_state["batch"]) > 0
and len(ingest_state["batch"]) + len(ingest_state["objects_for_current_coll"]) >= target_batch_size):
put_spool_data(constants.PROC_REVISION_CLEANUP_SCAN, [ingest_state["batch"]])
ingest_state["batch"] = []

ingest_state["batch"].extend(ingest_state["objects_for_current_coll"])
ingest_state["objects_for_current_coll"] = [data_id]
ingest_state["current_coll"] = coll_id

if len(ingest_state["batch"]) >= target_batch_size:
log.write(ctx, "Flush batch 2 " + str(ingest_state["batch"]))
put_spool_data(constants.PROC_REVISION_CLEANUP_SCAN, [ingest_state["batch"]])
ingest_state["batch"] = []

for (coll_id, data_id) in get_all_revision_data_ids(ctx):
number_revisions += 1
ingest_new_data_id(ctx, coll_id, data_id, ingest_state, target_batch_size)

if (len(ingest_state["batch"]) > 0
and len(ingest_state["batch"]) + len(ingest_state["objects_for_current_coll"]) >= target_batch_size):
put_spool_data(constants.PROC_REVISION_CLEANUP_SCAN, [ingest_state["batch"]])
ingest_state["batch"] = []

ingest_state["batch"].extend(ingest_state["objects_for_current_coll"])
if len(ingest_state["batch"]) > 0:
put_spool_data(constants.PROC_REVISION_CLEANUP_SCAN, [ingest_state["batch"]])

log.write(ctx, "Collected {} revisions for revision cleanup scanning.".format(number_revisions))
return "Revision data has been spooled for scanning"


@rule.make(inputs=[0], outputs=[1])
def rule_revisions_cleanup_collect(ctx, batch_size):
def rule_revisions_cleanup_scan(ctx, verbose_flag):
"""Collect revision data and put it in the spool system for processing by the revision cleanup
processing jobs
scan jobs
:param ctx: Combined type of a callback and rei struct
:param batch_size: Number of revisions to include in one spool object
:param verbose_flag: "1" if rule needs to print additional information for troubleshooting, else "0"
:returns: Status
Expand All @@ -606,25 +743,27 @@ def rule_revisions_cleanup_collect(ctx, batch_size):
if user.user_type(ctx) != 'rodsadmin':
raise Exception("The revision cleanup jobs can only be started by a rodsadmin user.")

if has_spool_data(constants.PROC_REVISION_CLEANUP):
return "Existing spool data present. Not adding new revision cleanup data."
log.write(ctx, 'Revision cleanup scan job starting.')
verbose = verbose_flag == "1"
revisions_list = get_spool_data(constants.PROC_REVISION_CLEANUP_SCAN)

if revisions_list is None:
log.write(ctx, 'Revision cleanup scan job stopping - no more spooled revision scan data.')
return "No more revision cleanup data"

data = revisions_info(ctx)
batch_size = int(batch_size)
number_revisions = len(data)
if verbose:
log.write(ctx, "Scanning revisions: " + str(revisions_list))

while len(data) > 0:
current_batch = data[:batch_size]
put_spool_data(constants.PROC_REVISION_CLEANUP, [current_batch])
data = data[batch_size:]
revision_data = revision_cleanup_scan_revision_objects(ctx, revisions_list, verbose)
put_spool_data(constants.PROC_REVISION_CLEANUP, [revision_data])

log.write(ctx, "Collected {} revisions for revision cleanup.".format(number_revisions))
return "Revision data has been spooled for cleanup"
log.write(ctx, 'Revision cleanup scan job finished.')
return 'Revision store cleanup scan job completed'


@rule.make(inputs=[0, 1, 2], outputs=[3])
def rule_revisions_cleanup_process(ctx, bucketcase, endOfCalendarDay, verbose_flag):
"""Apply the selected revision strategy to a batch of spooled revision data
"""Applies the selected revision strategy to a batch of spooled revision data
:param ctx: Combined type of a callback and rei struct
:param bucketcase: Select a bucketlist based on a string ('A', 'B', 'Simple'). If the value is an unknown case, the default
Expand All @@ -642,12 +781,13 @@ def rule_revisions_cleanup_process(ctx, bucketcase, endOfCalendarDay, verbose_fl
if user.user_type(ctx) != 'rodsadmin':
raise Exception("The revision cleanup jobs can only be started by a rodsadmin user.")

log.write(ctx, '[revisions] Revision cleanup job starting.')
log.write(ctx, 'Revision cleanup job processing starting.')
verbose = verbose_flag == "1"
_update_revision_store_acls(ctx)
revisions_list = get_spool_data(constants.PROC_REVISION_CLEANUP)

if revisions_list is None:
log.write(ctx, '[revisions] Revision cleanup job stopping - no more spooled revision data.')
log.write(ctx, 'Revision cleanup processing job stopping - no more spooled revision data.')
return "No more revision cleanup data"

end_of_calendar_day = int(endOfCalendarDay)
Expand All @@ -663,7 +803,7 @@ def rule_revisions_cleanup_process(ctx, bucketcase, endOfCalendarDay, verbose_fl

for revisions in revisions_list:
if verbose:
log.write(ctx, '[revisions] Processing revisions {} ...'.format(str(revisions)))
log.write(ctx, 'Processing revisions {} ...'.format(str(revisions)))
# Process the original path conform the bucket settings
candidates = get_deletion_candidates(ctx, buckets, revisions, end_of_calendar_day, verbose)
num_candidates += len(candidates)
Expand All @@ -673,22 +813,22 @@ def rule_revisions_cleanup_process(ctx, bucketcase, endOfCalendarDay, verbose_fl
rev_paths = {r[0]: r[2] for r in revisions}

if verbose:
log.write(ctx, '[revisions] Candidates to be removed: {} ...'.format(str(candidates)))
log.write(ctx, 'Candidates to be removed: {} ...'.format(str(candidates)))

# Delete the revisions that were found being obsolete
for revision_id in candidates:
rev_path = rev_paths[revision_id]
if verbose:
log.write(ctx, '[revisions] Removing candidate: {} ...'.format(str(revision_id)))
log.write(ctx, 'Removing candidate: {} ...'.format(str(revision_id)))
if not revision_remove(ctx, revision_id, rev_path):
num_errors += 1

log.write(ctx, '[revisions] Revision cleanup job completed - {} candidates for {} versioned data objects ({} successful / {} errors).'.format(
log.write(ctx, 'Revision cleanup processing job completed - {} candidates for {} versioned data objects ({} successful / {} errors).'.format(
str(num_candidates),
str(len(revisions_list)),
str(num_candidates - num_errors),
str(num_errors)))
return 'Revision store cleanup job completed'
return 'Revision store cleanup processing job completed'


def revision_remove(ctx, revision_id, revision_path):
Expand All @@ -702,7 +842,7 @@ def revision_remove(ctx, revision_id, revision_path):
:returns: Boolean indicating if revision was removed
"""
revision_prefix = os.path.join("/" + user.zone(ctx), constants.UUREVISIONCOLLECTION.lstrip(os.path.sep), '')
revision_prefix = get_revision_store_path(ctx, trailing_slash=True)
if not revision_path.startswith(revision_prefix):
log.write(ctx, "ERROR - sanity check fail when removing revision <{}>: <{}>".format(
revision_id,
Expand Down Expand Up @@ -829,16 +969,16 @@ def get_deletion_candidates(ctx, buckets, revisions, initial_upper_time_bound, v
# Add revision to list of removal
index = bucket_start_index + count
if verbose:
log.write(ctx, '[revisions] Scheduling revision <{}> in bucket <{}> for removal.'.format(str(index),
str(bucket)))
log.write(ctx, 'Scheduling revision <{}> in bucket <{}> for removal.'.format(str(index),
str(bucket)))
deletion_candidates.append(rev_list[index])
count += 1
else:
while count < nr_to_be_removed:
index = len(rev_list) + (bucket_start_index) - count
if verbose:
log.write(ctx, '[revisions] Scheduling revision <{}> in bucket <{}> for removal.'.format(str(index),
str(bucket)))
log.write(ctx, 'Scheduling revision <{}> in bucket <{}> for removal.'.format(str(index),
str(bucket)))
deletion_candidates.append(rev_list[index])
count += 1

Expand Down
Loading

0 comments on commit 7811bda

Please sign in to comment.