Skip to content

Commit

Permalink
YDA-5345: implement ARB
Browse files Browse the repository at this point in the history
Automatic resource balancing ensures that new data objects are
only created on resources that have enough space available.

See /docs/design/processes/automatic-resource-balancing.md in the Yoda
documentation for design information.
  • Loading branch information
stsnel committed Oct 17, 2023
1 parent 9ed3e88 commit f773acb
Show file tree
Hide file tree
Showing 12 changed files with 697 additions and 2 deletions.
15 changes: 15 additions & 0 deletions policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,5 +561,20 @@ def py_acPostProcForObjRename(ctx, src, dst):
if len(info.subpath) and info.group != pathutil.info(src).group:
ctx.uuEnforceGroupAcl(dst)


@rule.make(inputs=[0, 1, 2, 3, 4, 5, 6], outputs=[2])
def pep_resource_resolve_hierarchy_pre(ctx, resource, _ctx, out, operation, host, parser, vote):
if not config.arb_enabled or operation != "CREATE":
return

arb_data = arb_data_manager.ARBDataManager()
arb_status = arb_data.get(ctx, resource)

if arb_status == constants.arb_status.FULL:
return "read=1.0;write=0.0"
else:
return "read=1.0;write=1.0"


# }}}
# }}}
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ execnet==1.9.0
deepdiff==3.3.0
persist-queue==0.8.1
pyblake2==1.1.2
redis==3.5.3
74 changes: 74 additions & 0 deletions resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
'rule_resource_store_storage_statistics',
'rule_resource_transform_old_storage_data',
'rule_resource_research',
'rule_resource_update_resc_arb_data',
'rule_resource_update_misc_arb_data',
'rule_resource_vault']


Expand Down Expand Up @@ -641,6 +643,78 @@ def rule_resource_store_storage_statistics(ctx):
return 'ok'


@rule.make(inputs=[0, 1, 2], outputs=[])
def rule_resource_update_resc_arb_data(ctx, resc_name, bytes_free, bytes_total):
"""
Update ARB data for a specific resource
:param ctx: Combined type of a callback and rei struct
:param resc_name: Name of a particular unixfilesystem resource
:param bytes_free: Free size on this resource, in bytes
:param bytes_total: Total size of this resource, in bytes
"""
if user.user_type(ctx) != 'rodsadmin':
log.write(ctx, "Error: insufficient permissions to run ARB data update rule.")
return

if not resource.exists(ctx, resc_name):
log.write(ctx, "Error: could not find resource named '{}' for ARB update.".format(resc_name))
return

bytes_free_gb = int(bytes_free) / 2 ** 30
bytes_free_percent = 100 * (float(bytes_free) / float(bytes_total))

if resc_name in config.arb_exempt_resources:
arb_status = constants.arb_status.EXEMPT
elif bytes_free_gb >= config.arb_min_gb_free and bytes_free_percent > config.arb_min_percent_free:
arb_status = constants.arb_status.AVAILABLE
else:
arb_status = constants.arb_status.FULL

parent_resc_name = resource.get_parent_by_name(ctx, resc_name)

manager = arb_data_manager.ARBDataManager()
manager.put(ctx, resc_name, constants.arb_status.IGNORE)

if parent_resc_name is not None and resource.get_type_by_name(ctx, parent_resc_name) == "passthru":
manager.put(ctx, parent_resc_name, arb_status)


@rule.make()
def rule_resource_update_misc_arb_data(ctx):
"""Update ARB data for resources that are not covered by the regular process. That is,
all resources that are neither unixfilesystem nor passthrough resources, as well as
passthrough resources that do not have a unixfilesystem child resource.
:param ctx: Combined type of a callback and rei struct
"""
if user.user_type(ctx) != 'rodsadmin':
log.write(ctx, "Error: insufficient permissions to run ARB data update rule.")
return

manager = arb_data_manager.ARBDataManager()

all_resources = resource.get_all_resource_names(ctx)
ufs_resources = set(resource.get_resource_names_by_type(ctx, "unixfilesystem")
+ resource.get_resource_names_by_type(ctx, "unix file system"))
pt_resources = set(resource.get_resource_names_by_type(ctx, "passthru"))

