Skip to content

Commit

Permalink
Merge pull request #241 from HSF/flin
Browse files Browse the repository at this point in the history
Add queue limits attributes by cores and memory
  • Loading branch information
mightqxc authored Aug 26, 2024
2 parents c33a9be + 0941543 commit f85d3ed
Show file tree
Hide file tree
Showing 10 changed files with 405 additions and 58 deletions.
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "23-07-2024 10:30:54 on flin (by mightqxc)"
timestamp = "15-08-2024 15:48:43 on flin (by mightqxc)"
40 changes: 25 additions & 15 deletions pandaharvester/harvesterbody/job_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import math
import random
import socket

Expand Down Expand Up @@ -41,16 +42,22 @@ def run(self):
mainLog = self.make_logger(_logger, f"id={self.get_pid()}", method_name="run")
mainLog.debug("getting number of jobs to be fetched")
# get number of jobs to be fetched
nJobsPerQueue = self.dbProxy.get_num_jobs_to_fetch(harvester_config.jobfetcher.nQueues, harvester_config.jobfetcher.lookupTime)
mainLog.debug(f"got {len(nJobsPerQueue)} queues")
job_limit_to_fetch_dict = self.dbProxy.get_num_jobs_to_fetch(
harvester_config.jobfetcher.nQueues, harvester_config.jobfetcher.lookupTime, self.queueConfigMapper
)
mainLog.debug(f"got {len(job_limit_to_fetch_dict)} queues")
# get up to date queue configuration
pandaQueueDict = PandaQueuesDict(filter_site_list=nJobsPerQueue.keys())
pandaQueueDict = PandaQueuesDict(filter_site_list=job_limit_to_fetch_dict.keys())
# get job statistics
job_stats_dict = self.dbProxy.get_job_stats_full()
if job_stats_dict is None:
mainLog.warning(f"cannot get job stats")
# loop over all queues
for queueName, nJobs in nJobsPerQueue.items():
for queueName, value_dict in job_limit_to_fetch_dict.items():
n_jobs = value_dict["jobs"]
n_cores = value_dict["cores"]
if n_cores is None:
n_cores = math.inf
# check queue
if not self.queueConfigMapper.has_queue(queueName):
continue
Expand All @@ -59,9 +66,9 @@ def run(self):
queueConfig = self.queueConfigMapper.get_queue(queueName)
siteName = queueConfig.siteName
# upper limit
if nJobs > harvester_config.jobfetcher.maxJobs:
nJobs = harvester_config.jobfetcher.maxJobs
if nJobs == 0:
if n_jobs > harvester_config.jobfetcher.maxJobs:
n_jobs = harvester_config.jobfetcher.maxJobs
if n_jobs == 0:
tmpLog.debug("no job to fetch; skip")
continue
# get jobs
Expand All @@ -84,7 +91,8 @@ def run(self):
resource_type_limits_dict[new_key] = val
# FIXME: all parts about HIMEM are temporary as HIMEM rtypes and parameters will be replaced or reimplemented
# compute cores of active (submitted and running) jobs
n_jobs_rem = nJobs
n_jobs_rem = n_jobs
n_cores_rem = n_cores
pq_mcore_corecount = pandaQueueDict.get("corecount", 8) or 8
rt_n_jobs_dict = {}
rt_n_cores_dict = {
Expand All @@ -109,8 +117,12 @@ def run(self):
rt_n_cores_dict["normal"][tmp_status] += increment
# compute n_jobs to fetch for resource types
for j, resource_type in enumerate(random.sample(list(all_resource_types), k=len(all_resource_types))):
# corecount
rt_corecount = 1
if not rt_mapper.is_single_core_resource_type(resource_type):
rt_corecount = pq_mcore_corecount
# compute n jobs to get for this resource type
rt_n_jobs = n_jobs_rem / (len(all_resource_types) - j)
rt_n_jobs = min(n_jobs_rem / (len(all_resource_types) - j), n_cores_rem // rt_corecount)
if job_stats_dict and queueName in job_stats_dict:
pq_rt_job_stats_dict = job_stats_dict[queueName].get(resource_type, {}).get("jobs", {})
rt_n_active_jobs = pq_rt_job_stats_dict.get("starting", 0) + pq_rt_job_stats_dict.get("running", 0)
Expand All @@ -120,15 +132,13 @@ def run(self):
if "HIMEM" in resource_type_limits_dict and rt_mapper.is_high_memory_resource_type(resource_type):
# capped by total cores of HIMEM
rt_n_active_himem_cores = rt_n_cores_dict["HIMEM"]["starting"] + rt_n_cores_dict["HIMEM"]["running"]
rt_corecount = 1
if not rt_mapper.is_single_core_resource_type(resource_type):
rt_corecount = pq_mcore_corecount
rt_n_jobs = min(rt_n_jobs, (resource_type_limits_dict["HIMEM"] - rt_n_active_himem_cores) / rt_corecount)
rt_n_jobs = max(rt_n_jobs, 0)
rt_n_jobs_dict[resource_type] = rt_n_jobs
n_jobs_rem -= rt_n_jobs
n_cores_rem -= rt_n_jobs * rt_corecount

# fucntion to call get jobs
# function to call get jobs
def _get_jobs(resource_type=None, n_jobs=0):
# custom criteria from queueconfig
additional_criteria = queueConfig.getJobCriteria
Expand Down Expand Up @@ -208,10 +218,10 @@ def _get_jobs(resource_type=None, n_jobs=0):
# call get jobs
if all([val > 0 for val in rt_n_jobs_dict.values()]):
# no n_jobs limit on any resourcetypes, call get_jobs without constraint
_get_jobs(resource_type=None, n_jobs=nJobs)
_get_jobs(resource_type=None, n_jobs=n_jobs)
else:
# call get_jobs for each resourcetype with calculated rt_n_jobs
n_jobs_rem = nJobs
n_jobs_rem = n_jobs
for resource_type, rt_n_jobs in rt_n_jobs_dict.items():
n_jobs_to_get = max(min(round(rt_n_jobs), n_jobs_rem), 0)
got_n_jobs = _get_jobs(resource_type=resource_type, n_jobs=n_jobs_to_get)
Expand Down
44 changes: 40 additions & 4 deletions pandaharvester/harvesterbody/worker_adjuster.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import copy
import math
import traceback

from pandaharvester.harvesterconfig import harvester_config
from pandaharvester.harvestercore import core_utils
from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
from pandaharvester.harvestercore.plugin_factory import PluginFactory
from pandaharvester.harvestercore.resource_type_mapper import ResourceTypeMapper
from pandaharvester.harvestermisc.apfmon import Apfmon
from pandaharvester.harvestermisc.info_utils import PandaQueuesDict

# logger
_logger = core_utils.setup_logger("worker_adjuster")
Expand Down Expand Up @@ -116,21 +119,35 @@ def define_num_workers(self, static_num_workers, site_name):
if job_stats is not None:
job_stats = job_stats.data

# get panda queues dict from CRIC
panda_queues_dict = PandaQueuesDict()

# get resource type mapper
rt_mapper = ResourceTypeMapper()

# define num of new workers
for queue_name in static_num_workers:
# get queue
queue_config = self.queue_configMapper.get_queue(queue_name)
worker_limits_dict = self.dbProxy.get_worker_limits(queue_name)
worker_limits_dict, worker_stats_map = self.dbProxy.get_worker_limits(queue_name, queue_config)
max_workers = worker_limits_dict.get("maxWorkers", 0)
n_queue_limit = worker_limits_dict.get("nQueueLimitWorker", 0)
n_queue_limit_per_rt = worker_limits_dict["nQueueLimitWorkerPerRT"]
n_queue_limit_per_rt = n_queue_limit
queue_limit_cores = worker_limits_dict["nQueueWorkerCores"]
queue_limit_memory = worker_limits_dict["nQueueWorkerMemory"]
cores_queue = worker_stats_map["queue"]["core"]
memory_queue = worker_stats_map["queue"]["mem"]
n_queue_total, n_ready_total, n_running_total = 0, 0, 0
apf_msg = None
apf_data = None
for job_type, jt_values in static_num_workers[queue_name].items():
for resource_type, tmp_val in jt_values.items():
tmp_log.debug(f"Processing queue {queue_name} job_type {job_type} resource_type {resource_type} with static_num_workers {tmp_val}")

# get cores and memory request per worker of this resource_type
queue_dict = panda_queues_dict.get(queue_name, {})
rtype_request_cores, rtype_request_memory = rt_mapper.calculate_worker_requirements(resource_type, queue_dict)

# set 0 to num of new workers when the queue is disabled
if queue_name in queue_stat and queue_stat[queue_name]["status"] in ["offline", "standby", "maintenance"]:
dyn_num_workers[queue_name][job_type][resource_type]["nNewWorkers"] = 0
Expand Down Expand Up @@ -192,8 +209,19 @@ def define_num_workers(self, static_num_workers, site_name):
pass
elif (n_queue + n_ready + n_running) >= max_workers > 0:
# enough workers in the system
ret_msg = f"No n_new_workers since n_queue({n_queue}) + n_ready({n_ready}) + n_running({n_running}) "
ret_msg += f">= max_workers({max_workers})"
ret_msg = (
f"No n_new_workers since n_queue({n_queue}) + n_ready({n_ready}) + n_running({n_running}) " f">= max_workers({max_workers})"
)
tmp_log.debug(ret_msg)
pass
elif queue_limit_cores is not None and cores_queue >= queue_limit_cores:
# enough queuing cores
ret_msg = f"No n_new_workers since cores_queue({cores_queue}) >= " f"queue_limit_cores({queue_limit_cores})"
tmp_log.debug(ret_msg)
pass
elif queue_limit_memory is not None and memory_queue >= queue_limit_memory:
# enough queuing cores
ret_msg = f"No n_new_workers since memory_queue({memory_queue} MB) >= " f"queue_limit_memory({queue_limit_memory} MB)"
tmp_log.debug(ret_msg)
pass
else:
Expand Down Expand Up @@ -250,6 +278,14 @@ def define_num_workers(self, static_num_workers, site_name):
if max_workers > 0:
n_new_workers = min(n_new_workers, max(max_workers - n_queue - n_ready - n_running, 0))
tmp_log.debug(f"setting n_new_workers to {n_new_workers} to respect max_workers")
if queue_limit_cores:
new_worker_cores_max = max(queue_limit_cores - cores_queue, 0)
n_new_workers = min(n_new_workers, math.ceil(new_worker_cores_max / rtype_request_cores))
tmp_log.debug(f"setting n_new_workers to {n_new_workers} to respect queue_limit_cores")
if queue_limit_memory:
new_worker_memory_max = max(queue_limit_memory - memory_queue_queue, 0)
n_new_workers = min(n_new_workers, math.ceil(new_worker_memory_max / rtype_request_memory))
tmp_log.debug(f"setting n_new_workers to {n_new_workers} to respect queue_limit_memory")
if queue_config.maxNewWorkersPerCycle > 0:
n_new_workers = min(n_new_workers, queue_config.maxNewWorkersPerCycle)
tmp_log.debug(f"setting n_new_workers to {n_new_workers} in order to respect maxNewWorkersPerCycle")
Expand Down
4 changes: 2 additions & 2 deletions pandaharvester/harvestercore/db_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ def set_file_group(self, file_specs, group_id, status_string):
return self.dbProxy.set_file_group(file_specs, group_id, status_string)

# get queue status
def get_worker_limits(self, site_name):
return self.dbProxy.get_worker_limits(site_name)
def get_worker_limits(self, site_name, queue_config):
return self.dbProxy.get_worker_limits(site_name, queue_config)

# get worker CE stats
def get_worker_ce_stats(self, site_name):
Expand Down
Loading

0 comments on commit f85d3ed

Please sign in to comment.