diff --git a/Dockerfile b/Dockerfile index 92dd8348..3a94cb44 100644 --- a/Dockerfile +++ b/Dockerfile @@ -28,7 +28,7 @@ RUN yum-config-manager --enable crb # RUN yum install -y httpd.x86_64 conda gridsite mod_ssl.x86_64 httpd-devel.x86_64 gcc.x86_64 supervisor.noarch fetch-crl.noarch lcg-CA postgresql postgresql-contrib postgresql-static postgresql-libs postgresql-devel && \ # yum clean all && \ # rm -rf /var/cache/yum -RUN yum install -y httpd.x86_64 which conda gridsite mod_ssl.x86_64 httpd-devel.x86_64 gcc.x86_64 supervisor.noarch fetch-crl.noarch redis syslog-ng procps passwd which && \ +RUN yum install -y httpd.x86_64 which conda gridsite mod_ssl.x86_64 httpd-devel.x86_64 gcc.x86_64 supervisor.noarch fetch-crl.noarch redis syslog-ng procps passwd which systemd-udev && \ yum clean all && \ rm -rf /var/cache/yum @@ -45,6 +45,11 @@ RUN yum install -y fetch-crl.noarch ca-policy-egi-core && \ yum clean all && \ rm -rf /var/cache/yum +# update network limitations +# RUN echo 4096 > /proc/sys/net/core/somaxconn +# RUN sysctl -w net.core.somaxconn=4096 +RUN echo 'net.core.somaxconn=4096' >> /etc/sysctl.d/999-net.somax.conf + # setup env RUN adduser atlpan RUN groupadd zp diff --git a/Dockerfile.centos7 b/Dockerfile.centos7 index dfc76e3a..4c2a18ed 100644 --- a/Dockerfile.centos7 +++ b/Dockerfile.centos7 @@ -28,7 +28,7 @@ RUN yum upgrade -y && \ # RUN yum install -y httpd.x86_64 conda gridsite mod_ssl.x86_64 httpd-devel.x86_64 gcc.x86_64 supervisor.noarch fetch-crl.noarch lcg-CA postgresql postgresql-contrib postgresql-static postgresql-libs postgresql-devel && \ # yum clean all && \ # rm -rf /var/cache/yum -RUN yum install -y httpd.x86_64 which conda gridsite mod_ssl.x86_64 httpd-devel.x86_64 gcc.x86_64 supervisor.noarch fetch-crl.noarch lcg-CA redis syslog-ng && \ +RUN yum install -y httpd.x86_64 which conda gridsite mod_ssl.x86_64 httpd-devel.x86_64 gcc.x86_64 supervisor.noarch fetch-crl.noarch lcg-CA redis syslog-ng systemd-udev && \ yum clean all && \ rm -rf /var/cache/yum @@ -42,6 +42,11 @@ RUN yum install -y fetch-crl.noarch lcg-CA ca-policy-egi-core && \ yum clean all && \ rm -rf /var/cache/yum +# update network limitations +# RUN echo 4096 > /proc/sys/net/core/somaxconn +# RUN sysctl -w net.core.somaxconn=4096 +RUN echo 'net.core.somaxconn=4096' >> /etc/sysctl.d/999-net.somax.conf + # setup env RUN adduser atlpan RUN groupadd zp diff --git a/common/lib/idds/common/utils.py b/common/lib/idds/common/utils.py index f361affe..34af0160 100644 --- a/common/lib/idds/common/utils.py +++ b/common/lib/idds/common/utils.py @@ -6,16 +6,19 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2023 +# - Wen Guan, , 2019 - 2024 +import base64 import errno import datetime +import importlib import logging import json import os import re import requests +import signal import subprocess import sys import tarfile @@ -27,6 +30,7 @@ from itertools import groupby from operator import itemgetter from packaging import version as packaging_version +from typing import Any, Callable from idds.common.config import (config_has_section, config_has_option, config_get, config_get_bool) @@ -234,15 +238,112 @@ def check_database(): return False -def run_process(cmd, stdout=None, stderr=None): +def kill_process_group(pgrp, nap=10): + """ + Kill the process group. + DO NOT MOVE TO PROCESSES.PY - will lead to circular import since execute() needs it as well. + :param pgrp: process group id (int). + :param nap: napping time between kill signals in seconds (int) + :return: boolean (True if SIGTERM followed by SIGKILL signalling was successful) + """ + + status = False + _sleep = True + + # kill the process gracefully + print(f"killing group process {pgrp}") + try: + os.killpg(pgrp, signal.SIGTERM) + except Exception as error: + print(f"exception thrown when killing child group process under SIGTERM: {error}") + _sleep = False + else: + print(f"SIGTERM sent to process group {pgrp}") + + if _sleep: + print(f"sleeping {nap} s to allow processes to exit") + time.sleep(nap) + + try: + os.killpg(pgrp, signal.SIGKILL) + except Exception as error: + print(f"exception thrown when killing child group process with SIGKILL: {error}") + else: + print(f"SIGKILL sent to process group {pgrp}") + status = True + + return status + + +def kill_all(process: Any) -> str: + """ + Kill all processes after a time-out exception in process.communication(). + + :param process: process object + :return: stderr (str). + """ + + stderr = '' + try: + print('killing lingering subprocess and process group') + time.sleep(1) + # process.kill() + kill_process_group(os.getpgid(process.pid)) + except ProcessLookupError as exc: + stderr += f'\n(kill process group) ProcessLookupError={exc}' + except Exception as exc: + stderr += f'\n(kill_all 1) exception caught: {exc}' + try: + print('killing lingering process') + time.sleep(1) + os.kill(process.pid, signal.SIGTERM) + print('sleeping a bit before sending SIGKILL') + time.sleep(10) + os.kill(process.pid, signal.SIGKILL) + except ProcessLookupError as exc: + stderr += f'\n(kill process) ProcessLookupError={exc}' + except Exception as exc: + stderr += f'\n(kill_all 2) exception caught: {exc}' + print(f'sent soft kill signals - final stderr: {stderr}') + return stderr + + +def run_process(cmd, stdout=None, stderr=None, wait=False, timeout=7 * 24 * 3600): """ Runs a command in an out-of-procees shell. """ + print(f"To run command: {cmd}") if stdout and stderr: - process = subprocess.Popen(cmd, shell=True, stdout=stdout, stderr=stderr, preexec_fn=os.setsid) + process = subprocess.Popen(cmd, shell=True, stdout=stdout, stderr=stderr, preexec_fn=os.setsid, encoding='utf-8') + else: + process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setsid, encoding='utf-8') + if not wait: + return process + + try: + print(f'subprocess.communicate() will use timeout={timeout} s') + process.communicate(timeout=timeout) + except subprocess.TimeoutExpired as ex: + stderr = f'subprocess communicate sent TimeoutExpired: {ex}' + print(stderr) + stderr = kill_all(process) + print(f'Killing process: {stderr}') + exit_code = -1 + except Exception as ex: + stderr = f'subprocess has an exception: {ex}' + print(stderr) + stderr = kill_all(process) + print(f'Killing process: {stderr}') + exit_code = -1 else: - process = subprocess.Popen(cmd, shell=True) - return process + exit_code = process.poll() + + try: + process.wait(timeout=60) + except subprocess.TimeoutExpired: + print("process did not complete within the timeout of 60s - terminating") + process.terminate() + return exit_code def run_command(cmd): @@ -630,3 +731,148 @@ def group_list(input_list, key): update_groups[item_tuple] = {'keys': [], 'items': item} update_groups[item_tuple]['keys'].append(item_key) return update_groups + + +def import_fun(name: str) -> Callable[..., Any]: + """Returns a function from a dotted path name. Example: `path.to.module:func`. + + When the attribute we look for is a staticmethod, module name in its + dotted path is not the last-before-end word + + E.g.: package_a.package_b.module_a:ClassA.my_static_method + + Thus we remove the bits from the end of the name until we can import it + + Args: + name (str): The name (reference) to the path. + + Raises: + ValueError: If no module is found or invalid attribute name. + + Returns: + Any: An attribute (normally a Callable) + """ + name_bits = name.split(':') + module_name_bits, attribute_bits = name_bits[:-1], [name_bits[-1]] + module_name_bits = module_name_bits.split('.') + attribute_bits = attribute_bits.split('.') + module = None + while len(module_name_bits): + try: + module_name = '.'.join(module_name_bits) + module = importlib.import_module(module_name) + break + except ImportError: + attribute_bits.insert(0, module_name_bits.pop()) + + if module is None: + # maybe it's a builtin + try: + return __builtins__[name] + except KeyError: + raise ValueError('Invalid attribute name: %s' % name) + + attribute_name = '.'.join(attribute_bits) + if hasattr(module, attribute_name): + return getattr(module, attribute_name) + # staticmethods + attribute_name = attribute_bits.pop() + attribute_owner_name = '.'.join(attribute_bits) + try: + attribute_owner = getattr(module, attribute_owner_name) + except: # noqa + raise ValueError('Invalid attribute name: %s' % attribute_name) + + if not hasattr(attribute_owner, attribute_name): + raise ValueError('Invalid attribute name: %s' % name) + return getattr(attribute_owner, attribute_name) + + +def import_attribute(name: str) -> Callable[..., Any]: + """Returns an attribute from a dotted path name. Example: `path.to.func`. + + When the attribute we look for is a staticmethod, module name in its + dotted path is not the last-before-end word + + E.g.: package_a.package_b.module_a.ClassA.my_static_method + + Thus we remove the bits from the end of the name until we can import it + + Args: + name (str): The name (reference) to the path. + + Raises: + ValueError: If no module is found or invalid attribute name. + + Returns: + Any: An attribute (normally a Callable) + """ + name_bits = name.split('.') + module_name_bits, attribute_bits = name_bits[:-1], [name_bits[-1]] + module = None + while len(module_name_bits): + try: + module_name = '.'.join(module_name_bits) + module = importlib.import_module(module_name) + break + except ImportError: + attribute_bits.insert(0, module_name_bits.pop()) + + if module is None: + # maybe it's a builtin + try: + return __builtins__[name] + except KeyError: + raise ValueError('Invalid attribute name: %s' % name) + + attribute_name = '.'.join(attribute_bits) + if hasattr(module, attribute_name): + return getattr(module, attribute_name) + # staticmethods + attribute_name = attribute_bits.pop() + attribute_owner_name = '.'.join(attribute_bits) + try: + attribute_owner = getattr(module, attribute_owner_name) + except: # noqa + raise ValueError('Invalid attribute name: %s' % attribute_name) + + if not hasattr(attribute_owner, attribute_name): + raise ValueError('Invalid attribute name: %s' % name) + return getattr(attribute_owner, attribute_name) + + +def decode_base64(sb): + try: + if isinstance(sb, str): + sb_bytes = bytes(sb, 'ascii') + elif isinstance(sb, bytes): + sb_bytes = sb + else: + return sb + return base64.b64decode(sb_bytes).decode("utf-8") + except Exception as ex: + logging.error("decode_base64 %s: %s" % (sb, ex)) + return sb + + +def encode_base64(sb): + try: + if isinstance(sb, str): + sb_bytes = bytes(sb, 'ascii') + elif isinstance(sb, bytes): + sb_bytes = sb + return base64.b64encode(sb_bytes).decode("utf-8") + except Exception as ex: + logging.error("encode_base64 %s: %s" % (sb, ex)) + return sb + + +def create_archive_file(work_dir, archive_filename, files): + if not archive_filename.startswith("/"): + archive_filename = os.path.join(work_dir, archive_filename) + + with tarfile.open(archive_filename, "w:gz", dereference=True) as tar: + for local_file in files: + # base_name = os.path.basename(local_file) + tar.add(local_file, arcname=os.path.basename(local_file)) + return archive_filename diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index 1ad9d4c0..81727ef3 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -15,6 +15,7 @@ except ImportError: import configparser as ConfigParser +import concurrent import datetime import os import time @@ -1165,36 +1166,78 @@ def poll_panda_events(self, event_ids, log_prefix=''): jobs_event_status.update(job_event_status) return jobs_event_status - def poll_panda_jobs(self, job_ids, log_prefix=''): + def poll_panda_jobs(self, job_ids, executors=None, log_prefix=''): job_status_info = {} self.logger.debug(log_prefix + "poll_panda_jobs, poll_panda_jobs_chunk_size: %s, job_ids[:10]: %s" % (self.poll_panda_jobs_chunk_size, str(job_ids[:10]))) chunksize = self.poll_panda_jobs_chunk_size chunks = [job_ids[i:i + chunksize] for i in range(0, len(job_ids), chunksize)] - for chunk in chunks: - # jobs_list = Client.getJobStatus(chunk, verbose=0)[1] - jobs_list = self.get_panda_job_status(chunk, log_prefix=log_prefix) - if jobs_list: - self.logger.debug(log_prefix + "poll_panda_jobs, input jobs: %s, output_jobs: %s" % (len(chunk), len(jobs_list))) - for job_info in jobs_list: - job_set_id = job_info.jobsetID - job_status = self.get_content_status_from_panda_status(job_info) - if job_info and job_info.Files and len(job_info.Files) > 0: - for job_file in job_info.Files: - # if job_file.type in ['log']: - if job_file.type not in ['pseudo_input']: - continue - if ':' in job_file.lfn: - pos = job_file.lfn.find(":") - input_file = job_file.lfn[pos + 1:] - # input_file = job_file.lfn.split(':')[1] - else: - input_file = job_file.lfn - # job_status_info[input_file] = {'panda_id': job_info.PandaID, 'status': job_status, 'job_info': job_info} - if input_file not in job_status_info: - job_status_info[input_file] = {'job_set_id': job_set_id, 'jobs': []} - job_status_info[input_file]['jobs'].append({'panda_id': job_info.PandaID, 'status': job_status, 'job_info': job_info}) - else: - self.logger.warn(log_prefix + "poll_panda_jobs, input jobs: %s, output_jobs: %s" % (len(chunk), jobs_list)) + if executors is None: + for chunk in chunks: + # jobs_list = Client.getJobStatus(chunk, verbose=0)[1] + jobs_list = self.get_panda_job_status(chunk, log_prefix=log_prefix) + if jobs_list: + self.logger.debug(log_prefix + "poll_panda_jobs, input jobs: %s, output_jobs: %s" % (len(chunk), len(jobs_list))) + for job_info in jobs_list: + job_set_id = job_info.jobsetID + job_status = self.get_content_status_from_panda_status(job_info) + if job_info and job_info.Files and len(job_info.Files) > 0: + for job_file in job_info.Files: + # if job_file.type in ['log']: + if job_file.type not in ['pseudo_input']: + continue + if ':' in job_file.lfn: + pos = job_file.lfn.find(":") + input_file = job_file.lfn[pos + 1:] + # input_file = job_file.lfn.split(':')[1] + else: + input_file = job_file.lfn + # job_status_info[input_file] = {'panda_id': job_info.PandaID, 'status': job_status, 'job_info': job_info} + if input_file not in job_status_info: + job_status_info[input_file] = {'job_set_id': job_set_id, 'jobs': []} + job_status_info[input_file]['jobs'].append({'panda_id': job_info.PandaID, 'status': job_status, 'job_info': job_info}) + else: + self.logger.warn(log_prefix + "poll_panda_jobs, input jobs: %s, output_jobs: %s" % (len(chunk), jobs_list)) + else: + ret_futures = set() + for chunk in chunks: + f = executors.submit(self.get_panda_job_status, chunk, log_prefix) + ret_futures.add(f) + # Wait for all subprocess to complete + steps = 0 + while True: + steps += 1 + # Wait for all subprocess to complete in 3 minutes + completed, _ = concurrent.futures.wait(ret_futures, timeout=180, return_when=concurrent.futures.ALL_COMPLETED) + for f in completed: + jobs_list = f.result() + if jobs_list: + self.logger.debug(log_prefix + "poll_panda_jobs thread, input jobs: %s, output_jobs: %s" % (len(chunk), len(jobs_list))) + for job_info in jobs_list: + job_set_id = job_info.jobsetID + job_status = self.get_content_status_from_panda_status(job_info) + if job_info and job_info.Files and len(job_info.Files) > 0: + for job_file in job_info.Files: + # if job_file.type in ['log']: + if job_file.type not in ['pseudo_input']: + continue + if ':' in job_file.lfn: + pos = job_file.lfn.find(":") + input_file = job_file.lfn[pos + 1:] + # input_file = job_file.lfn.split(':')[1] + else: + input_file = job_file.lfn + # job_status_info[input_file] = {'panda_id': job_info.PandaID, 'status': job_status, 'job_info': job_info} + if input_file not in job_status_info: + job_status_info[input_file] = {'job_set_id': job_set_id, 'jobs': []} + job_status_info[input_file]['jobs'].append({'panda_id': job_info.PandaID, 'status': job_status, 'job_info': job_info}) + else: + self.logger.warn(log_prefix + "poll_panda_jobs thread, input jobs: %s, output_jobs: %s" % (len(chunk), jobs_list)) + + ret_futures = ret_futures - completed + if len(ret_futures) > 0: + self.logger.debug(log_prefix + "poll_panda_jobs thread: %s threads has been running for more than %s minutes" % (len(ret_futures), steps * 3)) + else: + break if not self.es: for filename in job_status_info: @@ -1649,7 +1692,7 @@ def get_update_contents(self, unterminated_jobs_status, input_output_maps, conte return update_contents, update_contents_full, new_contents_ext, update_contents_ext - def poll_panda_task(self, processing=None, input_output_maps=None, contents_ext=None, job_info_maps={}, log_prefix=''): + def poll_panda_task(self, processing=None, input_output_maps=None, contents_ext=None, job_info_maps={}, executors=None, log_prefix=''): task_id = None try: from pandaclient import Client @@ -1679,7 +1722,7 @@ def poll_panda_task(self, processing=None, input_output_maps=None, contents_ext= unterminated_jobs = self.get_unterminated_jobs(all_jobs_ids, input_output_maps, contents_ext) self.logger.debug(log_prefix + "poll_panda_task, task_id: %s, all jobs: %s, unterminated_jobs: %s" % (str(task_id), len(all_jobs_ids), len(unterminated_jobs))) - unterminated_jobs_status = self.poll_panda_jobs(unterminated_jobs, log_prefix=log_prefix) + unterminated_jobs_status = self.poll_panda_jobs(unterminated_jobs, executors=executors, log_prefix=log_prefix) self.logger.debug("unterminated_jobs_status: %s" % str(unterminated_jobs_status)) abort_status = False @@ -1784,7 +1827,7 @@ def get_external_content_ids(self, processing, log_prefix=''): return output return [] - def poll_processing_updates(self, processing, input_output_maps, contents_ext=None, job_info_maps={}, log_prefix=''): + def poll_processing_updates(self, processing, input_output_maps, contents_ext=None, job_info_maps={}, executors=None, log_prefix=''): """ *** Function called by Carrier agent. """ @@ -1799,6 +1842,7 @@ def poll_processing_updates(self, processing, input_output_maps, contents_ext=No input_output_maps=input_output_maps, contents_ext=contents_ext, job_info_maps=job_info_maps, + executors=executors, log_prefix=log_prefix) processing_status, update_contents, update_contents_full, new_contents_ext, update_contents_ext = ret_poll_panda_task diff --git a/main/config_default/httpd-idds-443-py39-cc7.conf b/main/config_default/httpd-idds-443-py39-cc7.conf index 4922fa16..8a980b74 100644 --- a/main/config_default/httpd-idds-443-py39-cc7.conf +++ b/main/config_default/httpd-idds-443-py39-cc7.conf @@ -32,7 +32,8 @@ MinSpareServers ${IDDS_SERVER_CONF_MIN_WORKERS} ServerLimit ${IDDS_SERVER_CONF_MAX_WORKERS} MaxSpareServers ${IDDS_SERVER_CONF_MAX_WORKERS} MaxClients ${IDDS_SERVER_CONF_MAX_WORKERS} -MaxRequestsPerChild 2000 +MaxRequestsPerChild 100 +ThreadsPerChild 100 @@ -41,14 +42,15 @@ MinSpareThreads ${IDDS_SERVER_CONF_MIN_WORKERS} ServerLimit ${IDDS_SERVER_CONF_MAX_WORKERS} MaxSpareThreads ${IDDS_SERVER_CONF_MAX_WORKERS} MaxRequestWorkers ${IDDS_SERVER_CONF_MAX_WORKERS} -MaxConnectionsPerChild 2000 +MaxConnectionsPerChild 100 +ThreadsPerChild 100 WSGIPythonHome /opt/idds WSGIPythonPath /opt/idds/lib/python3.9/site-packages - WSGIDaemonProcess idds_daemon processes=${IDDS_SERVER_CONF_NUM_WSGI} threads=2 request-timeout=600 queue-timeout=600 python-home=/opt/idds python-path=/opt/idds/lib/python3.9/site-packages python-path=/opt/idds python-path=/opt/idds/lib/python3.9/site-packages + WSGIDaemonProcess idds_daemon processes=${IDDS_SERVER_CONF_NUM_WSGI} threads=${IDDS_SERVER_CONF_NUM_WSGI_THREAD} request-timeout=600 queue-timeout=600 python-home=/opt/idds python-path=/opt/idds/lib/python3.9/site-packages python-path=/opt/idds python-path=/opt/idds/lib/python3.9/site-packages WSGIProcessGroup idds_daemon WSGIApplicationGroup %GLOBAL WSGIScriptAlias /idds /opt/idds/bin/idds.wsgi @@ -57,6 +59,8 @@ WSGIPythonPath /opt/idds/lib/python3.9/site-packages WSGIPassAuthorization On +ListenBackLog ${IDDS_SERVER_CONF_MAX_BACKLOG} + Listen 8443 Listen 8080 diff --git a/main/etc/idds/idds.cfg.template b/main/etc/idds/idds.cfg.template index d75942fb..f516f1b9 100755 --- a/main/etc/idds/idds.cfg.template +++ b/main/etc/idds/idds.cfg.template @@ -26,7 +26,7 @@ loglevel = DEBUG # idds atlas condor pool: aipanda101 # dev: aipanda104 # doma: aipanda105-107 -# +# idds-mon: aipanda108 [database] #default = mysql://idds:idds@pcuwvirt5.cern.ch/idds #default = mysql://idds:idds_passwd@aipanda182.cern.ch/idds diff --git a/main/etc/sql/oracle_update.sql b/main/etc/sql/oracle_update.sql index e3dc6cf0..0da5cb11 100644 --- a/main/etc/sql/oracle_update.sql +++ b/main/etc/sql/oracle_update.sql @@ -463,7 +463,7 @@ CREATE TABLE meta_info created_at DATE DEFAULT SYS_EXTRACT_UTC(systimestamp(0)), updated_at DATE DEFAULT SYS_EXTRACT_UTC(systimestamp(0)), description VARCHAR2(1000), - metadata CLOB, + meta_info CLOB, CONSTRAINT METAINFO_PK PRIMARY KEY (meta_id), -- USING INDEX LOCAL, CONSTRAINT METAINFO_NAME_UQ UNIQUE (name) ); diff --git a/main/etc/sql/postgresql.sql b/main/etc/sql/postgresql.sql index 0804b940..c8d1f1d1 100644 --- a/main/etc/sql/postgresql.sql +++ b/main/etc/sql/postgresql.sql @@ -311,6 +311,7 @@ CREATE TABLE doma_idds.events_archive ( CONSTRAINT "EVENTS_AR_PK" PRIMARY KEY (event_id) ); +CREATE SEQUENCE doma_idds."THROTTLER_ID_SEQ" START WITH 1 CREATE TABLE doma_idds.throttlers ( throttler_id BIGSERIAL NOT NULL, diff --git a/main/lib/idds/agents/carrier/poller.py b/main/lib/idds/agents/carrier/poller.py index 30b7296d..40d6be3f 100644 --- a/main/lib/idds/agents/carrier/poller.py +++ b/main/lib/idds/agents/carrier/poller.py @@ -86,6 +86,15 @@ def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries= self.max_updates_per_round = max_updates_per_round self.logger.info("max_updates_per_round: %s" % self.max_updates_per_round) + if not hasattr(self, 'enable_executors') or not self.enable_executors: + self.enable_executors = False + else: + if str(self.enable_executors).lower() == 'true': + self.enable_executors = True + else: + self.enable_executors = False + self.logger.info("enable_executors: %s" % self.enable_executors) + self.show_queue_size_time = None def is_ok_to_run_more_processings(self): @@ -286,9 +295,14 @@ def update_processing(self, processing, processing_model, use_bulk_update_mappin def handle_update_processing(self, processing): try: log_prefix = self.get_log_prefix(processing) + executors = None + if self.enable_executors: + executors = self.executors + ret_handle_update_processing = handle_update_processing(processing, self.agent_attributes, max_updates_per_round=self.max_updates_per_round, + executors=executors, logger=self.logger, log_prefix=log_prefix) diff --git a/main/lib/idds/agents/carrier/submitter.py b/main/lib/idds/agents/carrier/submitter.py index 118fd3ae..4a21e3c1 100644 --- a/main/lib/idds/agents/carrier/submitter.py +++ b/main/lib/idds/agents/carrier/submitter.py @@ -111,10 +111,14 @@ def handle_new_processing(self, processing): # transform_id = processing['transform_id'] # transform = core_transforms.get_transform(transform_id=transform_id) # work = transform['transform_metadata']['work'] + executors = None + if self.enable_executors: + executors = self.executors ret_new_processing = handle_new_processing(processing, self.agent_attributes, func_site_to_cloud=self.get_site_to_cloud, max_updates_per_round=self.max_updates_per_round, + executors=executors, logger=self.logger, log_prefix=log_prefix) status, processing, update_colls, new_contents, new_input_dependency_contents, msgs, errors = ret_new_processing diff --git a/main/lib/idds/agents/carrier/trigger.py b/main/lib/idds/agents/carrier/trigger.py index 5667b132..0f19c551 100644 --- a/main/lib/idds/agents/carrier/trigger.py +++ b/main/lib/idds/agents/carrier/trigger.py @@ -102,10 +102,15 @@ def get_trigger_processings(self): def handle_trigger_processing(self, processing, trigger_new_updates=False): try: log_prefix = self.get_log_prefix(processing) + executors = None + if self.enable_executors: + executors = self.executors + ret_trigger_processing = handle_trigger_processing(processing, self.agent_attributes, trigger_new_updates=trigger_new_updates, max_updates_per_round=self.max_updates_per_round, + executors=executors, logger=self.logger, log_prefix=log_prefix) process_status, update_contents, ret_msgs, parameters, update_dep_contents_status_name, update_dep_contents_status, new_update_contents, ret_update_transforms, has_updates = ret_trigger_processing diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 8c56e461..308e2065 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -8,10 +8,12 @@ # Authors: # - Wen Guan, , 2022 - 2023 +import concurrent import json import logging import time import threading +import traceback from idds.common.constants import (ProcessingStatus, CollectionStatus, @@ -541,7 +543,36 @@ def generate_messages(request_id, transform_id, workload_id, work, msg_type='fil return msgs -def handle_new_processing(processing, agent_attributes, func_site_to_cloud=None, max_updates_per_round=2000, logger=None, log_prefix=''): +def update_processing_contents_thread(logger, log_prefix, log_msg, kwargs): + try: + logger = get_logger(logger) + logger.debug(log_prefix + log_msg) + core_processings.update_processing_contents(**kwargs) + logger.debug(log_prefix + " end") + except Exception as ex: + logger.error(log_prefix + "update_processing_contents_thread: %s" % str(ex)) + except: + logger.error(traceback.format_exc()) + + +def wait_futures_finish(ret_futures, func_name, logger, log_prefix): + logger = get_logger(logger) + logger.debug(log_prefix + "%s: wait_futures_finish" % func_name) + # Wait for all subprocess to complete + steps = 0 + while True: + steps += 1 + # Wait for all subprocess to complete in 3 minutes + completed, _ = concurrent.futures.wait(ret_futures, timeout=180, return_when=concurrent.futures.ALL_COMPLETED) + ret_futures = ret_futures - completed + if len(ret_futures) > 0: + logger.debug(log_prefix + "%s thread: %s threads has been running for more than %s minutes" % (func_name, len(ret_futures), steps * 3)) + else: + break + logger.debug(log_prefix + "%s: wait_futures_finish end" % func_name) + + +def handle_new_processing(processing, agent_attributes, func_site_to_cloud=None, max_updates_per_round=2000, executors=None, logger=None, log_prefix=''): logger = get_logger(logger) proc = processing['processing_metadata']['processing'] @@ -579,24 +610,40 @@ def handle_new_processing(processing, agent_attributes, func_site_to_cloud=None, workload_id = processing['workload_id'] ret_new_contents_chunks = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps, max_updates_per_round=max_updates_per_round, logger=logger, log_prefix=log_prefix) - for ret_new_contents in ret_new_contents_chunks: - new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents - # new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents - new_contents = new_input_contents + new_output_contents + new_log_contents - - # not generate new messages - # if new_input_contents: - # msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='input') - # ret_msgs = ret_msgs + msgs - # if new_output_contents: - # msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='output') - # ret_msgs = ret_msgs + msgs - logger.debug(log_prefix + "handle_new_processing: add %s new contents" % (len(new_contents))) - core_processings.update_processing_contents(update_processing=None, - request_id=request_id, - new_contents=new_contents, - new_input_dependency_contents=new_input_dependency_contents, - messages=ret_msgs) + if executors is None: + for ret_new_contents in ret_new_contents_chunks: + new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents + # new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents + new_contents = new_input_contents + new_output_contents + new_log_contents + + # not generate new messages + # if new_input_contents: + # msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='input') + # ret_msgs = ret_msgs + msgs + # if new_output_contents: + # msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=new_input_contents, relation_type='output') + # ret_msgs = ret_msgs + msgs + logger.debug(log_prefix + "handle_new_processing: add %s new contents" % (len(new_contents))) + core_processings.update_processing_contents(update_processing=None, + request_id=request_id, + new_contents=new_contents, + new_input_dependency_contents=new_input_dependency_contents, + messages=ret_msgs) + else: + ret_futures = set() + for ret_new_contents in ret_new_contents_chunks: + new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents + new_contents = new_input_contents + new_output_contents + new_log_contents + log_msg = "handle_new_processing thread: add %s new contents" % (len(new_contents)) + kwargs = {'update_processing': None, + 'request_id': request_id, + 'new_contents': new_contents, + 'new_input_dependency_contents': new_input_dependency_contents, + 'messages': ret_msgs} + f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs) + ret_futures.add(f) + wait_futures_finish(ret_futures, "handle_new_processing", logger, log_prefix) + # return True, processing, update_collections, new_contents, new_input_dependency_contents, ret_msgs, errors return True, processing, update_collections, [], [], ret_msgs, errors @@ -1126,7 +1173,7 @@ def get_update_external_content_ids(input_output_maps, external_content_ids): return update_contents -def handle_update_processing(processing, agent_attributes, max_updates_per_round=2000, use_bulk_update_mappings=True, logger=None, log_prefix=''): +def handle_update_processing(processing, agent_attributes, max_updates_per_round=2000, use_bulk_update_mappings=True, executors=None, logger=None, log_prefix=''): logger = get_logger(logger) ret_msgs = [] @@ -1157,7 +1204,7 @@ def handle_update_processing(processing, agent_attributes, max_updates_per_round contents_ext = get_ext_contents(transform_id, work) job_info_maps = core_catalog.get_contents_ext_maps() ret_poll_processing = work.poll_processing_updates(processing, input_output_maps, contents_ext=contents_ext, - job_info_maps=job_info_maps, log_prefix=log_prefix) + job_info_maps=job_info_maps, executors=executors, log_prefix=log_prefix) process_status, content_updates, new_input_output_maps1, updated_contents_full, parameters, new_contents_ext, update_contents_ext = ret_poll_processing else: ret_poll_processing = work.poll_processing_updates(processing, input_output_maps, log_prefix=log_prefix) @@ -1170,6 +1217,8 @@ def handle_update_processing(processing, agent_attributes, max_updates_per_round logger.debug(log_prefix + "poll_processing_updates new_input_output_maps1.keys[:3]: %s" % (list(new_input_output_maps1.keys())[:3])) logger.debug(log_prefix + "poll_processing_updates updated_contents_full[:3]: %s" % (updated_contents_full[:3])) + ret_futures = set() + ret_new_contents_chunks = get_new_contents(request_id, transform_id, workload_id, new_input_output_maps, max_updates_per_round=max_updates_per_round) for ret_new_contents in ret_new_contents_chunks: new_input_contents, new_output_contents, new_log_contents, new_input_dependency_contents = ret_new_contents @@ -1187,14 +1236,25 @@ def handle_update_processing(processing, agent_attributes, max_updates_per_round # new_contents = new_input_contents + new_output_contents + new_log_contents + new_input_dependency_contents new_contents = new_input_contents + new_output_contents + new_log_contents - logger.debug(log_prefix + "handle_update_processing: add %s new contents" % (len(new_contents))) - core_processings.update_processing_contents(update_processing=None, - new_contents=new_contents, - new_input_dependency_contents=new_input_dependency_contents, - request_id=request_id, - # transform_id=transform_id, - use_bulk_update_mappings=use_bulk_update_mappings, - messages=ret_msgs) + if executors is None: + logger.debug(log_prefix + "handle_update_processing: add %s new contents" % (len(new_contents))) + core_processings.update_processing_contents(update_processing=None, + new_contents=new_contents, + new_input_dependency_contents=new_input_dependency_contents, + request_id=request_id, + # transform_id=transform_id, + use_bulk_update_mappings=use_bulk_update_mappings, + messages=ret_msgs) + else: + log_msg = "handle_update_processing thread: add %s new contents" % (len(new_contents)) + kwargs = {'update_processing': None, + 'request_id': request_id, + 'new_contents': new_contents, + 'new_input_dependency_contents': new_input_dependency_contents, + 'use_bulk_update_mappings': use_bulk_update_mappings, + 'messages': ret_msgs} + f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs) + ret_futures.add(f) ret_msgs = [] content_updates_missing_chunks = poll_missing_outputs(input_output_maps, max_updates_per_round=max_updates_per_round) @@ -1204,48 +1264,100 @@ def handle_update_processing(processing, agent_attributes, max_updates_per_round if updated_contents_full_missing: msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=updated_contents_full, relation_type='output') - logger.debug(log_prefix + "handle_update_processing: update %s missing contents" % (len(content_updates_missing))) - core_processings.update_processing_contents(update_processing=None, - update_contents=content_updates_missing, - request_id=request_id, - # transform_id=transform_id, - # use_bulk_update_mappings=use_bulk_update_mappings, - use_bulk_update_mappings=False, - messages=msgs) + if executors is None: + logger.debug(log_prefix + "handle_update_processing: update %s missing contents" % (len(content_updates_missing))) + core_processings.update_processing_contents(update_processing=None, + update_contents=content_updates_missing, + request_id=request_id, + # transform_id=transform_id, + # use_bulk_update_mappings=use_bulk_update_mappings, + use_bulk_update_mappings=False, + messages=msgs) + else: + log_msg = "handle_update_processing thread: update %s missing contents" % (len(content_updates_missing)) + kwargs = {'update_processing': None, + 'request_id': request_id, + 'update_contents': content_updates_missing, + 'use_bulk_update_mappings': False, + 'messages': msgs} + f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs) + ret_futures.add(f) if updated_contents_full: updated_contents_full_chunks = get_list_chunks(updated_contents_full, bulk_size=max_updates_per_round) for updated_contents_full_chunk in updated_contents_full_chunks: msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='file', files=updated_contents_full_chunk, relation_type='output') - core_processings.update_processing_contents(update_processing=None, - request_id=request_id, - messages=msgs) + if executors is None: + log_msg = "handle_update_processing: update %s messages" % (len(msgs)) + logger.debug(log_prefix + log_msg) + core_processings.update_processing_contents(update_processing=None, + request_id=request_id, + messages=msgs) + else: + log_msg = "handle_update_processing thread: update %s messages" % (len(msgs)) + kwargs = {'update_processing': None, + 'request_id': request_id, + 'messages': msgs} + f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs) + ret_futures.add(f) if new_contents_ext: new_contents_ext_chunks = get_list_chunks(new_contents_ext, bulk_size=max_updates_per_round) for new_contents_ext_chunk in new_contents_ext_chunks: - logger.debug(log_prefix + "handle_update_processing: add %s ext contents" % (len(new_contents_ext_chunk))) - core_processings.update_processing_contents(update_processing=None, - request_id=request_id, - new_contents_ext=new_contents_ext_chunk) + if executors is None: + log_msg = "handle_update_processing: add %s ext contents" % (len(new_contents_ext_chunk)) + logger.debug(log_prefix + log_msg) + core_processings.update_processing_contents(update_processing=None, + request_id=request_id, + new_contents_ext=new_contents_ext_chunk) + else: + log_msg = "handle_update_processing thread: add %s ext contents" % (len(new_contents_ext_chunk)) + kwargs = {'update_processing': None, + 'request_id': request_id, + 'new_contents_ext': new_contents_ext_chunk} + f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs) + ret_futures.add(f) + if update_contents_ext: update_contents_ext_chunks = get_list_chunks(update_contents_ext, bulk_size=max_updates_per_round) for update_contents_ext_chunk in update_contents_ext_chunks: - logger.debug(log_prefix + "handle_update_processing: update %s ext contents" % (len(update_contents_ext_chunk))) - core_processings.update_processing_contents(update_processing=None, - request_id=request_id, - update_contents_ext=update_contents_ext_chunk) + if executors is None: + log_msg = "handle_update_processing: update %s ext contents" % (len(update_contents_ext_chunk)) + logger.debug(log_prefix + log_msg) + core_processings.update_processing_contents(update_processing=None, + request_id=request_id, + update_contents_ext=update_contents_ext_chunk) + else: + log_msg = "handle_update_processing thread: update %s ext contents" % (len(update_contents_ext_chunk)) + kwargs = {'update_processing': None, + 'request_id': request_id, + 'update_contents_ext': update_contents_ext_chunk} + f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs) + ret_futures.add(f) if content_updates: content_updates_chunks = get_list_chunks(content_updates, bulk_size=max_updates_per_round) for content_updates_chunk in content_updates_chunks: - logger.debug(log_prefix + "handle_update_processing: update %s contents" % (len(content_updates_chunk))) - core_processings.update_processing_contents(update_processing=None, - request_id=request_id, - # transform_id=transform_id, - use_bulk_update_mappings=use_bulk_update_mappings, - update_contents=content_updates_chunk) + if executors is None: + log_msg = "handle_update_processing: update %s contents" % (len(content_updates_chunk)) + logger.debug(log_prefix + log_msg) + core_processings.update_processing_contents(update_processing=None, + request_id=request_id, + # transform_id=transform_id, + use_bulk_update_mappings=use_bulk_update_mappings, + update_contents=content_updates_chunk) + else: + log_msg = "handle_update_processing thread: update %s contents" % (len(content_updates_chunk)) + kwargs = {'update_processing': None, + 'request_id': request_id, + 'use_bulk_update_mappings': use_bulk_update_mappings, + 'update_contents': content_updates_chunk} + f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs) + ret_futures.add(f) + + if len(ret_futures) > 0: + wait_futures_finish(ret_futures, "handle_update_processing", logger, log_prefix) # return process_status, new_contents, new_input_dependency_contents, ret_msgs, content_updates + content_updates_missing, parameters, new_contents_ext, update_contents_ext return process_status, [], [], ret_msgs, [], parameters, [], [] @@ -1277,7 +1389,19 @@ def get_updated_transforms_by_content_status(request_id=None, transform_id=None, return update_transforms -def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=False, max_updates_per_round=2000, logger=None, log_prefix=''): +def update_contents_thread(logger, log_prefix, log_msg, kwargs): + try: + logger = get_logger(logger) + logger.debug(log_prefix + log_msg) + core_catalog.update_contents(**kwargs) + logger.debug(log_prefix + " end") + except Exception as ex: + logger.error(log_prefix + "update_contents_thread: %s" % str(ex)) + except: + logger.error(traceback.format_exc()) + + +def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=False, max_updates_per_round=2000, executors=None, logger=None, log_prefix=''): logger = get_logger(logger) has_updates = False @@ -1320,11 +1444,24 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= new_contents_update_list.append(con_dict) # contents_id_list.append(con['content_id']) new_contents_update_list_chunks = [new_contents_update_list[i:i + max_updates_per_round] for i in range(0, len(new_contents_update_list), max_updates_per_round)] + ret_futures = set() for chunk in new_contents_update_list_chunks: has_updates = True - logger.debug(log_prefix + "new_contents_update chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3]))) - # core_catalog.update_contents(chunk, request_id=request_id, transform_id=transform_id, use_bulk_update_mappings=False) - core_catalog.update_contents(chunk, request_id=request_id, transform_id=transform_id, use_bulk_update_mappings=True) + if executors is None: + logger.debug(log_prefix + "new_contents_update chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3]))) + # core_catalog.update_contents(chunk, request_id=request_id, transform_id=transform_id, use_bulk_update_mappings=False) + core_catalog.update_contents(chunk, request_id=request_id, transform_id=transform_id, use_bulk_update_mappings=True) + else: + log_msg = "new_contents_update thread chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3])) + kwargs = {'parameters': chunk, + 'request_id': request_id, + 'transform_id': transform_id, + 'use_bulk_update_mappings': True} + f = executors.submit(update_contents_thread, logger, log_prefix, log_msg, kwargs) + ret_futures.add(f) + if len(ret_futures) > 0: + wait_futures_finish(ret_futures, "new_contents_update", logger, log_prefix) + # core_catalog.delete_contents_update(contents=contents_id_list) core_catalog.delete_contents_update(request_id=request_id, transform_id=transform_id, fetch=True) logger.debug(log_prefix + "sync contents_update to contents done") @@ -1333,10 +1470,24 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= # core_catalog.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) to_triggered_contents = core_catalog.get_update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id) to_triggered_contents_chunks = [to_triggered_contents[i:i + max_updates_per_round] for i in range(0, len(to_triggered_contents), max_updates_per_round)] + + ret_futures = set() for chunk in to_triggered_contents_chunks: has_updates = True - logger.debug(log_prefix + "update_contents_from_others_by_dep_id chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3]))) - core_catalog.update_contents(chunk, request_id=request_id, transform_id=transform_id, use_bulk_update_mappings=False) + if executors is None: + logger.debug(log_prefix + "update_contents_from_others_by_dep_id chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3]))) + core_catalog.update_contents(chunk, request_id=request_id, transform_id=transform_id, use_bulk_update_mappings=False) + else: + log_msg = "update_contents_from_others_by_dep_id thread chunk[:3](total: %s): %s" % (len(chunk), str(chunk[:3])) + kwargs = {'parameters': chunk, + 'request_id': request_id, + 'transform_id': transform_id, + 'use_bulk_update_mappings': False} + f = executors.submit(update_contents_thread, logger, log_prefix, log_msg, kwargs) + ret_futures.add(f) + if len(ret_futures) > 0: + wait_futures_finish(ret_futures, "update_contents_from_others_by_dep_id", logger, log_prefix) + logger.debug(log_prefix + "update_contents_from_others_by_dep_id done") input_output_maps = get_input_output_maps(transform_id, work) @@ -1354,9 +1505,9 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= logger=logger, log_prefix=log_prefix) + ret_futures = set() for updated_contents_ret in updated_contents_ret_chunks: updated_contents, updated_contents_full_input, updated_contents_full_output, updated_contents_full_input_deps, new_update_contents = updated_contents_ret - logger.debug(log_prefix + "handle_trigger_processing: updated_contents[:3] (total: %s): %s" % (len(updated_contents), updated_contents[:3])) if updated_contents_full_input: # if the content is updated by receiver, here is the place to broadcast the messages @@ -1374,16 +1525,30 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= if updated_contents or new_update_contents: has_updates = True - core_processings.update_processing_contents(update_processing=None, - update_contents=updated_contents, - # new_update_contents=new_update_contents, - messages=ret_msgs, - request_id=request_id, - # transform_id=transform_id, - use_bulk_update_mappings=False) + if executors is None: + logger.debug(log_prefix + "handle_trigger_processing: updated_contents[:3] (total: %s): %s" % (len(updated_contents), updated_contents[:3])) + core_processings.update_processing_contents(update_processing=None, + update_contents=updated_contents, + # new_update_contents=new_update_contents, + messages=ret_msgs, + request_id=request_id, + # transform_id=transform_id, + use_bulk_update_mappings=False) + else: + log_msg = "handle_trigger_processing thread: updated_contents[:3] (total: %s): %s" % (len(updated_contents), updated_contents[:3]) + kwargs = {'update_processing': None, + 'request_id': request_id, + 'update_contents': updated_contents, + 'messages': ret_msgs, + 'use_bulk_update_mappings': False} + f = executors.submit(update_processing_contents_thread, logger, log_prefix, log_msg, kwargs) + ret_futures.add(f) + updated_contents = [] new_update_contents = [] ret_msgs = [] + if len(ret_futures) > 0: + wait_futures_finish(ret_futures, "handle_trigger_processing", logger, log_prefix) if has_updates: ret_update_transforms = get_updated_transforms_by_content_status(request_id=request_id, diff --git a/main/lib/idds/agents/common/baseagent.py b/main/lib/idds/agents/common/baseagent.py index 54275972..578b9721 100644 --- a/main/lib/idds/agents/common/baseagent.py +++ b/main/lib/idds/agents/common/baseagent.py @@ -229,7 +229,10 @@ def __call__(self): def stop(self): super(BaseAgent, self).stop() - self.event_bus.stop() + try: + self.event_bus.stop() + except Exception: + pass def terminate(self): self.stop() diff --git a/main/lib/idds/orm/base/alembic/versions/53d0af715dab_add_site_throttler.py b/main/lib/idds/orm/base/alembic/versions/53d0af715dab_add_site_throttler.py index 0557b6e7..44bb7a33 100644 --- a/main/lib/idds/orm/base/alembic/versions/53d0af715dab_add_site_throttler.py +++ b/main/lib/idds/orm/base/alembic/versions/53d0af715dab_add_site_throttler.py @@ -57,6 +57,8 @@ def upgrade() -> None: op.drop_constraint('THROTTLER_PK', table_name='throttlers', schema=schema) except: pass + # op.create_sequence(sa.Sequence('THROTTLER_ID_SEQ', schema=schema)) + op.execute(sa.schema.CreateSequence(sa.Sequence('THROTTLER_ID_SEQ', schema=schema))) op.create_table('throttlers', sa.Column('throttler_id', sa.BigInteger(), sa.Sequence('THROTTLER_ID_SEQ', schema=schema)), sa.Column('site', sa.String(50), nullable=False), @@ -92,3 +94,4 @@ def downgrade() -> None: op.drop_constraint('THROTTLER_SITE_UQ', table_name='throttlers', schema=schema) op.drop_constraint('THROTTLER_PK', table_name='throttlers', schema=schema) op.drop_table('throttlers', schema=schema) + op.drop_sequence('THROTTLER_ID_SEQ', schema=schema) diff --git a/main/lib/idds/tests/core_tests.py b/main/lib/idds/tests/core_tests.py index cd265c0e..c8e4770d 100644 --- a/main/lib/idds/tests/core_tests.py +++ b/main/lib/idds/tests/core_tests.py @@ -175,6 +175,7 @@ def print_workflow_template(workflow, layers=0): reqs = get_requests(request_id=479187, with_request=True, with_detail=False, with_metadata=True) reqs = get_requests(request_id=4498, with_request=True, with_detail=False, with_metadata=True) reqs = get_requests(request_id=3244, with_request=True, with_detail=False, with_metadata=True) +reqs = get_requests(request_id=6082, with_request=True, with_detail=False, with_metadata=True) # reqs = get_requests(request_id=589913, with_request=True, with_detail=False, with_metadata=True) for req in reqs: # print(req['request_id']) @@ -227,9 +228,9 @@ def print_workflow_template(workflow, layers=0): print(json_dumps(workflow.template, sort_keys=True, indent=4)) -""" sys.exit(0) +""" reqs = get_requests(request_id=28182323, with_request=False, with_detail=True, with_metadata=False) for req in reqs: print(json_dumps(req, sort_keys=True, indent=4)) diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index 0a45a4da..a7788ece 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -10,8 +10,8 @@ os.environ['PANDA_URL'] = 'http://rubin-panda-server-dev.slac.stanford.edu:80/server/panda' os.environ['PANDA_URL_SSL'] = 'https://rubin-panda-server-dev.slac.stanford.edu:8443/server/panda' -# os.environ['PANDA_URL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' -# os.environ['PANDA_URL_SSL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' +os.environ['PANDA_URL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' +os.environ['PANDA_URL_SSL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' from pandaclient import Client # noqa E402 @@ -49,6 +49,8 @@ task_ids = [i for i in range(166799, 167877)] task_ids = [i for i in range(167997, 168003)] task_ids = [688, 8686, 8695, 8696] +task_ids = [i for i in range(8958, 9634)] +task_ids = [i for i in range(8752, 8958)] for task_id in task_ids: print("Killing %s" % task_id) ret = Client.killTask(task_id, verbose=True) diff --git a/main/lib/idds/tests/test_domapanda_big.py b/main/lib/idds/tests/test_domapanda_big.py index a576af78..c9e78354 100644 --- a/main/lib/idds/tests/test_domapanda_big.py +++ b/main/lib/idds/tests/test_domapanda_big.py @@ -173,7 +173,7 @@ def setup_workflow(): taskN4.dependencies = [ {"name": "00004" + str(k), "dependencies": [], - "submitted": False} for k in range(10000) + "submitted": False} for k in range(100) ] taskN5 = PanDATask() @@ -182,7 +182,7 @@ def setup_workflow(): taskN5.dependencies = [ {"name": "00005" + str(k), "dependencies": [], - "submitted": False} for k in range(10000) + "submitted": False} for k in range(100) ] work1 = DomaPanDAWork(executable='echo; sleep 180', @@ -234,7 +234,7 @@ def setup_workflow(): work4 = DomaPanDAWork(executable='echo; sleep 180', primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'}, output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}], - log_collections=[], dependency_map=taskN1.dependencies, + log_collections=[], dependency_map=taskN4.dependencies, task_name=taskN4.name, task_queue=task_queue3, encode_command_line=True, task_priority=981, @@ -250,7 +250,7 @@ def setup_workflow(): work5 = DomaPanDAWork(executable='echo; sleep 180', primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'}, output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}], - log_collections=[], dependency_map=taskN1.dependencies, + log_collections=[], dependency_map=taskN5.dependencies, task_name=taskN5.name, task_queue=task_queue4, encode_command_line=True, task_priority=981, diff --git a/main/tools/env/setup_panda.sh b/main/tools/env/setup_panda.sh index bf38d973..37ef38ca 100644 --- a/main/tools/env/setup_panda.sh +++ b/main/tools/env/setup_panda.sh @@ -49,7 +49,7 @@ elif [ "$instance" == "usdf" ]; then export PANDA_URL=https://usdf-panda-server.slac.stanford.edu:8443/server/panda export PANDACACHE_URL=$PANDA_URL_SSL export PANDAMON_URL=https://usdf-panda-bigmon.slac.stanford.edu:8443/ - export PANDA_AUTH_VO=Rubin + export PANDA_AUTH_VO=Rubin:production export PANDACACHE_URL=$PANDA_URL_SSL export PANDA_SYS=/afs/cern.ch/user/w/wguan/workdisk/iDDS/.conda/iDDS/ diff --git a/start-daemon.sh b/start-daemon.sh index cadc96b6..9a8fe2bd 100755 --- a/start-daemon.sh +++ b/start-daemon.sh @@ -181,7 +181,7 @@ fi # min number of workers if [[ -z "${IDDS_SERVER_CONF_MIN_WORKERS}" ]]; then - export IDDS_SERVER_CONF_MIN_WORKERS=25 + export IDDS_SERVER_CONF_MIN_WORKERS=32 fi # max number of workers @@ -191,7 +191,17 @@ fi # max number of WSGI daemons if [[ -z "${IDDS_SERVER_CONF_NUM_WSGI}" ]]; then - export IDDS_SERVER_CONF_NUM_WSGI=25 + export IDDS_SERVER_CONF_NUM_WSGI=32 +fi + +# max number of WSGI daemons +if [[ -z "${IDDS_SERVER_CONF_MAX_BACKLOG}" ]]; then + export IDDS_SERVER_CONF_MAX_BACKLOG=511 +fi + +# max number of WSGI threads +if [[ -z "${IDDS_SERVER_CONF_NUM_WSGI_THREAD}" ]]; then + export IDDS_SERVER_CONF_NUM_WSGI_THREAD=32 fi # create database if not exists