From f5c3794588906d2bee51d069c54466634816d115 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 25 Oct 2023 10:55:22 +0200 Subject: [PATCH 01/11] update conductor heartbeat --- main/lib/idds/agents/common/plugins/messaging.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main/lib/idds/agents/common/plugins/messaging.py b/main/lib/idds/agents/common/plugins/messaging.py index 13bcd7c8..ace76953 100644 --- a/main/lib/idds/agents/common/plugins/messaging.py +++ b/main/lib/idds/agents/common/plugins/messaging.py @@ -149,7 +149,7 @@ def connect_to_messaging_brokers(self, sender=True): for broker, port in broker_addresses: conn = stomp.Connection12(host_and_ports=[(broker, port)], keepalive=True, - heartbeats=(30000, 0), # one minute + heartbeats=(30000, 30000), # half minute = num / 1000 timeout=timeout) conns.append(conn) channel_conns[name] = conns From 2a0abe2a80b5cbbc5e7ee6714c778891f29bb084 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 26 Oct 2023 14:23:08 +0200 Subject: [PATCH 02/11] fix trigger workers --- main/lib/idds/agents/carrier/finisher.py | 8 ++++---- main/lib/idds/agents/carrier/trigger.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/main/lib/idds/agents/carrier/finisher.py b/main/lib/idds/agents/carrier/finisher.py index 4825ef98..68d04d7d 100644 --- a/main/lib/idds/agents/carrier/finisher.py +++ b/main/lib/idds/agents/carrier/finisher.py @@ -31,12 +31,12 @@ class Finisher(Poller): Finisher works to submit and running tasks to WFMS. """ - def __init__(self, num_threads=1, finisher_max_number_workers=3, max_number_workers=3, poll_time_period=10, retries=3, retrieve_bulk_size=2, + def __init__(self, num_threads=1, finisher_max_number_workers=None, max_number_workers=3, poll_time_period=10, retries=3, retrieve_bulk_size=2, message_bulk_size=1000, **kwargs): - if finisher_max_number_workers > num_threads: - self.max_number_workers = finisher_max_number_workers + if finisher_max_number_workers: + self.max_number_workers = int(finisher_max_number_workers) else: - self.max_number_workers = max_number_workers + self.max_number_workers = int(max_number_workers) self.set_max_workers() num_threads = int(self.max_number_workers) diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index 1cc33643..e1f3e86b 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -32,12 +32,12 @@ class Trigger(Poller): Trigger works to trigger to release jobs """ - def __init__(self, num_threads=1, trigger_max_number_workers=3, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2, + def __init__(self, num_threads=1, trigger_max_number_workers=None, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2, name='Trigger', message_bulk_size=1000, max_updates_per_round=2000, **kwargs): - if trigger_max_number_workers > num_threads: - self.max_number_workers = trigger_max_number_workers + if trigger_max_number_workers: + self.max_number_workers = int(trigger_max_number_workers) else: - self.max_number_workers = max_number_workers + self.max_number_workers = int(max_number_workers) self.set_max_workers() From 4fe8fc38dedd0c49da589fc7c8e0838bdc807e04 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 26 Oct 2023 14:39:33 +0200 Subject: [PATCH 03/11] heartbeat report availability --- common/lib/idds/common/utils.py | 21 ++++++ main/config_default/idds.cfg | 8 +-- main/lib/idds/agents/carrier/utils.py | 4 +- main/lib/idds/agents/common/baseagent.py | 70 ++++++++++++++++++- .../idds/agents/coordinator/coordinator.py | 11 ++- main/lib/idds/agents/main.py | 8 ++- 6 files changed, 111 insertions(+), 11 deletions(-) diff --git a/common/lib/idds/common/utils.py b/common/lib/idds/common/utils.py index 6090cf73..71627b2e 100644 --- a/common/lib/idds/common/utils.py +++ b/common/lib/idds/common/utils.py @@ -41,6 +41,12 @@ DATE_FORMAT = '%a, %d %b %Y %H:%M:%S UTC' +def get_log_dir(): + if config_has_section('common') and config_has_option('common', 'logdir'): + return config_get('common', 'logdir') + return "/var/log/idds" + + def setup_logging(name, stream=None, loglevel=None): """ Setup logging @@ -588,3 +594,18 @@ def pid_exists(pid): def get_list_chunks(full_list, bulk_size=2000): chunks = [full_list[i:i + bulk_size] for i in range(0, len(full_list), bulk_size)] return chunks + + +def report_availability(availability): + try: + log_dir = get_log_dir() + if log_dir: + filename = os.path.join(log_dir, 'idds_availability') + with open(filename, 'w') as f: + json.dump(availability, f) + else: + print("availability: %s" % str(availability)) + except Exception as ex: + error = "Failed to report availablity: %s" % str(ex) + print(error) + logging.debug(error) diff --git a/main/config_default/idds.cfg b/main/config_default/idds.cfg index afa1db6a..cce9fbd4 100755 --- a/main/config_default/idds.cfg +++ b/main/config_default/idds.cfg @@ -36,8 +36,8 @@ coordination_interval_delay = 300 [clerk] -num_threads = 16 -max_number_workers = 16 +num_threads = 4 +max_number_workers = 4 poll_period = 300 new_poll_period = 10 update_poll_period = 300 @@ -66,8 +66,8 @@ domapandawork.num_retries = 0 [carrier] num_threads = 8 max_number_workers = 8 -trigger_max_number_workers = 8 -finisher_max_number_workers = 8 +trigger_max_number_workers = 3 +finisher_max_number_workers = 3 receiver_num_threads = 8 poll_period = 300 diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 9f4546d8..62767acd 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -1154,6 +1154,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= # contents_id_list.append(con['content_id']) new_contents_update_list_chunks = [new_contents_update_list[i:i + max_updates_per_round] for i in range(0, len(new_contents_update_list), max_updates_per_round)] for chunk in new_contents_update_list_chunks: + logger.debug(log_prefix + "new_contents_update chunk[:3](total: %s): %s" % (str(chunk[:3]), len(chunk))) core_catalog.update_contents(chunk) # core_catalog.delete_contents_update(contents=contents_id_list) core_catalog.delete_contents_update(request_id=request_id, transform_id=transform_id, fetch=True) @@ -1164,6 +1165,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= to_triggered_contents = core_catalog.get_update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) to_triggered_contents_chunks = [to_triggered_contents[i:i + max_updates_per_round] for i in range(0, len(to_triggered_contents), max_updates_per_round)] for chunk in to_triggered_contents_chunks: + logger.debug(log_prefix + "update_contents_from_others_by_dep_id chunk[:3](total: %s): %s" % (str(chunk[:3]), len(chunk))) core_catalog.update_contents(chunk) logger.debug(log_prefix + "update_contents_from_others_by_dep_id done") @@ -1185,7 +1187,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= has_updates = False for updated_contents_ret in updated_contents_ret_chunks: updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents = updated_contents_ret - logger.debug(log_prefix + "handle_trigger_processing: updated_contents[:3] %s" % (updated_contents[:3])) + logger.debug(log_prefix + "handle_trigger_processing: updated_contents[:3] (total: %s): %s" % (updated_contents[:3], len(updated_contents))) if updated_contents_full_input: # if the content is updated by receiver, here is the place to broadcast the messages diff --git a/main/lib/idds/agents/common/baseagent.py b/main/lib/idds/agents/common/baseagent.py index 271cd6a9..0f8730f3 100644 --- a/main/lib/idds/agents/common/baseagent.py +++ b/main/lib/idds/agents/common/baseagent.py @@ -21,7 +21,7 @@ ReturnCode) from idds.common.plugin.plugin_base import PluginBase from idds.common.plugin.plugin_utils import load_plugins, load_plugin_sequence -from idds.common.utils import setup_logging, pid_exists, json_dumps +from idds.common.utils import setup_logging, pid_exists, json_dumps, json_loads from idds.core import health as core_health, messages as core_messages from idds.agents.common.timerscheduler import TimerScheduler from idds.agents.common.eventbus.eventbus import EventBus @@ -64,6 +64,12 @@ def __init__(self, num_threads=1, name=None, logger=None, **kwargs): else: self.event_interval_delay = int(self.event_interval_delay) + if not hasattr(self, 'max_worker_exec_time'): + self.max_worker_exec_time = 3600 + else: + self.max_worker_exec_time = int(self.max_worker_exec_time) + self.num_hang_workers, self.num_active_workers = 0, 0 + self.plugins = {} self.plugin_sequence = [] @@ -130,6 +136,9 @@ def load_plugins(self): raise AgentPluginError("Plugin %s is defined but it is not defined in plugin_sequence" % plugin_name) """ + def get_num_hang_active_workers(self): + return self.num_hang_workers, self.num_active_workers + def init_event_function_map(self): self.event_func_map = {} @@ -138,6 +147,8 @@ def get_event_function_map(self): def execute_event_schedule(self): event_ids = list(self.event_futures.keys()) + self.num_hang_workers, self.num_active_workers = 0, len(event_ids) + for event_id in event_ids: event, future, start_time = self.event_futures[event_id] if future.done(): @@ -158,6 +169,9 @@ def execute_event_schedule(self): self.logger.warning("Corresponding resource is locked, put the event back again: %s" % json_dumps(event)) event.requeue() self.event_bus.send(event) + else: + if time.time() - start_time > self.max_worker_exec_time: + self.num_hang_workers += 1 event_funcs = self.get_event_function_map() for event_type in event_funcs: @@ -233,7 +247,8 @@ def is_self(self, health_item): return ret def get_health_payload(self): - return None + num_hang_workers, num_active_workers = self.get_num_hang_active_workers() + return {'num_hang_workers': num_hang_workers, 'num_active_workers': num_active_workers} def is_ready(self): return True @@ -246,6 +261,8 @@ def health_heartbeat(self, heartbeat_delay=None): thread_id = self.get_thread_id() thread_name = self.get_thread_name() payload = self.get_health_payload() + if payload: + payload = json_dumps(payload) if self.is_ready(): self.logger.debug("health heartbeat: agent %s, pid %s, thread %s, delay %s, payload %s" % (self.get_name(), pid, thread_name, self.heartbeat_delay, payload)) core_health.add_health_item(agent=self.get_name(), hostname=hostname, pid=pid, @@ -265,6 +282,55 @@ def health_heartbeat(self, heartbeat_delay=None): if pid_not_exists: core_health.clean_health(hostname=hostname, pids=pid_not_exists, older_than=None) + def get_health_items(self): + try: + hostname = socket.getfqdn() + core_health.clean_health(older_than=self.heartbeat_delay * 2) + health_items = core_health.retrieve_health_items() + pids, pid_not_exists = [], [] + for health_item in health_items: + if health_item['hostname'] == hostname: + pid = health_item['pid'] + if pid not in pids: + pids.append(pid) + for pid in pids: + if not pid_exists(pid): + pid_not_exists.append(pid) + if pid_not_exists: + core_health.clean_health(hostname=hostname, pids=pid_not_exists, older_than=None) + + health_items = core_health.retrieve_health_items() + return health_items + except Exception as ex: + self.logger.warn("Failed to get health items: %s" % str(ex)) + + return [] + + def get_availability(self): + try: + availability = {} + health_items = self.get_health_items() + hostname = socket.getfqdn() + for item in health_items: + if item['hostname'] == hostname: + if item['agent'] not in availability: + availability[item['agent']] = {} + payload = item['payload'] + num_hang_workers = 0 + num_active_workers = 0 + if payload: + payload = json_loads(payload) + num_hang_workers = payload.get('num_hang_workers', 0) + num_active_workers = payload.get('num_active_workers', 0) + + availability[item['agent']]['num_hang_workers'] = num_hang_workers + availability[item['agent']]['num_active_workers'] = num_active_workers + + return availability + except Exception as ex: + self.logger.warn("Failed to get availability: %s" % str(ex)) + return {} + def add_default_tasks(self): task = self.create_task(task_func=self.health_heartbeat, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=self.heartbeat_delay, diff --git a/main/lib/idds/agents/coordinator/coordinator.py b/main/lib/idds/agents/coordinator/coordinator.py index a148aed6..1421828e 100644 --- a/main/lib/idds/agents/coordinator/coordinator.py +++ b/main/lib/idds/agents/coordinator/coordinator.py @@ -16,7 +16,7 @@ from idds.common.constants import (Sections) from idds.common.exceptions import IDDSException from idds.common.event import EventPriority -from idds.common.utils import setup_logging, get_logger, json_dumps, json_loads +from idds.common.utils import setup_logging, get_logger, json_loads from idds.core import health as core_health from idds.agents.common.baseagent import BaseAgent @@ -88,9 +88,14 @@ def is_ready(self): return True def get_health_payload(self): + payload = super(Coordinator, self).get_health_payload() + manager = self.event_bus.get_manager() - payload = {'manager': manager} - payload = json_dumps(payload) + if payload: + payload['manager'] = manager + else: + payload = {'manager': manager} + # payload = json_dumps(payload) return payload def select_coordinator(self): diff --git a/main/lib/idds/agents/main.py b/main/lib/idds/agents/main.py index aff5ffb3..af82f8bb 100755 --- a/main/lib/idds/agents/main.py +++ b/main/lib/idds/agents/main.py @@ -20,7 +20,7 @@ from idds.common.constants import Sections from idds.common.config import config_has_section, config_has_option, config_list_options, config_get -from idds.common.utils import setup_logging +from idds.common.utils import setup_logging, report_availability setup_logging('idds.log') @@ -111,6 +111,12 @@ def run_agents(): logging.critical("Exit main run loop.") break + # select one agent to get the health items + candidate = RUNNING_AGENTS[0] + availability = candidate.get_availability() + logging.debug("availability: %s" % availability) + report_availability(availability) + def stop(signum=None, frame=None): global RUNNING_AGENTS From dc7088ffd2bfb761d15b7131b114420bfe57ec3b Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 26 Oct 2023 17:57:05 +0200 Subject: [PATCH 04/11] bulk submitter and poller --- main/lib/idds/agents/carrier/poller.py | 6 +- main/lib/idds/agents/carrier/submitter.py | 1 + main/lib/idds/agents/carrier/trigger.py | 2 +- main/lib/idds/agents/carrier/utils.py | 142 ++++++++++++++++------ 4 files changed, 112 insertions(+), 39 deletions(-) diff --git a/main/lib/idds/agents/carrier/poller.py b/main/lib/idds/agents/carrier/poller.py index 89bec25c..4bc07e48 100644 --- a/main/lib/idds/agents/carrier/poller.py +++ b/main/lib/idds/agents/carrier/poller.py @@ -35,7 +35,7 @@ class Poller(BaseAgent): """ def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2, - name='Poller', message_bulk_size=1000, **kwargs): + max_updates_per_round=2000, name='Poller', message_bulk_size=1000, **kwargs): self.max_number_workers = max_number_workers if int(num_threads) < int(self.max_number_workers): num_threads = int(self.max_number_workers) @@ -83,6 +83,9 @@ def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries= else: self.max_number_workers = int(self.max_number_workers) + self.max_updates_per_round = max_updates_per_round + self.logger.info("max_updates_per_round: %s" % self.max_updates_per_round) + self.show_queue_size_time = None def is_ok_to_run_more_processings(self): @@ -279,6 +282,7 @@ def handle_update_processing(self, processing): log_prefix = self.get_log_prefix(processing) ret_handle_update_processing = handle_update_processing(processing, self.agent_attributes, + max_updates_per_round=self.max_updates_per_round, logger=self.logger, log_prefix=log_prefix) diff --git a/main/lib/idds/agents/carrier/submitter.py b/main/lib/idds/agents/carrier/submitter.py index 2b6b18b2..817268df 100644 --- a/main/lib/idds/agents/carrier/submitter.py +++ b/main/lib/idds/agents/carrier/submitter.py @@ -109,6 +109,7 @@ def handle_new_processing(self, processing): ret_new_processing = handle_new_processing(processing, self.agent_attributes, func_site_to_cloud=self.get_site_to_cloud, + max_updates_per_round=self.max_updates_per_round, logger=self.logger, log_prefix=log_prefix) status, processing, update_colls, new_contents, new_input_dependency_contents, msgs, errors = ret_new_processing diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index e1f3e86b..2b31e63d 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -43,7 +43,7 @@ def __init__(self, num_threads=1, trigger_max_number_workers=None, max_number_wo num_threads = int(self.max_number_workers) super(Trigger, self).__init__(num_threads=num_threads, name=name, max_number_workers=self.max_number_workers, - retrieve_bulk_size=retrieve_bulk_size, **kwargs) + max_updates_per_round=max_updates_per_round, retrieve_bulk_size=retrieve_bulk_size, **kwargs) self.logger.info("num_threads: %s" % num_threads) self.max_updates_per_round = max_updates_per_round diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 62767acd..2a4fd04a 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -24,7 +24,7 @@ MessageStatus, MessageSource, MessageDestination, get_work_status_from_transform_processing_status) -from idds.common.utils import setup_logging +from idds.common.utils import setup_logging, get_list_chunks from idds.core import (transforms as core_transforms, processings as core_processings, catalog as core_catalog) @@ -205,13 +205,14 @@ def get_ext_contents(transform_id, work): return contents_ids -def get_new_contents(request_id, transform_id, workload_id, new_input_output_maps, logger=None, log_prefix=''): +def get_new_contents(request_id, transform_id, workload_id, new_input_output_maps, max_updates_per_round=2000, logger=None, log_prefix=''): logger = get_logger(logger) logger.debug(log_prefix + "get_new_contents") new_input_contents, new_output_contents, new_log_contents = [], [], [] new_input_dependency_contents = [] new_input_dep_coll_ids = [] + chunks = [] for map_id in new_input_output_maps: inputs = new_input_output_maps[map_id]['inputs'] if 'inputs' in new_input_output_maps[map_id] else [] inputs_dependency = new_input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in new_input_output_maps[map_id] else [] @@ -233,7 +234,21 @@ def get_new_contents(request_id, transform_id, workload_id, new_input_output_map content = get_new_content(request_id, transform_id, workload_id, map_id, log_content, content_relation_type=ContentRelationType.Log) new_log_contents.append(content) - return new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents + total_num_updates = len(new_input_contents) + len(new_output_contents) + len(new_log_contents) + len(new_input_dependency_contents) + if total_num_updates > max_updates_per_round: + chunk = new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents + chunks.append(chunk) + + new_input_contents, new_output_contents, new_log_contents = [], [], [] + new_input_dependency_contents = [] + + total_num_updates = len(new_input_contents) + len(new_output_contents) + len(new_log_contents) + len(new_input_dependency_contents) + if total_num_updates > 0: + chunk = new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents + chunks.append(chunk) + + # return new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents + return chunks def get_update_content(content): @@ -502,19 +517,25 @@ def handle_new_processing(processing, agent_attributes, func_site_to_cloud=None, request_id = processing['request_id'] transform_id = processing['transform_id'] workload_id = processing['workload_id'] - ret_new_contents = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps, logger=logger, log_prefix=log_prefix) - new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents - # new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents - new_contents = new_input_contents + new_output_contents + new_log_contents - - # not generate new messages - # if new_input_contents: - # msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='input') - # ret_msgs = ret_msgs + msgs - # if new_output_contents: - # msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='output') - # ret_msgs = ret_msgs + msgs - return True, processing, update_collections, new_contents, new_input_dependency_contents, ret_msgs, errors + ret_new_contents_chunks = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps, logger=logger, log_prefix=log_prefix) + for ret_new_contents in ret_new_contents_chunks: + new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents + # new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents + new_contents = new_input_contents + new_output_contents + new_log_contents + + # not generate new messages + # if new_input_contents: + # msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='input') + # ret_msgs = ret_msgs + msgs + # if new_output_contents: + # msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='output') + # ret_msgs = ret_msgs + msgs + core_processings.update_processing_contents(update_processing=None, + new_contents=new_contents, + new_input_dependency_contents=new_input_dependency_contents, + messages=ret_msgs) + # return True, processing, update_collections, new_contents, new_input_dependency_contents, ret_msgs, errors + return True, processing, update_collections, [], [], ret_msgs, errors def get_updated_contents_by_request(request_id, transform_id, workload_id, work, terminated=False, input_output_maps=None, @@ -951,9 +972,10 @@ def trigger_release_inputs(request_id, transform_id, workload_id, work, updated_ return update_contents, update_input_contents_full, update_contents_status_name, update_contents_status -def poll_missing_outputs(input_output_maps): +def poll_missing_outputs(input_output_maps, max_updates_per_round=2000): content_updates_missing, updated_contents_full_missing = [], [] + chunks = [] for map_id in input_output_maps: inputs = input_output_maps[map_id]['inputs'] if 'inputs' in input_output_maps[map_id] else [] inputs_dependency = input_output_maps[map_id]['inputs_dependency'] if 'inputs_dependency' in input_output_maps[map_id] else [] @@ -979,7 +1001,15 @@ def poll_missing_outputs(input_output_maps): content_updates_missing.append(u_content) updated_contents_full_missing.append(content) - return content_updates_missing, updated_contents_full_missing + if len(content_updates_missing) > max_updates_per_round: + chunk = content_updates_missing, updated_contents_full_missing + chunks.append(chunk) + content_updates_missing, updated_contents_full_missing = [], [] + if len(content_updates_missing) > 0: + chunk = content_updates_missing, updated_contents_full_missing + chunks.append(chunk) + # return content_updates_missing, updated_contents_full_missing + return chunks def has_external_content_id(input_output_maps): @@ -1020,7 +1050,7 @@ def get_update_external_content_ids(input_output_maps, external_content_ids): return update_contents -def handle_update_processing(processing, agent_attributes, logger=None, log_prefix=''): +def handle_update_processing(processing, agent_attributes, max_updates_per_round=2000, logger=None, log_prefix=''): logger = get_logger(logger) ret_msgs = [] @@ -1064,29 +1094,67 @@ def handle_update_processing(processing, agent_attributes, logger=None, log_pref logger.debug(log_prefix + "poll_processing_updates new_input_output_maps1.keys[:3]: %s" % (list(new_input_output_maps1.keys())[:3])) logger.debug(log_prefix + "poll_processing_updates updated_contents_full[:3]: %s" % (updated_contents_full[:3])) - ret_new_contents = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps) - new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents - # new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents - new_contents = new_input_contents + new_output_contents + new_log_contents + ret_new_contents_chunks = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps, max_updates_per_round=max_updates_per_round) + for ret_new_contents in ret_new_contents_chunks: + new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents + + ret_msgs = [] + if new_input_contents: + msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', + files=new_input_contents, relation_type='input') + ret_msgs = ret_msgs + msgs + if new_output_contents: + msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', + files=new_output_contents, relation_type='output') + ret_msgs = ret_msgs + msgs - content_updates_missing, updated_contents_full_missing = poll_missing_outputs(input_output_maps) + # new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents + new_contents = new_input_contents + new_output_contents + new_log_contents - if new_input_contents: - msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', - files=new_input_contents, relation_type='input') - ret_msgs = ret_msgs + msgs - if new_output_contents: - msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', - files=new_output_contents, relation_type='output') - ret_msgs = ret_msgs + msgs + core_processings.update_processing_contents(update_processing=None, + new_contents=new_contents, + new_input_dependency_contents=new_input_dependency_contents, + messages=ret_msgs) + + ret_msgs = [] + content_updates_missing_chunks = poll_missing_outputs(input_output_maps, max_updates_per_round=max_updates_per_round) + for content_updates_missing_chunk in content_updates_missing_chunks: + content_updates_missing, updated_contents_full_missing = content_updates_missing_chunk + msgs = [] + if updated_contents_full_missing: + msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', + files=updated_contents_full, relation_type='output') + core_processings.update_processing_contents(update_processing=None, + update_contents=content_updates_missing, + messages=msgs) - updated_contents_full = updated_contents_full + updated_contents_full_missing if updated_contents_full: - msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', - files=updated_contents_full, relation_type='output') - ret_msgs = ret_msgs + msgs + updated_contents_full_chunks = get_list_chunks(updated_contents_full, bulk_size=max_updates_per_round) + for updated_contents_full_chunk in updated_contents_full_chunks: + msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', + files=updated_contents_full_chunk, relation_type='output') + core_processings.update_processing_contents(update_processing=None, + messages=msgs) + + if new_contents_ext: + new_contents_ext_chunks = get_list_chunks(new_contents_ext, bulk_size=max_updates_per_round) + for new_contents_ext_chunk in new_contents_ext_chunks: + core_processings.update_processing_contents(update_processing=None, + new_contents_ext=new_contents_ext_chunk) + if update_contents_ext: + update_contents_ext_chunks = get_list_chunks(update_contents_ext, bulk_size=max_updates_per_round) + for update_contents_ext_chunk in update_contents_ext_chunks: + core_processings.update_processing_contents(update_processing=None, + update_contents_ext=update_contents_ext_chunk) + + if content_updates: + content_updates_chunks = get_list_chunks(content_updates, bulk_size=max_updates_per_round) + for content_updates_chunk in content_updates_chunks: + core_processings.update_processing_contents(update_processing=None, + update_contents=content_updates_chunk) - return process_status, new_contents, new_input_dependency_contents, ret_msgs, content_updates + content_updates_missing, parameters, new_contents_ext, update_contents_ext + # return process_status, new_contents, new_input_dependency_contents, ret_msgs, content_updates + content_updates_missing, parameters, new_contents_ext, update_contents_ext + return process_status, [], [], ret_msgs, [], parameters, [], [] def get_transform_id_dependency_map(transform_id, logger=None, log_prefix=''): From a352a4e4e723e5cf6fc6dfee4431fabcb8a9d4ed Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 26 Oct 2023 18:05:24 +0200 Subject: [PATCH 05/11] bulk submitter and poller --- main/lib/idds/agents/carrier/utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 2a4fd04a..32369a21 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -481,7 +481,7 @@ def generate_messages(request_id, transform_id, workload_id, work, msg_type='fil return msgs -def handle_new_processing(processing, agent_attributes, func_site_to_cloud=None, logger=None, log_prefix=''): +def handle_new_processing(processing, agent_attributes, func_site_to_cloud=None, max_updates_per_round=2000, logger=None, log_prefix=''): logger = get_logger(logger) proc = processing['processing_metadata']['processing'] @@ -517,7 +517,8 @@ def handle_new_processing(processing, agent_attributes, func_site_to_cloud=None, request_id = processing['request_id'] transform_id = processing['transform_id'] workload_id = processing['workload_id'] - ret_new_contents_chunks = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps, logger=logger, log_prefix=log_prefix) + ret_new_contents_chunks = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps, + max_updates_per_round=max_updates_per_round, logger=logger, log_prefix=log_prefix) for ret_new_contents in ret_new_contents_chunks: new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents # new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents From 73464eceb3b59a57ad7fbe355ea9f2afee3ff189 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 26 Oct 2023 20:56:24 +0200 Subject: [PATCH 06/11] update sls monitor svc --- main/lib/idds/core/processings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main/lib/idds/core/processings.py b/main/lib/idds/core/processings.py index b336f48e..338eae84 100644 --- a/main/lib/idds/core/processings.py +++ b/main/lib/idds/core/processings.py @@ -314,7 +314,7 @@ def resolve_input_dependency_id(new_input_dependency_contents, session=None): @transactional_session -def update_processing_contents(update_processing, update_contents, update_messages=None, new_contents=None, +def update_processing_contents(update_processing, update_contents=None, update_messages=None, new_contents=None, update_dep_contents=None, update_collections=None, messages=None, new_update_contents=None, new_input_dependency_contents=None, new_contents_ext=None, update_contents_ext=None, From 036da63ef1bc52ae6ac90efab6502cfc9720753a Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 26 Oct 2023 22:08:54 +0200 Subject: [PATCH 07/11] bulk submitter and poller --- main/lib/idds/agents/carrier/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 32369a21..1a37121b 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -1276,7 +1276,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= core_processings.update_processing_contents(update_processing=None, update_contents=updated_contents, - new_update_contents=new_update_contents, + # new_update_contents=new_update_contents, messages=ret_msgs) updated_contents = [] new_update_contents = [] From 71030ce94495379c1ee5a18247fabf6bd328b851 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 26 Oct 2023 22:19:21 +0200 Subject: [PATCH 08/11] bulk submitter and poller --- main/lib/idds/agents/carrier/utils.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 1a37121b..1c0ae37c 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -531,6 +531,7 @@ def handle_new_processing(processing, agent_attributes, func_site_to_cloud=None, # if new_output_contents: # msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='output') # ret_msgs = ret_msgs + msgs + logger.debug(log_prefix + "handle_new_processing: add %s new contents" % (len(new_contents))) core_processings.update_processing_contents(update_processing=None, new_contents=new_contents, new_input_dependency_contents=new_input_dependency_contents, @@ -1112,6 +1113,7 @@ def handle_update_processing(processing, agent_attributes, max_updates_per_round # new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents new_contents = new_input_contents + new_output_contents + new_log_contents + logger.debug(log_prefix + "handle_update_processing: add %s new contents" % (len(new_contents))) core_processings.update_processing_contents(update_processing=None, new_contents=new_contents, new_input_dependency_contents=new_input_dependency_contents, @@ -1125,6 +1127,7 @@ def handle_update_processing(processing, agent_attributes, max_updates_per_round if updated_contents_full_missing: msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=updated_contents_full, relation_type='output') + logger.debug(log_prefix + "handle_update_processing: update %s missing contents" % (len(content_updates_missing))) core_processings.update_processing_contents(update_processing=None, update_contents=content_updates_missing, messages=msgs) @@ -1140,17 +1143,20 @@ def handle_update_processing(processing, agent_attributes, max_updates_per_round if new_contents_ext: new_contents_ext_chunks = get_list_chunks(new_contents_ext, bulk_size=max_updates_per_round) for new_contents_ext_chunk in new_contents_ext_chunks: + logger.debug(log_prefix + "handle_update_processing: add %s ext contents" % (len(new_contents_ext_chunk))) core_processings.update_processing_contents(update_processing=None, new_contents_ext=new_contents_ext_chunk) if update_contents_ext: update_contents_ext_chunks = get_list_chunks(update_contents_ext, bulk_size=max_updates_per_round) for update_contents_ext_chunk in update_contents_ext_chunks: + logger.debug(log_prefix + "handle_update_processing: update %s ext contents" % (len(update_contents_ext_chunk))) core_processings.update_processing_contents(update_processing=None, update_contents_ext=update_contents_ext_chunk) if content_updates: content_updates_chunks = get_list_chunks(content_updates, bulk_size=max_updates_per_round) for content_updates_chunk in content_updates_chunks: + logger.debug(log_prefix + "handle_update_processing: update %s contents" % (len(content_updates_chunk))) core_processings.update_processing_contents(update_processing=None, update_contents=content_updates_chunk) From 7c640627b3868c58e2dc17cf98758502b59ba216 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 26 Oct 2023 22:28:48 +0200 Subject: [PATCH 09/11] bulk submitter and poller --- main/lib/idds/agents/carrier/utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 1c0ae37c..f790472f 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -1229,7 +1229,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= # contents_id_list.append(con['content_id']) new_contents_update_list_chunks = [new_contents_update_list[i:i + max_updates_per_round] for i in range(0, len(new_contents_update_list), max_updates_per_round)] for chunk in new_contents_update_list_chunks: - logger.debug(log_prefix + "new_contents_update chunk[:3](total: %s): %s" % (str(chunk[:3]), len(chunk))) + logger.debug(log_prefix + "new_contents_update chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3]))) core_catalog.update_contents(chunk) # core_catalog.delete_contents_update(contents=contents_id_list) core_catalog.delete_contents_update(request_id=request_id, transform_id=transform_id, fetch=True) @@ -1240,7 +1240,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= to_triggered_contents = core_catalog.get_update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) to_triggered_contents_chunks = [to_triggered_contents[i:i + max_updates_per_round] for i in range(0, len(to_triggered_contents), max_updates_per_round)] for chunk in to_triggered_contents_chunks: - logger.debug(log_prefix + "update_contents_from_others_by_dep_id chunk[:3](total: %s): %s" % (str(chunk[:3]), len(chunk))) + logger.debug(log_prefix + "update_contents_from_others_by_dep_id chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3]))) core_catalog.update_contents(chunk) logger.debug(log_prefix + "update_contents_from_others_by_dep_id done") @@ -1262,7 +1262,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= has_updates = False for updated_contents_ret in updated_contents_ret_chunks: updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents = updated_contents_ret - logger.debug(log_prefix + "handle_trigger_processing: updated_contents[:3] (total: %s): %s" % (updated_contents[:3], len(updated_contents))) + logger.debug(log_prefix + "handle_trigger_processing: updated_contents[:3] (total: %s): %s" % (len(updated_contents), updated_contents[:3])) if updated_contents_full_input: # if the content is updated by receiver, here is the place to broadcast the messages From 788e325e82c98cdd9c9ac3af139171397438a6cb Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 26 Oct 2023 22:57:51 +0200 Subject: [PATCH 10/11] bulk submitter and poller --- main/lib/idds/agents/carrier/trigger.py | 9 ++++++--- main/lib/idds/agents/carrier/utils.py | 8 +++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index 2b31e63d..83b5705e 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -103,7 +103,7 @@ def handle_trigger_processing(self, processing, trigger_new_updates=False): max_updates_per_round=self.max_updates_per_round, logger=self.logger, log_prefix=log_prefix) - process_status, update_contents, ret_msgs, parameters, update_dep_contents_status_name, update_dep_contents_status, new_update_contents, ret_update_transforms = ret_trigger_processing + process_status, update_contents, ret_msgs, parameters, update_dep_contents_status_name, update_dep_contents_status, new_update_contents, ret_update_transforms, has_updates = ret_trigger_processing self.logger.debug(log_prefix + "handle_trigger_processing: ret_update_transforms: %s" % str(ret_update_transforms)) @@ -127,7 +127,8 @@ def handle_trigger_processing(self, processing, trigger_new_updates=False): 'new_update_contents': new_update_contents, 'update_transforms': ret_update_transforms, 'update_dep_contents': (processing['request_id'], update_dep_contents_status_name, update_dep_contents_status), - 'processing_status': new_process_status} + 'processing_status': new_process_status, + 'has_updates': has_updates} except exceptions.ProcessFormatNotSupported as ex: self.logger.error(ex) self.logger.error(traceback.format_exc()) @@ -199,6 +200,7 @@ def process_trigger_processing_real(self, event): self.update_processing(ret, pr) update_transforms = ret.get('update_transforms', None) + has_updates = ret.get('has_updates', None) if update_transforms: # self.logger.info(log_pre + "update_contents_to_others_by_dep_id") # core_catalog.update_contents_to_others_by_dep_id(request_id=pr['request_id'], transform_id=pr['transform_id']) @@ -231,7 +233,8 @@ def process_trigger_processing_real(self, event): if ((event._content and 'has_updates' in event._content and event._content['has_updates']) or ('update_contents' in ret and ret['update_contents']) # noqa W503 or ('new_contents' in ret and ret['new_contents']) # noqa W503 - or ('messages' in ret and ret['messages'])): # noqa E129 + or ('messages' in ret and ret['messages']) # noqa W503 + or has_updates): # noqa E129 self.logger.info(log_pre + "SyncProcessingEvent(processing_id: %s)" % pr['processing_id']) event = SyncProcessingEvent(publisher_id=self.id, processing_id=pr['processing_id'], content=event._content, diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index f790472f..507d45d9 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -1193,6 +1193,7 @@ def get_updated_transforms_by_content_status(request_id=None, transform_id=None, def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=False, max_updates_per_round=2000, logger=None, log_prefix=''): logger = get_logger(logger) + has_updates = False ret_msgs = [] content_updates = [] ret_update_transforms = [] @@ -1207,7 +1208,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= work.set_agent_attributes(agent_attributes, processing) if (not work.use_dependency_to_release_jobs()) or workload_id is None: - return processing['substatus'], [], [], {}, {}, {}, [], [] + return processing['substatus'], [], [], {}, {}, {}, [], [], has_updates else: if trigger_new_updates: # delete information in the contents_update table, to invoke the trigger. @@ -1229,6 +1230,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= # contents_id_list.append(con['content_id']) new_contents_update_list_chunks = [new_contents_update_list[i:i + max_updates_per_round] for i in range(0, len(new_contents_update_list), max_updates_per_round)] for chunk in new_contents_update_list_chunks: + has_updates = True logger.debug(log_prefix + "new_contents_update chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3]))) core_catalog.update_contents(chunk) # core_catalog.delete_contents_update(contents=contents_id_list) @@ -1240,6 +1242,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= to_triggered_contents = core_catalog.get_update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) to_triggered_contents_chunks = [to_triggered_contents[i:i + max_updates_per_round] for i in range(0, len(to_triggered_contents), max_updates_per_round)] for chunk in to_triggered_contents_chunks: + has_updates = True logger.debug(log_prefix + "update_contents_from_others_by_dep_id chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3]))) core_catalog.update_contents(chunk) logger.debug(log_prefix + "update_contents_from_others_by_dep_id done") @@ -1259,7 +1262,6 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= logger=logger, log_prefix=log_prefix) - has_updates = False for updated_contents_ret in updated_contents_ret_chunks: updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents = updated_contents_ret logger.debug(log_prefix + "handle_trigger_processing: updated_contents[:3] (total: %s): %s" % (len(updated_contents), updated_contents[:3])) @@ -1304,7 +1306,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= # return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents, ret_update_transforms # return processing['substatus'], content_updates, ret_msgs, {}, update_dep_contents_status_name, update_dep_contents_status, [], ret_update_transforms # return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, [], ret_update_transforms - return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents, ret_update_transforms + return processing['substatus'], content_updates, ret_msgs, {}, {}, {}, new_update_contents, ret_update_transforms, has_updates def get_content_status_from_panda_msg_status(status): From 0167c5536ddc3fd403ddb5893e950524199533ac Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 26 Oct 2023 23:07:16 +0200 Subject: [PATCH 11/11] bulk submitter and poller --- main/config_default/idds.cfg | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/main/config_default/idds.cfg b/main/config_default/idds.cfg index cce9fbd4..564f9df8 100755 --- a/main/config_default/idds.cfg +++ b/main/config_default/idds.cfg @@ -66,8 +66,8 @@ domapandawork.num_retries = 0 [carrier] num_threads = 8 max_number_workers = 8 -trigger_max_number_workers = 3 -finisher_max_number_workers = 3 +trigger_max_number_workers = 8 +finisher_max_number_workers = 8 receiver_num_threads = 8 poll_period = 300