Skip to content

Commit

Permalink
Move Rucio check write logic to ServerUtilities.py (#7987)
Browse files Browse the repository at this point in the history
  • Loading branch information
novicecpp authored Nov 3, 2023
1 parent 2c72941 commit 47875c8
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 5 deletions.
78 changes: 76 additions & 2 deletions src/python/ServerUtilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
IN BOTH PYTHON2 AND PYTHON3 WITHOUT CALLS TO FUTURIZE OR OTHER UTILITIES NOT
PRESENT IN EARLIER CMSSW RELEASES
"""
# this is a weird and unclear message from pylint, better to ignore it
# pylint: disable=W0715
# W0715: this is a weird and unclear message from pylint, better to ignore it
# W1514,W0707 for python2 compatibility. Will refactor later in https://github.com/dmwm/CRABServer/issues/7765 task.
# pylint: disable=W0715,W1514,W0707

from __future__ import print_function
from __future__ import division
Expand Down Expand Up @@ -130,6 +131,9 @@
5: "NOT_REQUIRED"}
USER_ALLOWED_PUBLICATIONDB_STATUSES = dict((v, k) for k, v in PUBLICATIONDB_STATES.items())

RUCIO_QUOTA_WARNING_GB = 10 # when available Rucio quota is less than this, warn users
RUCIO_QUOTA_MINIMUM_GB = 1 # when available Rucio quota is less thatn this, refuse submi

def USER_SANDBOX_EXCLUSIONS(tarmembers):
""" The function is used by both the client and the crabcache to get a list of files to exclude during the
calculation of the checksum of the user input sandbox.
Expand Down Expand Up @@ -984,3 +988,73 @@ def isDatasetUserDataset(inputDataset, dbsInstance):
"""
return (dbsInstance.split('/')[1] != 'global') and \
(inputDataset.split('/')[-1] == 'USER')

def isEnoughRucioQuota(rucioClient, site, account=''):
"""
Check quota with Rucio server.
Return dict of result to construct message on caller, where
- `hasQuota`: `True` if user has quota on that site
- `isEnough`: `True` if user has remain quota more than `RUCIO_QUOTA_MINIMUM_GB`
- `isQuotaWarning`: `True` if user has remain quota more than
`RUCIO_QUOTA_MINIMUM_GB`, but less than `RUCIO_QUOTA_WARNING_GB`
- `total`: (float) total quota in GiB
- `used`: (float) used quota in GiB
- `free`: (float) remain quota in GiB
:param rucioClient: Rucio's client object
:type rucioClient: rucio.client.client.Client
:param site: RSE name, e.g. T2_CH_CERN
:type site: str
:param account: (optional) Rucio account name (for server)
:type account: str
:return: dict of result of quota checking (see details above)
:rtype: dict
"""
ret = {
'hasQuota': False,
'isEnough': False,
'isQuotaWarning': False,
'total': 0,
'used': 0,
'free':0,
}
if not account:
account = rucioClient.account
quotas = list(rucioClient.get_local_account_usage(account, site))
if quotas:
ret['hasQuota'] = True
quota = quotas[0]
ret['total'] = quota['bytes_limit'] / 2**(10*3) # GiB
ret['used'] = quota['bytes'] / 2**(10*3) # GiB
ret['free'] = quota['bytes_remaining'] / 2**(10*3) # GiB
if ret['free'] > RUCIO_QUOTA_MINIMUM_GB:
ret['isEnough'] = True
if ret['free'] <= RUCIO_QUOTA_WARNING_GB:
ret['isQuotaWarning'] = True
return ret

def getRucioAccountFromLFN(lfn):
"""
Extract Rucio account from LFN.
For Rucio's group account, account always has `_group` suffix, but path and
scope do not contains `_group` suffix.
>>> getRucioAccountFromLFN(lfn='/store/user/rucio/cmsbot/path/to/dir')
'cmsbot'
>>> getRucioAccountFromLFN(lfn='/store/group/rucio/crab_test/path/to/dir')
'crab_test_group'
:param lfn: LFN
:type lfn: str
:return: Rucio's account
:rtype: str
"""
if lfn.startswith('/store/user/rucio') or lfn.startswith('/store/group/rucio'):
account = lfn.split('/')[4]
if lfn.startswith('/store/group/rucio/'):
return f'{account}_group'
return account
raise Exception(f'Expected /store/{{user,group}}/rucio/<account>, got {lfn}')
18 changes: 15 additions & 3 deletions src/python/TaskWorker/Actions/StageoutCheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from TaskWorker.WorkerExceptions import TaskWorkerException
from ServerUtilities import isFailurePermanent
from ServerUtilities import getCheckWriteCommand, createDummyFile
from ServerUtilities import removeDummyFile, execute_command
from ServerUtilities import removeDummyFile, execute_command, isEnoughRucioQuota, getRucioAccountFromLFN
from RucioUtils import getWritePFN

class StageoutCheck(TaskAction):
Expand All @@ -19,6 +19,7 @@ class StageoutCheck(TaskAction):
def __init__(self, config, crabserver, procnum=-1, rucioClient=None):
TaskAction.__init__(self, config, crabserver, procnum)
self.rucioClient = rucioClient
self.privilegedRucioClient = None
self.task = None
self.proxy = None
self.workflow = None
Expand Down Expand Up @@ -82,8 +83,19 @@ def execute(self, *args, **kw):
# by Rucio robot without using user credentials
if self.task['tm_output_lfn'].startswith('/store/user/rucio') or \
self.task['tm_output_lfn'].startswith('/store/group/rucio'):
# to be filled with actual quota check, for the time being.. just go
return
rucioAccount = getRucioAccountFromLFN(self.task['tm_output_lfn'])
self.logger.info(f"Checking Rucio quota from account {rucioAccount}.")
quotaCheck = isEnoughRucioQuota(self.rucioClient, self.task['tm_asyncdest'], rucioAccount)
if not quotaCheck['isEnough']:
msg = f"Not enough Rucio quota at {self.task['tm_asyncdest']}:{self.task['tm_output_lfn']}."\
f" Remain quota: {quotaCheck['free']} GB."
raise TaskWorkerException(msg)
self.logger.info(f" Remain quota: {quotaCheck['free']} GB.")
if quotaCheck['isQuotaWarning']:
msg = 'Rucio Quota is very little and although CRAB will submit, stageout may fail.'
self.logger.warning(msg)
self.uploadWarning(msg, self.task['user_proxy'], self.task['tm_taskname'])

# if not using Rucio, old code:
else:
cpCmd, rmCmd, append = getCheckWriteCommand(self.proxy, self.logger)
Expand Down

0 comments on commit 47875c8

Please sign in to comment.