The following steps will show you the steps to use MPI through ipython's ipyparallel interface.
Once you are logged in at carc run these steps:
cd /projects/systems/shared/ipython_cluster_profiles
cp -r profile_pbs ~/.ipython/
Then check the files copied.
cd ~/.ipython/profile_pbs
Now on JupyterHub go to the IPython Clusters tab (refresh if already open) and you should see a pbs profile now available to you. Click the JupyterHub icon in the upper left of your screen if you can't see the clusters tab.
You can start a job by setting number of engine in the 'pbs' cluster profile and clicking start under actions. For this example we will request 8 ipython compute engines.
[Optional] Since ipython's ipyparallel system is requesting compute nodes through the torque PBS system you will have to wait until the nodes are running before you can run code on them. Check that the job is running in terminal with
watch qstat -tn -u <username>
You should see something like the following:
Every 2.0s: qstat -t -n -u $USER Wed Oct 23 09:15:14 2019
wheeler-sn.alliance.unm.edu:
Req'd Req'd Elap
Job ID Username Queue Jobname SessID NDS TSK Memory Time S Time
----------------------- ----------- -------- ---------------- ------ ----- ------ --------- --------- - ---------
258370.wheeler-sn.alli mfricke default jupyterhub 21730 1 1 -- 08:00:00 R 00:06:45
wheeler291/1
258371.wheeler-sn.alli mfricke default ipython_controll 22553 1 1 -- 01:00:00 R 00:06:11
wheeler291/2
258372.wheeler-sn.alli mfricke default ipython_engine 3213 2 16 -- 01:00:00 R 00:06:11
wheeler176/0-7+wheeler175/0-7
Notice the ipython engines are running with status 'R'. You can also check to see whether the compute engines are ready in your python notebook (see below).
To exit the watch command use control-C
To change the walltime of your profile, in the ~/.ipython/profile_pbs directory edit the pbs.engine.template and the pbs.controller.template to fit the requirments for your job. By editing these files you can also change from the default to debug queue as you are testing your program.
Now you can open a Jupyter notebook and follow the remainder of this tutorial.
Create a new file in your home directory and name it psum.py. Enter the following into psum.py and save the file.
from mpi4py import MPI
import numpy as np
def psum(a):
locsum = np.sum(a)
rcvBuf = np.array(0.0,'d')
MPI.COMM_WORLD.Allreduce([locsum, MPI.DOUBLE],
[rcvBuf, MPI.DOUBLE],
op=MPI.SUM)
return rcvBuf
This function performs a distributed sum over all the nodes on the MPI communications group.
Create a new Python 3 notebook in Jupyterhub and name it mpi_test.ipynb. Enter the following into cells of your notebook. Many of the commands are run on the MPI cluster and so are asynchronous. To check whether an operation has completed we check the status with ".wait_interactive()". When the status reports "done" you can move on to the next step.
import ipyparallel as ipp
from mpi4py import MPI
import numpy as np
cluster = ipp.Client(profile='pbs')
Engines in ipparallel parlence are the same as processes or workers in other parallel systems.
cluster.ids
[0, 1, 2, 3, 4, 5, 6, 7]
len(cluster[:])
8
view = cluster[:]
Enable ipython `magics´. These are ipython helper functions such as %
view.activate()
Check to see if the MPI communication world is of the expected size. It should be size 8 since we have 8 engines.
Note we are running the Get_size command on each engine to make sure they all see the same MPI comm world. %px simply executes the code following it on each compute engine in parallel.
status_mpi_size=%px size = MPI.COMM_WORLD.Get_size()
status_mpi_size.wait_interactive()
done
The output of viewing the size variable should be an array with the same number of entries as engines, and each entry should be the number of engines requested.
view['size']
[8, 8, 8, 8, 8, 8, 8, 8]
Recall that psum.py just loads the MPI libraries and defines the distributed sum function, psum. We are not actually calling the psum function yet.
status_psum_run=view.run('psum.py')
status_psum_run.wait_interactive()
done
The scatter command sends 32 values from 0 to 31 to the 8 compute engines. Each compute engine gets 32/8=4 values. This is the ipyparallel scatter command, not an MPI scatter command.
status_scatter=view.scatter('a',np.arange(32,dtype='float'))
done
We can view the variable 'a' on all the compute engines. The value of 'a' for each compute engine is an element of the return array. In this case each value is itself an array.
view['a']
[array([0., 1., 2., 3.]), array([4., 5., 6., 7.]), array([ 8., 9., 10., 11.]), array([12., 13., 14., 15.]), array([16., 17., 18., 19.]), array([20., 21., 22., 23.]), array([24., 25., 26., 27.]), array([28., 29., 30., 31.])]
MPI code has to be executed on each compute engine so they can each perform the MPI reduce. This is accomplished by running calling the psum function on all the compute engines simultaniosly. MPI will allow them to communicate with each other to calculate the sum.
status_psum_call=%px totalsum = psum(a)
status_psum_call.wait_interactive()
done
Total should be equal to 31(31+1)/2=496
view['totalsum']
[array(496.), array(496.), array(496.), array(496.), array(496.), array(496.), array(496.), array(496.)]
Each compute engine calculated the sum of all the values. Since we ran this MPI function on all the compute engines they report the same value.
Rather than loading psum from file we can define it in the notebook using the ipython function decorator '@'.
@view.remote(block = True)
def inlinesum():
from mpi4py import MPI
import numpy as np
locsum = np.sum(a)
rcvBuf = np.array(0.0,'d')
MPI.COMM_WORLD.Allreduce([locsum, MPI.DOUBLE],
[rcvBuf, MPI.DOUBLE],
op=MPI.SUM)
return rcvBuf
Now we can call inlinesum and it is automatically run on every compute engine. The call is through ipyparallels but the computation is still using MPI.
inlinesum()
[array(496.), array(496.), array(496.), array(496.), array(496.), array(496.), array(496.), array(496.), array(496.), array(496.), array(496.), array(496.), array(496.), array(496.), array(496.), array(496.)]