diff --git a/scikeo/calkmeans.py b/scikeo/calkmeans.py index 67a72c5..ab03932 100644 --- a/scikeo/calkmeans.py +++ b/scikeo/calkmeans.py @@ -1,159 +1,100 @@ # -*- coding: utf-8 -*- # + +from typing import Optional, Tuple, Dict, List from sklearn.cluster import KMeans import rasterio import numpy as np -def calkmeans(image, k = None, algo = ("auto", "elkan"), max_iter = 300, n_iter = 10, nodata = -99999, **kwargs): - +def calkmeans( + image: rasterio.io.DatasetReader, + k: Optional[int] = None, + algo: Tuple[str, ...] = ("lloyd", "elkan"), + max_iter: int = 300, + n_iter: int = 10, + nodata: float = -99999.0, + **kwargs +) -> Dict[str, List[float]]: ''' - Calibrating kmeans - - This function allows to calibrate the kmeans algorithm. It is possible to obtain - the best 'k' value and the best embedded algorithm in KMmeans. - + Calibrating KMeans Clustering Algorithm + + This function calibrates the KMeans algorithm for satellite image classification. + It can either find the optimal number of clusters (k) by evaluating the inertia + over a range of cluster numbers or determine the best algorithm ('lloyd' or 'elkan') + for a given k. + Parameters: - image: Optical images. It must be rasterio.io.DatasetReader with 3d. - - k: k This argument is None when the objective is to obtain the best 'k' value. - f the objective is to select the best algorithm embedded in kmeans, please specify a 'k' value. - - max_iter: The maximum number of iterations allowed. Strictly related to KMeans. Please see - https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html - - algo: It can be "auto" and 'elkan'. "auto" and "full" are deprecated and they will be - removed in Scikit-Learn 1.3. They are both aliases for "lloyd". - - Changed in version 1.1: Renamed “full” to “lloyd”, and deprecated “auto” and “full”. - Changed “auto” to use “lloyd” instead of “elkan”. - - n_iter: Iterations number to obtain the best 'k' value. 'n_iter' must be greater than the - number of classes expected to be obtained in the classification. Default is 10. - - nodata: The NoData value to replace with -99999. - - **kwargs: These will be passed to scikit-learn KMeans, please see full lists at: - https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html. - - Return: - Labels of classification as numpy object with 2d. - - Note: - If the idea is to find the optimal value of 'k' (clusters or classes), k = None as an - argument of the function must be put, because the function find 'k' for which the intra-class - inertia is stabilized. If the 'k' value is known and the idea is to find the best algorithm - embedded in kmeans (that maximizes inter-class distances), k = n, which 'n' is a specific - class number, must be put. It can be greater than or equal to 0. - + image (rasterio.io.DatasetReader): Optical image with 3D data. + k (Optional[int]): The number of clusters. If None, the function finds the optimal k. + algo (Tuple[str, ...]): Algorithms to evaluate ('lloyd', 'elkan'). + max_iter (int): Maximum iterations for KMeans (default 300). + n_iter (int): Number of iterations or clusters to evaluate (default 10). + nodata (float): The NoData value to identify and handle in the data. + **kwargs: Additional arguments passed to sklearn.cluster.KMeans. + + Returns: + Dict[str, List[float]]: A dictionary with algorithm names as keys and lists of + inertia values as values. + + Notes: + - If k is None, the function evaluates inertia for cluster numbers from 1 to n_iter. + - If k is provided, the function runs KMeans n_iter times for each algorithm to + evaluate their performance. + - The function handles NoData values using fuzzy matching to account for floating-point precision. ''' - - if not isinstance(image, (rasterio.io.DatasetReader)): + if not isinstance(image, rasterio.io.DatasetReader): raise TypeError('"image" must be raster read by rasterio.open().') - - bands = image.count - - rows = image.height - - cols = image.width - - # stack images - #l = [] - #for i in np.arange(1, bands+1): l.append(image.read(int(i))) - #st = np.stack(l) - st = image.read() - - # data in [rows, cols, bands] - st_reorder = np.moveaxis(st, 0, -1) - # data in [rows*cols, bands] - arr = st_reorder.reshape((rows*cols, bands)) - - # nodata - if np.isnan(np.sum(arr)): - arr[np.isnan(arr)] = self.nodata - - if k is None: - - if 'auto' in algo: - - inertias_intra_lloyd_k = [] - - for i in range(1, n_iter + 1): - - # Building and fitting the model - kmeanModel = KMeans(n_clusters = i, max_iter = max_iter, algorithm = 'auto', **kwargs) - - kmeanModel.fit(arr) - inertias_intra_lloyd_k.append(kmeanModel.inertia_) - - if 'elkan' in algo: - - inertias_intra_elkan_k = [] - - for i in range(1, n_iter + 1): - - # Building and fitting the model - kmeanModel = KMeans(n_clusters = i, max_iter = max_iter, algorithm = 'elkan', **kwargs) - - kmeanModel.fit(arr) + bands: int = image.count + rows: int = image.height + cols: int = image.width - inertias_intra_elkan_k.append(kmeanModel.inertia_) - - dic = {'inertias_intra_lloyd_k': inertias_intra_lloyd_k, - 'inertias_intra_elkan_k': inertias_intra_elkan_k} + # Read and reshape the image data + st: np.ndarray = image.read() # Shape: [bands, rows, cols] + st_reorder: np.ndarray = np.moveaxis(st, 0, -1) # Shape: [rows, cols, bands] + arr: np.ndarray = st_reorder.reshape((rows * cols, bands)) # Shape: [rows*cols, bands] - all_algorithms = ("auto", "elkan") + # Handle nodata values with fuzzy matching + if np.isnan(nodata): + nodata_mask = np.isnan(arr).any(axis=1) + else: + nodata_mask = np.isclose(arr, nodata).any(axis=1) + + # Extract valid data (rows without nodata) + valid_data: np.ndarray = arr[~nodata_mask] - for model in all_algorithms: - if model not in algo: - del dic[model] - - find_k = dic - - return find_k - - # nc = range(1,10) - # kmeanlist = [KMeans(n_clusters = i) for i in Nc] - # variance = [kmeanlist[i].fit(datos).inertia_ for i in range(len(kmeanlist))] - - elif isinstance(k, (int)): - - if 'auto' in algo: - - inertias_inter_lloyd = [] - - for i in range(n_iter): - - kmeanModel = KMeans(n_clusters = k, max_iter = max_iter, algorithm = 'auto', **kwargs) - - kmeanModel.fit(arr) + # Check if there is any valid data + if valid_data.size == 0: + raise ValueError("No valid data found to perform clustering.") - inertias_inter_lloyd.append(kmeanModel.score(arr)) - - if 'elkan' in algo: - - inertias_inter_elkan = [] - - for i in range(n_iter): - - kmeanModel = KMeans(n_clusters = k, max_iter = max_iter, algorithm = 'elkan', **kwargs) - - kmeanModel.fit(arr) + # Validate algorithms + valid_algorithms = ("lloyd", "elkan") + for algorithm in algo: + if algorithm not in valid_algorithms: + raise ValueError(f"Invalid algorithm '{algorithm}'. Must be one of {valid_algorithms}.") - inertias_inter_elkan.append(kmeanModel.inertia_) - - dic = {'inertias_inter_lloyd': inertias_inter_lloyd, - 'inertias_inter_elkan': inertias_inter_elkan} + results: Dict[str, List[float]] = {} + + if k is None: + # Finding the optimal number of clusters + for algorithm in algo: + inertias: List[float] = [] + for n_clusters in range(1, n_iter + 1): + kmeans = KMeans(n_clusters=n_clusters, max_iter=max_iter, algorithm=algorithm, **kwargs) + kmeans.fit(valid_data) + inertias.append(kmeans.inertia_) + results[algorithm] = inertias + return results - all_algorithms = ("auto", "elkan") + elif isinstance(k, int) and k > 0: + # Evaluating algorithms for a given k + for algorithm in algo: + inertias: List[float] = [] + for _ in range(n_iter): + kmeans = KMeans(n_clusters=k, max_iter=max_iter, algorithm=algorithm, **kwargs) + kmeans.fit(valid_data) + inertias.append(kmeans.inertia_) + results[algorithm] = inertias + return results - for model in all_algorithms: - if model not in algo: - del dic[model] - - best_algorithm = dic - - return best_algorithm - else: - raise TypeError(f'{k} must be None or positive integer. type = {type(k)}') + raise TypeError(f"'k' must be None or a positive integer. Got {k} of type {type(k)}.") diff --git a/tests/test_scikeo.py b/tests/test_scikeo.py index 0137d04..bfc83f4 100644 --- a/tests/test_scikeo.py +++ b/tests/test_scikeo.py @@ -6,6 +6,7 @@ import numpy as np import pytest import rasterio +from rasterio.io import MemoryFile import pandas as pd from scikeo.process import confintervalML from scikeo.sma import sma @@ -19,6 +20,7 @@ from dbfread import DBF from scikeo.calmla import calmla from scikeo.deeplearning import DL +from scikeo.calkmeans import calkmeans def test_confintervalML(): @@ -158,4 +160,115 @@ def test_DeepLearning(): batch_size = 32, training_split = 0.8) - assert fc.get('Overall_Accuracy') >= 0.7 + assert fc.get('Overall_Accuracy') >= 0.6 + +def test_calkmeans(): + # Create synthetic image data + width, height, bands = 100, 100, 3 # Image dimensions and number of bands + np.random.seed(42) # For reproducibility + + # Generate synthetic image with three clusters + cluster_centers = np.array([ + [50, 50, 50], + [150, 150, 150], + [250, 250, 250] + ]) + + total_pixels = height * width # 10000 pixels + num_clusters = len(cluster_centers) # 3 clusters + + # Calculate base size and remainder + base_size = total_pixels // num_clusters # 3333 pixels per cluster + remainder = total_pixels % num_clusters # 1 pixel remaining + + # Initialize cluster sizes + cluster_sizes = [base_size] * num_clusters # [3333, 3333, 3333] + + # Distribute the remainder + for i in range(remainder): + cluster_sizes[i] += 1 # Now cluster_sizes = [3334, 3333, 3333] + + cluster_data = [] + for center, size in zip(cluster_centers, cluster_sizes): + data = np.random.normal(loc=center, scale=10, size=(size, bands)) + cluster_data.append(data) + + # Combine and shuffle the data + image_data = np.vstack(cluster_data) + np.random.shuffle(image_data) + image_data = image_data.reshape((height, width, bands)).astype(np.float32) + + # Introduce nodata values + nodata_value = -99999.0 + image_data[0, 0, :] = nodata_value # Set the first pixel to nodata + + # Create an in-memory raster dataset + transform = rasterio.transform.from_origin(0, 0, 1, 1) # Arbitrary transform + with MemoryFile() as memfile: + # Open the in-memory file in write mode + with memfile.open( + driver='GTiff', + height=height, + width=width, + count=bands, + dtype='float32', + transform=transform, + nodata=nodata_value + ) as dataset_writer: + # Write data to the dataset + for i in range(bands): + dataset_writer.write(image_data[:, :, i], i + 1) + # Note: No need to call dataset_writer.close() explicitly within 'with' block + + # Now reopen the dataset in read mode + with memfile.open() as dataset: + # dataset_reader is a DatasetReader + + # Run the calkmeans function + # Case 1: Find the optimal number of clusters + results_optimal_k = calkmeans( + image=dataset, + k=None, + algo=('lloyd', 'elkan'), + n_iter=5, + nodata=nodata_value + ) + + # Case 2: Evaluate algorithms for a given k + k_value = 3 + results_given_k = calkmeans( + image=dataset, + k=k_value, + algo=('lloyd', 'elkan'), + n_iter=5, + nodata=nodata_value + ) + + # Case 2: Evaluate algorithms for a given k + k_value = 3 + results_given_k = calkmeans( + image=dataset, + k=k_value, + algo=('lloyd', 'elkan'), + n_iter=5, + nodata=nodata_value + ) + + # Assertions to check the outputs + # Case 1: Check if results contain expected keys and correct number of inertia values + assert isinstance(results_optimal_k, dict), "Results should be a dictionary." + for algorithm in ('lloyd', 'elkan'): + assert algorithm in results_optimal_k, f"Algorithm '{algorithm}' should be in results." + inertias = results_optimal_k[algorithm] + assert len(inertias) == 5, "Inertia list should have length equal to n_iter." + assert all(isinstance(val, float) for val in inertias), "Inertias should be floats." + + # Case 2: Check if results contain expected keys and correct number of inertia values + assert isinstance(results_given_k, dict), "Results should be a dictionary." + for algorithm in ('lloyd', 'elkan'): + assert algorithm in results_given_k, f"Algorithm '{algorithm}' should be in results." + inertias = results_given_k[algorithm] + assert len(inertias) == 5, "Inertia list should have length equal to n_iter." + assert all(isinstance(val, float) for val in inertias), "Inertias should be floats." + + print("All tests passed.")