diff --git a/common/lib/idds/common/utils.py b/common/lib/idds/common/utils.py index 6090cf73..71627b2e 100644 --- a/common/lib/idds/common/utils.py +++ b/common/lib/idds/common/utils.py @@ -41,6 +41,12 @@ DATE_FORMAT = '%a, %d %b %Y %H:%M:%S UTC' +def get_log_dir(): + if config_has_section('common') and config_has_option('common', 'logdir'): + return config_get('common', 'logdir') + return "/var/log/idds" + + def setup_logging(name, stream=None, loglevel=None): """ Setup logging @@ -588,3 +594,18 @@ def pid_exists(pid): def get_list_chunks(full_list, bulk_size=2000): chunks = [full_list[i:i + bulk_size] for i in range(0, len(full_list), bulk_size)] return chunks + + +def report_availability(availability): + try: + log_dir = get_log_dir() + if log_dir: + filename = os.path.join(log_dir, 'idds_availability') + with open(filename, 'w') as f: + json.dump(availability, f) + else: + print("availability: %s" % str(availability)) + except Exception as ex: + error = "Failed to report availablity: %s" % str(ex) + print(error) + logging.debug(error) diff --git a/main/config_default/idds.cfg b/main/config_default/idds.cfg index afa1db6a..564f9df8 100755 --- a/main/config_default/idds.cfg +++ b/main/config_default/idds.cfg @@ -36,8 +36,8 @@ coordination_interval_delay = 300 [clerk] -num_threads = 16 -max_number_workers = 16 +num_threads = 4 +max_number_workers = 4 poll_period = 300 new_poll_period = 10 update_poll_period = 300 diff --git a/main/lib/idds/agents/carrier/finisher.py b/main/lib/idds/agents/carrier/finisher.py index 4825ef98..68d04d7d 100644 --- a/main/lib/idds/agents/carrier/finisher.py +++ b/main/lib/idds/agents/carrier/finisher.py @@ -31,12 +31,12 @@ class Finisher(Poller): Finisher works to submit and running tasks to WFMS. """ - def __init__(self, num_threads=1, finisher_max_number_workers=3, max_number_workers=3, poll_time_period=10, retries=3, retrieve_bulk_size=2, + def __init__(self, num_threads=1, finisher_max_number_workers=None, max_number_workers=3, poll_time_period=10, retries=3, retrieve_bulk_size=2, message_bulk_size=1000, **kwargs): - if finisher_max_number_workers > num_threads: - self.max_number_workers = finisher_max_number_workers + if finisher_max_number_workers: + self.max_number_workers = int(finisher_max_number_workers) else: - self.max_number_workers = max_number_workers + self.max_number_workers = int(max_number_workers) self.set_max_workers() num_threads = int(self.max_number_workers) diff --git a/main/lib/idds/agents/carrier/poller.py b/main/lib/idds/agents/carrier/poller.py index 89bec25c..4bc07e48 100644 --- a/main/lib/idds/agents/carrier/poller.py +++ b/main/lib/idds/agents/carrier/poller.py @@ -35,7 +35,7 @@ class Poller(BaseAgent): """ def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2, - name='Poller', message_bulk_size=1000, **kwargs): + max_updates_per_round=2000, name='Poller', message_bulk_size=1000, **kwargs): self.max_number_workers = max_number_workers if int(num_threads) < int(self.max_number_workers): num_threads = int(self.max_number_workers) @@ -83,6 +83,9 @@ def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries= else: self.max_number_workers = int(self.max_number_workers) + self.max_updates_per_round = max_updates_per_round + self.logger.info("max_updates_per_round: %s" % self.max_updates_per_round) + self.show_queue_size_time = None def is_ok_to_run_more_processings(self): @@ -279,6 +282,7 @@ def handle_update_processing(self, processing): log_prefix = self.get_log_prefix(processing) ret_handle_update_processing = handle_update_processing(processing, self.agent_attributes, + max_updates_per_round=self.max_updates_per_round, logger=self.logger, log_prefix=log_prefix) diff --git a/main/lib/idds/agents/carrier/submitter.py b/main/lib/idds/agents/carrier/submitter.py index 2b6b18b2..817268df 100644 --- a/main/lib/idds/agents/carrier/submitter.py +++ b/main/lib/idds/agents/carrier/submitter.py @@ -109,6 +109,7 @@ def handle_new_processing(self, processing): ret_new_processing = handle_new_processing(processing, self.agent_attributes, func_site_to_cloud=self.get_site_to_cloud, + max_updates_per_round=self.max_updates_per_round, logger=self.logger, log_prefix=log_prefix) status, processing, update_colls, new_contents, new_input_dependency_contents, msgs, errors = ret_new_processing diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index 1cc33643..83b5705e 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -32,18 +32,18 @@ class Trigger(Poller): Trigger works to trigger to release jobs """ - def __init__(self, num_threads=1, trigger_max_number_workers=3, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2, + def __init__(self, num_threads=1, trigger_max_number_workers=None, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2, name='Trigger', message_bulk_size=1000, max_updates_per_round=2000, **kwargs): - if trigger_max_number_workers > num_threads: - self.max_number_workers = trigger_max_number_workers + if trigger_max_number_workers: + self.max_number_workers = int(trigger_max_number_workers) else: - self.max_number_workers = max_number_workers + self.max_number_workers = int(max_number_workers) self.set_max_workers() num_threads = int(self.max_number_workers) super(Trigger, self).__init__(num_threads=num_threads, name=name, max_number_workers=self.max_number_workers, - retrieve_bulk_size=retrieve_bulk_size, **kwargs) + max_updates_per_round=max_updates_per_round, retrieve_bulk_size=retrieve_bulk_size, **kwargs) self.logger.info("num_threads: %s" % num_threads) self.max_updates_per_round = max_updates_per_round @@ -103,7 +103,7 @@ def handle_trigger_processing(self, processing, trigger_new_updates=False): max_updates_per_round=self.max_updates_per_round, logger=self.logger, log_prefix=log_prefix) - process_status, update_contents, ret_msgs, parameters, update_dep_contents_status_name, update_dep_contents_status, new_update_contents, ret_update_transforms = ret_trigger_processing + process_status, update_contents, ret_msgs, parameters, update_dep_contents_status_name, update_dep_contents_status, new_update_contents, ret_update_transforms, has_updates = ret_trigger_processing self.logger.debug(log_prefix + "handle_trigger_processing: ret_update_transforms: %s" % str(ret_update_transforms)) @@ -127,7 +127,8 @@ def handle_trigger_processing(self, processing, trigger_new_updates=False): 'new_update_contents': new_update_contents, 'update_transforms': ret_update_transforms, 'update_dep_contents': (processing['request_id'], update_dep_contents_status_name, update_dep_contents_status), - 'processing_status': new_process_status} + 'processing_status': new_process_status, + 'has_updates': has_updates} except exceptions.ProcessFormatNotSupported as ex: self.logger.error(ex) self.logger.error(traceback.format_exc()) @@ -199,6 +200,7 @@ def process_trigger_processing_real(self, event): self.update_processing(ret, pr) update_transforms = ret.get('update_transforms', None) + has_updates = ret.get('has_updates', None) if update_transforms: # self.logger.info(log_pre + "update_contents_to_others_by_dep_id") # core_catalog.update_contents_to_others_by_dep_id(request_id=pr['request_id'], transform_id=pr['transform_id']) @@ -231,7 +233,8 @@ def process_trigger_processing_real(self, event): if ((event._content and 'has_updates' in event._content and event._content['has_updates']) or ('update_contents' in ret and ret['update_contents']) # noqa W503 or ('new_contents' in ret and ret['new_contents']) # noqa W503 - or ('messages' in ret and ret['messages'])): # noqa E129 + or ('messages' in ret and ret['messages']) # noqa W503 + or has_updates): # noqa E129 self.logger.info(log_pre + "SyncProcessingEvent(processing_id: %s)" % pr['processing_id']) event = SyncProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'], content=event._content, diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 9f4546d8..507d45d9 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -24,7 +24,7 @@ MessageStatus, MessageSource, MessageDestination, get_work_status_from_transform_processing_status) -from idds.common.utils import setup_logging +from idds.common.utils import setup_logging, get_list_chunks from idds.core import (transforms as core_transforms, processings as core_processings, catalog as core_catalog) @@ -205,13 +205,14 @@ def get_ext_contents(transform_id, work): return contents_ids -def get_new_contents(request_id, transform_id, workload_id, new_input_output_maps, logger=None, log_prefix=''): +def get_new_contents(request_id, transform_id, workload_id, new_input_output_maps, max_updates_per_round=2000, logger=None, log_prefix=''): logger = get_logger(logger) logger.debug(log_prefix + "get_new_contents") new_input_contents, new_output_contents, new_log_contents = [], [], [] new_input_dependency_contents = [] new_input_dep_coll_ids = [] + chunks = [] for map_id in new_input_output_maps: inputs = new_input_output_maps[map_id]['inputs'] if 'inputs' in new_input_output_maps[map_id] else [] inputs_dependency = new_input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in new_input_output_maps[map_id] else [] @@ -233,7 +234,21 @@ def get_new_contents(request_id, transform_id, workload_id, new_input_output_map content = get_new_content(request_id, transform_id, workload_id, map_id, log_content, content_relation_type=ContentRelationType.Log) new_log_contents.append(content) - return new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents + total_num_updates = len(new_input_contents) + len(new_output_contents) + len(new_log_contents) + len(new_input_dependency_contents) + if total_num_updates > max_updates_per_round: + chunk = new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents + chunks.append(chunk) + + new_input_contents, new_output_contents, new_log_contents = [], [], [] + new_input_dependency_contents = [] + + total_num_updates = len(new_input_contents) + len(new_output_contents) + len(new_log_contents) + len(new_input_dependency_contents) + if total_num_updates > 0: + chunk = new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents + chunks.append(chunk) + + # return new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents + return chunks def get_update_content(content): @@ -466,7 +481,7 @@ def generate_messages(request_id, transform_id, workload_id, work, msg_type='fil return msgs -def handle_new_processing(processing, agent_attributes, func_site_to_cloud=None, logger=None, log_prefix=''): +def handle_new_processing(processing, agent_attributes, func_site_to_cloud=None, max_updates_per_round=2000, logger=None, log_prefix=''): logger = get_logger(logger) proc = processing['processing_metadata']['processing'] @@ -502,19 +517,27 @@ def handle_new_processing(processing, agent_attributes, func_site_to_cloud=None, request_id = processing['request_id'] transform_id = processing['transform_id'] workload_id = processing['workload_id'] - ret_new_contents = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps, logger=logger, log_prefix=log_prefix) - new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents - # new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents - new_contents = new_input_contents + new_output_contents + new_log_contents - - # not generate new messages - # if new_input_contents: - # msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='input') - # ret_msgs = ret_msgs + msgs - # if new_output_contents: - # msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='output') - # ret_msgs = ret_msgs + msgs - return True, processing, update_collections, new_contents, new_input_dependency_contents, ret_msgs, errors + ret_new_contents_chunks = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps, + max_updates_per_round=max_updates_per_round, logger=logger, log_prefix=log_prefix) + for ret_new_contents in ret_new_contents_chunks: + new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents + # new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents + new_contents = new_input_contents + new_output_contents + new_log_contents + + # not generate new messages + # if new_input_contents: + # msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='input') + # ret_msgs = ret_msgs + msgs + # if new_output_contents: + # msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='output') + # ret_msgs = ret_msgs + msgs + logger.debug(log_prefix + "handle_new_processing: add %s new contents" % (len(new_contents))) + core_processings.update_processing_contents(update_processing=None, + new_contents=new_contents, + new_input_dependency_contents=new_input_dependency_contents, + messages=ret_msgs) + # return True, processing, update_collections, new_contents, new_input_dependency_contents, ret_msgs, errors + return True, processing, update_collections, [], [], ret_msgs, errors def get_updated_contents_by_request(request_id, transform_id, workload_id, work, terminated=False, input_output_maps=None, @@ -951,9 +974,10 @@ def trigger_release_inputs(request_id, transform_id, workload_id, work, updated_ return update_contents, update_input_contents_full, update_contents_status_name, update_contents_status -def poll_missing_outputs(input_output_maps): +def poll_missing_outputs(input_output_maps, max_updates_per_round=2000): content_updates_missing, updated_contents_full_missing = [], [] + chunks = [] for map_id in input_output_maps: inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else [] inputs_dependency = input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in input_output_maps[map_id] else [] @@ -979,7 +1003,15 @@ def poll_missing_outputs(input_output_maps): content_updates_missing.append(u_content) updated_contents_full_missing.append(content) - return content_updates_missing, updated_contents_full_missing + if len(content_updates_missing) > max_updates_per_round: + chunk = content_updates_missing, updated_contents_full_missing + chunks.append(chunk) + content_updates_missing, updated_contents_full_missing = [], [] + if len(content_updates_missing) > 0: + chunk = content_updates_missing, updated_contents_full_missing + chunks.append(chunk) + # return content_updates_missing, updated_contents_full_missing + return chunks def has_external_content_id(input_output_maps): @@ -1020,7 +1052,7 @@ def get_update_external_content_ids(input_output_maps, external_content_ids): return update_contents -def handle_update_processing(processing, agent_attributes, logger=None, log_prefix=''): +def handle_update_processing(processing, agent_attributes, max_updates_per_round=2000, logger=None, log_prefix=''): logger = get_logger(logger) ret_msgs = [] @@ -1064,29 +1096,72 @@ def handle_update_processing(processing, agent_attributes, logger=None, log_pref logger.debug(log_prefix + "poll_processing_updates new_input_output_maps1.keys[:3]: %s" % (list(new_input_output_maps1.keys())[:3])) logger.debug(log_prefix + "poll_processing_updates updated_contents_full[:3]: %s" % (updated_contents_full[:3])) - ret_new_contents = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps) - new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents - # new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents - new_contents = new_input_contents + new_output_contents + new_log_contents + ret_new_contents_chunks = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps, max_updates_per_round=max_updates_per_round) + for ret_new_contents in ret_new_contents_chunks: + new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents + + ret_msgs = [] + if new_input_contents: + msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', + files=new_input_contents, relation_type='input') + ret_msgs = ret_msgs + msgs + if new_output_contents: + msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', + files=new_output_contents, relation_type='output') + ret_msgs = ret_msgs + msgs - content_updates_missing, updated_contents_full_missing = poll_missing_outputs(input_output_maps) + # new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents + new_contents = new_input_contents + new_output_contents + new_log_contents - if new_input_contents: - msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', - files=new_input_contents, relation_type='input') - ret_msgs = ret_msgs + msgs - if new_output_contents: - msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', - files=new_output_contents, relation_type='output') - ret_msgs = ret_msgs + msgs + logger.debug(log_prefix + "handle_update_processing: add %s new contents" % (len(new_contents))) + core_processings.update_processing_contents(update_processing=None, + new_contents=new_contents, + new_input_dependency_contents=new_input_dependency_contents, + messages=ret_msgs) + + ret_msgs = [] + content_updates_missing_chunks = poll_missing_outputs(input_output_maps, max_updates_per_round=max_updates_per_round) + for content_updates_missing_chunk in content_updates_missing_chunks: + content_updates_missing, updated_contents_full_missing = content_updates_missing_chunk + msgs = [] + if updated_contents_full_missing: + msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', + files=updated_contents_full, relation_type='output') + logger.debug(log_prefix + "handle_update_processing: update %s missing contents" % (len(content_updates_missing))) + core_processings.update_processing_contents(update_processing=None, + update_contents=content_updates_missing, + messages=msgs) - updated_contents_full = updated_contents_full + updated_contents_full_missing if updated_contents_full: - msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', - files=updated_contents_full, relation_type='output') - ret_msgs = ret_msgs + msgs + updated_contents_full_chunks = get_list_chunks(updated_contents_full, bulk_size=max_updates_per_round) + for updated_contents_full_chunk in updated_contents_full_chunks: + msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', + files=updated_contents_full_chunk, relation_type='output') + core_processings.update_processing_contents(update_processing=None, + messages=msgs) + + if new_contents_ext: + new_contents_ext_chunks = get_list_chunks(new_contents_ext, bulk_size=max_updates_per_round) + for new_contents_ext_chunk in new_contents_ext_chunks: + logger.debug(log_prefix + "handle_update_processing: add %s ext contents" % (len(new_contents_ext_chunk))) + core_processings.update_processing_contents(update_processing=None, + new_contents_ext=new_contents_ext_chunk) + if update_contents_ext: + update_contents_ext_chunks = get_list_chunks(update_contents_ext, bulk_size=max_updates_per_round) + for update_contents_ext_chunk in update_contents_ext_chunks: + logger.debug(log_prefix + "handle_update_processing: update %s ext contents" % (len(update_contents_ext_chunk))) + core_processings.update_processing_contents(update_processing=None, + update_contents_ext=update_contents_ext_chunk) + + if content_updates: + content_updates_chunks = get_list_chunks(content_updates, bulk_size=max_updates_per_round) + for content_updates_chunk in content_updates_chunks: + logger.debug(log_prefix + "handle_update_processing: update %s contents" % (len(content_updates_chunk))) + core_processings.update_processing_contents(update_processing=None, + update_contents=content_updates_chunk) - return process_status, new_contents, new_input_dependency_contents, ret_msgs, content_updates + content_updates_missing, parameters, new_contents_ext, update_contents_ext + # return process_status, new_contents, new_input_dependency_contents, ret_msgs, content_updates + content_updates_missing, parameters, new_contents_ext, update_contents_ext + return process_status, [], [], ret_msgs, [], parameters, [], [] def get_transform_id_dependency_map(transform_id, logger=None, log_prefix=''): @@ -1118,6 +1193,7 @@ def get_updated_transforms_by_content_status(request_id=None, transform_id=None, def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=False, max_updates_per_round=2000, logger=None, log_prefix=''): logger = get_logger(logger) + has_updates = False ret_msgs = [] content_updates = [] ret_update_transforms = [] @@ -1132,7 +1208,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= work.set_agent_attributes(agent_attributes, processing) if (not work.use_dependency_to_release_jobs()) or workload_id is None: - return processing['substatus'], [], [], {}, {}, {}, [], [] + return processing['substatus'], [], [], {}, {}, {}, [], [], has_updates else: if trigger_new_updates: # delete information in the contents_update table, to invoke the trigger. @@ -1154,6 +1230,8 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= # contents_id_list.append(con['content_id']) new_contents_update_list_chunks = [new_contents_update_list[i:i + max_updates_per_round] for i in range(0, len(new_contents_update_list), max_updates_per_round)] for chunk in new_contents_update_list_chunks: + has_updates = True + logger.debug(log_prefix + "new_contents_update chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3]))) core_catalog.update_contents(chunk) # core_catalog.delete_contents_update(contents=contents_id_list) core_catalog.delete_contents_update(request_id=request_id, transform_id=transform_id, fetch=True) @@ -1164,6 +1242,8 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= to_triggered_contents = core_catalog.get_update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) to_triggered_contents_chunks = [to_triggered_contents[i:i + max_updates_per_round] for i in range(0, len(to_triggered_contents), max_updates_per_round)] for chunk in to_triggered_contents_chunks: + has_updates = True + logger.debug(log_prefix + "update_contents_from_others_by_dep_id chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3]))) core_catalog.update_contents(chunk) logger.debug(log_prefix + "update_contents_from_others_by_dep_id done") @@ -1182,10 +1262,9 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= logger=logger, log_prefix=log_prefix) - has_updates = False for updated_contents_ret in updated_contents_ret_chunks: updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents = updated_contents_ret - logger.debug(log_prefix + "handle_trigger_processing: updated_contents[:3] %s" % (updated_contents[:3])) + logger.debug(log_prefix + "handle_trigger_processing: updated_contents[:3] (total: %s): %s" % (len(updated_contents), updated_contents[:3])) if updated_contents_full_input: # if the content is updated by receiver, here is the place to broadcast the messages @@ -1205,7 +1284,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= core_processings.update_processing_contents(update_processing=None, update_contents=updated_contents, - new_update_contents=new_update_contents, + # new_update_contents=new_update_contents, messages=ret_msgs) updated_contents = [] new_update_contents = [] @@ -1227,7 +1306,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= # return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents, ret_update_transforms # return processing['substatus'], content_updates, ret_msgs, {}, update_dep_contents_status_name, update_dep_contents_status, [], ret_update_transforms # return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, [], ret_update_transforms - return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents, ret_update_transforms + return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents, ret_update_transforms, has_updates def get_content_status_from_panda_msg_status(status): diff --git a/main/lib/idds/agents/common/baseagent.py b/main/lib/idds/agents/common/baseagent.py index 271cd6a9..0f8730f3 100644 --- a/main/lib/idds/agents/common/baseagent.py +++ b/main/lib/idds/agents/common/baseagent.py @@ -21,7 +21,7 @@ ReturnCode) from idds.common.plugin.plugin_base import PluginBase from idds.common.plugin.plugin_utils import load_plugins, load_plugin_sequence -from idds.common.utils import setup_logging, pid_exists, json_dumps +from idds.common.utils import setup_logging, pid_exists, json_dumps, json_loads from idds.core import health as core_health, messages as core_messages from idds.agents.common.timerscheduler import TimerScheduler from idds.agents.common.eventbus.eventbus import EventBus @@ -64,6 +64,12 @@ def __init__(self, num_threads=1, name=None, logger=None, **kwargs): else: self.event_interval_delay = int(self.event_interval_delay) + if not hasattr(self, 'max_worker_exec_time'): + self.max_worker_exec_time = 3600 + else: + self.max_worker_exec_time = int(self.max_worker_exec_time) + self.num_hang_workers, self.num_active_workers = 0, 0 + self.plugins = {} self.plugin_sequence = [] @@ -130,6 +136,9 @@ def load_plugins(self): raise AgentPluginError("Plugin %s is defined but it is not defined in plugin_sequence" % plugin_name) """ + def get_num_hang_active_workers(self): + return self.num_hang_workers, self.num_active_workers + def init_event_function_map(self): self.event_func_map = {} @@ -138,6 +147,8 @@ def get_event_function_map(self): def execute_event_schedule(self): event_ids = list(self.event_futures.keys()) + self.num_hang_workers, self.num_active_workers = 0, len(event_ids) + for event_id in event_ids: event, future, start_time = self.event_futures[event_id] if future.done(): @@ -158,6 +169,9 @@ def execute_event_schedule(self): self.logger.warning("Corresponding resource is locked, put the event back again: %s" % json_dumps(event)) event.requeue() self.event_bus.send(event) + else: + if time.time() - start_time > self.max_worker_exec_time: + self.num_hang_workers += 1 event_funcs = self.get_event_function_map() for event_type in event_funcs: @@ -233,7 +247,8 @@ def is_self(self, health_item): return ret def get_health_payload(self): - return None + num_hang_workers, num_active_workers = self.get_num_hang_active_workers() + return {'num_hang_workers': num_hang_workers, 'num_active_workers': num_active_workers} def is_ready(self): return True @@ -246,6 +261,8 @@ def health_heartbeat(self, heartbeat_delay=None): thread_id = self.get_thread_id() thread_name = self.get_thread_name() payload = self.get_health_payload() + if payload: + payload = json_dumps(payload) if self.is_ready(): self.logger.debug("health heartbeat: agent %s, pid %s, thread %s, delay %s, payload %s" % (self.get_name(), pid, thread_name, self.heartbeat_delay, payload)) core_health.add_health_item(agent=self.get_name(), hostname=hostname, pid=pid, @@ -265,6 +282,55 @@ def health_heartbeat(self, heartbeat_delay=None): if pid_not_exists: core_health.clean_health(hostname=hostname, pids=pid_not_exists, older_than=None) + def get_health_items(self): + try: + hostname = socket.getfqdn() + core_health.clean_health(older_than=self.heartbeat_delay * 2) + health_items = core_health.retrieve_health_items() + pids, pid_not_exists = [], [] + for health_item in health_items: + if health_item['hostname'] == hostname: + pid = health_item['pid'] + if pid not in pids: + pids.append(pid) + for pid in pids: + if not pid_exists(pid): + pid_not_exists.append(pid) + if pid_not_exists: + core_health.clean_health(hostname=hostname, pids=pid_not_exists, older_than=None) + + health_items = core_health.retrieve_health_items() + return health_items + except Exception as ex: + self.logger.warn("Failed to get health items: %s" % str(ex)) + + return [] + + def get_availability(self): + try: + availability = {} + health_items = self.get_health_items() + hostname = socket.getfqdn() + for item in health_items: + if item['hostname'] == hostname: + if item['agent'] not in availability: + availability[item['agent']] = {} + payload = item['payload'] + num_hang_workers = 0 + num_active_workers = 0 + if payload: + payload = json_loads(payload) + num_hang_workers = payload.get('num_hang_workers', 0) + num_active_workers = payload.get('num_active_workers', 0) + + availability[item['agent']]['num_hang_workers'] = num_hang_workers + availability[item['agent']]['num_active_workers'] = num_active_workers + + return availability + except Exception as ex: + self.logger.warn("Failed to get availability: %s" % str(ex)) + return {} + def add_default_tasks(self): task = self.create_task(task_func=self.health_heartbeat, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=self.heartbeat_delay, diff --git a/main/lib/idds/agents/common/plugins/messaging.py b/main/lib/idds/agents/common/plugins/messaging.py index 13bcd7c8..ace76953 100644 --- a/main/lib/idds/agents/common/plugins/messaging.py +++ b/main/lib/idds/agents/common/plugins/messaging.py @@ -149,7 +149,7 @@ def connect_to_messaging_brokers(self, sender=True): for broker, port in broker_addresses: conn = stomp.Connection12(host_and_ports=[(broker, port)], keepalive=True, - heartbeats=(30000, 0), # one minute + heartbeats=(30000, 30000), # half minute = num / 1000 timeout=timeout) conns.append(conn) channel_conns[name] = conns diff --git a/main/lib/idds/agents/coordinator/coordinator.py b/main/lib/idds/agents/coordinator/coordinator.py index a148aed6..1421828e 100644 --- a/main/lib/idds/agents/coordinator/coordinator.py +++ b/main/lib/idds/agents/coordinator/coordinator.py @@ -16,7 +16,7 @@ from idds.common.constants import (Sections) from idds.common.exceptions import IDDSException from idds.common.event import EventPriority -from idds.common.utils import setup_logging, get_logger, json_dumps, json_loads +from idds.common.utils import setup_logging, get_logger, json_loads from idds.core import health as core_health from idds.agents.common.baseagent import BaseAgent @@ -88,9 +88,14 @@ def is_ready(self): return True def get_health_payload(self): + payload = super(Coordinator, self).get_health_payload() + manager = self.event_bus.get_manager() - payload = {'manager': manager} - payload = json_dumps(payload) + if payload: + payload['manager'] = manager + else: + payload = {'manager': manager} + # payload = json_dumps(payload) return payload def select_coordinator(self): diff --git a/main/lib/idds/agents/main.py b/main/lib/idds/agents/main.py index aff5ffb3..af82f8bb 100755 --- a/main/lib/idds/agents/main.py +++ b/main/lib/idds/agents/main.py @@ -20,7 +20,7 @@ from idds.common.constants import Sections from idds.common.config import config_has_section, config_has_option, config_list_options, config_get -from idds.common.utils import setup_logging +from idds.common.utils import setup_logging, report_availability setup_logging('idds.log') @@ -111,6 +111,12 @@ def run_agents(): logging.critical("Exit main run loop.") break + # select one agent to get the health items + candidate = RUNNING_AGENTS[0] + availability = candidate.get_availability() + logging.debug("availability: %s" % availability) + report_availability(availability) + def stop(signum=None, frame=None): global RUNNING_AGENTS diff --git a/main/lib/idds/core/processings.py b/main/lib/idds/core/processings.py index b336f48e..338eae84 100644 --- a/main/lib/idds/core/processings.py +++ b/main/lib/idds/core/processings.py @@ -314,7 +314,7 @@ def resolve_input_dependency_id(new_input_dependency_contents, session=None): @transactional_session -def update_processing_contents(update_processing, update_contents, update_messages=None, new_contents=None, +def update_processing_contents(update_processing, update_contents=None, update_messages=None, new_contents=None, update_dep_contents=None, update_collections=None, messages=None, new_update_contents=None, new_input_dependency_contents=None, new_contents_ext=None, update_contents_ext=None,