Skip to content

Commit

Permalink
Merge pull request #225 from HSF/dev
Browse files Browse the repository at this point in the history
func to set cloud from site name if the cloud is not set
  • Loading branch information
wguanicedew authored Oct 18, 2023
2 parents 0930457 + 74beb75 commit 351881e
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 5 deletions.
4 changes: 2 additions & 2 deletions client/lib/idds/client/clientmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def ping(self):
return status

@exception_handler
def submit(self, workflow, username=None, userdn=None, use_dataset_name=True):
def submit(self, workflow, username=None, userdn=None, use_dataset_name=False):
"""
Submit the workflow as a request to iDDS server.
Expand Down Expand Up @@ -454,7 +454,7 @@ def submit(self, workflow, username=None, userdn=None, use_dataset_name=True):
if self.auth_type == 'x509_proxy':
workflow.add_proxy()

if use_dataset_name:
if use_dataset_name or not workflow.name:
primary_init_work = workflow.get_primary_initial_collection()
if primary_init_work:
if type(primary_init_work) in [Collection, CollectionV1]:
Expand Down
15 changes: 15 additions & 0 deletions doma/lib/idds/doma/workflowv2/domapandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,17 @@ def create_processing(self, input_output_maps=[]):
self.active_processings.append(proc.internal_id)
return proc

def get_site_from_cloud(self, site):
try:
func_site_to_cloud = self.get_func_site_to_cloud()
if func_site_to_cloud:
cloud = func_site_to_cloud(site)
return cloud
except Exception as ex:
self.logger.error(ex)
return None
return None

def submit_panda_task(self, processing):
try:
from pandaclient import Client
Expand All @@ -584,6 +595,10 @@ def submit_panda_task(self, processing):
if 'new_retries' in processing and processing['new_retries']:
new_retries = int(processing['new_retries'])
task_param['taskName'] = task_param['taskName'] + "_" + str(new_retries)
cloud = self.get_site_from_cloud(task_param['PandaSite'])
if cloud:
task_param['cloud'] = cloud

if self.has_dependency():
parent_tid = None
self.logger.info("parent_workload_id: %s" % self.parent_workload_id)
Expand Down
24 changes: 24 additions & 0 deletions main/lib/idds/agents/carrier/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries=

super(Submitter, self).__init__(num_threads=num_threads, max_number_workers=self.max_number_workers,
name=name, retrieve_bulk_size=retrieve_bulk_size, **kwargs)
self.site_to_cloud = None

def get_new_processings(self):
"""
Expand Down Expand Up @@ -76,6 +77,28 @@ def get_new_processings(self):
self.logger.error(traceback.format_exc())
return []

def get_site_to_cloud(self, site, log_prefix=''):
try:
if self.site_to_cloud is None:
self.logger.debug(log_prefix + " agent_attributes: %s" % str(self.agent_attributes))
self.site_to_cloud = {}
if self.agent_attributes and 'domapandawork' in self.agent_attributes and self.agent_attributes['domapandawork']:
if 'site_to_cloud' in self.agent_attributes['domapandawork'] and self.agent_attributes['domapandawork']['site_to_cloud']:
site_to_clouds = self.agent_attributes['domapandawork']['site_to_cloud'].split(",")
for site_to_cloud in site_to_clouds:
local_site, cloud = site_to_cloud.split(':')
if local_site not in self.site_to_cloud:
self.site_to_cloud[local_site] = cloud
self.logger.debug(log_prefix + " site_to_cloud: %s" % self.site_to_cloud)

if site and self.site_to_cloud:
cloud = self.site_to_cloud.get(site, None)
self.logger.debug(log_prefix + "cloud for site(%s): %s" % (site, cloud))
return cloud
except Exception as ex:
self.logger.error(ex)
return None

