Skip to content

Commit

Permalink
Merge pull request #32 from gadorlhiac/ENH/ipc_ssh
Browse files Browse the repository at this point in the history
ENH IPC Improvements - Support TCP, ZMQ and Better multi-node handling
  • Loading branch information
valmar authored Aug 8, 2024
2 parents ab2681f + 94bc617 commit 2d37011
Show file tree
Hide file tree
Showing 11 changed files with 816 additions and 70 deletions.
4 changes: 4 additions & 0 deletions config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ TestSocket:
array_size: 8000 # Size of arrays to send. 8000 floats ~ 6.4e4
num_arrays: 10 # Number of arrays to send.

TestMultiNodeCommunication:
send_obj: "plot" # Either "plot" or "array". Type of object to send
arr_size: 5 # Size of the array if sending array

FindPeaksPyAlgos:
outdir: ""
n_events: 100
Expand Down
43 changes: 38 additions & 5 deletions docs/tutorial/new_task.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,17 @@ There are more examples of this pattern spread throughout the various `Task` mod

**Overview**

After a pydantic model has been created, the next required step is to define a **managed `Task`**. In the context of this library, a **managed `Task`** refers to the combination of an `Executor` and a `Task` to run. The `Executor` manages the process of `Task` submission and the execution environment, as well as performing an logging, eLog communication, etc. There are currently two types of `Executor` to choose from. For most cases you will use the first option for third-party `Task`s.
After a pydantic model has been created, the next required step is to define a **managed `Task`**. In the context of this library, a **managed `Task`** refers to the combination of an `Executor` and a `Task` to run. The `Executor` manages the process of `Task` submission and the execution environment, as well as performing any logging, eLog communication, etc. There are currently two types of `Executor` to choose from, **but only one is applicable to third-party code.** The second `Executor` is listed below for completeness only. If you need MPI see the note below.

1. `Executor`: This is the standard `Executor` and sufficient for most third-party uses cases.
1. `Executor`: This is the standard `Executor`. It should be used for third-party uses cases.
2. `MPIExecutor`: This performs all the same types of operations as the option above; however, it will submit your `Task` using MPI.
- The `MPIExecutor` will submit the `Task` using the number of available cores - 1. The number of cores is determined from the physical core/thread count on your local machine, or the number of cores allocated by SLURM when submitting on the batch nodes.

As mentioned, for most cases you can setup a third-party `Task` to use the first type of `Executor`. If, however, your third-party `Task` uses MPI, you can use either. When using the standard `Executor` for a `Task` requiring MPI, the `executable` in the pydantic model must be set to `mpirun`. For example, a third-party `Task` model, that uses MPI but can be run with the `Executor` may look like the following. We assume this `Task` runs a Python script using MPI.
**Using MPI with third-party `Task`s**

As mentioned, you should setup a third-party `Task` to use the first type of `Executor`. If, however, your third-party `Task` uses MPI this may seem non-intuitive. When using the `MPIExecutor` LUTE code is submitted with MPI. This includes the code that performs signalling to the `Executor` and `exec`s the third-party code you are interested in running. While it is possible to set this code up to run with MPI, it is more challenging in the case of third-party `Task`s because there is no `Task` code to modify directly! The `MPIExecutor` is provided mostly for first-party code. This is not an issue, however, since the standard `Executor` is easily configured to run with MPI in the case of third-party code.

When using the standard `Executor` for a `Task` requiring MPI, the `executable` in the pydantic model must be set to `mpirun`. For example, a third-party `Task` model, that uses MPI but is intended to be run with the `Executor` may look like the following. We assume this `Task` runs a Python script using MPI.

```py
class RunMPITaskParameters(ThirdPartyParameters):
Expand Down Expand Up @@ -603,8 +607,9 @@ In addition to the global configuration options there are a couple of ways to sp
### Writing the `Task`
You can write your analysis code (or whatever code to be executed) as long as it adheres to the limited rules below. You can create a new module for your `Task` in `lute.tasks` or add it to any existing module, if it makes sense for it to belong there. The `Task` itself is a single class constructed as:
1. Your analysis `Task` is a class named in a way that matches its Pydantic model. E.g. `RunTask` is the `Task`, and `RunTaskParameters` is the Pydantic model.
2. The class must inherit from the `Task` class (see template below).
2. The class must inherit from the `Task` class (see template below). **If you intend to use MPI see the following section.**
3. You must provide an implementation of a `_run` method. This is the method that will be executed when the `Task` is run. You can in addition write as many methods as you need. For fine-grained execution control you can also provide `_pre_run()` and `_post_run()` methods, but this is optional.
4. For all communication (including print statements) you should use the `_report_to_executor(msg: Message)` method. Since the `Task` is run as a subprocess this method will pass information to the controlling `Executor`. You can pass **any** type of object using this method, strings, plots, arrays, etc.
5. If you did not use the `set_result` configuration option in your parameters model, make sure to provide a result when finished. This is done by setting `self._result.payload = ...`. You can set the result to be any object. If you have written the result to a file, for example, please provide a path.
Expand Down Expand Up @@ -652,6 +657,34 @@ class RunTask(Task): # Inherit from Task
self._result.task_status = TaskStatus.COMPLETED
```

