Skip to content

Commit

Permalink
Merge pull request #225 from HSF/flin
Browse files Browse the repository at this point in the history
Ignore worker ratio limits in push queues; fix messenger feed_jobs
  • Loading branch information
mightqxc authored Apr 26, 2024
2 parents 11787d4 + 882d580 commit 79939f8
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "02-04-2024 07:28:13 on flin (by mightqxc)"
timestamp = "25-04-2024 07:49:43 on flin (by mightqxc)"
2 changes: 2 additions & 0 deletions pandaharvester/harvesterbody/job_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def run(self):
}
if job_stats_dict and queueName in job_stats_dict:
for tmp_rt, val_dict in job_stats_dict[queueName].items():
if tmp_rt == "_total":
continue
for tmp_status in ["starting", "running"]:
increment = val_dict["cores"][tmp_status]
if rt_mapper.is_high_memory_resource_type(tmp_rt):
Expand Down
17 changes: 16 additions & 1 deletion pandaharvester/harvestercore/db_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
from .panda_queue_spec import PandaQueueSpec
from .process_lock_spec import ProcessLockSpec
from .queue_config_dump_spec import QueueConfigDumpSpec
from .resource_type_constants import BASIC_RESOURCE_TYPE_SINGLE_CORE
from .seq_number_spec import SeqNumberSpec
from .service_metrics_spec import ServiceMetricSpec
from .work_spec import WorkSpec
from .resource_type_constants import BASIC_RESOURCE_TYPE_SINGLE_CORE

# logger
_logger = core_utils.setup_logger("db_proxy")
Expand Down Expand Up @@ -3577,8 +3577,18 @@ def get_worker_stats_full(self, filter_site_list=None):
resourceType = str(resourceType)
retMap.setdefault(computingSite, {})
retMap[computingSite].setdefault(jobType, {})
retMap[computingSite].setdefault("_total", {})
retMap[computingSite][jobType].setdefault(resourceType, {"running": 0, "submitted": 0, "to_submit": 0})
retMap[computingSite][jobType].setdefault("_total", {"running": 0, "submitted": 0, "to_submit": 0})
retMap[computingSite]["_total"].setdefault(resourceType, {"running": 0, "submitted": 0, "to_submit": 0})
retMap[computingSite]["_total"].setdefault("_total", {"running": 0, "submitted": 0, "to_submit": 0})
retMap[computingSite][jobType][resourceType][workerStatus] = cnt
retMap[computingSite][jobType]["_total"].setdefault(workerStatus, 0)
retMap[computingSite][jobType]["_total"][workerStatus] += cnt
retMap[computingSite]["_total"][resourceType].setdefault(workerStatus, 0)
retMap[computingSite]["_total"][resourceType][workerStatus] += cnt
retMap[computingSite]["_total"]["_total"].setdefault(workerStatus, 0)
retMap[computingSite]["_total"]["_total"][workerStatus] += cnt
# commit
self.commit()
tmpLog.debug(f"got {str(retMap)}")
Expand Down Expand Up @@ -5722,10 +5732,15 @@ def get_job_stats_full(self, filter_site_list=None):
},
},
)
retMap[computingSite].setdefault("_total", {"jobs": {}, "cores": {}})
retMap[computingSite][resourceType]["jobs"].setdefault(jobStatus, 0)
retMap[computingSite][resourceType]["cores"].setdefault(jobStatus, 0)
retMap[computingSite]["_total"]["jobs"].setdefault(jobStatus, 0)
retMap[computingSite]["_total"]["cores"].setdefault(jobStatus, 0)
retMap[computingSite][resourceType]["jobs"][jobStatus] += cnt
retMap[computingSite][resourceType]["cores"][jobStatus] += nCore * cnt
retMap[computingSite]["_total"]["jobs"][jobStatus] += cnt
retMap[computingSite]["_total"]["cores"][jobStatus] += nCore * cnt
# commit
self.commit()
tmpLog.debug(f"got {str(retMap)}")
Expand Down
10 changes: 8 additions & 2 deletions pandaharvester/harvestercore/queue_config_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,16 @@ def load_data(self, refill_table=False):
queueConfig.getJobCriteria = None
else:
queueConfig.getJobCriteria = tmpCriteria
# nullify job attributes if NoJob mapType
# nullify all job limit attributes if NoJob mapType (PULL)
if queueConfig.mapType == WorkSpec.MT_NoJob:
for attName in ["nQueueLimitJob", "nQueueLimitJobRatio", "nQueueLimitJobMax", "nQueueLimitJobMin"]:
setattr(queueConfig, attName, None)
if hasattr(queueConfig, attName):
setattr(queueConfig, attName, None)
# nullify worker ratio limit attributes if jobful mapTypes (PUSH)
if queueConfig.mapType != WorkSpec.MT_NoJob:
for attName in ["nQueueLimitWorkerRatio", "nQueueLimitWorkerMin"]:
if hasattr(queueConfig, attName):
setattr(queueConfig, attName, None)
# heartbeat suppression
if queueConfig.truePilot and queueConfig.noHeartbeat == "":
queueConfig.noHeartbeat = "running,transferring,finished,failed"
Expand Down
2 changes: 2 additions & 0 deletions pandaharvester/harvestermessenger/shared_file_messenger.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ def feed_jobs(self, workspec, jobspec_list):
pfcFile.write(pfc)
# make symlink
for fileSpec in jobSpec.inFiles:
if fileSpec.path is None:
continue
dstPath = os.path.join(accessPoint, fileSpec.lfn)
if fileSpec.path != dstPath:
# test if symlink exists if so remove it
Expand Down
22 changes: 20 additions & 2 deletions pandaharvester/harvesterscripts/harvester_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,18 @@ def query_workers(arguments):
raise


