From e8de06e926c36c980bd830791d67515f1480bb6e Mon Sep 17 00:00:00 2001 From: Wen Guan <wguan.icedew@gmail.com> Date: Wed, 28 Aug 2024 11:52:04 +0200 Subject: [PATCH] add workflow operations to disable separate logs --- main/lib/idds/agents/carrier/plugins/base.py | 15 ++++++----- workflow/lib/idds/iworkflow/work.py | 16 ++++++++++++ workflow/lib/idds/iworkflow/workflow.py | 26 +++++++++++++++++--- 3 files changed, 48 insertions(+), 9 deletions(-) diff --git a/main/lib/idds/agents/carrier/plugins/base.py b/main/lib/idds/agents/carrier/plugins/base.py index 4994d165..84f10124 100644 --- a/main/lib/idds/agents/carrier/plugins/base.py +++ b/main/lib/idds/agents/carrier/plugins/base.py @@ -98,12 +98,15 @@ def get_task_params(self, work): task_param_map['maxAttempt'] = work.max_attempt if task_param_map['maxFailure'] < work.max_attempt: task_param_map['maxFailure'] = work.max_attempt - task_param_map['log'] = {"dataset": "PandaJob_iworkflow/", # "PandaJob_#{pandaid}/" - "destination": "local", - "param_type": "log", - "token": "local", - "type": "template", - "value": "log.tgz"} + + if work.enable_separate_log: + task_param_map['log'] = {"dataset": "PandaJob_iworkflow/", # "PandaJob_#{pandaid}/" + "destination": "local", + "param_type": "log", + "token": "local", + "type": "template", + "value": "log.tgz"} + task_param_map['jobParameters'] = [ {'type': 'constant', 'value': executable, # noqa: E501 diff --git a/workflow/lib/idds/iworkflow/work.py b/workflow/lib/idds/iworkflow/work.py index 4c14a46d..7d8e4d99 100644 --- a/workflow/lib/idds/iworkflow/work.py +++ b/workflow/lib/idds/iworkflow/work.py @@ -236,6 +236,14 @@ def processing_id(self): def processing_id(self, value): self._processing_id = value + @property + def enable_separate_log(self): + return self._workflow_context.enable_separate_log + + @enable_separate_log.setter + def enable_separate_log(self, value): + self._workflow_context.enable_separate_log = value + @property def brokers(self): return self._workflow_context.brokers @@ -574,6 +582,14 @@ def workload_id(self, value): def get_workload_id(self): return self.workload_id + @property + def enable_separate_log(self): + return self._context.enable_separate_log + + @enable_separate_log.setter + def enable_separate_log(self, value): + self._context.enable_separate_log = value + @property def token(self): return self._context.token diff --git a/workflow/lib/idds/iworkflow/workflow.py b/workflow/lib/idds/iworkflow/workflow.py index 4930870e..376b77f2 100644 --- a/workflow/lib/idds/iworkflow/workflow.py +++ b/workflow/lib/idds/iworkflow/workflow.py @@ -55,7 +55,8 @@ def get_current_workflow(cls): class WorkflowContext(Context): - def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, max_walltime=24 * 3600, init_env=None, exclude_source_files=[], clean_env=None): + def __init__(self, name=None, service='panda', source_dir=None, workflow_type=WorkflowType.iWorkflow, distributed=True, + max_walltime=24 * 3600, init_env=None, exclude_source_files=[], clean_env=None, enable_separate_log=False): super(WorkflowContext, self).__init__() self._service = service # panda, idds, sharefs self._request_id = None @@ -119,6 +120,8 @@ def __init__(self, name=None, service='panda', source_dir=None, workflow_type=Wo else: self._exclude_source_files = [exclude_source_files] + self._enable_separate_log = enable_separate_log + @property def logger(self): return logging.getLogger(self.__class__.__name__) @@ -287,6 +290,14 @@ def workload_id(self): def workload_id(self, value): self._workload_id = value + @property + def enable_separate_log(self): + return self._enable_separate_log + + @enable_separate_log.setter + def enable_separate_log(self, value): + self._enable_separate_log = value + @property def brokers(self): return self._brokers @@ -719,7 +730,7 @@ class Workflow(Base): def __init__(self, func=None, service='panda', context=None, source_dir=None, local=False, distributed=True, pre_kwargs={}, args=None, kwargs={}, multi_jobs_kwargs_list=[], current_job_kwargs=None, name=None, init_env=None, is_unique_func_name=False, max_walltime=24 * 3600, source_dir_parent_level=None, - exclude_source_files=[], clean_env=None): + enable_separate_log=False, exclude_source_files=[], clean_env=None): """ Init a workflow. """ @@ -754,7 +765,8 @@ def __init__(self, func=None, service='panda', context=None, source_dir=None, lo else: self._context = WorkflowContext(name=self._name, service=service, workflow_type=workflow_type, source_dir=source_dir, distributed=distributed, init_env=init_env, max_walltime=max_walltime, - exclude_source_files=exclude_source_files, clean_env=clean_env) + exclude_source_files=exclude_source_files, clean_env=clean_env, + enable_separate_log=enable_separate_log) @property def service(self): @@ -929,6 +941,14 @@ def token(self): def token(self, value): self._context.token = value + @property + def enable_separate_log(self): + return self._context.enable_separate_log + + @enable_separate_log.setter + def enable_separate_log(self, value): + self._context.enable_separate_log = value + def get_work_tag(self): return self._context.workflow_type.name