diff --git a/xpysom/tests.py b/xpysom/tests.py index e4d2d68..caf3405 100644 --- a/xpysom/tests.py +++ b/xpysom/tests.py @@ -12,6 +12,8 @@ import pickle import os +import mockmpi +import sys class TestCupySom(unittest.TestCase): def setUp(self, xp=cp): @@ -303,4 +305,66 @@ def test_bubble(self): class TestNumpySomHex(TestCupySomHex): def setUp(self): - TestCupySom.setUp(self, xp=np) \ No newline at end of file + TestCupySom.setUp(self, xp=np) + + + +def core_mpi_init_weight(comm, xp): + xp = {'np': np, 'cp': cp}[xp] + som = XPySom(5, 5, 2, sigma=1.0, learning_rate=0.5, random_seed=1, xp=xp) + wcheck = som._weights.copy() + comm.Bcast(wcheck) + + np.testing.assert_array_almost_equal(wcheck, som._weights) + +def core_mpi_train(comm, xp): + xp = {'np': np, 'cp': cp}[xp] + # train two equivalent SOMs, one in parallel. + sys.modules['mpi4py'] = mockmpi + sys.modules['mpi4py.MPI'] = mockmpi.comm + mockmpi.MPI = mockmpi.comm + + nfeat = 5 + ndata = 100 + + # data at root + rng = np.random.default_rng(12345) + # should ensure all rng values the same + data = rng.uniform(size=(ndata, nfeat)) + + # split data among processors + my_data = np.array_split(data, comm.size)[comm.rank] + + som1 = XPySom(5, 5, nfeat, sigma=1.0, learning_rate=0.5, random_seed=7, xp=xp) + + som1.train(my_data, 10, comm=comm) + comm.Barrier() + + # results should be the same as a serial test using all the data + if comm.rank == 0: + som2 = XPySom(5, 5, nfeat, sigma=1.0, learning_rate=0.5, random_seed=7, xp=xp) + som2.train(data, 10, comm=None) + np.testing.assert_array_almost_equal(som1._weights, som2._weights) + + + + +class TestMPINumpy(unittest.TestCase): + def setUp(self): + self.xp = 'np' + def test_pca_weights_init(self): + mockmpi.mock_mpiexec(2, core_mpi_init_weight, self.xp) + mockmpi.mock_mpiexec(5, core_mpi_init_weight, self.xp) + + + def test_mpi_train(self): + mockmpi.mock_mpiexec(2, core_mpi_train, self.xp) + mockmpi.mock_mpiexec(5, core_mpi_train, self.xp) + +class TestMPICupy(TestMPINumpy): + def setUp(self): + self.xp = 'cp' + + +if __name__ == "__main__": + unittest.main() diff --git a/xpysom/xpysom.py b/xpysom/xpysom.py index c91c46a..51e1f5b 100644 --- a/xpysom/xpysom.py +++ b/xpysom/xpysom.py @@ -31,6 +31,21 @@ beginning = None sec_left = None + +def mpi_reduce(comm, data, xp): + import mpi4py.MPI + # TODO: bleeding edge mpi4py versions + # can directly access cupy arrays. Once + # that is mainstream then use it here + + if xp.__name__ == 'cupy': + tmp = xp.asnumpy(data) + comm.Allreduce(mpi4py.MPI.IN_PLACE, tmp) + data[:] = xp.asarray(tmp) + else: + comm.Allreduce(mpi4py.MPI.IN_PLACE, data) + + def print_progress(t, T): digits = len(str(T)) @@ -415,11 +430,17 @@ def _update(self, x_gpu, wins, eta, sig): self._denominator_gpu += sum_g_gpu[:,:,self.xp.newaxis] - def _merge_updates(self): + def _merge_updates(self, comm=None): """ Divides the numerator accumulator by the denominator accumulator to compute the new weights. """ + + if comm is not None: + comm.Barrier() + mpi_reduce(comm, self._denominator_gpu, self.xp) + mpi_reduce(comm, self._numerator_gpu, self.xp) + self._weights_gpu = self.xp.where( self._denominator_gpu != 0, self._numerator_gpu / self._denominator_gpu, @@ -427,7 +448,7 @@ def _merge_updates(self): ) - def train(self, data, num_epochs, iter_beg=0, iter_end=None, verbose=False): + def train(self, data, num_epochs, iter_beg=0, iter_end=None, verbose=False, comm=None): """Trains the SOM. Parameters @@ -508,7 +529,7 @@ def train(self, data, num_epochs, iter_beg=0, iter_end=None, verbose=False): num_epochs*len(data) ) - self._merge_updates() + self._merge_updates(comm=comm) # Copy back arrays to host if self.xp.__name__ == 'cupy': @@ -654,20 +675,29 @@ def topographic_error(self, data): return (distance > 1.5).mean().item() - def random_weights_init(self, data): + def random_weights_init(self, data, comm=None): """Initializes the weights of the SOM picking random samples from data. TODO: unoptimized """ self._check_input_len(data) - it = np.nditer(self._weights[:,:,0], flags=['multi_index']) - while not it.finished: - rand_i = self._random_generator.randint(len(data)) - self._weights[it.multi_index] = data[rand_i] - it.iternext() + # Only the root process needs to do this + if (comm is None) or (comm.rank == 0): + it = np.nditer(self._weights[:,:,0], flags=['multi_index']) + while not it.finished: + rand_i = self._random_generator.randint(len(data)) + self._weights[it.multi_index] = data[rand_i] + it.iternext() + + # Give the weights from the root process to everyone else. + # These are host arrays so we do not need to move from device + if comm is not None: + import mpi4py.MPI + comm.Bcast(self._weights) - def pca_weights_init(self, data): + + def pca_weights_init(self, data, comm=None): """Initializes the weights to span the first two principal components. This initialization doesn't depend on random processes and @@ -682,15 +712,21 @@ def pca_weights_init(self, data): msg = 'The data needs at least 2 features for pca initialization' raise ValueError(msg) self._check_input_len(data) - if len(self._neigx) == 1 or len(self._neigy) == 1: - msg = 'PCA initialization inappropriate:' + \ - 'One of the dimensions of the map is 1.' - warn(msg) - pc_length, pc = np.linalg.eig(np.cov(np.transpose(data))) - pc_order = np.argsort(-pc_length) - for i, c1 in enumerate(np.linspace(-1, 1, len(self._neigx))): - for j, c2 in enumerate(np.linspace(-1, 1, len(self._neigy))): - self._weights[i, j] = c1*pc[pc_order[0]] + c2*pc[pc_order[1]] + + if (comm is None) or (comm.rank == 0): + if len(self._neigx) == 1 or len(self._neigy) == 1: + msg = 'PCA initialization inappropriate:' + \ + 'One of the dimensions of the map is 1.' + warn(msg) + pc_length, pc = np.linalg.eig(np.cov(np.transpose(data))) + pc_order = np.argsort(-pc_length) + for i, c1 in enumerate(np.linspace(-1, 1, len(self._neigx))): + for j, c2 in enumerate(np.linspace(-1, 1, len(self._neigy))): + self._weights[i, j] = c1*pc[pc_order[0]] + c2*pc[pc_order[1]] + + if comm is not None: + import mpi4py.MPI + comm.Bcast(self._weights) def distance_map(self):