def query_jobs(arguments):
dbProxy = DBProxy()
try:
if arguments.all:
res_obj = dbProxy.get_job_stats_full()
else:
res_obj = dbProxy.get_job_stats_full(filter_site_list=arguments.queue_list)
json_print(res_obj)
except TypeError as e:
raise


# === Command map =======================================================


Expand All @@ -329,6 +341,7 @@ def query_workers(arguments):
"kill_workers": kill_workers,
# query commands
"query_workers": query_workers,
"query_jobs": query_jobs,
}

# === Main ======================================================
Expand Down Expand Up @@ -383,7 +396,7 @@ def main():
qconf_dump_parser.set_defaults(which="qconf_dump")
qconf_dump_parser.add_argument("-J", "--json", dest="json", action="store_true", help="Dump configuration in JSON format")
qconf_dump_parser.add_argument("-a", "--all", dest="all", action="store_true", help="Dump configuration of all active queues")
qconf_dump_parser.add_argument("queue_list", nargs="*", type=str, action="store", metavar="<queue_name>", help="Name of active queue")
qconf_dump_parser.add_argument("queue_list", nargs="+", type=str, action="store", metavar="<queue_name>", help="Name of active queue")
qconf_dump_parser.add_argument(
"-i", "--id", dest="id_list", nargs="+", type=int, action="store", metavar="<configID>", help="Dump configuration of queue with configID"
)
Expand Down Expand Up @@ -434,7 +447,12 @@ def main():
query_workers_parser = query_subparsers.add_parser("workers", help="Query statistiscs of workers in queues")
query_workers_parser.set_defaults(which="query_workers")
query_workers_parser.add_argument("-a", "--all", dest="all", action="store_true", help="Show results of all queues")
query_workers_parser.add_argument("queue_list", nargs="*", type=str, action="store", metavar="<queue_name>", help="Name of active queue")
query_workers_parser.add_argument("queue_list", nargs="+", type=str, action="store", metavar="<queue_name>", help="Name of active queue")
# query job_stats command
query_jobs_parser = query_subparsers.add_parser("jobs", help="Query statistiscs of jobs in queues")
query_jobs_parser.set_defaults(which="query_jobs")
query_jobs_parser.add_argument("-a", "--all", dest="all", action="store_true", help="Show results of all queues")
query_jobs_parser.add_argument("queue_list", nargs="+", type=str, action="store", metavar="<queue_name>", help="Name of active queue")

# start parsing
if len(sys.argv) == 1:
Expand Down

0 comments on commit 79939f8

Please sign in to comment.