diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 1d873af6..56c5c11f 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "02-04-2024 07:28:13 on flin (by mightqxc)" +timestamp = "25-04-2024 07:49:43 on flin (by mightqxc)" diff --git a/pandaharvester/harvesterbody/job_fetcher.py b/pandaharvester/harvesterbody/job_fetcher.py index 34227eb9..c163d62c 100644 --- a/pandaharvester/harvesterbody/job_fetcher.py +++ b/pandaharvester/harvesterbody/job_fetcher.py @@ -99,6 +99,8 @@ def run(self): } if job_stats_dict and queueName in job_stats_dict: for tmp_rt, val_dict in job_stats_dict[queueName].items(): + if tmp_rt == "_total": + continue for tmp_status in ["starting", "running"]: increment = val_dict["cores"][tmp_status] if rt_mapper.is_high_memory_resource_type(tmp_rt): diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 97aa655c..ae91e1c2 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -26,10 +26,10 @@ from .panda_queue_spec import PandaQueueSpec from .process_lock_spec import ProcessLockSpec from .queue_config_dump_spec import QueueConfigDumpSpec +from .resource_type_constants import BASIC_RESOURCE_TYPE_SINGLE_CORE from .seq_number_spec import SeqNumberSpec from .service_metrics_spec import ServiceMetricSpec from .work_spec import WorkSpec -from .resource_type_constants import BASIC_RESOURCE_TYPE_SINGLE_CORE # logger _logger = core_utils.setup_logger("db_proxy") @@ -3577,8 +3577,18 @@ def get_worker_stats_full(self, filter_site_list=None): resourceType = str(resourceType) retMap.setdefault(computingSite, {}) retMap[computingSite].setdefault(jobType, {}) + retMap[computingSite].setdefault("_total", {}) retMap[computingSite][jobType].setdefault(resourceType, {"running": 0, "submitted": 0, "to_submit": 0}) + retMap[computingSite][jobType].setdefault("_total", {"running": 0, "submitted": 0, "to_submit": 0}) + retMap[computingSite]["_total"].setdefault(resourceType, {"running": 0, "submitted": 0, "to_submit": 0}) + retMap[computingSite]["_total"].setdefault("_total", {"running": 0, "submitted": 0, "to_submit": 0}) retMap[computingSite][jobType][resourceType][workerStatus] = cnt + retMap[computingSite][jobType]["_total"].setdefault(workerStatus, 0) + retMap[computingSite][jobType]["_total"][workerStatus] += cnt + retMap[computingSite]["_total"][resourceType].setdefault(workerStatus, 0) + retMap[computingSite]["_total"][resourceType][workerStatus] += cnt + retMap[computingSite]["_total"]["_total"].setdefault(workerStatus, 0) + retMap[computingSite]["_total"]["_total"][workerStatus] += cnt # commit self.commit() tmpLog.debug(f"got {str(retMap)}") @@ -5722,10 +5732,15 @@ def get_job_stats_full(self, filter_site_list=None): }, }, ) + retMap[computingSite].setdefault("_total", {"jobs": {}, "cores": {}}) retMap[computingSite][resourceType]["jobs"].setdefault(jobStatus, 0) retMap[computingSite][resourceType]["cores"].setdefault(jobStatus, 0) + retMap[computingSite]["_total"]["jobs"].setdefault(jobStatus, 0) + retMap[computingSite]["_total"]["cores"].setdefault(jobStatus, 0) retMap[computingSite][resourceType]["jobs"][jobStatus] += cnt retMap[computingSite][resourceType]["cores"][jobStatus] += nCore * cnt + retMap[computingSite]["_total"]["jobs"][jobStatus] += cnt + retMap[computingSite]["_total"]["cores"][jobStatus] += nCore * cnt # commit self.commit() tmpLog.debug(f"got {str(retMap)}") diff --git a/pandaharvester/harvestercore/queue_config_mapper.py b/pandaharvester/harvestercore/queue_config_mapper.py index 316dee5d..071ffdd8 100644 --- a/pandaharvester/harvestercore/queue_config_mapper.py +++ b/pandaharvester/harvestercore/queue_config_mapper.py @@ -555,10 +555,16 @@ def load_data(self, refill_table=False): queueConfig.getJobCriteria = None else: queueConfig.getJobCriteria = tmpCriteria - # nullify job attributes if NoJob mapType + # nullify all job limit attributes if NoJob mapType (PULL) if queueConfig.mapType == WorkSpec.MT_NoJob: for attName in ["nQueueLimitJob", "nQueueLimitJobRatio", "nQueueLimitJobMax", "nQueueLimitJobMin"]: - setattr(queueConfig, attName, None) + if hasattr(queueConfig, attName): + setattr(queueConfig, attName, None) + # nullify worker ratio limit attributes if jobful mapTypes (PUSH) + if queueConfig.mapType != WorkSpec.MT_NoJob: + for attName in ["nQueueLimitWorkerRatio", "nQueueLimitWorkerMin"]: + if hasattr(queueConfig, attName): + setattr(queueConfig, attName, None) # heartbeat suppression if queueConfig.truePilot and queueConfig.noHeartbeat == "": queueConfig.noHeartbeat = "running,transferring,finished,failed" diff --git a/pandaharvester/harvestermessenger/shared_file_messenger.py b/pandaharvester/harvestermessenger/shared_file_messenger.py index 65722993..5182fcd2 100644 --- a/pandaharvester/harvestermessenger/shared_file_messenger.py +++ b/pandaharvester/harvestermessenger/shared_file_messenger.py @@ -457,6 +457,8 @@ def feed_jobs(self, workspec, jobspec_list): pfcFile.write(pfc) # make symlink for fileSpec in jobSpec.inFiles: + if fileSpec.path is None: + continue dstPath = os.path.join(accessPoint, fileSpec.lfn) if fileSpec.path != dstPath: # test if symlink exists if so remove it diff --git a/pandaharvester/harvesterscripts/harvester_admin.py b/pandaharvester/harvesterscripts/harvester_admin.py index 6cb80a79..334dba3b 100644 --- a/pandaharvester/harvesterscripts/harvester_admin.py +++ b/pandaharvester/harvesterscripts/harvester_admin.py @@ -307,6 +307,18 @@ def query_workers(arguments): raise +def query_jobs(arguments): + dbProxy = DBProxy() + try: + if arguments.all: + res_obj = dbProxy.get_job_stats_full() + else: + res_obj = dbProxy.get_job_stats_full(filter_site_list=arguments.queue_list) + json_print(res_obj) + except TypeError as e: + raise + + # === Command map ======================================================= @@ -329,6 +341,7 @@ def query_workers(arguments): "kill_workers": kill_workers, # query commands "query_workers": query_workers, + "query_jobs": query_jobs, } # === Main ====================================================== @@ -383,7 +396,7 @@ def main(): qconf_dump_parser.set_defaults(which="qconf_dump") qconf_dump_parser.add_argument("-J", "--json", dest="json", action="store_true", help="Dump configuration in JSON format") qconf_dump_parser.add_argument("-a", "--all", dest="all", action="store_true", help="Dump configuration of all active queues") - qconf_dump_parser.add_argument("queue_list", nargs="*", type=str, action="store", metavar="", help="Name of active queue") + qconf_dump_parser.add_argument("queue_list", nargs="+", type=str, action="store", metavar="", help="Name of active queue") qconf_dump_parser.add_argument( "-i", "--id", dest="id_list", nargs="+", type=int, action="store", metavar="", help="Dump configuration of queue with configID" ) @@ -434,7 +447,12 @@ def main(): query_workers_parser = query_subparsers.add_parser("workers", help="Query statistiscs of workers in queues") query_workers_parser.set_defaults(which="query_workers") query_workers_parser.add_argument("-a", "--all", dest="all", action="store_true", help="Show results of all queues") - query_workers_parser.add_argument("queue_list", nargs="*", type=str, action="store", metavar="", help="Name of active queue") + query_workers_parser.add_argument("queue_list", nargs="+", type=str, action="store", metavar="", help="Name of active queue") + # query job_stats command + query_jobs_parser = query_subparsers.add_parser("jobs", help="Query statistiscs of jobs in queues") + query_jobs_parser.set_defaults(which="query_jobs") + query_jobs_parser.add_argument("-a", "--all", dest="all", action="store_true", help="Show results of all queues") + query_jobs_parser.add_argument("queue_list", nargs="+", type=str, action="store", metavar="", help="Name of active queue") # start parsing if len(sys.argv) == 1: