From 3986dbd9ce23ea97121fb8986bc853f6ee4aa8a1 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Tue, 23 Apr 2024 16:00:37 +0200 Subject: [PATCH 1/6] shared file messenger: skip symlink if filespec.path is NULL --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvestermessenger/shared_file_messenger.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 1d873af6..bbb036fe 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "02-04-2024 07:28:13 on flin (by mightqxc)" +timestamp = "23-04-2024 14:00:39 on flin (by mightqxc)" diff --git a/pandaharvester/harvestermessenger/shared_file_messenger.py b/pandaharvester/harvestermessenger/shared_file_messenger.py index 65722993..5182fcd2 100644 --- a/pandaharvester/harvestermessenger/shared_file_messenger.py +++ b/pandaharvester/harvestermessenger/shared_file_messenger.py @@ -457,6 +457,8 @@ def feed_jobs(self, workspec, jobspec_list): pfcFile.write(pfc) # make symlink for fileSpec in jobSpec.inFiles: + if fileSpec.path is None: + continue dstPath = os.path.join(accessPoint, fileSpec.lfn) if fileSpec.path != dstPath: # test if symlink exists if so remove it From 3a6dbb435af52ddfe54c725751e2886fa2dc1f32 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Wed, 24 Apr 2024 16:48:17 +0200 Subject: [PATCH 2/6] nullify worker ratio limits in push queues --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvestercore/queue_config_mapper.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index bbb036fe..9453c480 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "23-04-2024 14:00:39 on flin (by mightqxc)" +timestamp = "24-04-2024 14:48:18 on flin (by mightqxc)" diff --git a/pandaharvester/harvestercore/queue_config_mapper.py b/pandaharvester/harvestercore/queue_config_mapper.py index 316dee5d..abb2625c 100644 --- a/pandaharvester/harvestercore/queue_config_mapper.py +++ b/pandaharvester/harvestercore/queue_config_mapper.py @@ -555,10 +555,14 @@ def load_data(self, refill_table=False): queueConfig.getJobCriteria = None else: queueConfig.getJobCriteria = tmpCriteria - # nullify job attributes if NoJob mapType + # nullify all job limit attributes if NoJob mapType (PULL) if queueConfig.mapType == WorkSpec.MT_NoJob: for attName in ["nQueueLimitJob", "nQueueLimitJobRatio", "nQueueLimitJobMax", "nQueueLimitJobMin"]: setattr(queueConfig, attName, None) + # nullify worker ratio limit attributes if jobful mapTypes (PUSH) + if queueConfig.mapType != WorkSpec.MT_NoJob: + for attName in ["nQueueLimitWorkerRatio", "nQueueLimitWorkerMin"]: + setattr(queueConfig, attName, None) # heartbeat suppression if queueConfig.truePilot and queueConfig.noHeartbeat == "": queueConfig.noHeartbeat = "running,transferring,finished,failed" From 73157f33bfd328b43cf438456df37d0ca1d8a8ed Mon Sep 17 00:00:00 2001 From: mightqxc Date: Wed, 24 Apr 2024 16:56:18 +0200 Subject: [PATCH 3/6] fix pretty --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvestercore/queue_config_mapper.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 9453c480..67bf2cd4 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "24-04-2024 14:48:18 on flin (by mightqxc)" +timestamp = "24-04-2024 14:56:20 on flin (by mightqxc)" diff --git a/pandaharvester/harvestercore/queue_config_mapper.py b/pandaharvester/harvestercore/queue_config_mapper.py index abb2625c..071ffdd8 100644 --- a/pandaharvester/harvestercore/queue_config_mapper.py +++ b/pandaharvester/harvestercore/queue_config_mapper.py @@ -558,11 +558,13 @@ def load_data(self, refill_table=False): # nullify all job limit attributes if NoJob mapType (PULL) if queueConfig.mapType == WorkSpec.MT_NoJob: for attName in ["nQueueLimitJob", "nQueueLimitJobRatio", "nQueueLimitJobMax", "nQueueLimitJobMin"]: - setattr(queueConfig, attName, None) + if hasattr(queueConfig, attName): + setattr(queueConfig, attName, None) # nullify worker ratio limit attributes if jobful mapTypes (PUSH) if queueConfig.mapType != WorkSpec.MT_NoJob: for attName in ["nQueueLimitWorkerRatio", "nQueueLimitWorkerMin"]: - setattr(queueConfig, attName, None) + if hasattr(queueConfig, attName): + setattr(queueConfig, attName, None) # heartbeat suppression if queueConfig.truePilot and queueConfig.noHeartbeat == "": queueConfig.noHeartbeat = "running,transferring,finished,failed" From eed1b9dbb5fd8e13f47f992cc66b2ba22cfc32fc Mon Sep 17 00:00:00 2001 From: mightqxc Date: Wed, 24 Apr 2024 21:27:09 +0200 Subject: [PATCH 4/6] add total in worker_stats_full --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvestercore/db_proxy.py | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 67bf2cd4..ccd9d891 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "24-04-2024 14:56:20 on flin (by mightqxc)" +timestamp = "24-04-2024 19:27:14 on flin (by mightqxc)" diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 97aa655c..9a1868c8 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -26,10 +26,10 @@ from .panda_queue_spec import PandaQueueSpec from .process_lock_spec import ProcessLockSpec from .queue_config_dump_spec import QueueConfigDumpSpec +from .resource_type_constants import BASIC_RESOURCE_TYPE_SINGLE_CORE from .seq_number_spec import SeqNumberSpec from .service_metrics_spec import ServiceMetricSpec from .work_spec import WorkSpec -from .resource_type_constants import BASIC_RESOURCE_TYPE_SINGLE_CORE # logger _logger = core_utils.setup_logger("db_proxy") @@ -3577,8 +3577,18 @@ def get_worker_stats_full(self, filter_site_list=None): resourceType = str(resourceType) retMap.setdefault(computingSite, {}) retMap[computingSite].setdefault(jobType, {}) + retMap[computingSite].setdefault("_total", {}) retMap[computingSite][jobType].setdefault(resourceType, {"running": 0, "submitted": 0, "to_submit": 0}) + retMap[computingSite][jobType].setdefault("_total", {"running": 0, "submitted": 0, "to_submit": 0}) + retMap[computingSite]["_total"].setdefault(resourceType, {"running": 0, "submitted": 0, "to_submit": 0}) + retMap[computingSite]["_total"].setdefault("_total", {"running": 0, "submitted": 0, "to_submit": 0}) retMap[computingSite][jobType][resourceType][workerStatus] = cnt + retMap[computingSite][jobType]["_total"].setdefault(workerStatus, 0) + retMap[computingSite][jobType]["_total"][workerStatus] += cnt + retMap[computingSite]["_total"][resourceType].setdefault(workerStatus, 0) + retMap[computingSite]["_total"][resourceType][workerStatus] += cnt + retMap[computingSite]["_total"]["_total"].setdefault(workerStatus, 0) + retMap[computingSite]["_total"]["_total"][workerStatus] += cnt # commit self.commit() tmpLog.debug(f"got {str(retMap)}") From abeb215a9399edc0c484fb540918bcc26e0702f8 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Wed, 24 Apr 2024 22:10:54 +0200 Subject: [PATCH 5/6] harvester_admin add query jobs --- pandaharvester/commit_timestamp.py | 2 +- .../harvesterscripts/harvester_admin.py | 22 +++++++++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index ccd9d891..04a789c5 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "24-04-2024 19:27:14 on flin (by mightqxc)" +timestamp = "24-04-2024 20:10:55 on flin (by mightqxc)" diff --git a/pandaharvester/harvesterscripts/harvester_admin.py b/pandaharvester/harvesterscripts/harvester_admin.py index 6cb80a79..334dba3b 100644 --- a/pandaharvester/harvesterscripts/harvester_admin.py +++ b/pandaharvester/harvesterscripts/harvester_admin.py @@ -307,6 +307,18 @@ def query_workers(arguments): raise +def query_jobs(arguments): + dbProxy = DBProxy() + try: + if arguments.all: + res_obj = dbProxy.get_job_stats_full() + else: + res_obj = dbProxy.get_job_stats_full(filter_site_list=arguments.queue_list) + json_print(res_obj) + except TypeError as e: + raise + + # === Command map ======================================================= @@ -329,6 +341,7 @@ def query_workers(arguments): "kill_workers": kill_workers, # query commands "query_workers": query_workers, + "query_jobs": query_jobs, } # === Main ====================================================== @@ -383,7 +396,7 @@ def main(): qconf_dump_parser.set_defaults(which="qconf_dump") qconf_dump_parser.add_argument("-J", "--json", dest="json", action="store_true", help="Dump configuration in JSON format") qconf_dump_parser.add_argument("-a", "--all", dest="all", action="store_true", help="Dump configuration of all active queues") - qconf_dump_parser.add_argument("queue_list", nargs="*", type=str, action="store", metavar="", help="Name of active queue") + qconf_dump_parser.add_argument("queue_list", nargs="+", type=str, action="store", metavar="", help="Name of active queue") qconf_dump_parser.add_argument( "-i", "--id", dest="id_list", nargs="+", type=int, action="store", metavar="", help="Dump configuration of queue with configID" ) @@ -434,7 +447,12 @@ def main(): query_workers_parser = query_subparsers.add_parser("workers", help="Query statistiscs of workers in queues") query_workers_parser.set_defaults(which="query_workers") query_workers_parser.add_argument("-a", "--all", dest="all", action="store_true", help="Show results of all queues") - query_workers_parser.add_argument("queue_list", nargs="*", type=str, action="store", metavar="", help="Name of active queue") + query_workers_parser.add_argument("queue_list", nargs="+", type=str, action="store", metavar="", help="Name of active queue") + # query job_stats command + query_jobs_parser = query_subparsers.add_parser("jobs", help="Query statistiscs of jobs in queues") + query_jobs_parser.set_defaults(which="query_jobs") + query_jobs_parser.add_argument("-a", "--all", dest="all", action="store_true", help="Show results of all queues") + query_jobs_parser.add_argument("queue_list", nargs="+", type=str, action="store", metavar="", help="Name of active queue") # start parsing if len(sys.argv) == 1: From 882d5802686d9f687aab9e91e872ad2005225f42 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Thu, 25 Apr 2024 09:49:42 +0200 Subject: [PATCH 6/6] add total in get_job_stats_full --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvesterbody/job_fetcher.py | 2 ++ pandaharvester/harvestercore/db_proxy.py | 5 +++++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 04a789c5..56c5c11f 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "24-04-2024 20:10:55 on flin (by mightqxc)" +timestamp = "25-04-2024 07:49:43 on flin (by mightqxc)" diff --git a/pandaharvester/harvesterbody/job_fetcher.py b/pandaharvester/harvesterbody/job_fetcher.py index 34227eb9..c163d62c 100644 --- a/pandaharvester/harvesterbody/job_fetcher.py +++ b/pandaharvester/harvesterbody/job_fetcher.py @@ -99,6 +99,8 @@ def run(self): } if job_stats_dict and queueName in job_stats_dict: for tmp_rt, val_dict in job_stats_dict[queueName].items(): + if tmp_rt == "_total": + continue for tmp_status in ["starting", "running"]: increment = val_dict["cores"][tmp_status] if rt_mapper.is_high_memory_resource_type(tmp_rt): diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 9a1868c8..ae91e1c2 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -5732,10 +5732,15 @@ def get_job_stats_full(self, filter_site_list=None): }, }, ) + retMap[computingSite].setdefault("_total", {"jobs": {}, "cores": {}}) retMap[computingSite][resourceType]["jobs"].setdefault(jobStatus, 0) retMap[computingSite][resourceType]["cores"].setdefault(jobStatus, 0) + retMap[computingSite]["_total"]["jobs"].setdefault(jobStatus, 0) + retMap[computingSite]["_total"]["cores"].setdefault(jobStatus, 0) retMap[computingSite][resourceType]["jobs"][jobStatus] += cnt retMap[computingSite][resourceType]["cores"][jobStatus] += nCore * cnt + retMap[computingSite]["_total"]["jobs"][jobStatus] += cnt + retMap[computingSite]["_total"]["cores"][jobStatus] += nCore * cnt # commit self.commit() tmpLog.debug(f"got {str(retMap)}")