From b1511e539346bc6df9800be04fb7bc8240db02da Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 18 Oct 2024 09:36:08 +0200 Subject: [PATCH 1/2] fix to retrieve messages without request_id --- main/lib/idds/agents/conductor/conductor.py | 1 + main/lib/idds/orm/messages.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/main/lib/idds/agents/conductor/conductor.py b/main/lib/idds/agents/conductor/conductor.py index 619941ac..3d9f9a7b 100644 --- a/main/lib/idds/agents/conductor/conductor.py +++ b/main/lib/idds/agents/conductor/conductor.py @@ -331,6 +331,7 @@ def run(self): output_messages = self.get_output_messages() self.clean_messages(output_messages) + time.sleep(1) except IDDSException as error: self.logger.error("Main thread IDDSException: %s" % str(error)) self.logger.error(traceback.format_exc()) diff --git a/main/lib/idds/orm/messages.py b/main/lib/idds/orm/messages.py index 8cfd1060..dca4566b 100644 --- a/main/lib/idds/orm/messages.py +++ b/main/lib/idds/orm/messages.py @@ -117,7 +117,8 @@ def update_messages(messages, bulk_size=1000, use_bulk_update_mappings=False, re query = query.filter(models.Message.request_id == request_id) else: if min_request_id: - query = query.filter(models.Message.request_id >= min_request_id) + query = query.filter(or_(models.Message.request_id >= min_request_id, + models.Message.request_id.is_(None))) if transform_id: query = query.filter(models.Message.transform_id == transform_id) query = query.filter(models.Message.msg_id.in_(keys))\ From 8e080bb0dfc653caf5194ed055317ca6fde672fc Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Fri, 18 Oct 2024 09:41:11 +0200 Subject: [PATCH 2/2] fix to handle cloud for sites --- doma/lib/idds/doma/workflowv2/domapandawork.py | 7 ++++--- main/lib/idds/agents/carrier/submitter.py | 7 ++++++- main/lib/idds/tests/panda_test.py | 10 ++++++---- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index 6c958109..f4ce8d55 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -792,9 +792,10 @@ def submit_panda_task(self, processing): if 'new_retries' in processing and processing['new_retries']: new_retries = int(processing['new_retries']) task_param['taskName'] = task_param['taskName'] + "_" + str(new_retries) - cloud = self.get_site_from_cloud(task_param['PandaSite']) - if cloud: - task_param['cloud'] = cloud + if not task_param['cloud']: + cloud = self.get_site_from_cloud(task_param['PandaSite']) + if cloud: + task_param['cloud'] = cloud if self.has_dependency(): parent_tid = None diff --git a/main/lib/idds/agents/carrier/submitter.py b/main/lib/idds/agents/carrier/submitter.py index dfb258a2..6be26fee 100644 --- a/main/lib/idds/agents/carrier/submitter.py +++ b/main/lib/idds/agents/carrier/submitter.py @@ -102,7 +102,12 @@ def get_site_to_cloud(self, site, log_prefix=''): if site and self.site_to_cloud: cloud = self.site_to_cloud.get(site, None) - self.logger.debug(log_prefix + "cloud for site(%s): %s" % (site, cloud)) + if cloud: + self.logger.debug(log_prefix + "cloud for site(%s): %s" % (site, cloud)) + return cloud + if 'default' in self.site_to_cloud: + cloud = self.site_to_cloud.get('default', None) + self.logger.debug(log_prefix + "cloud for default site(%s): %s" % (site, cloud)) return cloud except Exception as ex: self.logger.error(ex) diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index d29ad1b5..d2cde25d 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -8,14 +8,14 @@ os.environ['PANDA_URL_SSL'] = 'https://pandaserver-doma.cern.ch:443/server/panda' os.environ['PANDA_BEHIND_REAL_LB'] = "1" -# os.environ['PANDA_URL'] = 'http://rubin-panda-server-dev.slac.stanford.edu:80/server/panda' -# os.environ['PANDA_URL_SSL'] = 'https://rubin-panda-server-dev.slac.stanford.edu:8443/server/panda' +os.environ['PANDA_URL'] = 'http://rubin-panda-server-dev.slac.stanford.edu:80/server/panda' +os.environ['PANDA_URL_SSL'] = 'https://rubin-panda-server-dev.slac.stanford.edu:8443/server/panda' # os.environ['PANDA_URL_SSL'] = 'https://panda-doma-k8s-panda.cern.ch/server/panda' # os.environ['PANDA_URL'] = 'http://panda-doma-k8s-panda.cern.ch:25080/server/panda' -os.environ['PANDA_URL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' -os.environ['PANDA_URL_SSL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' +# os.environ['PANDA_URL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' +# os.environ['PANDA_URL_SSL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda' # os.environ['PANDA_URL_SSL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda' # os.environ['PANDA_URL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda' @@ -81,6 +81,8 @@ task_ids = [124, 68, 75, 78, 79] task_ids = [19654] task_ids = [16700, 16704, 17055, 17646, 17792, 18509, 19754, 21666, 21714, 21739, 16148, 16149, 16150] +task_ids = [473, 472] + [i for i in range(325, 345)] +task_ids = [476, 477, 478] for task_id in task_ids: print("Killing %s" % task_id) ret = Client.killTask(task_id, verbose=True)