diff --git a/examples/plot_estimate_gmm.py b/examples/plot_estimate_gmm.py index 0d71317784..d5e5c4ed14 100644 --- a/examples/plot_estimate_gmm.py +++ b/examples/plot_estimate_gmm.py @@ -20,12 +20,12 @@ n_samples = 300 n_features = 2 X = np.ndarray((n_samples, n_features)) -X[:n_samples / 3, :] = random_state.multivariate_normal( - [0.0, 1.0], [[0.5, -1.0], [-1.0, 5.0]], size=(n_samples / 3,)) -X[n_samples / 3:-n_samples / 3, :] = random_state.multivariate_normal( - [-2.0, -2.0], [[3.0, 1.0], [1.0, 1.0]], size=(n_samples / 3,)) -X[-n_samples / 3:, :] = random_state.multivariate_normal( - [3.0, 1.0], [[3.0, -1.0], [-1.0, 1.0]], size=(n_samples / 3,)) +X[:n_samples // 3, :] = random_state.multivariate_normal( + [0.0, 1.0], [[0.5, -1.0], [-1.0, 5.0]], size=(n_samples // 3,)) +X[n_samples // 3:-n_samples // 3, :] = random_state.multivariate_normal( + [-2.0, -2.0], [[3.0, 1.0], [1.0, 1.0]], size=(n_samples // 3,)) +X[-n_samples // 3:, :] = random_state.multivariate_normal( + [3.0, 1.0], [[3.0, -1.0], [-1.0, 1.0]], size=(n_samples // 3,)) gmm = GMM(n_components=3, random_state=random_state) gmm.from_samples(X) diff --git a/examples/plot_initializations.py b/examples/plot_initializations.py new file mode 100644 index 0000000000..013b9c4fdd --- /dev/null +++ b/examples/plot_initializations.py @@ -0,0 +1,86 @@ +""" +================================== +Compare Initializations Strategies +================================== + +Expectation Maximization for Gaussian Mixture Models does not have a unique +solution. The result depends on the initialization. It is particularly +important to either normalize the training data or set the covariances +accordingly. In addition, k-means++ initialization helps to find a good +initial distribution of means. +""" +print(__doc__) + +import numpy as np +import matplotlib.pyplot as plt +from gmr.utils import check_random_state +from gmr import GMM, plot_error_ellipses, kmeansplusplus_initialization, covariance_initialization + + +random_state = check_random_state(0) + +n_samples = 300 +n_features = 2 +X = np.ndarray((n_samples, n_features)) +X[:n_samples // 3, :] = random_state.multivariate_normal( + [0.0, 1.0], [[0.5, -1.0], [-1.0, 5.0]], size=(n_samples // 3,)) +X[n_samples // 3:-n_samples // 3, :] = random_state.multivariate_normal( + [-2.0, -2.0], [[3.0, 1.0], [1.0, 1.0]], size=(n_samples // 3,)) +X[-n_samples // 3:, :] = random_state.multivariate_normal( + [3.0, 1.0], [[3.0, -1.0], [-1.0, 1.0]], size=(n_samples // 3,)) + +# artificial scaling, makes standard implementation fail +# either the initial covariances have to be adjusted or we have +# to normalize the dataset +X[:, 1] *= 100.0 + +plt.figure(figsize=(10, 10)) + +n_components = 3 +initial_covs = np.empty((n_components, n_features, n_features)) +initial_covs[:] = np.eye(n_features) +gmm = GMM(n_components=n_components, + priors=np.ones(n_components, dtype=np.float) / n_components, + means=np.zeros((n_components, n_features)), + covariances=initial_covs, random_state=random_state) + +plt.subplot(2, 2, 1) +plt.title("Default initialization") +plt.xlim((-10, 10)) +plt.ylim((-1000, 1000)) +plot_error_ellipses(plt.gca(), gmm, colors=["r", "g", "b"], alpha=0.15) +plt.scatter(X[:, 0], X[:, 1]) + +gmm.from_samples(X) + +plt.subplot(2, 2, 2) +plt.title("Trained Gaussian Mixture Model") +plt.xlim((-10, 10)) +plt.ylim((-1000, 1000)) +plot_error_ellipses(plt.gca(), gmm, colors=["r", "g", "b"], alpha=0.15) +plt.scatter(X[:, 0], X[:, 1]) + +initial_means = kmeansplusplus_initialization(X, n_components, random_state) +initial_covs = covariance_initialization(X, n_components) +gmm = GMM(n_components=n_components, + priors=np.ones(n_components, dtype=np.float) / n_components, + means=np.copy(initial_means), + covariances=initial_covs, random_state=random_state) + +plt.subplot(2, 2, 3) +plt.title("k-means++ and inital covariance scaling") +plt.xlim((-10, 10)) +plt.ylim((-1000, 1000)) +plot_error_ellipses(plt.gca(), gmm, colors=["r", "g", "b"], alpha=0.15) +plt.scatter(X[:, 0], X[:, 1]) + +gmm.from_samples(X) + +plt.subplot(2, 2, 4) +plt.title("Trained Gaussian Mixture Model") +plt.xlim((-10, 10)) +plt.ylim((-1000, 1000)) +plot_error_ellipses(plt.gca(), gmm, colors=["r", "g", "b"], alpha=0.15) +plt.scatter(X[:, 0], X[:, 1]) + +plt.show() diff --git a/examples/plot_regression.py b/examples/plot_regression.py index 49a08a2711..f72e66b61a 100644 --- a/examples/plot_regression.py +++ b/examples/plot_regression.py @@ -20,7 +20,7 @@ n_samples = 10 X = np.ndarray((n_samples, 2)) X[:, 0] = np.linspace(0, 2 * np.pi, n_samples) -X[:, 1] = 1 - 3 * X[:, 0] + random_state.randn(n_samples) +X[:, 1] = 100 * (1 - 3 * X[:, 0] + random_state.randn(n_samples)) mvn = MVN(random_state=0) mvn.from_samples(X) @@ -35,7 +35,7 @@ plt.scatter(X[:, 0], X[:, 1]) y = mean.ravel() s = covariance.ravel() -plt.fill_between(X_test, y - s, y + s, alpha=0.2) +plt.fill_between(X_test, y - np.sqrt(s), y + np.sqrt(s), alpha=0.2) plt.plot(X_test, y, lw=2) n_samples = 100 diff --git a/examples/plot_trajectories.py b/examples/plot_trajectories.py new file mode 100644 index 0000000000..9d065405af --- /dev/null +++ b/examples/plot_trajectories.py @@ -0,0 +1,152 @@ +import numpy as np +import matplotlib.pyplot as plt +from matplotlib.patches import Ellipse +from itertools import cycle +from gmr import GMM, plot_error_ellipses, kmeansplusplus_initialization, covariance_initialization, plot_error_ellipse +from gmr.utils import check_random_state + + +def make_demonstrations(n_demonstrations, n_steps, sigma=0.25, mu=0.5, + start=np.zeros(2), goal=np.ones(2), random_state=None): + """Generates demonstration that can be used to test imitation learning. + + Parameters + ---------- + n_demonstrations : int + Number of noisy demonstrations + + n_steps : int + Number of time steps + + sigma : float, optional (default: 0.25) + Standard deviation of noisy component + + mu : float, optional (default: 0.5) + Mean of noisy component + + start : array, shape (2,), optional (default: 0s) + Initial position + + goal : array, shape (2,), optional (default: 1s) + Final position + + random_state : int + Seed for random number generator + + Returns + ------- + X : array, shape (n_task_dims, n_steps, n_demonstrations) + Noisy demonstrated trajectories + + ground_truth : array, shape (n_task_dims, n_steps) + Original trajectory + """ + random_state = np.random.RandomState(random_state) + + X = np.empty((2, n_steps, n_demonstrations)) + + # Generate ground-truth for plotting + ground_truth = np.empty((2, n_steps)) + T = np.linspace(-0, 1, n_steps) + ground_truth[0] = T + ground_truth[1] = (T / 20 + 1 / (sigma * np.sqrt(2 * np.pi)) * + np.exp(-0.5 * ((T - mu) / sigma) ** 2)) + + # Generate trajectories + for i in range(n_demonstrations): + noisy_sigma = sigma * random_state.normal(1.0, 0.1) + noisy_mu = mu * random_state.normal(1.0, 0.1) + X[0, :, i] = T + X[1, :, i] = T + (1 / (noisy_sigma * np.sqrt(2 * np.pi)) * + np.exp(-0.5 * ((T - noisy_mu) / + noisy_sigma) ** 2)) + + # Spatial alignment + current_start = ground_truth[:, 0] + current_goal = ground_truth[:, -1] + current_amplitude = current_goal - current_start + amplitude = goal - start + ground_truth = ((ground_truth.T - current_start) * amplitude / + current_amplitude + start).T + + for demo_idx in range(n_demonstrations): + current_start = X[:, 0, demo_idx] + current_goal = X[:, -1, demo_idx] + current_amplitude = current_goal - current_start + X[:, :, demo_idx] = ((X[:, :, demo_idx].T - current_start) * + amplitude / current_amplitude + start).T + + return X, ground_truth + + +plot_covariances = False +X, _ = make_demonstrations( + n_demonstrations=500, n_steps=20, goal=np.array([1., 2.]), + random_state=0) +X = X.transpose(2, 1, 0) +n_demonstrations, n_steps, n_task_dims = X.shape +X_train = np.empty((n_demonstrations, n_steps, n_task_dims + 1)) +X_train[:, :, 1:] = X +t = np.linspace(0, 1, n_steps) +X_train[:, :, 0] = t +X_train = X_train.reshape(n_demonstrations * n_steps, n_task_dims + 1) + +random_state = check_random_state(0) +n_components = 10 +initial_means = kmeansplusplus_initialization(X_train, n_components, random_state) +initial_covs = covariance_initialization(X_train, n_components) +from sklearn.mixture import BayesianGaussianMixture +bgmm = BayesianGaussianMixture(n_components=n_components, max_iter=500).fit(X_train) +gmm = GMM( + n_components=n_components, + priors=bgmm.weights_, + means=bgmm.means_, + #means=np.copy(initial_means), + covariances=bgmm.covariances_, + #covariances=initial_covs, + verbose=2, + random_state=random_state) +#gmm.from_samples( +# X_train, n_iter=100, +# reinit_means=True, min_eff_sample=n_task_dims, max_eff_sample=0.8) + +plt.figure() +plt.subplot(111) + +plt.plot(X[:, :, 0].T, X[:, :, 1].T, c="k", alpha=0.1) + +means_over_time = [] +y_stds = [] +for step in t: + conditional_gmm = gmm.condition([0], np.array([step])) + conditional_mvn = conditional_gmm.to_mvn() + means_over_time.append(conditional_mvn.mean) + y_stds.append(conditional_mvn.covariance[1, 1]) + samples = conditional_gmm.sample(100) + plt.scatter(samples[:, 0], samples[:, 1], s=1) +means_over_time = np.array(means_over_time) +y_stds = np.array(y_stds) +plt.plot(means_over_time[:, 0], means_over_time[:, 1], c="r", lw=2) +plt.fill_between( + means_over_time[:, 0], + means_over_time[:, 1] - 1.96 * y_stds, + means_over_time[:, 1] + 1.96 * y_stds, + color="r", alpha=0.5) + +if plot_covariances: + colors = cycle(["r", "g", "b"]) + for factor in np.linspace(0.5, 4.0, 8): + new_gmm = GMM( + n_components=len(gmm.means), priors=gmm.priors, + means=gmm.means[:, 1:], covariances=gmm.covariances[:, 1:, 1:], + random_state=gmm.random_state) + for mean, (angle, width, height) in new_gmm.to_ellipses(factor): + ell = Ellipse(xy=mean, width=width, height=height, + angle=np.degrees(angle)) + ell.set_alpha(0.2) + ell.set_color(next(colors)) + plt.gca().add_artist(ell) + +plt.xlabel("$x_1$") +plt.ylabel("$x_2$") +plt.show() diff --git a/gmr/__init__.py b/gmr/__init__.py index bbe405eaec..c15a601429 100644 --- a/gmr/__init__.py +++ b/gmr/__init__.py @@ -12,4 +12,5 @@ __all__ = ['gmm', 'mvn', 'utils'] from .mvn import MVN, plot_error_ellipse -from .gmm import GMM, plot_error_ellipses +from .gmm import (GMM, plot_error_ellipses, kmeansplusplus_initialization, + covariance_initialization) diff --git a/gmr/gmm.py b/gmr/gmm.py index 7919f5d125..ca14b93bfb 100644 --- a/gmr/gmm.py +++ b/gmr/gmm.py @@ -1,8 +1,91 @@ import numpy as np +from scipy.spatial.distance import cdist, pdist, squareform from .utils import check_random_state from .mvn import MVN +def kmeansplusplus_initialization(X, n_components, random_state=None): + """k-means++ initialization for centers of a GMM. + + Initialization of GMM centers before expectation maximization (EM). + The first center is selected uniformly random. Subsequent centers are + sampled from the data with probability proportional to the squared + distance to the closest center. + + Parameters + ---------- + X : array-like, shape (n_samples, n_features) + Samples from the true distribution. + + n_components : int (> 0) + Number of MVNs that compose the GMM. + + random_state : int or RandomState, optional (default: global random state) + If an integer is given, it fixes the seed. Defaults to the global numpy + random number generator. + + Returns + ------- + initial_means : array, shape (n_components, n_features) + Initial means + """ + if n_components <= 0: + raise ValueError("Only n_components > 0 allowed.") + if n_components > len(X): + raise ValueError( + "More components (%d) than samples (%d) are not allowed." + % (n_components, len(X))) + + random_state = check_random_state(random_state) + + all_indices = np.arange(len(X)) + selected_centers = [random_state.choice(all_indices, size=1).tolist()[0]] + while len(selected_centers) < n_components: + centers = np.atleast_2d(X[np.array(selected_centers, dtype=int)]) + i = _select_next_center(X, centers, random_state, selected_centers, all_indices) + selected_centers.append(i) + return X[np.array(selected_centers, dtype=int)] + + +def _select_next_center(X, centers, random_state, excluded_indices=[], all_indices=None): + squared_distances = cdist(X, centers, metric="sqeuclidean") + selection_probability = squared_distances.max(axis=1) + selection_probability[np.array(excluded_indices, dtype=int)] = 0.0 + selection_probability /= np.sum(selection_probability) + if all_indices is None: + all_indices = np.arange(len(X)) + return random_state.choice(all_indices, size=1, p=selection_probability)[0] + + +def covariance_initialization(X, n_components): + """Initialize covariances. + + The standard deviation in each dimension is set to the average Euclidean + distance of the training samples divided by the number of components. + + Parameters + ---------- + X : array-like, shape (n_samples, n_features) + Samples from the true distribution. + + n_components : int (> 0) + Number of MVNs that compose the GMM. + + Returns + ------- + initial_covariances : array, shape (n_components, n_features, n_features) + Initial covariances + """ + n_features = X.shape[1] + average_distances = np.empty(n_features) + for i in range(n_features): + average_distances[i] = np.mean( + pdist(X[:, i, np.newaxis], metric="euclidean")) + initial_covariances = np.empty((n_components, n_features, n_features)) + initial_covariances[:] = np.eye(n_features) * (average_distances / n_components) ** 2 + return initial_covariances + + class GMM(object): """Gaussian Mixture Model. @@ -44,7 +127,8 @@ def _check_initialized(self): if self.covariances is None: raise ValueError("Covariances have not been initialized") - def from_samples(self, X, R_diff=1e-4, n_iter=100): + def from_samples(self, X, R_diff=1e-4, n_iter=100, reinit_means=False, + min_eff_sample=0, max_eff_sample=1.0): """MLE of the mean and covariance. Expectation-maximization is used to infer the model parameters. The @@ -54,7 +138,7 @@ def from_samples(self, X, R_diff=1e-4, n_iter=100): Parameters ---------- X : array-like, shape (n_samples, n_features) - Samples from the true function. + Samples from the true distribution. R_diff : float Minimum allowed difference of responsibilities between successive @@ -63,11 +147,34 @@ def from_samples(self, X, R_diff=1e-4, n_iter=100): n_iter : int Maximum number of iterations. + reinit_means : bool, optional (default: False) + Reinitialize degenerated means. Checks distances between all means + and initializes identical distributions. + + min_eff_sample : int, optional (default: 0) + Minimum number of effective samples that is allowed to update one + Gaussian before it will be reinitialized. 0 deactivates this. + The number of features (n_features) is a good initial guess. Do + not set too large values, otherwise small clusters might not be + covered at all. + + max_eff_sample : float in [0, 1], optional (default: 1.0) + Maximum fraction of effective samples from all samples that is + allowed to update one Gaussian. If this threshold is surpassed + it will be reinitialized. A value >= 1.0 will disable this. + A value below 1 / n_components is not possible. A value between + 0.5 and 1 is recommended. + Returns ------- self : MVN This object. """ + if max_eff_sample <= 1.0 / self.n_components: + raise ValueError( + "max_eff_sample is too small. It must be set to at least " + "1 / n_components.") + n_samples, n_features = X.shape if self.priors is None: @@ -75,7 +182,6 @@ def from_samples(self, X, R_diff=1e-4, n_iter=100): dtype=np.float) / self.n_components if self.means is None: - # TODO k-means++ indices = self.random_state.choice( np.arange(n_samples), self.n_components) self.means = X[indices] @@ -86,8 +192,12 @@ def from_samples(self, X, R_diff=1e-4, n_iter=100): for k in range(self.n_components): self.covariances[k] = np.eye(n_features) + initial_covariances = np.copy(self.covariances) + R = np.zeros((n_samples, self.n_components)) - for _ in range(n_iter): + for it in range(n_iter): + if self.verbose >= 2: + print("Iteration #%d" % (it + 1)) R_prev = R # Expectation @@ -104,11 +214,80 @@ def from_samples(self, X, R_diff=1e-4, n_iter=100): self.priors = w / w.sum() self.means = R_n.T.dot(X) for k in range(self.n_components): + if self.verbose >= 2: + print("Component #%d" % k) + + # TODO + #max_variance = np.diag(self.covariances[k]).max() + #if max_variance < + Xm = X - self.means[k] self.covariances[k] = (R_n[:, k, np.newaxis] * Xm).T.dot(Xm) + effective_samples = 1.0 / np.sum(R_n[:, k] ** 2) + if effective_samples < min_eff_sample: + print("Not enough effective samples") + self._reinitialize_gaussian(k, X, initial_covariances) + if effective_samples > int(max_eff_sample * n_samples): + print("Too many effective samples") + self._reinitialize_gaussian(k, X, initial_covariances) + if self.verbose >= 2: + print("Effective samples %g" % effective_samples) + + eigvals, _ = np.linalg.eigh(self.covariances[k]) + eigvals[np.abs(eigvals) < np.finfo(R.dtype).eps] = 0.0 + nonzero_eigvals = np.count_nonzero(eigvals) + if nonzero_eigvals < n_features: + print("Not enough nonzero eigenvalues") + #self.covariances[k] = initial_covariances[k] + if self.verbose >= 2: + print("Nonzero eigenvalues %d" % nonzero_eigvals) + #print(eigvals) + #print(self.covariances[k]) + rank = np.linalg.matrix_rank(self.covariances[k]) + if self.verbose >= 2: + print("Too low rank") + #self.covariances[k] = initial_covariances[k] + if rank < n_features: + print("Rank %d" % rank) + + if reinit_means: + self._reinitialize_too_close_means(X, R, initial_covariances) + return self + def _reinitialize_too_close_means(self, X, R, initial_covariances): + mean_distances = pdist(self.means) + too_close_means = np.any(mean_distances < np.finfo(R.dtype).eps) + if too_close_means: + print("Too close means") + mean_distances = squareform(mean_distances) + #if self.verbose >= 2: + # print(mean_distances) + if too_close_means: + same_means = np.where(mean_distances + np.eye(self.n_components) + < np.finfo(R.dtype).eps) + # we only reset one mean at a time + i = same_means[0][0] + + if self.verbose >= 2: + print("Resetting mean #%d" % i) + + self._reinitialize_gaussian(i, X, initial_covariances) + + def _reinitialize_gaussian(self, i, X, initial_covariances): + if i == 0: + centers = self.means[1:] + else: + centers = np.vstack((self.means[:i], self.means[i + 1:])) + n = _select_next_center(X, centers, self.random_state) + self.means[i] = np.copy(X[n]) + + self.covariances[i] = initial_covariances[i] + + self.priors[i] = 1.0 / self.n_components + self.priors /= np.sum(self.priors) + def sample(self, n_samples): """Sample from Gaussian mixture distribution. @@ -270,8 +449,33 @@ def to_ellipses(self, factor=1.0): res.append((self.means[k], mvn.to_ellipse(factor))) return res + def to_mvn(self): + """Collapse to a single Gaussian. -def plot_error_ellipses(ax, gmm, colors=None): + Returns + ------- + mvn : MVN + Multivariate normal distribution. + """ + self._check_initialized() + + mean = np.sum(self.priors[:, np.newaxis] * self.means, 0) + assert len(self.covariances) + covariance = np.zeros_like(self.covariances[0]) + covariance += np.sum(self.priors[:, np.newaxis, np.newaxis] * self.covariances, axis=0) + covariance += self.means.T.dot(np.diag(self.priors)).dot(self.means) + covariance -= np.outer(mean, mean) + # efficient version of: + #for k in range(self.n_components): + # covariance += self.priors[k] * ( + # self.covariances[k] + np.outer(self.means[k], self.means[k]) - + # np.outer(mean, mean)) + return MVN( + mean=mean, covariance=covariance, + verbose=self.verbose, random_state=self.random_state) + + +def plot_error_ellipses(ax, gmm, colors=None, alpha=0.25): """Plot error ellipses of GMM components. Parameters @@ -281,6 +485,12 @@ def plot_error_ellipses(ax, gmm, colors=None): gmm : GMM Gaussian mixture model. + + colors : list of str, optional (default: None) + Colors in which the ellipses should be plotted + + alpha : int, optional (default: 0.25) + Alpha value for ellipses """ from matplotlib.patches import Ellipse from itertools import cycle @@ -290,7 +500,7 @@ def plot_error_ellipses(ax, gmm, colors=None): for mean, (angle, width, height) in gmm.to_ellipses(factor): ell = Ellipse(xy=mean, width=width, height=height, angle=np.degrees(angle)) - ell.set_alpha(0.25) + ell.set_alpha(alpha) if colors is not None: ell.set_color(next(colors)) ax.add_artist(ell) diff --git a/gmr/mvn.py b/gmr/mvn.py index 02170b008c..9e4a04bcb2 100644 --- a/gmr/mvn.py +++ b/gmr/mvn.py @@ -100,19 +100,29 @@ def to_probability_density(self, X): X = np.atleast_2d(X) n_features = X.shape[1] - C = self.covariance try: - L = sp.linalg.cholesky(C, lower=True) + L = sp.linalg.cholesky(self.covariance, lower=True) except np.linalg.LinAlgError: - C = self.covariance + 1e-6 * np.eye(n_features) - L = sp.linalg.cholesky(C, lower=True) - D = X - self.mean - cov_sol = sp.linalg.solve_triangular(L, D.T, lower=True).T - if self.norm is None: - self.norm = 0.5 / np.pi ** (0.5 * n_features) / sp.linalg.det(L) + # Degenerated covariance, try to add regularization + L = sp.linalg.cholesky( + self.covariance + 1e-3 * np.eye(n_features), lower=True) + + X_minus_mean = X - self.mean - DpD = np.sum(cov_sol ** 2, axis=1) - return self.norm * np.exp(-0.5 * DpD) + if self.norm is None: + # Suppress a determinant of 0 to avoid numerical problems + L_det = max(sp.linalg.det(L), np.finfo(L.dtype).eps) + self.norm = 0.5 / np.pi ** (0.5 * n_features) / L_det + + # Solve L x = (X - mean)^T for x with triangular L + # (LL^T = Sigma), that is, x = L^T^-1 (X - mean)^T. + # We can avoid covariance inversion when computing + # (X - mean) Sigma^-1 (X - mean)^T with this trick, + # since Sigma^-1 = L^T^-1 L^-1. + X_normalized = sp.linalg.solve_triangular( + L, X_minus_mean.T, lower=True).T + + return self.norm * np.exp(-0.5 * np.sum(X_normalized ** 2, axis=1)) def marginalize(self, indices): """Marginalize over everything except the given indices. diff --git a/gmr/tests/test_gmm.py b/gmr/tests/test_gmm.py index 2f9bc83406..d2c6f2b2cf 100644 --- a/gmr/tests/test_gmm.py +++ b/gmr/tests/test_gmm.py @@ -1,7 +1,7 @@ import sys import numpy as np from gmr.utils import check_random_state -from nose.tools import assert_equal, assert_less, assert_raises +from nose.tools import assert_equal, assert_less, assert_raises, assert_in, assert_false, assert_true from nose.plugins.skip import SkipTest from numpy.testing import assert_array_almost_equal try: @@ -10,7 +10,7 @@ except ImportError: # Python 3 from io import StringIO -from gmr import GMM, plot_error_ellipses +from gmr import GMM, plot_error_ellipses, kmeansplusplus_initialization from test_mvn import AxisStub @@ -25,6 +25,56 @@ X = np.vstack((X1, X2)) +def test_kmeanspp_too_few_centers(): + X = np.array([[0.0, 1.0]]) + assert_raises(ValueError, kmeansplusplus_initialization, X, 0, 0) + + +def test_kmeanspp_too_many_centers(): + X = np.array([[0.0, 1.0]]) + assert_raises(ValueError, kmeansplusplus_initialization, X, 2, 0) + + +def test_kmeanspp_one_sample(): + X = np.array([[0.0, 1.0]]) + centers = kmeansplusplus_initialization(X, 1, 0) + assert_array_almost_equal(X, centers) + + +def test_kmeanspp_two_samples(): + X = np.array([[0.0, 1.0], [1.0, 0.0]]) + centers = kmeansplusplus_initialization(X, 1, 0) + assert_in(centers[0], X) + + +def test_kmeanspp_two_samples_two_centers(): + X = np.array([[0.0, 1.0], [1.0, 0.0]]) + centers = kmeansplusplus_initialization(X, 2, 0) + assert_in(centers[0], X) + assert_in(centers[1], X) + assert_false(centers[0, 0] == centers[1, 0]) + + +def test_kmeanspp_six_samples_three_centers(): + X = np.array([ + [0.0, 1.0], + [1.0, 0.0], + [0.0, 0.0], + [1.0, 1.0], + [100.0, 0.0], + [0.0, 100.0]]) + centers = kmeansplusplus_initialization(X, 3, 0) + assert_equal(len(centers), 3) + assert_in(np.array([100.0, 0.0]), centers) + assert_in(np.array([0.0, 100.0]), centers) + assert_true( + X[0] in centers or + X[1] in centers or + X[2] in centers or + X[3] in centers + ) + + def test_estimate_moments(): """Test moments estimated from samples and sampling from GMM.""" global X