#### Using MPI for your `Task`

In the case your `Task` is written to use `MPI` a slight modification to the template above is needed. Specifically, an additional keyword argument should be passed to the base class initializer: `use_mpi=True`. This tells the base class to adjust signalling/communication behaviour appropriately for a multi-rank MPI program. Doing this prevents tricky-to-track-down problems due to ranks starting, completing and sending messages at different times. The rest of your code can, as before, be written as you see fit. The use of this keyword argument will also synchronize the start of all ranks and wait until all ranks have finished to exit.

```py
"""Task which needs to run with MPI"""

__all__ = ["RunTask"]
__author__ = "" # Please include so we know who the SME is

# Include any imports you need here

from lute.execution.ipc import Message # Message for communication
from lute.io.models.base import * # For TaskParameters
from lute.tasks.task import * # For Task

# Only the init is shown
class RunMPITask(Task): # Inherit from Task
"""Task description goes here, or in __init__"""

# Signal the use of MPI!
def __init__(self, *, params: TaskParameters, use_mpi: bool = True) -> None:
super().__init__(params=params, use_mpi=use_mpi) # Sets up Task, parameters, etc.
# That's it.
```

#### Message signals

Signals in `Message` objects are strings and can be one of the following:

```py
Expand Down Expand Up @@ -684,4 +717,4 @@ def import_task(task_name: str) -> Type[Task]:
```

### Defining an `Executor`
The process of `Executor` definition is identical to the process as described for `ThirdPartyTask`s above.
The process of `Executor` definition is identical to the process as described for `ThirdPartyTask`s above. The one exception is if you defined the `Task` to use MPI as described in the section above (Using MPI for your `Task`), you will likely consider using the `MPIExecutor`.
10 changes: 9 additions & 1 deletion launch_scripts/submit_slurm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,15 @@ LOG_FILE="${TASK}_${EXPERIMENT:-$EXP}_r${FORMAT_RUN}_$(date +'%Y-%m-%d_%H-%M-%S'
SLURM_ARGS+=" --output=${LOG_FILE}.out"
SLURM_ARGS+=" --error=${LOG_FILE}.out"

export LUTE_SOCKET="/tmp/lute_${RANDOM}.sock"
# If LUTE_USE_TCP is unset use TCP
if [[ -z ${LUTE_USE_TCP} || ${LUTE_USE_TCP} != 0 ]]; then
echo "Using TCP"
export LUTE_USE_TCP=1
else
echo "Using Unix sockets"
unset LUTE_USE_TCP
export LUTE_SOCKET="/tmp/lute_${RANDOM}.sock"
fi

# By default source the psana environment since most Tasks will use it.
source /sdf/group/lcls/ds/ana/sw/conda1/manage/bin/psconda.sh
Expand Down
37 changes: 25 additions & 12 deletions lute/execution/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,16 @@ def _pre_task(self) -> None:
This method may or may not be used by subclasses. It may be useful
for logging etc.
"""
...
# This prevents the Executors in managed_tasks.py from all acquiring
# resources like sockets.
for communicator in self._communicators:
communicator.delayed_setup()
# Not great, but experience shows we need a bit of time to setup
# network.
time.sleep(0.1)
# Propagate any env vars setup by Communicators - only update LUTE_ vars
tmp: Dict[str, str] = {key: os.environ[key] for key in os.environ if "LUTE_" in key}
self._analysis_desc.task_env.update(tmp)

def _submit_task(self, cmd: str) -> subprocess.Popen:
proc: subprocess.Popen = subprocess.Popen(
Expand Down Expand Up @@ -299,6 +308,7 @@ def _submit_cmd(self, executable_path: str, params: str) -> str:

def execute_task(self) -> None:
"""Run the requested Task as a subprocess."""
self._pre_task()
lute_path: Optional[str] = os.getenv("LUTE_PATH")
if lute_path is None:
logger.debug("Absolute path to subprocess_task.py not found.")
Expand Down Expand Up @@ -596,17 +606,20 @@ def _task_loop(self, proc: subprocess.Popen) -> None:
that its finished.
"""
for communicator in self._communicators:
msg: Message = communicator.read(proc)
if msg.signal is not None and msg.signal.upper() in LUTE_SIGNALS:
hook: Callable[[Executor, Message], None] = getattr(
self.Hooks, msg.signal.lower()
)
hook(self, msg)
if msg.contents is not None:
if isinstance(msg.contents, str) and msg.contents != "":
logger.info(msg.contents)
elif not isinstance(msg.contents, str):
logger.info(msg.contents)
while True:
msg: Message = communicator.read(proc)
if msg.signal is not None and msg.signal.upper() in LUTE_SIGNALS:
hook: Callable[[Executor, Message], None] = getattr(
self.Hooks, msg.signal.lower()
)
hook(self, msg)
if msg.contents is not None:
if isinstance(msg.contents, str) and msg.contents != "":
logger.info(msg.contents)
elif not isinstance(msg.contents, str):
logger.info(msg.contents)
if not communicator.has_messages:
break

def _finalize_task(self, proc: subprocess.Popen) -> None:
"""Any actions to be performed after the Task has ended.
Expand Down
Loading

0 comments on commit 2d37011

Please sign in to comment.