Skip to content

Commit

Permalink
Merge pull request #226 from HSF/flin
Browse files Browse the repository at this point in the history
htcondor: Add custom submit attributes support; minor update
  • Loading branch information
mightqxc authored May 7, 2024
2 parents 79939f8 + ffe1b54 commit 31cbce7
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "25-04-2024 07:49:43 on flin (by mightqxc)"
timestamp = "07-05-2024 09:25:45 on flin (by mightqxc)"
12 changes: 11 additions & 1 deletion pandaharvester/harvestermonitor/htcondor_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ class HTCondorMonitor(PluginBase):
# constructor
def __init__(self, **kwarg):
PluginBase.__init__(self, **kwarg)
extra_plugin_configs = {}
try:
extra_plugin_configs = harvester_config.master.extraPluginConfigs["HTCondorMonitor"]
except AttributeError:
pass
except KeyError:
pass
try:
self.nProcesses
except AttributeError:
Expand All @@ -228,7 +235,10 @@ def __init__(self, **kwarg):
try:
self.useCondorHistory
except AttributeError:
self.useCondorHistory = True
if extra_plugin_configs.get("use_condor_history") is False:
self.useCondorHistory = False
else:
self.useCondorHistory = True
try:
self.submissionHost_list
except AttributeError:
Expand Down
62 changes: 46 additions & 16 deletions pandaharvester/harvestersubmitter/htcondor_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ def make_a_jdl(
pilot_type_opt = pilot_opt_dict["pilot_type_opt"]
pilot_url_str = pilot_opt_dict["pilot_url_str"]
pilot_debug_str = pilot_opt_dict["pilot_debug_str"]
tmpLog.debug(f"pilot options: {pilot_opt_dict}")
# get token filename according to CE
token_filename = None
if token_dir is not None and ce_info_dict.get("ce_endpoint"):
Expand Down Expand Up @@ -343,13 +344,19 @@ class HTCondorSubmitter(PluginBase):
def __init__(self, **kwarg):
tmpLog = core_utils.make_logger(baseLogger, method_name="__init__")
self.logBaseURL = None
self.templateFile = None
if hasattr(self, "useFQDN") and self.useFQDN:
self.hostname = socket.getfqdn()
else:
self.hostname = socket.gethostname().split(".")[0]
PluginBase.__init__(self, **kwarg)

# extra plugin configs
extra_plugin_configs = {}
try:
extra_plugin_configs = harvester_config.master.extraPluginConfigs["HTCondorSubmitter"]
except AttributeError:
pass
except KeyError:
pass
# number of processes
try:
self.nProcesses
Expand Down Expand Up @@ -437,6 +444,11 @@ def __init__(self, **kwarg):
self.useCRICGridCE = False
finally:
self.useCRIC = self.useCRIC or self.useCRICGridCE
# sdf template
try:
self.templateFile
except AttributeError:
self.templateFile = None
# sdf template directories of CEs; ignored if templateFile is set
try:
self.CEtemplateDir
Expand Down Expand Up @@ -503,24 +515,20 @@ def __init__(self, **kwarg):
self.rcPilotRandomWeightPermille = 0
# submission to ARC CE's with nordugrid (gridftp) or arc (REST) grid type
self.submit_arc_grid_type = "arc"
try:
extra_plugin_configs = harvester_config.master.extraPluginConfigs["HTCondorSubmitter"]
except AttributeError:
pass
except KeyError:
pass
else:
if extra_plugin_configs.get("submit_arc_grid_type") == "nordugrid":
self.submit_arc_grid_type = "nordugrid"
if extra_plugin_configs.get("submit_arc_grid_type") == "nordugrid":
self.submit_arc_grid_type = "nordugrid"
# record of information of CE statistics
self.ceStatsLock = threading.Lock()
self.ceStats = dict()
# allowed associated parameters from CRIC
self._allowed_cric_attrs = (
# allowed associated parameters and paramester prefixes from CRIC
self._allowed_cric_attrs = [
"pilot_url",
"pilot_args",
"unified_dispatch",
)
]
self._allowed_cric_attr_prefixes = [
"jdl.plusattr.",
]

# get CE statistics of a site
def get_ce_statistics(self, site_name, n_new_workers, time_window=21600):
Expand Down Expand Up @@ -577,7 +585,9 @@ def submit_workers(self, workspec_list):
# tmpLog.debug('panda_queues_name and queue_info: {0}, {1}'.format(self.queueName, panda_queues_dict[self.queueName]))
# associated params on CRIC
for key, val in panda_queues_dict.get_harvester_params(self.queueName).items():
if key in self._allowed_cric_attrs:
if not isinstance(key, str):
continue
if key in self._allowed_cric_attrs or any([key.startswith(the_prefix) for the_prefix in self._allowed_cric_attr_prefixes]):
if isinstance(val, str):
# sanitized list the value
val = re.sub(r"[;$~`]*", "", val)
Expand All @@ -594,8 +604,26 @@ def submit_workers(self, workspec_list):
pilot_url = associated_params_dict.get("pilot_url")
pilot_args = associated_params_dict.get("pilot_args", "")
pilot_version = str(this_panda_queue_dict.get("pilot_version", "current"))
python_version = str(this_panda_queue_dict.get("python_version", "2"))
python_version = str(this_panda_queue_dict.get("python_version", "3"))
is_gpu_resource = this_panda_queue_dict.get("resource_type", "") == "gpu"
custom_submit_attr_dict = {}
for k, v in associated_params_dict.items():
# fill custom submit attributes for adding to JDL
try:
the_prefix = "jdl.plusattr."
if k.startswith(the_prefix):
attr_key = k[len(the_prefix) :]
attr_value = str(v)
if not re.fullmatch(r"[a-zA-Z_0-9][a-zA-Z_0-9.\-]*", attr_key):
# skip invalid key
continue
if not re.fullmatch(r"[a-zA-Z_0-9.\-,]+", attr_value):
# skip invalid value
continue
custom_submit_attr_dict[attr_key] = attr_value
except Exception as e:
tmpLog.warning(f'Got {e} with custom submit attributes "{k}: {v}"; skipped')
continue

# get override requirements from queue configured
try:
Expand Down Expand Up @@ -763,6 +791,7 @@ def _choose_credential(workspec):
ce_info_dict["ce_endpoint"] = self.ceEndpoint
except AttributeError:
pass
tmpLog.debug(f"Got pilot version: \"{pilot_version}\"; CE endpoint: \"{ce_info_dict.get('ce_endpoint')}\"")
try:
# Manually define ceQueueName
if self.ceQueueName:
Expand Down Expand Up @@ -872,6 +901,7 @@ def _choose_credential(workspec):
"is_unified_dispatch": is_unified_dispatch,
"prod_rc_permille": self.rcPilotRandomWeightPermille,
"is_gpu_resource": is_gpu_resource,
"custom_submit_attr_dict": custom_submit_attr_dict,
}
)
return data
Expand Down
2 changes: 1 addition & 1 deletion pandaharvester/panda_pkg_info.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.5.2"
release_version = "0.5.3"

0 comments on commit 31cbce7

Please sign in to comment.