From 4d2ebfadf4359b3cdf881775d69b1592006ce58d Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 31 Jan 2024 17:28:34 +0100 Subject: [PATCH] fix to set the site --- client/lib/idds/client/clientmanager.py | 1 + doma/lib/idds/doma/workflowv2/domapandawork.py | 9 +++++++++ main/lib/idds/agents/clerk/clerk.py | 2 +- main/lib/idds/core/requests.py | 14 +++++++++++--- main/lib/idds/orm/requests.py | 8 ++++---- workflow/lib/idds/workflow/work.py | 3 +++ workflow/lib/idds/workflow/workflow.py | 14 ++++++++++++++ workflow/lib/idds/workflowv2/work.py | 3 +++ workflow/lib/idds/workflowv2/workflow.py | 14 ++++++++++++++ 9 files changed, 60 insertions(+), 8 deletions(-) diff --git a/client/lib/idds/client/clientmanager.py b/client/lib/idds/client/clientmanager.py index 8fbaf651..e8152ef9 100644 --- a/client/lib/idds/client/clientmanager.py +++ b/client/lib/idds/client/clientmanager.py @@ -441,6 +441,7 @@ def submit(self, workflow, username=None, userdn=None, use_dataset_name=False): 'transform_tag': 'workflow', 'status': RequestStatus.New, 'priority': 0, + 'site': workflow.get_site(), 'lifetime': workflow.lifetime, 'workload_id': workflow.get_workload_id(), 'request_metadata': {'version': release_version, 'workload_id': workflow.get_workload_id(), 'workflow': workflow} diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index 730ff4b7..1ad9d4c0 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -146,6 +146,15 @@ def my_condition(self): return True return False + def get_site(self): + if self.task_site: + return self.task_site + if self.task_queue: + return self.task_queue + if self.queue: + return self.queue + return self.task_cloud + @property def dependency_map(self): return self._dependency_map diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index a77dc908..2a135c2b 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -356,7 +356,7 @@ def clean_min_request_id(self): min_request_id = old_min_request_id - 1000 BaseAgent.min_request_id = min_request_id else: - for req_id in BaseAgent.min_request_id_cache: + for req_id in list(BaseAgent.min_request_id_cache.keys()): time_stamp = BaseAgent.min_request_id_cache[req_id] if time_stamp < time.time() - 12 * 3600: # older than 12 hours del BaseAgent.min_request_id_cache[req_id] diff --git a/main/lib/idds/core/requests.py b/main/lib/idds/core/requests.py index 0c2f4077..d9b08aa8 100644 --- a/main/lib/idds/core/requests.py +++ b/main/lib/idds/core/requests.py @@ -33,7 +33,7 @@ def create_request(scope=None, name=None, requester=None, request_type=None, username=None, userdn=None, transform_tag=None, status=RequestStatus.New, locking=RequestLocking.Idle, priority=0, lifetime=None, workload_id=None, request_metadata=None, - new_poll_period=1, update_poll_period=10, + new_poll_period=1, update_poll_period=10, site=None, new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0, processing_metadata=None): """ @@ -65,6 +65,7 @@ def create_request(scope=None, name=None, requester=None, request_type=None, 'new_poll_period': new_poll_period, 'update_poll_period': update_poll_period, 'new_retries': new_retries, 'update_retries': update_retries, 'max_new_retries': max_new_retries, 'max_update_retries': max_update_retries, + 'site': site, 'request_metadata': request_metadata, 'processing_metadata': processing_metadata} return orm_requests.create_request(**kwargs) @@ -74,7 +75,7 @@ def add_request(scope=None, name=None, requester=None, request_type=None, username=None, userdn=None, transform_tag=None, status=RequestStatus.New, locking=RequestLocking.Idle, priority=0, lifetime=None, workload_id=None, request_metadata=None, - new_poll_period=1, update_poll_period=10, + new_poll_period=1, update_poll_period=10, site=None, new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0, processing_metadata=None, session=None): """ @@ -98,9 +99,16 @@ def add_request(scope=None, name=None, requester=None, request_type=None, if workload_id is None and request_metadata and 'workload_id' in request_metadata and request_metadata['workload_id']: workload_id = int(request_metadata['workload_id']) # request_metadata = convert_request_metadata_to_workflow(scope, name, workload_id, request_type, request_metadata) + if not site: + try: + if request_metadata and 'workflow' in request_metadata and request_metadata['workflow']: + w = request_metadata['workflow'] + site = w.get_site() + except Exception: + pass kwargs = {'scope': scope, 'name': name, 'requester': requester, 'request_type': request_type, - 'username': username, 'userdn': userdn, + 'username': username, 'userdn': userdn, 'site': site, 'transform_tag': transform_tag, 'status': status, 'locking': locking, 'priority': priority, 'lifetime': lifetime, 'workload_id': workload_id, 'new_poll_period': new_poll_period, 'update_poll_period': update_poll_period, diff --git a/main/lib/idds/orm/requests.py b/main/lib/idds/orm/requests.py index b4a6208e..e64a7251 100644 --- a/main/lib/idds/orm/requests.py +++ b/main/lib/idds/orm/requests.py @@ -31,7 +31,7 @@ def create_request(scope=None, name=None, requester=None, request_type=None, username=None, userdn=None, transform_tag=None, status=RequestStatus.New, locking=RequestLocking.Idle, priority=0, lifetime=None, workload_id=None, request_metadata=None, - new_poll_period=1, update_poll_period=10, + new_poll_period=1, update_poll_period=10, site=None, new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0, processing_metadata=None): """ @@ -80,7 +80,7 @@ def create_request(scope=None, name=None, requester=None, request_type=None, username=username, userdn=userdn, transform_tag=transform_tag, status=status, locking=locking, priority=priority, workload_id=workload_id, - expired_at=expired_at, + expired_at=expired_at, site=site, new_retries=new_retries, update_retries=update_retries, max_new_retries=max_new_retries, max_update_retries=max_update_retries, request_metadata=request_metadata, processing_metadata=processing_metadata) @@ -98,7 +98,7 @@ def add_request(scope=None, name=None, requester=None, request_type=None, username=None, userdn=None, transform_tag=None, status=RequestStatus.New, locking=RequestLocking.Idle, priority=0, lifetime=None, workload_id=None, request_metadata=None, - new_poll_period=1, update_poll_period=10, + new_poll_period=1, update_poll_period=10, site=None, new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0, processing_metadata=None, session=None): """ @@ -128,7 +128,7 @@ def add_request(scope=None, name=None, requester=None, request_type=None, username=username, userdn=userdn, transform_tag=transform_tag, status=status, locking=locking, priority=priority, workload_id=workload_id, lifetime=lifetime, - new_poll_period=new_poll_period, + new_poll_period=new_poll_period, site=site, update_poll_period=update_poll_period, new_retries=new_retries, update_retries=update_retries, max_new_retries=max_new_retries, max_update_retries=max_update_retries, diff --git a/workflow/lib/idds/workflow/work.py b/workflow/lib/idds/workflow/work.py index fbb826c3..a15f6b10 100644 --- a/workflow/lib/idds/workflow/work.py +++ b/workflow/lib/idds/workflow/work.py @@ -620,6 +620,9 @@ def get_template_work_id(self): def get_sequence_id(self): return self.sequence_id + def get_site(self): + return None + @property def internal_id(self): return self.get_metadata_item('internal_id') diff --git a/workflow/lib/idds/workflow/workflow.py b/workflow/lib/idds/workflow/workflow.py index d8539dc1..249c455f 100644 --- a/workflow/lib/idds/workflow/workflow.py +++ b/workflow/lib/idds/workflow/workflow.py @@ -1201,6 +1201,17 @@ def set_workload_id(self, workload_id): def get_workload_id(self): return self.workload_id + def get_site(self): + try: + work_id = self.primary_initial_work + if not work_id: + work_id = list(self.works.keys())[0] + work = self.works[work_id] + return work.get_site() + except Exception: + pass + return None + def add_initial_works(self, work): self.initial_works.append(work.get_internal_id()) if self.primary_initial_work is None: @@ -2110,6 +2121,9 @@ def get_workload_id(self): return self.runs[str(self.num_run)].workload_id return self.template.workload_id + def get_site(self): + return self.template.get_site() + def add_work(self, work, initial=False, primary=False): self.template.add_work(work, initial, primary) diff --git a/workflow/lib/idds/workflowv2/work.py b/workflow/lib/idds/workflowv2/work.py index c599b642..e3caa610 100644 --- a/workflow/lib/idds/workflowv2/work.py +++ b/workflow/lib/idds/workflowv2/work.py @@ -622,6 +622,9 @@ def get_template_work_id(self): def get_sequence_id(self): return self.sequence_id + def get_site(self): + return None + @property def internal_id(self): return self.get_metadata_item('internal_id') diff --git a/workflow/lib/idds/workflowv2/workflow.py b/workflow/lib/idds/workflowv2/workflow.py index a999db8f..fe6b49f3 100644 --- a/workflow/lib/idds/workflowv2/workflow.py +++ b/workflow/lib/idds/workflowv2/workflow.py @@ -1281,6 +1281,17 @@ def set_workload_id(self, workload_id): def get_workload_id(self): return self.workload_id + def get_site(self): + try: + work_id = self.primary_initial_work + if not work_id: + work_id = list(self.works.keys())[0] + work = self.works[work_id] + return work.get_site() + except Exception: + pass + return None + def add_initial_works(self, work): self.initial_works.append(work.get_internal_id()) if self.primary_initial_work is None: @@ -2270,6 +2281,9 @@ def get_workload_id(self): return self.runs[str(self.num_run)].workload_id return self.template.workload_id + def get_site(self): + return self.template.get_site() + def add_work(self, work, initial=False, primary=False): self.template.add_work(work, initial, primary)