From df957404bef37e207ad1da20fc52d52949a6fa28 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Tue, 6 Aug 2024 10:24:40 +0200 Subject: [PATCH] workflow prerun to check asyncresult is ok --- workflow/lib/idds/iworkflow/work.py | 37 ++++++++------ workflow/lib/idds/iworkflow/workflow.py | 65 +++++++++++++++---------- 2 files changed, 60 insertions(+), 42 deletions(-) diff --git a/workflow/lib/idds/iworkflow/work.py b/workflow/lib/idds/iworkflow/work.py index c3710819..49d2471e 100644 --- a/workflow/lib/idds/iworkflow/work.py +++ b/workflow/lib/idds/iworkflow/work.py @@ -341,6 +341,9 @@ def setup(self): ret = init_env return ret + def get_clean_env(self): + return self._workflow_context.get_clean_env() + class Work(Base): @@ -917,6 +920,12 @@ def setup(self): """ return self._context.setup() + def get_clean_env(self): + """ + :returns command: `str` to clean the workflow. + """ + return self._context.get_clean_env() + def load_func(self, func_name): """ Load the function from the source files. @@ -934,28 +943,19 @@ def pre_run(self): if workflow_context.distributed: logging.info("Test AsyncResult") a_ret = AsyncResult(workflow_context, wait_num=1, timeout=30) - a_ret.subscribe() - - async_ret = AsyncResult(workflow_context, internal_id=a_ret.internal_id) - test_result = "AsyncResult test (request_id: %s, transform_id: %s)" % (workflow_context.request_id, workflow_context.transform_id) - logging.info("AsyncResult publish: %s" % test_result) - async_ret.publish(test_result) - - ret_q = a_ret.wait_result(force_return_results=True) - logging.info("AsyncResult results: %s" % str(ret_q)) - if ret_q and ret_q == test_result: - logging.info("AsyncResult test succeeded") - return True - else: - logging.info("AsyncResult test failed (published: %s, received: %s)" % (test_result, ret_q)) - return False + ret = a_ret.is_ok() + logging.info(f"pre_run asyncresult test is_ok: {ret}") + return ret return True def run(self): """ Run the work. """ - self.pre_run() + is_ok = self.pre_run() + if not is_ok: + logging.error(f"pre_run is_ok: {is_ok}, will exit.") + raise Exception("work pre_run failed") func_name, pre_kwargs, args, kwargs = self._func_name_and_args multi_jobs_kwargs_list = self.multi_jobs_kwargs_list @@ -1074,6 +1074,11 @@ def get_runner(self): cmd = cmd + " " + run_command else: cmd = run_command + + clean_env = self.get_clean_env() + if clean_env: + cmd = cmd + "; " + clean_env + return cmd diff --git a/workflow/lib/idds/iworkflow/workflow.py b/workflow/lib/idds/iworkflow/workflow.py index 34df8c88..09210382 100644 --- a/workflow/lib/idds/iworkflow/workflow.py +++ b/workflow/lib/idds/iworkflow/workflow.py @@ -55,7 +55,7 @@ def get_current_workflow(cls): class WorkflowContext(Context): - def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None, exclude_source_files=[]): + def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None, exclude_source_files=[], clean_env=None): super(WorkflowContext, self).__init__() self._service = service # panda, idds, sharefs self._request_id = None @@ -110,6 +110,7 @@ def __init__(self, name=None, service='panda', source_dir=None, workflow_type=Wo self.init_idds() self._init_env = init_env + self._clean_env = clean_env self._exclude_source_files = [] if exclude_source_files: @@ -148,6 +149,16 @@ def init_env(self, value): if self._init_env: self._init_env = self._init_env + " " + @property + def clean_env(self): + return self._clean_env + + @clean_env.setter + def clean_env(self, value): + self._clean_env = value + if self._clean_env: + self._clean_env = self._clean_env + " " + @property def vo(self): return self._vo @@ -421,6 +432,7 @@ def init_idds(self): def init_panda(self): if not self._panda_initialized: self._panda_initialized = True + self._panda_env = self.get_panda_env() if not self.site: self.site = os.environ.get("PANDA_SITE", None) if not self.queue: @@ -478,6 +490,9 @@ def setup(self): ret = init_env return ret + def get_clean_env(self): + return self.clean_env + def setup_source_files(self): """ Setup source files. @@ -691,7 +706,7 @@ class Workflow(Base): def __init__(self, func=None, service='panda', context=None, source_dir=None, local=False, distributed=True, pre_kwargs={}, args=None, kwargs={}, multi_jobs_kwargs_list=[], current_job_kwargs=None, name=None, init_env=None, is_unique_func_name=False, max_walltime=24 * 3600, source_dir_parent_level=None, - exclude_source_files=[]): + exclude_source_files=[], clean_env=None): """ Init a workflow. """ @@ -726,7 +741,7 @@ def __init__(self, func=None, service='panda', context=None, source_dir=None, lo else: self._context = WorkflowContext(name=self._name, service=service, workflow_type=workflow_type, source_dir=source_dir, distributed=distributed, init_env=init_env, max_walltime=max_walltime, - exclude_source_files=exclude_source_files) + exclude_source_files=exclude_source_files, clean_env=clean_env) @property def service(self): @@ -1102,6 +1117,12 @@ def setup(self): """ return self._context.setup() + def get_clean_env(self): + """ + :returns command: `str` to clean the workflow. + """ + return self._context.get_clean_env() + def setup_source_files(self): """ Setup location of source files @@ -1125,25 +1146,9 @@ def pre_run(self): if workflow_context.distributed: logging.info("Test AsyncResult") a_ret = AsyncResult(workflow_context, wait_num=1, timeout=30) - a_ret.subscribe() - - async_ret = AsyncResult(workflow_context, internal_id=a_ret.internal_id) - test_result = "AsyncResult test (request_id: %s)" % workflow_context.request_id - logging.info("AsyncResult publish: %s" % test_result) - async_ret.publish(test_result) - - ret_q = a_ret.wait_result(force_return_results=True) - logging.info("AsyncResult results: %s" % str(ret_q)) - if ret_q: - if ret_q == test_result: - logging.info("AsyncResult test succeeded") - return True - else: - logging.info("AsyncResult test failed (published: %s, received: %s)" % (test_result, ret_q)) - return False - else: - logging.info("Not received results") - return False + ret = a_ret.is_ok() + logging.info(f"pre_run asyncresult test is_ok: {ret}") + return ret return True def run(self): @@ -1152,7 +1157,10 @@ def run(self): """ # with self: if True: - self.pre_run() + is_ok = self.pre_run() + if not is_ok: + logging.error(f"pre_run is_ok: {is_ok}, will exit.") + raise Exception("workflow pre_run failed") func_name, pre_kwargs, args, kwargs = self._func_name_and_args multi_jobs_kwargs_list = self.multi_jobs_kwargs_list @@ -1205,6 +1213,11 @@ def get_runner(self): cmd = cmd + " " + run_command else: cmd = run_command + + clean_env = self.get_clean_env() + if clean_env: + cmd = cmd + "; " + clean_env + return cmd def get_func_name(self): @@ -1215,12 +1228,12 @@ def get_func_name(self): # foo = workflow(arg)(foo) def workflow(func=None, *, local=False, service='idds', source_dir=None, primary=False, queue=None, site=None, cloud=None, max_walltime=24 * 3600, distributed=True, init_env=None, pre_kwargs={}, return_workflow=False, no_wraps=False, - source_dir_parent_level=None, exclude_source_files=[]): + source_dir_parent_level=None, exclude_source_files=[], clean_env=None): if func is None: return functools.partial(workflow, local=local, service=service, source_dir=source_dir, primary=primary, queue=queue, site=site, cloud=cloud, max_walltime=max_walltime, distributed=distributed, init_env=init_env, pre_kwargs=pre_kwargs, no_wraps=no_wraps, return_workflow=return_workflow, source_dir_parent_level=source_dir_parent_level, - exclude_source_files=exclude_source_files) + exclude_source_files=exclude_source_files, clean_env=clean_env) if 'IDDS_IGNORE_WORKFLOW_DECORATOR' in os.environ: return func @@ -1230,7 +1243,7 @@ def wrapper(*args, **kwargs): try: f = Workflow(func, service=service, source_dir=source_dir, local=local, max_walltime=max_walltime, distributed=distributed, pre_kwargs=pre_kwargs, args=args, kwargs=kwargs, init_env=init_env, source_dir_parent_level=source_dir_parent_level, - exclude_source_files=exclude_source_files) + exclude_source_files=exclude_source_files, clean_env=clean_env) f.queue = queue f.site = site