Skip to content

Commit

Permalink
workflow prerun to check asyncresult is ok
Browse files Browse the repository at this point in the history
  • Loading branch information
wguanicedew committed Aug 6, 2024
1 parent 565c993 commit df95740
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 42 deletions.
37 changes: 21 additions & 16 deletions workflow/lib/idds/iworkflow/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ def setup(self):
ret = init_env
return ret

def get_clean_env(self):
return self._workflow_context.get_clean_env()


class Work(Base):

Expand Down Expand Up @@ -917,6 +920,12 @@ def setup(self):
"""
return self._context.setup()

def get_clean_env(self):
"""
:returns command: `str` to clean the workflow.
"""
return self._context.get_clean_env()

def load_func(self, func_name):
"""
Load the function from the source files.
Expand All @@ -934,28 +943,19 @@ def pre_run(self):
if workflow_context.distributed:
logging.info("Test AsyncResult")
a_ret = AsyncResult(workflow_context, wait_num=1, timeout=30)
a_ret.subscribe()

async_ret = AsyncResult(workflow_context, internal_id=a_ret.internal_id)
test_result = "AsyncResult test (request_id: %s, transform_id: %s)" % (workflow_context.request_id, workflow_context.transform_id)
logging.info("AsyncResult publish: %s" % test_result)
async_ret.publish(test_result)

ret_q = a_ret.wait_result(force_return_results=True)
logging.info("AsyncResult results: %s" % str(ret_q))
if ret_q and ret_q == test_result:
logging.info("AsyncResult test succeeded")
return True
else:
logging.info("AsyncResult test failed (published: %s, received: %s)" % (test_result, ret_q))
return False
ret = a_ret.is_ok()
logging.info(f"pre_run asyncresult test is_ok: {ret}")
return ret
return True

def run(self):
"""
Run the work.
"""
self.pre_run()
is_ok = self.pre_run()
if not is_ok:
logging.error(f"pre_run is_ok: {is_ok}, will exit.")
raise Exception("work pre_run failed")

func_name, pre_kwargs, args, kwargs = self._func_name_and_args
multi_jobs_kwargs_list = self.multi_jobs_kwargs_list
Expand Down Expand Up @@ -1074,6 +1074,11 @@ def get_runner(self):
cmd = cmd + " " + run_command
else:
cmd = run_command

clean_env = self.get_clean_env()
if clean_env:
cmd = cmd + "; " + clean_env

return cmd


Expand Down
65 changes: 39 additions & 26 deletions workflow/lib/idds/iworkflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def get_current_workflow(cls):


