Skip to content

Commit

Permalink
Merge pull request #36 from robmarkcole/update-calkmeans
Browse files Browse the repository at this point in the history
Update calkmeans
  • Loading branch information
yotarazona authored Oct 17, 2024
2 parents 866acb6 + 0d71f8b commit 4ab2416
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 141 deletions.
221 changes: 81 additions & 140 deletions scikeo/calkmeans.py
Original file line number Diff line number Diff line change
@@ -1,159 +1,100 @@
# -*- coding: utf-8 -*-
# +
from typing import Optional, Tuple, Dict, List
from sklearn.cluster import KMeans
import rasterio
import numpy as np

def calkmeans(image, k = None, algo = ("auto", "elkan"), max_iter = 300, n_iter = 10, nodata = -99999, **kwargs):

def calkmeans(
image: rasterio.io.DatasetReader,
k: Optional[int] = None,
algo: Tuple[str, ...] = ("lloyd", "elkan"),
max_iter: int = 300,
n_iter: int = 10,
nodata: float = -99999.0,
**kwargs
) -> Dict[str, List[float]]:
'''
Calibrating kmeans
This function allows to calibrate the kmeans algorithm. It is possible to obtain
the best 'k' value and the best embedded algorithm in KMmeans.
Calibrating KMeans Clustering Algorithm
This function calibrates the KMeans algorithm for satellite image classification.
It can either find the optimal number of clusters (k) by evaluating the inertia
over a range of cluster numbers or determine the best algorithm ('lloyd' or 'elkan')
for a given k.
Parameters:
image: Optical images. It must be rasterio.io.DatasetReader with 3d.
k: k This argument is None when the objective is to obtain the best 'k' value.
f the objective is to select the best algorithm embedded in kmeans, please specify a 'k' value.
max_iter: The maximum number of iterations allowed. Strictly related to KMeans. Please see
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html
algo: It can be "auto" and 'elkan'. "auto" and "full" are deprecated and they will be
removed in Scikit-Learn 1.3. They are both aliases for "lloyd".
Changed in version 1.1: Renamed “full” to “lloyd”, and deprecated “auto” and “full”.
Changed “auto” to use “lloyd” instead of “elkan”.
n_iter: Iterations number to obtain the best 'k' value. 'n_iter' must be greater than the
number of classes expected to be obtained in the classification. Default is 10.
nodata: The NoData value to replace with -99999.
**kwargs: These will be passed to scikit-learn KMeans, please see full lists at:
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html.
Return:
Labels of classification as numpy object with 2d.
Note:
If the idea is to find the optimal value of 'k' (clusters or classes), k = None as an
argument of the function must be put, because the function find 'k' for which the intra-class
inertia is stabilized. If the 'k' value is known and the idea is to find the best algorithm
embedded in kmeans (that maximizes inter-class distances), k = n, which 'n' is a specific
class number, must be put. It can be greater than or equal to 0.
image (rasterio.io.DatasetReader): Optical image with 3D data.
k (Optional[int]): The number of clusters. If None, the function finds the optimal k.
algo (Tuple[str, ...]): Algorithms to evaluate ('lloyd', 'elkan').
max_iter (int): Maximum iterations for KMeans (default 300).
n_iter (int): Number of iterations or clusters to evaluate (default 10).
nodata (float): The NoData value to identify and handle in the data.
**kwargs: Additional arguments passed to sklearn.cluster.KMeans.
Returns:
Dict[str, List[float]]: A dictionary with algorithm names as keys and lists of
inertia values as values.
Notes:
- If k is None, the function evaluates inertia for cluster numbers from 1 to n_iter.
- If k is provided, the function runs KMeans n_iter times for each algorithm to
evaluate their performance.
- The function handles NoData values using fuzzy matching to account for floating-point precision.
'''

if not isinstance(image, (rasterio.io.DatasetReader)):
if not isinstance(image, rasterio.io.DatasetReader):
raise TypeError('"image" must be raster read by rasterio.open().')

bands = image.count

rows = image.height

cols = image.width

# stack images
#l = []
#for i in np.arange(1, bands+1): l.append(image.read(int(i)))
#st = np.stack(l)
st = image.read()

# data in [rows, cols, bands]
st_reorder = np.moveaxis(st, 0, -1)
# data in [rows*cols, bands]
arr = st_reorder.reshape((rows*cols, bands))

