From 47875c8cef98619ca4f4e536bb57dadc29c342a3 Mon Sep 17 00:00:00 2001 From: Thanayut Seethongchuen Date: Fri, 3 Nov 2023 16:08:26 +0100 Subject: [PATCH] Move Rucio check write logic to ServerUtilities.py (#7987) --- src/python/ServerUtilities.py | 78 ++++++++++++++++++- .../TaskWorker/Actions/StageoutCheck.py | 18 ++++- 2 files changed, 91 insertions(+), 5 deletions(-) diff --git a/src/python/ServerUtilities.py b/src/python/ServerUtilities.py index 60b20eeb17..a4ff09ff95 100644 --- a/src/python/ServerUtilities.py +++ b/src/python/ServerUtilities.py @@ -4,8 +4,9 @@ IN BOTH PYTHON2 AND PYTHON3 WITHOUT CALLS TO FUTURIZE OR OTHER UTILITIES NOT PRESENT IN EARLIER CMSSW RELEASES """ -# this is a weird and unclear message from pylint, better to ignore it -# pylint: disable=W0715 +# W0715: this is a weird and unclear message from pylint, better to ignore it +# W1514,W0707 for python2 compatibility. Will refactor later in https://github.com/dmwm/CRABServer/issues/7765 task. +# pylint: disable=W0715,W1514,W0707 from __future__ import print_function from __future__ import division @@ -130,6 +131,9 @@ 5: "NOT_REQUIRED"} USER_ALLOWED_PUBLICATIONDB_STATUSES = dict((v, k) for k, v in PUBLICATIONDB_STATES.items()) +RUCIO_QUOTA_WARNING_GB = 10 # when available Rucio quota is less than this, warn users +RUCIO_QUOTA_MINIMUM_GB = 1 # when available Rucio quota is less thatn this, refuse submi + def USER_SANDBOX_EXCLUSIONS(tarmembers): """ The function is used by both the client and the crabcache to get a list of files to exclude during the calculation of the checksum of the user input sandbox. @@ -984,3 +988,73 @@ def isDatasetUserDataset(inputDataset, dbsInstance): """ return (dbsInstance.split('/')[1] != 'global') and \ (inputDataset.split('/')[-1] == 'USER') + +def isEnoughRucioQuota(rucioClient, site, account=''): + """ + Check quota with Rucio server. + + Return dict of result to construct message on caller, where + - `hasQuota`: `True` if user has quota on that site + - `isEnough`: `True` if user has remain quota more than `RUCIO_QUOTA_MINIMUM_GB` + - `isQuotaWarning`: `True` if user has remain quota more than + `RUCIO_QUOTA_MINIMUM_GB`, but less than `RUCIO_QUOTA_WARNING_GB` + - `total`: (float) total quota in GiB + - `used`: (float) used quota in GiB + - `free`: (float) remain quota in GiB + + :param rucioClient: Rucio's client object + :type rucioClient: rucio.client.client.Client + :param site: RSE name, e.g. T2_CH_CERN + :type site: str + :param account: (optional) Rucio account name (for server) + :type account: str + + :return: dict of result of quota checking (see details above) + :rtype: dict + """ + ret = { + 'hasQuota': False, + 'isEnough': False, + 'isQuotaWarning': False, + 'total': 0, + 'used': 0, + 'free':0, + } + if not account: + account = rucioClient.account + quotas = list(rucioClient.get_local_account_usage(account, site)) + if quotas: + ret['hasQuota'] = True + quota = quotas[0] + ret['total'] = quota['bytes_limit'] / 2**(10*3) # GiB + ret['used'] = quota['bytes'] / 2**(10*3) # GiB + ret['free'] = quota['bytes_remaining'] / 2**(10*3) # GiB + if ret['free'] > RUCIO_QUOTA_MINIMUM_GB: + ret['isEnough'] = True + if ret['free'] <= RUCIO_QUOTA_WARNING_GB: + ret['isQuotaWarning'] = True + return ret + +def getRucioAccountFromLFN(lfn): + """ + Extract Rucio account from LFN. + For Rucio's group account, account always has `_group` suffix, but path and + scope do not contains `_group` suffix. + + >>> getRucioAccountFromLFN(lfn='/store/user/rucio/cmsbot/path/to/dir') + 'cmsbot' + >>> getRucioAccountFromLFN(lfn='/store/group/rucio/crab_test/path/to/dir') + 'crab_test_group' + + :param lfn: LFN + :type lfn: str + + :return: Rucio's account + :rtype: str + """ + if lfn.startswith('/store/user/rucio') or lfn.startswith('/store/group/rucio'): + account = lfn.split('/')[4] + if lfn.startswith('/store/group/rucio/'): + return f'{account}_group' + return account + raise Exception(f'Expected /store/{{user,group}}/rucio/, got {lfn}') diff --git a/src/python/TaskWorker/Actions/StageoutCheck.py b/src/python/TaskWorker/Actions/StageoutCheck.py index 70a8f9c2c6..378fe645c7 100644 --- a/src/python/TaskWorker/Actions/StageoutCheck.py +++ b/src/python/TaskWorker/Actions/StageoutCheck.py @@ -5,7 +5,7 @@ from TaskWorker.WorkerExceptions import TaskWorkerException from ServerUtilities import isFailurePermanent from ServerUtilities import getCheckWriteCommand, createDummyFile -from ServerUtilities import removeDummyFile, execute_command +from ServerUtilities import removeDummyFile, execute_command, isEnoughRucioQuota, getRucioAccountFromLFN from RucioUtils import getWritePFN class StageoutCheck(TaskAction): @@ -19,6 +19,7 @@ class StageoutCheck(TaskAction): def __init__(self, config, crabserver, procnum=-1, rucioClient=None): TaskAction.__init__(self, config, crabserver, procnum) self.rucioClient = rucioClient + self.privilegedRucioClient = None self.task = None self.proxy = None self.workflow = None @@ -82,8 +83,19 @@ def execute(self, *args, **kw): # by Rucio robot without using user credentials if self.task['tm_output_lfn'].startswith('/store/user/rucio') or \ self.task['tm_output_lfn'].startswith('/store/group/rucio'): - # to be filled with actual quota check, for the time being.. just go - return + rucioAccount = getRucioAccountFromLFN(self.task['tm_output_lfn']) + self.logger.info(f"Checking Rucio quota from account {rucioAccount}.") + quotaCheck = isEnoughRucioQuota(self.rucioClient, self.task['tm_asyncdest'], rucioAccount) + if not quotaCheck['isEnough']: + msg = f"Not enough Rucio quota at {self.task['tm_asyncdest']}:{self.task['tm_output_lfn']}."\ + f" Remain quota: {quotaCheck['free']} GB." + raise TaskWorkerException(msg) + self.logger.info(f" Remain quota: {quotaCheck['free']} GB.") + if quotaCheck['isQuotaWarning']: + msg = 'Rucio Quota is very little and although CRAB will submit, stageout may fail.' + self.logger.warning(msg) + self.uploadWarning(msg, self.task['user_proxy'], self.task['tm_taskname']) + # if not using Rucio, old code: else: cpCmd, rmCmd, append = getCheckWriteCommand(self.proxy, self.logger)