class WorkflowContext(Context):
def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None, exclude_source_files=[]):
def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None, exclude_source_files=[], clean_env=None):
super(WorkflowContext, self).__init__()
self._service = service # panda, idds, sharefs
self._request_id = None
Expand Down Expand Up @@ -110,6 +110,7 @@ def __init__(self, name=None, service='panda', source_dir=None, workflow_type=Wo
self.init_idds()

self._init_env = init_env
self._clean_env = clean_env

self._exclude_source_files = []
if exclude_source_files:
Expand Down Expand Up @@ -148,6 +149,16 @@ def init_env(self, value):
if self._init_env:
self._init_env = self._init_env + " "

@property
def clean_env(self):
return self._clean_env

@clean_env.setter
def clean_env(self, value):
self._clean_env = value
if self._clean_env:
self._clean_env = self._clean_env + " "

@property
def vo(self):
return self._vo
Expand Down Expand Up @@ -421,6 +432,7 @@ def init_idds(self):
def init_panda(self):
if not self._panda_initialized:
self._panda_initialized = True
self._panda_env = self.get_panda_env()
if not self.site:
self.site = os.environ.get("PANDA_SITE", None)
if not self.queue:
Expand Down Expand Up @@ -478,6 +490,9 @@ def setup(self):
ret = init_env
return ret

def get_clean_env(self):
return self.clean_env

def setup_source_files(self):
"""
Setup source files.
Expand Down Expand Up @@ -691,7 +706,7 @@ class Workflow(Base):
def __init__(self, func=None, service='panda', context=None, source_dir=None, local=False, distributed=True,
pre_kwargs={}, args=None, kwargs={}, multi_jobs_kwargs_list=[], current_job_kwargs=None, name=None,
init_env=None, is_unique_func_name=False, max_walltime=24 * 3600, source_dir_parent_level=None,
exclude_source_files=[]):
exclude_source_files=[], clean_env=None):
"""
Init a workflow.
"""
Expand Down Expand Up @@ -726,7 +741,7 @@ def __init__(self, func=None, service='panda', context=None, source_dir=None, lo
else:
self._context = WorkflowContext(name=self._name, service=service, workflow_type=workflow_type, source_dir=source_dir,
distributed=distributed, init_env=init_env, max_walltime=max_walltime,
exclude_source_files=exclude_source_files)
exclude_source_files=exclude_source_files, clean_env=clean_env)

@property
def service(self):
Expand Down Expand Up @@ -1102,6 +1117,12 @@ def setup(self):
"""
return self._context.setup()

def get_clean_env(self):
"""
:returns command: `str` to clean the workflow.
"""
return self._context.get_clean_env()

def setup_source_files(self):
"""
Setup location of source files
Expand All @@ -1125,25 +1146,9 @@ def pre_run(self):
if workflow_context.distributed:
logging.info("Test AsyncResult")
a_ret = AsyncResult(workflow_context, wait_num=1, timeout=30)
a_ret.subscribe()

async_ret = AsyncResult(workflow_context, internal_id=a_ret.internal_id)
test_result = "AsyncResult test (request_id: %s)" % workflow_context.request_id
logging.info("AsyncResult publish: %s" % test_result)
async_ret.publish(test_result)

ret_q = a_ret.wait_result(force_return_results=True)
logging.info("AsyncResult results: %s" % str(ret_q))
if ret_q:
if ret_q == test_result:
logging.info("AsyncResult test succeeded")
return True
else:
logging.info("AsyncResult test failed (published: %s, received: %s)" % (test_result, ret_q))
return False
else:
logging.info("Not received results")
return False
ret = a_ret.is_ok()
logging.info(f"pre_run asyncresult test is_ok: {ret}")
return ret
return True

def run(self):
Expand All @@ -1152,7 +1157,10 @@ def run(self):
"""
# with self:
if True:
self.pre_run()
is_ok = self.pre_run()
if not is_ok:
logging.error(f"pre_run is_ok: {is_ok}, will exit.")
raise Exception("workflow pre_run failed")

func_name, pre_kwargs, args, kwargs = self._func_name_and_args
multi_jobs_kwargs_list = self.multi_jobs_kwargs_list
Expand Down Expand Up @@ -1205,6 +1213,11 @@ def get_runner(self):
cmd = cmd + " " + run_command
else:
cmd = run_command

clean_env = self.get_clean_env()
if clean_env:
cmd = cmd + "; " + clean_env

return cmd

def get_func_name(self):
Expand All @@ -1215,12 +1228,12 @@ def get_func_name(self):
# foo = workflow(arg)(foo)
def workflow(func=None, *, local=False, service='idds', source_dir=None, primary=False, queue=None, site=None, cloud=None,
max_walltime=24 * 3600, distributed=True, init_env=None, pre_kwargs={}, return_workflow=False, no_wraps=False,
source_dir_parent_level=None, exclude_source_files=[]):
source_dir_parent_level=None, exclude_source_files=[], clean_env=None):
if func is None:
return functools.partial(workflow, local=local, service=service, source_dir=source_dir, primary=primary, queue=queue, site=site, cloud=cloud,
max_walltime=max_walltime, distributed=distributed, init_env=init_env, pre_kwargs=pre_kwargs, no_wraps=no_wraps,
return_workflow=return_workflow, source_dir_parent_level=source_dir_parent_level,
exclude_source_files=exclude_source_files)
exclude_source_files=exclude_source_files, clean_env=clean_env)

if 'IDDS_IGNORE_WORKFLOW_DECORATOR' in os.environ:
return func
Expand All @@ -1230,7 +1243,7 @@ def wrapper(*args, **kwargs):
try:
f = Workflow(func, service=service, source_dir=source_dir, local=local, max_walltime=max_walltime, distributed=distributed,
pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, init_env=init_env, source_dir_parent_level=source_dir_parent_level,
exclude_source_files=exclude_source_files)
exclude_source_files=exclude_source_files, clean_env=clean_env)

f.queue = queue
f.site = site
Expand Down

0 comments on commit df95740

Please sign in to comment.