Skip to content

Commit

Permalink
Merge pull request #275 from wguanicedew/dev
Browse files Browse the repository at this point in the history
fix to set the site
  • Loading branch information
wguanicedew authored Jan 31, 2024
2 parents ce0dd32 + 4d2ebfa commit bafc304
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 8 deletions.
1 change: 1 addition & 0 deletions client/lib/idds/client/clientmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ def submit(self, workflow, username=None, userdn=None, use_dataset_name=False):
'transform_tag': 'workflow',
'status': RequestStatus.New,
'priority': 0,
'site': workflow.get_site(),
'lifetime': workflow.lifetime,
'workload_id': workflow.get_workload_id(),
'request_metadata': {'version': release_version, 'workload_id': workflow.get_workload_id(), 'workflow': workflow}
Expand Down
9 changes: 9 additions & 0 deletions doma/lib/idds/doma/workflowv2/domapandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ def my_condition(self):
return True
return False

def get_site(self):
if self.task_site:
return self.task_site
if self.task_queue:
return self.task_queue
if self.queue:
return self.queue
return self.task_cloud

@property
def dependency_map(self):
return self._dependency_map
Expand Down
2 changes: 1 addition & 1 deletion main/lib/idds/agents/clerk/clerk.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def clean_min_request_id(self):
min_request_id = old_min_request_id - 1000
BaseAgent.min_request_id = min_request_id
else:
for req_id in BaseAgent.min_request_id_cache:
for req_id in list(BaseAgent.min_request_id_cache.keys()):
time_stamp = BaseAgent.min_request_id_cache[req_id]
if time_stamp < time.time() - 12 * 3600: # older than 12 hours
del BaseAgent.min_request_id_cache[req_id]
Expand Down
14 changes: 11 additions & 3 deletions main/lib/idds/core/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def create_request(scope=None, name=None, requester=None, request_type=None,
username=None, userdn=None, transform_tag=None,
status=RequestStatus.New, locking=RequestLocking.Idle, priority=0,
lifetime=None, workload_id=None, request_metadata=None,
new_poll_period=1, update_poll_period=10,
new_poll_period=1, update_poll_period=10, site=None,
new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
processing_metadata=None):
"""
Expand Down Expand Up @@ -65,6 +65,7 @@ def create_request(scope=None, name=None, requester=None, request_type=None,
'new_poll_period': new_poll_period, 'update_poll_period': update_poll_period,
'new_retries': new_retries, 'update_retries': update_retries,
'max_new_retries': max_new_retries, 'max_update_retries': max_update_retries,
'site': site,
'request_metadata': request_metadata, 'processing_metadata': processing_metadata}
return orm_requests.create_request(**kwargs)

Expand All @@ -74,7 +75,7 @@ def add_request(scope=None, name=None, requester=None, request_type=None,
username=None, userdn=None, transform_tag=None,
status=RequestStatus.New, locking=RequestLocking.Idle, priority=0,
lifetime=None, workload_id=None, request_metadata=None,
new_poll_period=1, update_poll_period=10,
new_poll_period=1, update_poll_period=10, site=None,
new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
processing_metadata=None, session=None):
"""
Expand All @@ -98,9 +99,16 @@ def add_request(scope=None, name=None, requester=None, request_type=None,
if workload_id is None and request_metadata and 'workload_id' in request_metadata and request_metadata['workload_id']:
workload_id = int(request_metadata['workload_id'])
# request_metadata = convert_request_metadata_to_workflow(scope, name, workload_id, request_type, request_metadata)
if not site:
try:
if request_metadata and 'workflow' in request_metadata and request_metadata['workflow']:
w = request_metadata['workflow']
site = w.get_site()
except Exception:
pass

kwargs = {'scope': scope, 'name': name, 'requester': requester, 'request_type': request_type,
'username': username, 'userdn': userdn,
'username': username, 'userdn': userdn, 'site': site,
'transform_tag': transform_tag, 'status': status, 'locking': locking,
'priority': priority, 'lifetime': lifetime, 'workload_id': workload_id,
'new_poll_period': new_poll_period, 'update_poll_period': update_poll_period,
Expand Down
8 changes: 4 additions & 4 deletions main/lib/idds/orm/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def create_request(scope=None, name=None, requester=None, request_type=None,
username=None, userdn=None, transform_tag=None,
status=RequestStatus.New, locking=RequestLocking.Idle, priority=0,
lifetime=None, workload_id=None, request_metadata=None,
new_poll_period=1, update_poll_period=10,
new_poll_period=1, update_poll_period=10, site=None,
new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
processing_metadata=None):
"""
Expand Down Expand Up @@ -80,7 +80,7 @@ def create_request(scope=None, name=None, requester=None, request_type=None,
username=username, userdn=userdn,
transform_tag=transform_tag, status=status, locking=locking,
priority=priority, workload_id=workload_id,
expired_at=expired_at,
expired_at=expired_at, site=site,
new_retries=new_retries, update_retries=update_retries,
max_new_retries=max_new_retries, max_update_retries=max_update_retries,
request_metadata=request_metadata, processing_metadata=processing_metadata)
Expand All @@ -98,7 +98,7 @@ def add_request(scope=None, name=None, requester=None, request_type=None,
username=None, userdn=None, transform_tag=None,
status=RequestStatus.New, locking=RequestLocking.Idle, priority=0,
lifetime=None, workload_id=None, request_metadata=None,
new_poll_period=1, update_poll_period=10,
new_poll_period=1, update_poll_period=10, site=None,
new_retries=0, update_retries=0, max_new_retries=3, max_update_retries=0,
processing_metadata=None, session=None):
"""
Expand Down Expand Up @@ -128,7 +128,7 @@ def add_request(scope=None, name=None, requester=None, request_type=None,
username=username, userdn=userdn,
transform_tag=transform_tag, status=status, locking=locking,
priority=priority, workload_id=workload_id, lifetime=lifetime,
new_poll_period=new_poll_period,
new_poll_period=new_poll_period, site=site,
update_poll_period=update_poll_period,
new_retries=new_retries, update_retries=update_retries,
max_new_retries=max_new_retries, max_update_retries=max_update_retries,
Expand Down
3 changes: 3 additions & 0 deletions workflow/lib/idds/workflow/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,9 @@ def get_template_work_id(self):
def get_sequence_id(self):
return self.sequence_id

def get_site(self):
return None

@property
def internal_id(self):
return self.get_metadata_item('internal_id')
Expand Down
14 changes: 14 additions & 0 deletions workflow/lib/idds/workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,17 @@ def set_workload_id(self, workload_id):
def get_workload_id(self):
return self.workload_id

def get_site(self):
try:
work_id = self.primary_initial_work
if not work_id:
work_id = list(self.works.keys())[0]
work = self.works[work_id]
return work.get_site()
except Exception:
pass
return None

def add_initial_works(self, work):
self.initial_works.append(work.get_internal_id())
if self.primary_initial_work is None:
Expand Down Expand Up @@ -2110,6 +2121,9 @@ def get_workload_id(self):
return self.runs[str(self.num_run)].workload_id
return self.template.workload_id

def get_site(self):
return self.template.get_site()

def add_work(self, work, initial=False, primary=False):
self.template.add_work(work, initial, primary)

Expand Down
3 changes: 3 additions & 0 deletions workflow/lib/idds/workflowv2/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,9 @@ def get_template_work_id(self):
def get_sequence_id(self):
return self.sequence_id

def get_site(self):
return None

@property
def internal_id(self):
return self.get_metadata_item('internal_id')
Expand Down
14 changes: 14 additions & 0 deletions workflow/lib/idds/workflowv2/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,17 @@ def set_workload_id(self, workload_id):
def get_workload_id(self):
return self.workload_id

def get_site(self):
try:
work_id = self.primary_initial_work
if not work_id:
work_id = list(self.works.keys())[0]
work = self.works[work_id]
return work.get_site()
except Exception:
pass
return None

def add_initial_works(self, work):
self.initial_works.append(work.get_internal_id())
if self.primary_initial_work is None:
Expand Down Expand Up @@ -2270,6 +2281,9 @@ def get_workload_id(self):
return self.runs[str(self.num_run)].workload_id
return self.template.workload_id

def get_site(self):
return self.template.get_site()

def add_work(self, work, initial=False, primary=False):
self.template.add_work(work, initial, primary)

Expand Down

0 comments on commit bafc304

Please sign in to comment.