for resc in all_resources:
if resc in ufs_resources:
pass
elif resc not in pt_resources:
manager.put(ctx, resc, constants.arb_status.IGNORE)
else:
child_resources = resource.get_children_by_name(ctx, resc)
child_found = False
for child_resource in child_resources:
if child_resource in ufs_resources:
child_found = True
# Ignore the passthrough resource if it does not have a UFS child resource
if not child_found:
manager.put(ctx, resc, constants.arb_status.IGNORE)


def get_categories(ctx):
"""Get all categories for current user.
Expand Down
6 changes: 6 additions & 0 deletions rules_uu.cfg.template
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,9 @@ sram_rest_api_url =
sram_api_key =
sram_verbose_logging =
sram_tls_verify =


arb_enabled =
arb_exempt_resources =
arb_min_gb_free =
arb_min_percent_free =
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ strictness=short
docstring_style=sphinx
max-line-length=127
exclude=__init__.py,tools,tests/env/
application-import-names=avu_json,conftest,util,api,config,constants,data_access_token,datacite,datarequest,data_object,epic,error,folder,groups,intake,intake_dataset,intake_lock,intake_scan,intake_utils,intake_vault,json_datacite,json_landing_page,jsonutil,log,mail,meta,meta_form,msi,notifications,schema,schema_transformation,schema_transformations,settings,pathutil,provenance,policies_intake,policies_datamanager,policies_datapackage_status,policies_folder_status,policies_datarequest_status,publication,query,replication,revisions,rule,user,vault,sram
application-import-names=avu_json,conftest,util,api,config,constants,data_access_token,datacite,datarequest,data_object,epic,error,folder,groups,intake,intake_dataset,intake_lock,intake_scan,intake_utils,intake_vault,json_datacite,json_landing_page,jsonutil,log,mail,meta,meta_form,msi,notifications,schema,schema_transformation,schema_transformations,settings,pathutil,provenance,policies_intake,policies_datamanager,policies_datapackage_status,policies_folder_status,policies_datarequest_status,publication,query,replication,revisions,rule,user,vault,sram,arb_data_manager,cached_data_manager,resource
216 changes: 216 additions & 0 deletions tools/arb-update-resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
#!/usr/bin/env python3

""" This script collects and submits data needed for Automatic Resource Balancing (ARB): the
process of ensuring that new data objects get created on resources that still have space available.
It does the following things:
- Gather free space and total space for all local unixfilesystem resources
- Pass this data to the ARB update rule, so that it can be taken into account by ARB.
- If the script is run on the provider, it also invokes the rule that initializes ARB data
for resources that are not relevant to ARB. This makes ARB ignore these resources.
"""

import argparse
import json
import os
import psutil
import socket
import ssl

from io import StringIO
from collections import OrderedDict


from irods.column import In
from irods.models import Resource
from irods.password_obfuscation import decode as password_decode
from irods.rule import Rule
from irods.session import iRODSSession


def get_hostname():
return socket.getfqdn()


def get_volume_total(path):
return psutil.disk_usage(path).total


def get_volume_free(path):
return psutil.disk_usage(path).free


def parse_args():
'''Parse command line arguments'''

parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument("-v", "--verbose", action="store_true", default=False,
help="Show verbose information about what the script is doing.")
parser.add_argument("--override-free", type=str, default="",
help="Comma-separated list of free space overrides, e.g. 'resc1:1024,resc2:2048'")
parser.add_argument("--override-total", type=str, default="",
help="Comma-separated list of total space overrides, e.g. 'resc1:1024,resc2:2048'")
return parser.parse_args()


def parse_cs_values(input):
"""Parses a comma-separated list of key:value pairs as a dict."""
result = dict()
for kv_pair in input.split(","):
if kv_pair == "":
continue
elif ":" not in kv_pair:
raise Exception("Could not parse KV pair: " + kv_pair)
else:
result[kv_pair.split(":")[0]] = kv_pair.split(":")[1]
return result


def get_irods_environment(
irods_environment_file="/var/lib/irods/.irods/irods_environment.json"):
"""Reads the irods_environment.json file, which contains the environment
configuration.
:param str irods_environment_file filename of the iRODS environment file.;
:return Data structure containing the configuration"""
with open(irods_environment_file, 'r') as f:
return json.load(f)


def setup_session(irods_environment_config,
ca_file="/etc/pki/tls/certs/chain.crt"):
"""Use irods environment files to configure a iRODSSession"""

irodsA = os.path.expanduser("~/.irods/.irodsA")
with open(irodsA, "r") as r:
scrambled_password = r.read()
password = password_decode(scrambled_password)

ssl_context = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH,
cafile=ca_file,
capath=None,
cadata=None)
ssl_settings = {'client_server_negotiation': 'request_server_negotiation',
'client_server_policy': 'CS_NEG_REQUIRE',
'encryption_algorithm': 'AES-256-CBC',
'encryption_key_size': 32,
'encryption_num_hash_rounds': 16,
'encryption_salt_size': 8,
'ssl_context': ssl_context}
settings = dict()
settings.update(irods_environment_config)
settings.update(ssl_settings)
settings["password"] = password
session = iRODSSession(**settings)
return session


def get_local_ufs_resources(session):
results = session.query(Resource.name).filter(
In(Resource.type, ["unixfilesystem", "unix file system"])).filter(
Resource.location == get_hostname()).all()
return sorted(list(map(lambda g: g[Resource.name], results)))


def process_ufs_resources(session, resource_names, override_free_dict, override_total_dict, verbose_mode):
for resource_name in resource_names:
if verbose_mode:
print("Processing resource {} ...".format(resource_name))

resource = session.resources.get(resource_name)

free_space = override_free_dict.get(resource_name, get_volume_free(resource.vault_path))
total_space = override_total_dict.get(resource_name, get_volume_total(resource.vault_path))

if verbose_mode:
print("Setting free / total space of resource {} to {} / {}.".format(resource_name, free_space, total_space))
call_rule_update_resc(session, resource_name, free_space, total_space)


def call_rule(session, rulename, params, number_outputs,
rule_engine='irods_rule_engine_plugin-irods_rule_language-instance'):
"""Run a rule
:param rulename: name of the rule
:param params: dictionary of rule input parameters and their values
:param number_output: number of output parameters
:param rule_engine: rule engine to run rule on (defaults to legacy rule engine if none provided)
"""
body = 'myRule {{\n {}('.format(rulename)

for input_var in params.keys():
body += "'*{}',".format(input_var)

if len(params) > 0:
# Remove trailing comma from input argument list
body = body[:-1]

body += '); writeLine("stdout","");}'

input_params = {"*{}".format(k): '"{}"'.format(v) for (k, v) in params.items()}
output_params = 'ruleExecOut'

re_config = {'instance_name': rule_engine}

myrule = Rule(
session,
rule_file=StringIO(body),
params=input_params,
output=output_params,
**re_config)

outArray = myrule.execute()
buf = outArray.MsParam_PI[0].inOutStruct.stdoutBuf.buf.decode(
'utf-8').splitlines()

return buf[:number_outputs]


def call_rule_update_resc(session, resource, bytes_free, bytes_total):
""" Calls rule to update data for a specific resource (and its parent resource)
"""
parms = OrderedDict([
('resource', resource),
('bytes_free', bytes_free),
('bytes_total', bytes_total)])
[out] = call_rule(session, 'rule_resource_update_resc_arb_data', parms, 1)


def call_rule_update_misc(session):
"""Calls rule to update resources to be ignored by ARB
"""
parms = OrderedDict([])
[out] = call_rule(session, 'rule_resource_update_misc_arb_data', parms, 1)


def is_on_provider():
with open('/etc/irods/server_config.json', 'r') as f:
config = json.load(f)
return config["icat_host"] == get_hostname()


def main():
args = parse_args()
env = get_irods_environment()
session = setup_session(env)
override_free_dict = parse_cs_values(args.override_free)
override_total_dict = parse_cs_values(args.override_total)
local_ufs_resources = get_local_ufs_resources(session)
process_ufs_resources(session,
local_ufs_resources,
override_free_dict,
override_total_dict,
args.verbose)

if is_on_provider():
if args.verbose:
print("Updating misc resources ...")
call_rule_update_misc(session)


if __name__ == '__main__':
main()
3 changes: 3 additions & 0 deletions util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import avu
import misc
import config
import resource
import arb_data_manager
import cached_data_manager

# Config items can be accessed directly as 'config.foo' by any module
# that imports * from util.
Expand Down
Loading

0 comments on commit f773acb

Please sign in to comment.