From f26cf9f19acf715cab38a60d397d42c05f19ca9e Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Tue, 23 Apr 2024 07:50:47 -0700 Subject: [PATCH 1/4] ENH Give option to JIDSlurmOperator to override max node number. Do not make active yet. --- workflows/airflow/operators/jidoperators.py | 41 +++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index 6367d698..a494ca93 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -121,6 +121,7 @@ def __init__( user: str = getpass.getuser(), poke_interval: float = 30.0, max_cores: Optional[int] = None, + max_nodes: Optional[int] = None, *args, **kwargs, ) -> None: @@ -129,6 +130,46 @@ def __init__( self.user: str = user self.poke_interval: float = poke_interval self.max_cores: Optional[int] = max_cores + self.max_nodes: Optional[int] = max_nodes + + def _sub_overridable_arguments(self, slurm_param_str: str) -> str: + """Overrides certain SLURM arguments given instance options. + + Since the same SLURM arguments are used by default for the entire DAG, + individual Operator instances can override some important ones if they + are passed at instantiation. + + ASSUMES `=` is used with SLURM arguments! E.g. --ntasks=12, --nodes=0-4 + + Args: + slurm_param_str (str): Constructed string of DAG SLURM arguments + without modification + Returns: + slurm_param_str (str): Modified SLURM argument string. + """ + # Cap max cores used by a managed Task if that is requested + # Only search for part after `=` since this will always be passed + pattern: str = r"(?<=\bntasks=)\d+" + ntasks: int + try: + ntasks = int(re.findall(pattern, slurm_param_str)[0]) + except IndexError as err: # If `ntasks` not passed - 1 is default + ntasks = 1 + if self.max_cores is not None and ntasks > self.max_cores: + slurm_param_str = re.sub(pattern, f"{self.max_cores}", slurm_param_str) + + # Cap max nodes. Unlike above search for everything, if not present, add it. + pattern = r"nodes=\S+" + nnodes_str: str + try: + nnodes_str = re.findall(pattern, slurm_param_str)[0] + slurm_param_str = re.sub( + pattern, f"nodes=0-{self.max_nodes}", slurm_param_str + ) + except IndexError as err: # `--nodes` not present + slurm_param_str = f"{slurm_param_str} --nodes=0-{self.max_nodes}" + + return slurm_param_str def create_control_doc( self, context: Dict[str, Any] From caf7aca5d7c8682a00d364ce7d64b9f1cd3146aa Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Tue, 23 Apr 2024 10:12:39 -0700 Subject: [PATCH 2/4] DOC Add information to workflows tutorial on the option --- docs/tutorial/creating_workflows.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/tutorial/creating_workflows.md b/docs/tutorial/creating_workflows.md index acd4a5c9..2770b0c0 100644 --- a/docs/tutorial/creating_workflows.md +++ b/docs/tutorial/creating_workflows.md @@ -82,7 +82,8 @@ Currently, the following `Operator`s are maintained: ### `JIDSlurmOperator` arguments - `task_id`: This is nominally the name of the task on the Airflow side. However, for simplicity this is used 1-1 to match the name of a **managed** Task defined in LUTE's `managed_tasks.py` module. I.e., it should the name of an `Executor("Task")` object which will run the specific Task of interest. This **must** match the name of a defined managed Task. -- `max_cores`: Used to cap the maximum number of cores which should be requested of SLURM. By default all jobs will run with the same number of cores, which should be specified when running the `launch_airflow.py` script (either from the ARP, or by hand). This behaviour was chosen because in general we want to increase or decrease the core-count for all Tasks uniformly, and we don't want to have to specify core number arguments for each job individually. Nonetheless, on occassion it may be necessary to cap the number of cores a specific job will use. E.g. if the default value specified when launching the Airflow DAG is multiple cores, and one job is single threaded, the core count can be capped for that single job to 1, while the rest run with multiple cores. +- `max_cores`: Used to cap the maximum number of cores which should be requested of SLURM. By default all jobs will run with the same number of cores, which should be specified when running the `launch_airflow.py` script (either from the ARP, or by hand). This behaviour was chosen because in general we want to increase or decrease the core-count for all `Task`s uniformly, and we don't want to have to specify core number arguments for each job individually. Nonetheless, on occassion it may be necessary to cap the number of cores a specific job will use. E.g. if the default value specified when launching the Airflow DAG is multiple cores, and one job is single threaded, the core count can be capped for that single job to 1, while the rest run with multiple cores. +- `max_nodes`: Similar to the above. This will make sure the `Task` is distributed across no more than a maximum number of nodes. This feature is useful for, e.g., multi-threaded software which does not make use of tools like `MPI`. So, the `Task` can run on multiple cores, but only within a single node. # Creating a new workflow From 9bb69a1ef6294a3477f5cc68317a29ca6308d0d6 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Tue, 23 Apr 2024 10:41:01 -0700 Subject: [PATCH 3/4] BUG Fix issue with substituting if the flag is not passed at all --- workflows/airflow/operators/jidoperators.py | 41 ++++++++++++--------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index a494ca93..40e0ed68 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -148,26 +148,31 @@ def _sub_overridable_arguments(self, slurm_param_str: str) -> str: slurm_param_str (str): Modified SLURM argument string. """ # Cap max cores used by a managed Task if that is requested - # Only search for part after `=` since this will always be passed - pattern: str = r"(?<=\bntasks=)\d+" - ntasks: int - try: - ntasks = int(re.findall(pattern, slurm_param_str)[0]) - except IndexError as err: # If `ntasks` not passed - 1 is default - ntasks = 1 - if self.max_cores is not None and ntasks > self.max_cores: - slurm_param_str = re.sub(pattern, f"{self.max_cores}", slurm_param_str) + # Only search for part after `=` since this will usually be passed + if self.max_cores is not None: + pattern: str = r"(?<=\bntasks=)\d+" + ntasks: int + try: + ntasks = int(re.findall(pattern, slurm_param_str)[0]) + if ntasks > self.max_cores: + slurm_param_str = re.sub(pattern, f"{self.max_cores}", slurm_param_str) + except IndexError: # If `ntasks` not passed - 1 is default + ntasks = 1 + slurm_param_str = f"{slurm_param_str} --ntasks={ntasks}" # Cap max nodes. Unlike above search for everything, if not present, add it. - pattern = r"nodes=\S+" - nnodes_str: str - try: - nnodes_str = re.findall(pattern, slurm_param_str)[0] - slurm_param_str = re.sub( - pattern, f"nodes=0-{self.max_nodes}", slurm_param_str - ) - except IndexError as err: # `--nodes` not present - slurm_param_str = f"{slurm_param_str} --nodes=0-{self.max_nodes}" + if self.max_nodes is not None: + pattern = r"nodes=\S+" + nnodes_str: str + try: + nnodes_str = re.findall(pattern, slurm_param_str)[0] + # Check if present with above. Below does nothing but does not + # throw error if pattern not present. + slurm_param_str = re.sub( + pattern, f"nodes=0-{self.max_nodes}", slurm_param_str + ) + except IndexError: # `--nodes` not present + slurm_param_str = f"{slurm_param_str} --nodes=0-{self.max_nodes}" return slurm_param_str From d492b03bcd6f677287736a9862096a817821f073 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Tue, 30 Apr 2024 10:52:05 -0700 Subject: [PATCH 4/4] MNT Add max_nodes option to Airflow operators running CrystFEL Tasks --- workflows/airflow/find_peaks_index.py | 2 +- workflows/airflow/operators/jidoperators.py | 4 +++- workflows/airflow/psocake_sfx_phasing.py | 8 ++++---- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/workflows/airflow/find_peaks_index.py b/workflows/airflow/find_peaks_index.py index 2335c090..3c512474 100644 --- a/workflows/airflow/find_peaks_index.py +++ b/workflows/airflow/find_peaks_index.py @@ -31,7 +31,7 @@ peak_finder: JIDSlurmOperator = JIDSlurmOperator(task_id="PeakFinderPsocake", dag=dag) indexer: JIDSlurmOperator = JIDSlurmOperator( - max_cores=120, task_id="CrystFELIndexer", dag=dag + max_cores=120, max_nodes=1, task_id="CrystFELIndexer", dag=dag ) peak_finder >> indexer diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index 40e0ed68..01e05c3b 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -155,7 +155,9 @@ def _sub_overridable_arguments(self, slurm_param_str: str) -> str: try: ntasks = int(re.findall(pattern, slurm_param_str)[0]) if ntasks > self.max_cores: - slurm_param_str = re.sub(pattern, f"{self.max_cores}", slurm_param_str) + slurm_param_str = re.sub( + pattern, f"{self.max_cores}", slurm_param_str + ) except IndexError: # If `ntasks` not passed - 1 is default ntasks = 1 slurm_param_str = f"{slurm_param_str} --ntasks={ntasks}" diff --git a/workflows/airflow/psocake_sfx_phasing.py b/workflows/airflow/psocake_sfx_phasing.py index 433778a9..7bc62bf4 100644 --- a/workflows/airflow/psocake_sfx_phasing.py +++ b/workflows/airflow/psocake_sfx_phasing.py @@ -35,7 +35,7 @@ peak_finder: JIDSlurmOperator = JIDSlurmOperator(task_id="PeakFinderPsocake", dag=dag) indexer: JIDSlurmOperator = JIDSlurmOperator( - max_cores=120, task_id="CrystFELIndexer", dag=dag + max_cores=120, max_nodes=1, task_id="CrystFELIndexer", dag=dag ) # Concatenate stream files from all previous runs with same tag @@ -45,17 +45,17 @@ # Merge merger: JIDSlurmOperator = JIDSlurmOperator( - max_cores=120, task_id="PartialatorMerger", dag=dag + max_cores=120, max_nodes=1, task_id="PartialatorMerger", dag=dag ) # Figures of merit hkl_comparer: JIDSlurmOperator = JIDSlurmOperator( - max_cores=8, task_id="HKLComparer", dag=dag + max_cores=8, max_nodes=1, task_id="HKLComparer", dag=dag ) # HKL conversions hkl_manipulator: JIDSlurmOperator = JIDSlurmOperator( - max_cores=8, task_id="HKLManipulator", dag=dag + max_cores=8, max_nodes=1, task_id="HKLManipulator", dag=dag ) # SHELX Tasks