# nodata
if np.isnan(np.sum(arr)):
arr[np.isnan(arr)] = self.nodata

if k is None:

if 'auto' in algo:

inertias_intra_lloyd_k = []

for i in range(1, n_iter + 1):

# Building and fitting the model
kmeanModel = KMeans(n_clusters = i, max_iter = max_iter, algorithm = 'auto', **kwargs)

kmeanModel.fit(arr)

inertias_intra_lloyd_k.append(kmeanModel.inertia_)

if 'elkan' in algo:

inertias_intra_elkan_k = []

for i in range(1, n_iter + 1):

# Building and fitting the model
kmeanModel = KMeans(n_clusters = i, max_iter = max_iter, algorithm = 'elkan', **kwargs)

kmeanModel.fit(arr)
bands: int = image.count
rows: int = image.height
cols: int = image.width

inertias_intra_elkan_k.append(kmeanModel.inertia_)
dic = {'inertias_intra_lloyd_k': inertias_intra_lloyd_k,
'inertias_intra_elkan_k': inertias_intra_elkan_k}
# Read and reshape the image data
st: np.ndarray = image.read() # Shape: [bands, rows, cols]
st_reorder: np.ndarray = np.moveaxis(st, 0, -1) # Shape: [rows, cols, bands]
arr: np.ndarray = st_reorder.reshape((rows * cols, bands)) # Shape: [rows*cols, bands]

all_algorithms = ("auto", "elkan")
# Handle nodata values with fuzzy matching
if np.isnan(nodata):
nodata_mask = np.isnan(arr).any(axis=1)
else:
nodata_mask = np.isclose(arr, nodata).any(axis=1)

# Extract valid data (rows without nodata)
valid_data: np.ndarray = arr[~nodata_mask]

for model in all_algorithms:
if model not in algo:
del dic[model]

find_k = dic

return find_k

# nc = range(1,10)
# kmeanlist = [KMeans(n_clusters = i) for i in Nc]
# variance = [kmeanlist[i].fit(datos).inertia_ for i in range(len(kmeanlist))]

elif isinstance(k, (int)):

if 'auto' in algo:

inertias_inter_lloyd = []

for i in range(n_iter):

kmeanModel = KMeans(n_clusters = k, max_iter = max_iter, algorithm = 'auto', **kwargs)

kmeanModel.fit(arr)
# Check if there is any valid data
if valid_data.size == 0:
raise ValueError("No valid data found to perform clustering.")

inertias_inter_lloyd.append(kmeanModel.score(arr))

if 'elkan' in algo:

inertias_inter_elkan = []

for i in range(n_iter):

kmeanModel = KMeans(n_clusters = k, max_iter = max_iter, algorithm = 'elkan', **kwargs)

kmeanModel.fit(arr)
# Validate algorithms
valid_algorithms = ("lloyd", "elkan")
for algorithm in algo:
if algorithm not in valid_algorithms:
raise ValueError(f"Invalid algorithm '{algorithm}'. Must be one of {valid_algorithms}.")

inertias_inter_elkan.append(kmeanModel.inertia_)

dic = {'inertias_inter_lloyd': inertias_inter_lloyd,
'inertias_inter_elkan': inertias_inter_elkan}
results: Dict[str, List[float]] = {}

if k is None:
# Finding the optimal number of clusters
for algorithm in algo:
inertias: List[float] = []
for n_clusters in range(1, n_iter + 1):
kmeans = KMeans(n_clusters=n_clusters, max_iter=max_iter, algorithm=algorithm, **kwargs)
kmeans.fit(valid_data)
inertias.append(kmeans.inertia_)
results[algorithm] = inertias
return results

all_algorithms = ("auto", "elkan")
elif isinstance(k, int) and k > 0:
# Evaluating algorithms for a given k
for algorithm in algo:
inertias: List[float] = []
for _ in range(n_iter):
kmeans = KMeans(n_clusters=k, max_iter=max_iter, algorithm=algorithm, **kwargs)
kmeans.fit(valid_data)
inertias.append(kmeans.inertia_)
results[algorithm] = inertias
return results

for model in all_algorithms:
if model not in algo:
del dic[model]

best_algorithm = dic

return best_algorithm

