diff --git a/policies.py b/policies.py index 36b4f7b19..fe31ac240 100644 --- a/policies.py +++ b/policies.py @@ -561,5 +561,19 @@ def py_acPostProcForObjRename(ctx, src, dst): if len(info.subpath) and info.group != pathutil.info(src).group: ctx.uuEnforceGroupAcl(dst) + +@rule.make(inputs=range(7), outputs=[7]) +def pep_resource_resolve_hierarchy_pre(ctx, resource, _ctx, out, operation, host, parser, vote): + if not config.arb_enabled or operation != "CREATE": + return + + arb_data = arb_data_manager.ARBDataManager() + arb_status = arb_data.get(ctx, resource) + + if arb_status == "AVAILABLE": + return "read=1.0;write=1.0" + elif arb_status == "FULL": + return "read=1.0;write=0.0" + # }}} # }}} diff --git a/resources.py b/resources.py index e497e4b80..337afe2c8 100644 --- a/resources.py +++ b/resources.py @@ -19,6 +19,8 @@ 'rule_resource_store_storage_statistics', 'rule_resource_transform_old_storage_data', 'rule_resource_research', + 'rule_resource_update_resc_arb_data', + 'rule_resource_update_misc_arb_data', 'rule_resource_vault'] @@ -641,6 +643,78 @@ def rule_resource_store_storage_statistics(ctx): return 'ok' +@rule.make(inputs=[0, 1, 2], outputs=[]) +def rule_resource_update_resc_arb_data(ctx, resc_name, bytes_free, bytes_total): + """ + Update ARB data for a specific resource + + :param ctx: Combined type of a callback and rei struct + :param resc_name: Name of a particular unixfilesystem resource + :param bytes_free: Free size on this resource, in bytes + :param bytes_total: Total size of this resource, in bytes + """ + if user.user_type(ctx) != 'rodsadmin': + log.write(ctx, "Error: insufficient permissions to run ARB data update rule.") + return + + if not resource.exists(ctx, resc_name): + log.write(ctx, "Error: could not find resource named '{}' for ARB update.".format(resc_name)) + return + + bytes_free_gb = int(bytes_free) / 2 ** 30 + bytes_free_percent = 100 * (float(bytes_free) / float(bytes_total)) + + if resc_name in config.arb_exempt_resources: + arb_status = "EXEMPT" + elif bytes_free_gb >= config.arb_min_gb_free and bytes_free_percent > config.arb_min_percent_free: + arb_status = "AVAILABLE" + else: + arb_status = "FULL" + + parent_resc_name = resource.get_parent_by_name(ctx, resc_name) + + manager = arb_data_manager.ARBDataManager() + manager.put(ctx, resc_name, "IGNORE") + + if parent_resc_name is not None and resource.get_type_by_name(ctx, parent_resc_name) == "passthru": + manager.put(ctx, parent_resc_name, arb_status) + + +@rule.make() +def rule_resource_update_misc_arb_data(ctx): + """Update ARB data for resources that are not covered by the regular process. That is, + all resources that are neither unixfilesystem nor passthrough resources, as well as + passthrough resources that do not have a unixfilesystem child resource. + + :param ctx: Combined type of a callback and rei struct + """ + if user.user_type(ctx) != 'rodsadmin': + log.write(ctx, "Error: insufficient permissions to run ARB data update rule.") + return + + manager = arb_data_manager.ARBDataManager() + + all_resources = resource.get_all_resource_names(ctx) + ufs_resources = set(resource.get_resource_names_by_type(ctx, "unixfilesystem") + + resource.get_resource_names_by_type(ctx, "unix file system")) + pt_resources = set(resource.get_resource_names_by_type(ctx, "passthru")) + + for resc in all_resources: + if resc in ufs_resources: + pass + elif resc not in pt_resources: + manager.put(ctx, resc, "IGNORE") + else: + child_resources = resource.get_children_by_name(ctx, resc) + child_found = False + for child_resource in child_resources: + if child_resource in ufs_resources: + child_found = True + # Ignore the passthrough resource if it does not have a UFS child resource + if not child_found: + manager.put(ctx, resc, "IGNORE") + + def get_categories(ctx): """Get all categories for current user. diff --git a/rules_uu.cfg.template b/rules_uu.cfg.template index 674f03585..76f1e8edb 100644 --- a/rules_uu.cfg.template +++ b/rules_uu.cfg.template @@ -64,3 +64,9 @@ sram_rest_api_url = sram_api_key = sram_verbose_logging = sram_tls_verify = + + +arb_enabled = +arb_exempt_resources = +arb_min_gb_free = +arb_min_percent_free = diff --git a/setup.cfg b/setup.cfg index 7172709f4..6ba7aaa36 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,4 +5,4 @@ strictness=short docstring_style=sphinx max-line-length=127 exclude=__init__.py,tools,tests/env/ -application-import-names=avu_json,conftest,util,api,config,constants,data_access_token,datacite,datarequest,data_object,epic,error,folder,groups,intake,intake_dataset,intake_lock,intake_scan,intake_utils,intake_vault,json_datacite,json_landing_page,jsonutil,log,mail,meta,meta_form,msi,notifications,schema,schema_transformation,schema_transformations,settings,pathutil,provenance,policies_intake,policies_datamanager,policies_datapackage_status,policies_folder_status,policies_datarequest_status,publication,query,replication,revisions,rule,user,vault,sram +application-import-names=avu_json,conftest,util,api,config,constants,data_access_token,datacite,datarequest,data_object,epic,error,folder,groups,intake,intake_dataset,intake_lock,intake_scan,intake_utils,intake_vault,json_datacite,json_landing_page,jsonutil,log,mail,meta,meta_form,msi,notifications,schema,schema_transformation,schema_transformations,settings,pathutil,provenance,policies_intake,policies_datamanager,policies_datapackage_status,policies_folder_status,policies_datarequest_status,publication,query,replication,revisions,rule,user,vault,sram,arb_data_manager,cached_data_manger,resource diff --git a/tools/arb-update-resources.py b/tools/arb-update-resources.py new file mode 100644 index 000000000..5e4d9b192 --- /dev/null +++ b/tools/arb-update-resources.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python3 + +""" This script collects and submits data needed for Automatic Resource Balancing (ARB): the + process of ensuring that new data objects get created on resources that still have space available. + + It does the following things: + - Gather free space and total space for all local unixfilesystem resources + - Pass this data to the ARB update rule, so that it can be taken into account by ARB. + - If the script is run on the provider, it also invokes the rule that initializes ARB data + for resources that are not relevant to ARB. This makes ARB ignore these resources. +""" + +import argparse +import json +import os +import psutil +import socket +import ssl + +from io import StringIO +from collections import OrderedDict + + +from irods.column import In +from irods.models import Resource +from irods.password_obfuscation import decode as password_decode +from irods.rule import Rule +from irods.session import iRODSSession + + +def get_hostname(): + return socket.getfqdn() + + +def get_volume_total(path): + return psutil.disk_usage(path).total + + +def get_volume_free(path): + return psutil.disk_usage(path).free + + +def parse_args(): + '''Parse command line arguments''' + + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawTextHelpFormatter) + parser.add_argument("-v", "--verbose", action="store_true", default=False, + help="Show verbose information about what the script is doing.") + + return parser.parse_args() + + +def get_irods_environment( + irods_environment_file="/var/lib/irods/.irods/irods_environment.json"): + """Reads the irods_environment.json file, which contains the environment + configuration. + + :param str irods_environment_file filename of the iRODS environment file.; + :return Data structure containing the configuration""" + with open(irods_environment_file, 'r') as f: + return json.load(f) + + +def setup_session(irods_environment_config, + ca_file="/etc/pki/tls/certs/chain.crt"): + """Use irods environment files to configure a iRODSSession""" + + irodsA = os.path.expanduser("~/.irods/.irodsA") + with open(irodsA, "r") as r: + scrambled_password = r.read() + password = password_decode(scrambled_password) + + ssl_context = ssl.create_default_context( + purpose=ssl.Purpose.SERVER_AUTH, + cafile=ca_file, + capath=None, + cadata=None) + ssl_settings = {'client_server_negotiation': 'request_server_negotiation', + 'client_server_policy': 'CS_NEG_REQUIRE', + 'encryption_algorithm': 'AES-256-CBC', + 'encryption_key_size': 32, + 'encryption_num_hash_rounds': 16, + 'encryption_salt_size': 8, + 'ssl_context': ssl_context} + settings = dict() + settings.update(irods_environment_config) + settings.update(ssl_settings) + settings["password"] = password + session = iRODSSession(**settings) + return session + + +def get_local_ufs_resources(session): + results = session.query(Resource.name).filter( + In(Resource.type, ["unixfilesystem", "unix file system"])).filter( + Resource.location == get_hostname()).all() + return sorted(list(map(lambda g: g[Resource.name], results))) + + +def process_ufs_resources(session, resource_names, verbose_mode): + for resource_name in resource_names: + if verbose_mode: + print("Processing resource {} ...".format(resource_name)) + + resource = session.resources.get(resource_name) + + free_space = get_volume_free(resource.vault_path) + total_space = get_volume_total(resource.vault_path) + + if verbose_mode: + print("Setting free / total space of resource {} to {} / {}.".format(resource_name, free_space, total_space)) + call_rule_update_resc(session, resource_name, free_space, total_space) + + +def call_rule(session, rulename, params, number_outputs, + rule_engine='irods_rule_engine_plugin-irods_rule_language-instance'): + """Run a rule + + :param rulename: name of the rule + :param params: dictionary of rule input parameters and their values + :param number_output: number of output parameters + :param rule_engine: rule engine to run rule on (defaults to legacy rule engine if none provided) + """ + body = 'myRule {{\n {}('.format(rulename) + + for input_var in params.keys(): + body += "'*{}',".format(input_var) + + if len(params) > 0: + # Remove trailing comma from input argument list + body = body[:-1] + + body += '); writeLine("stdout","");}' + + input_params = {"*{}".format(k): '"{}"'.format(v) for (k, v) in params.items()} + output_params = 'ruleExecOut' + + re_config = {'instance_name': rule_engine} + + myrule = Rule( + session, + rule_file=StringIO(body), + params=input_params, + output=output_params, + **re_config) + + outArray = myrule.execute() + buf = outArray.MsParam_PI[0].inOutStruct.stdoutBuf.buf.decode( + 'utf-8').splitlines() + + return buf[:number_outputs] + + +def call_rule_update_resc(session, resource, bytes_free, bytes_total): + """ Calls rule to update data for a specific resource (and its parent resource) + """ + parms = OrderedDict([ + ('resource', resource), + ('bytes_free', bytes_free), + ('bytes_total', bytes_total)]) + [out] = call_rule(session, 'rule_resource_update_resc_arb_data', parms, 1) + + +def call_rule_update_misc(session): + """Calls rule to update resources to be ignored by ARB + + """ + parms = OrderedDict([]) + [out] = call_rule(session, 'rule_resource_update_misc_arb_data', parms, 1) + + +def is_on_provider(): + with open('/etc/irods/server_config.json', 'r') as f: + config = json.load(f) + return config["icat_host"] == get_hostname() + + +def main(): + args = parse_args() + env = get_irods_environment() + print("Env: " + str(env)) + session = setup_session(env) + local_ufs_resources = get_local_ufs_resources(session) + process_ufs_resources(session, + local_ufs_resources, + args.verbose) + + if is_on_provider(): + if args.verbose: + print("Updating misc resources ...") + call_rule_update_misc(session) + + +if __name__ == '__main__': + main() diff --git a/util/__init__.py b/util/__init__.py index 17530501a..f6f9a1b77 100644 --- a/util/__init__.py +++ b/util/__init__.py @@ -28,6 +28,9 @@ import avu import misc import config +import resource +import arb_data_manager +import cached_data_manager # Config items can be accessed directly as 'config.foo' by any module # that imports * from util. diff --git a/util/arb_data_manager.py b/util/arb_data_manager.py new file mode 100644 index 000000000..3eec49df6 --- /dev/null +++ b/util/arb_data_manager.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +"""This file contain functions that implement cached data storage for automatic resource + balancing, which takes care of ensuring that new data objects are put on resources that + have enough space available. +""" + +__copyright__ = 'Copyright (c) 2019-2023, Utrecht University' +__license__ = 'GPLv3, see LICENSE' + +import genquery + +import cached_data_manager +import log +import msi + + +class ARBDataManager(cached_data_manager.CachedDataManager): + AVU_NAME = "yoda::arb" + + def _get_context_string(self): + """ :returns: a string that identifies the particular type of data manager + + :returns: context string for this type of data manager + """ + return "arb" + + def _get_original_data(self, ctx, keyname): + """This function is called when data needs to be retrieved from the original + (non-cached) location. + + :param ctx: Combined type of a callback and rei struct + :param keyname: name of the key + """ + arb_data = list(genquery.row_iterator( + "META_RESC_ATTR_VALUE", + "META_RESC_ATTR_NAME = '{}' AND RESC_NAME = '{}'".format(self.AVU_NAME, keyname), + genquery.AS_LIST, ctx)) + + if len(arb_data) == 0: + # If we don't have an ARB value, ARB should ignore this resource + return "IGNORE" + elif len(arb_data) == 1: + return arb_data[0][0] + else: + log.write(ctx, "WARNING: multiple ARB AVUs present for resource '{}'. ARB will ignore it.") + return "IGNORE" + + def _put_original_data(self, ctx, keyname, data): + """This function is called when data needs to be updated in the original + (non-cached) location. + + :param ctx: Combined type of a callback and rei struct + :param keyname: name of the key + :param data: Data for this key + """ + msi.mod_avu_metadata(ctx, "-r", keyname, "set", self.AVU_NAME, data, "") + + def _should_populate_cache_on_get(self): + """This function controls whether the manager populates the cache + after retrieving original data. + + :returns: Boolean value that states whether the cache should be populated when original data + is retrieved. + """ + return True diff --git a/util/cached_data_manager.py b/util/cached_data_manager.py new file mode 100644 index 000000000..57614855f --- /dev/null +++ b/util/cached_data_manager.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- + +__copyright__ = 'Copyright (c) 2019-2023, Utrecht University' +__license__ = 'GPLv3, see LICENSE' + +import redis + + +class CachedDataManager: + """This class contains a framework that subclasses can use + to create a manager for cache data. The basic idea is that + the subclass defines functions to access some data (e.g. in AVUs + on particular iRODS objects. The responses are then cached. + """ + + # Internal methods to implement by subclass + def _get_context_string(self): + """This function should be implemented by subclasses. + + :raises Exception: if function has not been implemented in subclass. + """ + raise Exception("Context string not provided by CacheDataManager.") + + def _get_original_data(self, ctx, keyname): + """This function is called when data needs to be retrieved from the original + (non-cached) location. + + :param ctx: Combined type of a callback and rei struct + :param keyname: name of the key + + :raises Exception: if function has not been implemented in subclass. + """ + raise Exception("Get original data not implemented by CacheDataManager,") + + def _put_original_data(self, ctx, keyname, data): + """This function is called when data needs to be updated in the original + (non-cached) location. + + :param ctx: Combined type of a callback and rei struct + :param keyname: name of the key + :param data: Data to store for this key + + :raises Exception: if function has not been implemented in subclass. + """ + raise Exception("Put original data not implemented by CacheDataManager.") + + # Internal methods that have a default implementation. Can optionally + # be re-implemented by subclass. + + def __init__(self, *args, **kwargs): + self._connection = redis.Redis(host="localhost") + + def _get_connection(self): + return self._connection + + def _get_cache_keyname(self, keyname): + return self._get_context_string() + "::" + keyname + + def get(self, ctx, keyname): + """Retrieves data from the cache if possible, otherwise retrieves + the original. + + :param ctx: Combined type of a callback and rei struct + :param keyname: name of the key + + :returns: Data for this key + """ + connection = self._get_connection() + cache_keyname = self._get_cache_keyname(keyname) + + if connection.ping(): + cached_result = self._get_connection().get(cache_keyname) + else: + cached_result = None + + if cached_result is None: + original_result = self._get_original_data(keyname) + if self._should_populate_cache_on_get(): + self._update_cache(ctx, keyname, original_result) + return original_result + else: + return cached_result + + def put(self, ctx, keyname, data): + """Update both the original value and cached value (if cache is not available, it is not updated) + + :param ctx: Combined type of a callback and rei struct + :param keyname: name of the key + :param data: data for this key + """ + self._put_original_data(ctx, keyname, data) + if self._get_connection().ping(): + self._update_cache(ctx, self._get_cache_keyname(keyname), data) + + def _update_cache(self, ctx, keyname, data): + """Update a value in the cache + + :param ctx: Combined type of a callback and rei struct + :param keyname: name of the key + :param data: data for this key + """ + cache_keyname = self._get_cache_keyname(keyname) + self._get_connection().set(cache_keyname, data) + + def clear(self, ctx, keyname): + """Clears cached data for a key if present. + + :param ctx: Combined type of a callback and rei struct + :param keyname: name of the key + """ + cache_keyname = self._get_cache_keyname(keyname) + self._get_connection().delete(cache_keyname) + + def _should_populate_cache_on_get(self): + """This function controls whether the manager populates the cache + after retrieving original data. + + :returns: Boolean value that determines whether the cache should be populated + when original value are retrieved. + """ + return False diff --git a/util/config.py b/util/config.py index b009ef82f..9a3ad5c81 100644 --- a/util/config.py +++ b/util/config.py @@ -131,7 +131,11 @@ def __repr__(self): sram_api_key=None, sram_flow=None, sram_verbose_logging=False, - sram_tls_verify=True) + sram_tls_verify=True, + arb_enabled=False, + arb_exempt_resources=[], + arb_min_gb_free=0, + arb_min_percent_free=5) # }}} diff --git a/util/resource.py b/util/resource.py new file mode 100644 index 000000000..f54faca5a --- /dev/null +++ b/util/resource.py @@ -0,0 +1,142 @@ +# -*- coding: utf-8 -*- +"""Utility / convenience functions for dealing with resources.""" + +__copyright__ = 'Copyright (c) 2019-2023, Utrecht University' +__license__ = 'GPLv3, see LICENSE' + +import genquery + + +def exists(ctx, name): + """Check if a resource with a given name exists.""" + return len(list(genquery.row_iterator( + "RESC_ID", "RESC_NAME = '{}'".format(name), + genquery.AS_LIST, ctx))) > 0 + + +def id_from_name(ctx, resc_name): + """Get resource ID from resource name. + + :param ctx: Combined type of a callback and rei struct + :param resc_name: Name of resource + + :returns: Resource ID + """ + return genquery.Query(ctx, ["RESC_ID"], "RESC_NAME = '{}'".format(resc_name)).first() + + +def name_from_id(ctx, resc_id): + """Get resource name from resource ID. + + :param ctx: Combined type of a callback and rei struct + :param resc_id: Resource ID + + :returns: Resource name + """ + return genquery.Query(ctx, ["RESC_NAME"], "RESC_ID = '{}'".format(resc_id)).first() + + +def get_parent_by_id(ctx, resc_id): + """Get resource parent ID from resource ID + + :param ctx: Combined type of a callback and rei struct + :param resc_id: Resource ID + + :returns: Parent resource ID (or None if it has no parent) + """ + result = genquery.Query(ctx, ["RESC_PARENT"], "RESC_ID = '{}'".format(resc_id)).first() + return None if result == "" else result + + +def get_parent_by_name(ctx, resc_name): + """Get resource parent name from resource name + + :param ctx: Combined type of a callback and rei struct + :param resc_name: Resource name + + :returns: Parent resource name (or None if it has no parent) + """ + resource_id = id_from_name(ctx, resc_name) + parent_resource_id = get_parent_by_id(ctx, resource_id) + return None if parent_resource_id is None else name_from_id(ctx, parent_resource_id) + + +def get_children_by_id(ctx, resc_id): + """Get resource children IDs from resource ID + + :param ctx: Combined type of a callback and rei struct + :param resc_id: Resource ID + + :returns: list of child resource IDs + """ + result = genquery.Query(ctx, ["RESC_PARENT"], "RESC_ID = '{}'".format(resc_id)).first() + + result = list(genquery.row_iterator( + "RESC_ID", + "RESC_PARENT = '{}'".format(resc_id), + genquery.AS_LIST, ctx)) + return [r[0] for r in result] + + +def get_children_by_name(ctx, resc_name): + """Get resource children names from resource name + + :param ctx: Combined type of a callback and rei struct + :param resc_name: Resource name + + :returns: Parent resource name (or None if it has no parent) + """ + resource_id = id_from_name(ctx, resc_name) + child_resource_ids = get_children_by_id(ctx, resource_id) + return [name_from_id(ctx, child_id) for child_id in child_resource_ids] + + +def get_type_by_id(ctx, resc_id): + """Get resource type from resource ID + + :param ctx: Combined type of a callback and rei struct + :param resc_id: Resource ID + + :returns: Resource type (e.g. "passhru") + """ + return genquery.Query(ctx, ["RESC_TYPE_NAME"], "RESC_ID = '{}'".format(resc_id)).first() + + +def get_type_by_name(ctx, resc_name): + """Get resource type from resource name + + :param ctx: Combined type of a callback and rei struct + :param resc_name: Resource name + + :returns: Resource type (e.g. "passthru") + """ + return genquery.Query(ctx, ["RESC_TYPE_NAME"], "RESC_NAME = '{}'".format(resc_name)).first() + + +def get_resource_names_by_type(ctx, resc_type): + """Get resource names by type + + :param ctx: Combined type of a callback and rei struct + :param resc_type: Resource type (e.g. "passthru" or "unixfilesystem") + + :returns: List of matching resource names + """ + result = list(genquery.row_iterator( + "RESC_NAME", + "RESC_TYPE_NAME = '{}'".format(resc_type), + genquery.AS_LIST, ctx)) + return [r[0] for r in result] + + +def get_all_resource_names(ctx): + """Get a list of all resource names + + :param ctx: Combined type of a callback and rei struct + + :returns: list of all resource names + """ + result = list(genquery.row_iterator( + "RESC_NAME", + "", + genquery.AS_LIST, ctx)) + return [r[0] for r in result]