def handle_new_processing(self, processing):
try:
log_prefix = self.get_log_prefix(processing)
Expand All @@ -85,6 +108,7 @@ def handle_new_processing(self, processing):
# work = transform['transform_metadata']['work']
ret_new_processing = handle_new_processing(processing,
self.agent_attributes,
func_site_to_cloud=self.get_site_to_cloud,
logger=self.logger,
log_prefix=log_prefix)
status, processing, update_colls, new_contents, new_input_dependency_contents, msgs, errors = ret_new_processing
Expand Down
4 changes: 3 additions & 1 deletion main/lib/idds/agents/carrier/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,14 +466,16 @@ def generate_messages(request_id, transform_id, workload_id, work, msg_type='fil
return msgs


def handle_new_processing(processing, agent_attributes, logger=None, log_prefix=''):
def handle_new_processing(processing, agent_attributes, func_site_to_cloud=None, logger=None, log_prefix=''):
logger = get_logger(logger)

proc = processing['processing_metadata']['processing']
work = proc.work
work.set_agent_attributes(agent_attributes, processing)
transform_id = processing['transform_id']

if func_site_to_cloud:
work.set_func_site_to_cloud(func_site_to_cloud)
status, workload_id, errors = work.submit_processing(processing)
logger.info(log_prefix + "submit_processing (status: %s, workload_id: %s, errors: %s)" % (status, workload_id, errors))

Expand Down
1 change: 1 addition & 0 deletions main/lib/idds/tests/panda_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
conn = idds_api.get_api(idds_utils.json_dumps, idds_host=None, compress=True, manager=True)
reqid = 5460
ret = conn.get_requests(request_id=int(reqid), with_detail=True)
print(ret)
jtids = [task["transform_workload_id"] for task in ret[1][1] if task["transform_status"]["attributes"]["_name_"] != "Finished"]
print(jtids)

Expand Down
6 changes: 4 additions & 2 deletions main/lib/idds/tests/panda_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
os.environ['PANDA_URL_SSL'] = 'https://pandaserver-doma.cern.ch:25443/server/panda'

os.environ['PANDA_BEHIND_REAL_LB'] = "1"
os.environ['PANDA_URL'] = 'http://rubin-panda-server-dev.slac.stanford.edu:80/server/panda'
os.environ['PANDA_URL_SSL'] = 'https://rubin-panda-server-dev.slac.stanford.edu:8443/server/panda'
# os.environ['PANDA_URL'] = 'http://rubin-panda-server-dev.slac.stanford.edu:80/server/panda'
# os.environ['PANDA_URL_SSL'] = 'https://rubin-panda-server-dev.slac.stanford.edu:8443/server/panda'

from pandaclient import Client # noqa E402

Expand All @@ -34,6 +34,8 @@
task_ids = [165124, 165130, 165135] + [i for i in range(165143, 165149)]
task_ids = [i for i in range(251, 282)]
task_ids = [282, 322, 323, 324, 325]
task_ids = [i for i in range(165243, 165277)]
task_ids = [165277]
for task_id in task_ids:
print("Killing %s" % task_id)
ret = Client.killTask(task_id, verbose=True)
Expand Down
2 changes: 2 additions & 0 deletions main/lib/idds/tests/test_domapanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
# task_queue = 'SLAC_Rubin_Merge'
# task_queue = 'SLAC_TEST'

task_cloud = None


def randStr(chars=string.ascii_lowercase + string.digits, N=10):
return ''.join(random.choice(chars) for _ in range(N))
Expand Down
8 changes: 8 additions & 0 deletions workflow/lib/idds/workflow/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,8 @@ def __init__(self, executable=None, arguments=None, parameters=None, setup=None,

self.sliced_global_parameters = None

self.func_site_to_cloud = None

"""
self._running_data_names = []
for name in ['internal_id', 'template_work_id', 'initialized', 'sequence_id', 'parameters', 'work_id', 'transforming', 'workdir',
Expand Down Expand Up @@ -1097,6 +1099,12 @@ def set_work_name(self, work_name):
def get_work_name(self):
return self.work_name

def set_func_site_to_cloud(self, func):
self.func_site_to_cloud = func

def get_func_site_to_cloud(self):
return self.func_site_to_cloud

def get_is_template(self):
self.is_template

Expand Down
8 changes: 8 additions & 0 deletions workflow/lib/idds/workflowv2/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@ def __init__(self, executable=None, arguments=None, parameters=None, setup=None,

self.is_build_work = False

self.func_site_to_cloud = None

"""
self._running_data_names = []
for name in ['internal_id', 'template_work_id', 'initialized', 'sequence_id', 'parameters', 'work_id', 'transforming', 'workdir',
Expand Down Expand Up @@ -1118,6 +1120,12 @@ def set_work_name(self, work_name):
def get_work_name(self):
return self.work_name

def set_func_site_to_cloud(self, func):
self.func_site_to_cloud = func

def get_func_site_to_cloud(self):
return self.func_site_to_cloud

def get_is_template(self):
self.is_template

Expand Down

0 comments on commit 351881e

Please sign in to comment.