else:
raise TypeError(f'{k} must be None or positive integer. type = {type(k)}')
raise TypeError(f"'k' must be None or a positive integer. Got {k} of type {type(k)}.")
115 changes: 114 additions & 1 deletion tests/test_scikeo.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import numpy as np
import pytest
import rasterio
from rasterio.io import MemoryFile
import pandas as pd
from scikeo.process import confintervalML
from scikeo.sma import sma
Expand All @@ -19,6 +20,7 @@
from dbfread import DBF
from scikeo.calmla import calmla
from scikeo.deeplearning import DL
from scikeo.calkmeans import calkmeans


def test_confintervalML():
Expand Down Expand Up @@ -158,4 +160,115 @@ def test_DeepLearning():
batch_size = 32,
training_split = 0.8)

assert fc.get('Overall_Accuracy') >= 0.7
assert fc.get('Overall_Accuracy') >= 0.6

def test_calkmeans():
# Create synthetic image data
width, height, bands = 100, 100, 3 # Image dimensions and number of bands
np.random.seed(42) # For reproducibility

# Generate synthetic image with three clusters
cluster_centers = np.array([
[50, 50, 50],
[150, 150, 150],
[250, 250, 250]
])

total_pixels = height * width # 10000 pixels
num_clusters = len(cluster_centers) # 3 clusters

# Calculate base size and remainder
base_size = total_pixels // num_clusters # 3333 pixels per cluster
remainder = total_pixels % num_clusters # 1 pixel remaining

# Initialize cluster sizes
cluster_sizes = [base_size] * num_clusters # [3333, 3333, 3333]

# Distribute the remainder
for i in range(remainder):
cluster_sizes[i] += 1 # Now cluster_sizes = [3334, 3333, 3333]

cluster_data = []
for center, size in zip(cluster_centers, cluster_sizes):
data = np.random.normal(loc=center, scale=10, size=(size, bands))
cluster_data.append(data)

# Combine and shuffle the data
image_data = np.vstack(cluster_data)
np.random.shuffle(image_data)
image_data = image_data.reshape((height, width, bands)).astype(np.float32)

# Introduce nodata values
nodata_value = -99999.0
image_data[0, 0, :] = nodata_value # Set the first pixel to nodata

# Create an in-memory raster dataset
transform = rasterio.transform.from_origin(0, 0, 1, 1) # Arbitrary transform
with MemoryFile() as memfile:
# Open the in-memory file in write mode
with memfile.open(
driver='GTiff',
height=height,
width=width,
count=bands,
dtype='float32',
transform=transform,
nodata=nodata_value
) as dataset_writer:
# Write data to the dataset
for i in range(bands):
dataset_writer.write(image_data[:, :, i], i + 1)
# Note: No need to call dataset_writer.close() explicitly within 'with' block

# Now reopen the dataset in read mode
with memfile.open() as dataset:
# dataset_reader is a DatasetReader

# Run the calkmeans function
# Case 1: Find the optimal number of clusters
results_optimal_k = calkmeans(
image=dataset,
k=None,
algo=('lloyd', 'elkan'),
n_iter=5,
nodata=nodata_value
)

# Case 2: Evaluate algorithms for a given k
k_value = 3
results_given_k = calkmeans(
image=dataset,
k=k_value,
algo=('lloyd', 'elkan'),
n_iter=5,
nodata=nodata_value
)

# Case 2: Evaluate algorithms for a given k
k_value = 3
results_given_k = calkmeans(
image=dataset,
k=k_value,
algo=('lloyd', 'elkan'),
n_iter=5,
nodata=nodata_value
)

# Assertions to check the outputs
# Case 1: Check if results contain expected keys and correct number of inertia values
assert isinstance(results_optimal_k, dict), "Results should be a dictionary."
for algorithm in ('lloyd', 'elkan'):
assert algorithm in results_optimal_k, f"Algorithm '{algorithm}' should be in results."
inertias = results_optimal_k[algorithm]
assert len(inertias) == 5, "Inertia list should have length equal to n_iter."
assert all(isinstance(val, float) for val in inertias), "Inertias should be floats."

# Case 2: Check if results contain expected keys and correct number of inertia values
assert isinstance(results_given_k, dict), "Results should be a dictionary."
for algorithm in ('lloyd', 'elkan'):
assert algorithm in results_given_k, f"Algorithm '{algorithm}' should be in results."
inertias = results_given_k[algorithm]
assert len(inertias) == 5, "Inertia list should have length equal to n_iter."
assert all(isinstance(val, float) for val in inertias), "Inertias should be floats."

print("All tests passed.")

0 comments on commit 4ab2416

Please sign in to comment.