"""
_processing/cluster.py
========================
.. module:: cluster
:platform: Unix
:synopsis: Initialization of the exploration package for hsi-wizard.
Module Overview
---------------
This module initializes the cluster functions of the hsi-wizard package.
Functions
---------
.. autofunction:: isodata
.. autofunction:: smooth_kmneas
Credits
-------
The Isodata code was inspired by:
- Repository: pyRadar
- Author/Organization: PyRadar
- Original repository: https://github.com/PyRadar/pyradar/
"""
import numpy as np
from scipy.cluster.vq import vq
from scipy.ndimage import uniform_filter, gaussian_filter
from typing import Tuple
from typing import Optional
from sklearn.cluster import KMeans, AgglomerativeClustering
from sklearn.metrics import pairwise_distances
from sklearn.decomposition import PCA
from sklearn.feature_extraction.image import grid_to_graph
from scipy.signal import convolve2d
def _quit_low_change_in_clusters(centers: np.ndarray, last_centers: np.ndarray, theta_o: float) -> bool:
"""
Determine if cluster update change is below the threshold to stop iteration.
Compares current and previous cluster centers and stops if all changes are below `theta_o`.
Parameters
----------
centers : np.ndarray
Current cluster centers.
last_centers : np.ndarray
Cluster centers from the previous iteration.
theta_o : float
Threshold for the percent change in cluster centers.
Returns
-------
bool
True if change is below the threshold and iteration should stop, False otherwise.
"""
qt = False
if centers.shape == last_centers.shape:
thresholds = np.abs((centers - last_centers) / (last_centers + 1))
if np.all(thresholds <= theta_o): # percent of change in [0:1]
qt = True
return qt
def _discard_clusters(img_class_flat: np.ndarray, centers: np.ndarray, clusters_list: np.ndarray, theta_m: int) -> Tuple[np.ndarray, np.ndarray, int]:
"""
Remove clusters with fewer members than the minimum threshold.
Parameters
----------
img_class_flat : np.ndarray
Flattened image class labels.
centers : np.ndarray
Cluster centers.
clusters_list : np.ndarray
List of cluster labels.
theta_m : int
Minimum number of pixels per cluster.
Returns
-------
Tuple[np.ndarray, np.ndarray, int]
Updated centers, cluster list, and number of clusters.
"""
k_ = centers.shape[0]
to_delete = np.array([])
assert centers.shape[0] == clusters_list.size, \
"ERROR: discard_cluster() centers and clusters_list size are different"
for cluster in range(k_):
indices = np.where(img_class_flat == clusters_list[cluster])[0]
total_per_cluster = indices.size
if total_per_cluster <= theta_m:
to_delete = np.append(to_delete, cluster)
if to_delete.size:
to_delete = np.array(to_delete, dtype=int)
new_centers = np.delete(centers, to_delete, axis=0)
new_clusters_list = np.delete(clusters_list, to_delete)
else:
new_centers = centers
new_clusters_list = clusters_list
# new_centers, new_clusters_list = sort_arrays_by_first(new_centers, new_clusters_list)
assert new_centers.shape[0] == new_clusters_list.size, \
"ERROR: discard_cluster() centers and clusters_list size are different"
return new_centers, new_clusters_list, k_
def _update_clusters(img_flat: np.ndarray, img_class_flat: np.ndarray, centers: np.ndarray, clusters_list: np.ndarray) -> Tuple[np.ndarray, np.ndarray, int]:
"""
Update cluster centers based on the current assignments.
Parameters
----------
img_flat : np.ndarray
Flattened image pixels.
img_class_flat : np.ndarray
Flattened image class labels.
centers : np.ndarray
Current cluster centers.
clusters_list : np.ndarray
List of cluster labels.
Returns
-------
Tuple[np.ndarray, np.ndarray, int]
Updated centers, cluster list, and number of clusters.
"""
k_ = centers.shape[0]
new_centers = np.zeros((k_, img_flat.shape[1]))
new_clusters_list = np.array([])
if centers.shape[0] != clusters_list.size:
raise ValueError(
"ERROR: update_clusters() centers and clusters_list size are different"
)
for cluster in range(k_):
indices = np.where(img_class_flat == clusters_list[cluster])[0]
# get whole cluster
cluster_values = img_flat[indices, :]
new_cluster = cluster_values.mean(axis=0)
new_centers[cluster, :] = new_cluster
new_clusters_list = np.append(new_clusters_list, cluster)
new_centers, new_clusters_list = _sort_arrays_by_first(new_centers, new_clusters_list)
if new_centers.shape[0] != new_clusters_list.size:
raise ValueError(
"ERROR: update_clusters() centers and clusters_list size are different after sorting"
)
return new_centers, new_clusters_list, k_
def _initial_clusters(img_flat: np.ndarray, k_: int, method: str = "linspace") -> np.ndarray | None:
"""
Generate initial cluster centers.
Parameters
----------
img_flat : np.ndarray
Flattened image pixels.
k_ : int
Number of clusters.
method : str, optional
Initialization method: 'linspace' or 'random'.
Returns
-------
np.ndarray | None
Initial cluster centers or None on error.
"""
methods_available = ["linspace", "random"]
v = img_flat.shape[1]
assert method in methods_available, f"ERROR: method {method} is not valid."
if method == "linspace":
maximum, minimum = img_flat.max(axis=0), img_flat.min(axis=0)
centers = np.array([np.linspace(minimum[i], maximum[i], k_) for i in range(v)]).T
elif method == "random":
start, end = 0, img_flat.shape[0]
indices = np.random.randint(start, end, k_)
centers = img_flat[indices]
else:
return None
return centers
def _sort_arrays_by_first(centers: np.ndarray, clusters_list: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Sort centers and cluster labels based on the first feature.
Parameters
----------
centers : np.ndarray
Cluster centers.
clusters_list : np.ndarray
Cluster label array.
Returns
-------
Tuple[np.ndarray, np.ndarray]
Sorted centers and cluster list.
"""
assert centers.shape[0] == clusters_list.size, \
"ERROR: sort_arrays_by_first centers and clusters_list size are not equal"
indices = np.argsort(centers[:, 0])
sorted_centers = centers[indices, :]
sorted_clusters_list = clusters_list[indices]
return sorted_centers, sorted_clusters_list
def _split_clusters(img_flat: np.ndarray, img_class_flat: np.ndarray, centers: np.ndarray, clusters_list: np.ndarray, theta_s: float, theta_m: int) -> Tuple[np.ndarray, np.ndarray, int]:
"""
Split a cluster if its standard deviation exceeds a threshold.
Parameters
----------
img_flat : np.ndarray
Flattened image pixels.
img_class_flat : np.ndarray
Flattened image class labels.
centers : np.ndarray
Cluster centers.
clusters_list : np.ndarray
Cluster label array.
theta_s : float
Standard deviation threshold for splitting.
theta_m : int
Minimum pixel count per cluster.
Returns
-------
Tuple[np.ndarray, np.ndarray, int]
Updated centers, cluster list, and number of clusters.
"""
assert centers.shape[0] == clusters_list.size, "ERROR: split() centers and clusters_list size are different"
delta = 10
k_ = centers.shape[0]
count_per_cluster = np.zeros(k_)
stddev = np.array([])
avg_dists_to_clusters, k_ = _compute_avg_distance(img_flat, img_class_flat, centers, clusters_list)
d, k_ = _compute_overall_distance(img_class_flat, avg_dists_to_clusters, clusters_list)
# compute all the standard deviation of the clusters
for cluster in range(k_):
indices = np.where(img_class_flat == clusters_list[cluster])[0]
count_per_cluster[cluster] = indices.size
value = ((img_flat[indices] - centers[cluster]) ** 2).sum()
value /= count_per_cluster[cluster]
value = np.sqrt(value)
stddev = np.append(stddev, value)
cluster = stddev.argmax()
max_stddev = stddev[cluster]
max_clusters_list = int(clusters_list.max())
if max_stddev > theta_s:
if avg_dists_to_clusters[cluster] >= d:
if count_per_cluster[cluster] > (2.0 * theta_m):
old_cluster = centers[cluster, :]
new_cluster_1 = old_cluster + delta
new_cluster_1 = new_cluster_1.reshape(1, -1)
new_cluster_2 = old_cluster - delta
new_cluster_2 = new_cluster_2.reshape(1, -1)
centers = np.delete(centers, cluster, axis=0)
clusters_list = np.delete(clusters_list, cluster)
centers = np.concatenate((centers, new_cluster_1), axis=0)
centers = np.concatenate((centers, new_cluster_2), axis=0)
clusters_list = np.append(clusters_list, max_clusters_list + 1)
clusters_list = np.append(clusters_list, max_clusters_list + 2)
centers, clusters_list = _sort_arrays_by_first(centers, clusters_list)
assert centers.shape[0] == clusters_list.size, \
"ERROR: split() centers and clusters_list size are different"
return centers, clusters_list, k_
def _compute_avg_distance(img_flat: np.ndarray, img_class_flat: np.ndarray, centers: np.ndarray, clusters_list: np.ndarray) -> Tuple[np.ndarray, int]:
"""
Compute average intra-cluster distance.
Parameters
----------
img_flat : np.ndarray
Flattened image pixels.
img_class_flat : np.ndarray
Flattened image class labels.
centers : np.ndarray
Cluster centers.
clusters_list : np.ndarray
Cluster label array.
Returns
-------
Tuple[np.ndarray, int]
Array of average distances and cluster count.
"""
k_ = centers.shape[0]
avg_dists_to_clusters = np.zeros(k_)
for cluster in range(k_):
indices = np.where(img_class_flat == clusters_list[cluster])[0]
cluster_points = img_flat[indices]
avg_dists_to_clusters[cluster] = np.mean(np.linalg.norm(cluster_points - centers[cluster], axis=1))
return avg_dists_to_clusters, k_
def _compute_overall_distance(img_class_flat: np.ndarray, avg_dists_to_clusters: np.ndarray, clusters_list: np.ndarray) -> Tuple[float, int]:
"""
Calculate overall weighted distance from cluster centers.
Parameters
----------
img_class_flat : np.ndarray
Flattened image class labels.
avg_dists_to_clusters : np.ndarray
Average distances per cluster.
clusters_list : np.ndarray
Cluster label array.
Returns
-------
Tuple[float, int]
Overall distance and number of clusters.
"""
k_ = avg_dists_to_clusters.size
total_count = 0
total_dist = 0
for cluster in range(k_):
nbr_points = len(np.where(img_class_flat == clusters_list[cluster])[0])
total_dist += avg_dists_to_clusters[cluster] * nbr_points
total_count += nbr_points
d = total_dist / total_count
return d, k_
def _merge_clusters(img_class_flat: np.ndarray, centers: np.ndarray, clusters_list: np.ndarray, p: int, theta_c: int, k_: int) -> Tuple[np.ndarray, np.ndarray, int]:
"""
Merge nearby clusters if their distance is below a threshold.
Parameters
----------
img_class_flat : np.ndarray
Flattened image class labels.
centers : np.ndarray
Cluster centers.
clusters_list : np.ndarray
Cluster label array.
p : int
Max number of merge candidates.
theta_c : int
Distance threshold for merging.
k_ : int
Current number of clusters.
Returns
-------
Tuple[np.ndarray, np.ndarray, int]
Updated centers, cluster list, and number of clusters.
"""
pair_dists = _compute_pairwise_distances(centers)
first_p_elements = pair_dists[:p]
below_threshold = [(c1, c2) for d, (c1, c2) in first_p_elements if d < theta_c]
if below_threshold:
k_ = centers.size
count_per_cluster = np.zeros(k_)
to_add = np.array([]) # new clusters to add
to_delete = np.array([]) # clusters to delete
for cluster in range(k_):
result = np.where(img_class_flat == clusters_list[cluster])
indices = result[0]
count_per_cluster[cluster] = indices.size
for c1, c2 in below_threshold:
c1_count = float(count_per_cluster[c1]) + 1
c2_count = float(count_per_cluster[c2])
factor = 1.0 / (c1_count + c2_count)
weight_c1 = c1_count * centers[c1]
weight_c2 = c2_count * centers[c2]
value = round(factor * (weight_c1 + weight_c2))
to_add = np.append(to_add, value)
to_delete = np.append(to_delete, [c1, c2])
# delete old clusters and their indices from the available array
centers = np.delete(centers, to_delete)
clusters_list = np.delete(clusters_list, to_delete)
# generate new indices for the new clusters
# starting from the max index 'to_add.size' times
start = int(clusters_list.max())
end = to_add.size + start
centers = np.append(centers, to_add)
clusters_list = np.append(clusters_list, range(start, end))
centers, clusters_list = _sort_arrays_by_first(centers, clusters_list)
return centers, clusters_list, k_
def _compute_pairwise_distances(centers: np.ndarray) -> list:
"""
Compute all pairwise distances between cluster centers.
Parameters
----------
centers : np.ndarray
Cluster centers.
Returns
-------
list
Sorted list of (distance, (cluster1, cluster2)) tuples.
"""
pair_dists = []
for i in range(centers.shape[0]):
for j in range(i):
# Compute the Euclidean distance using np.linalg.norm
d = np.linalg.norm(centers[i] - centers[j])
pair_dists.append((d, (i, j)))
# Sort by the computed distance (the first element in the tuple)
return sorted(pair_dists, key=lambda x: x[0])
[docs]
def isodata(dc, k: int = 10, it: int = 10, p: int = 2, theta_m: int = 10,
theta_s: float = 0.1, theta_c: int = 2, theta_o: float = 0.05,
k_: Optional[int] = None) -> np.ndarray:
"""
Classify hyperspectral image data using the ISODATA clustering algorithm.
Performs iterative clustering on a hyperspectral DataCube, adapting the number of clusters
by splitting and merging based on data distribution, until convergence or iteration limit is reached.
Parameters
----------
dc : DataCube
DataCube object containing the hyperspectral image with shape (v, x, y).
k : int, optional
Initial number of clusters to begin clustering. Default is 10.
it : int, optional
Maximum number of iterations. Default is 10.
p : int, optional
Maximum number of cluster pairs allowed to merge. Default is 2.
theta_m : int, optional
Minimum number of pixels required per cluster. Default is 10.
theta_s : float, optional
Threshold for standard deviation to trigger cluster splitting. Default is 0.1.
theta_c : int, optional
Distance threshold for merging clusters. Default is 2.
theta_o : float, optional
Threshold for minimum change in cluster centers to stop iteration. Default is 0.05.
k_ : int, optional
Alternative number of clusters to override initial `k`. Default is None.
Returns
-------
np.ndarray
2D array (x, y) with cluster labels assigned to each pixel.
Raises
------
ValueError
If intermediate clustering steps produce inconsistent dimensions or invalid data.
Notes
-----
This implementation adapts clustering during iterations by merging or splitting clusters,
based on the intra-cluster statistics and spatial constraints. The algorithm stops early
if cluster centers converge according to `theta_o`.
Examples
--------
>>> labels = isodata(dc, k=5, it=15)
"""
img = np.transpose(dc.cube, (1, 2, 0)) # Rearrange cube dimensions to (H, W, Channels)
if k_ is None:
k_ = k
x, y, _ = img.shape
img_flat = img.reshape(-1, img.shape[2]) # Flatten spatial dimensions
clusters_list = np.arange(k_)
centers = _initial_clusters(img_flat, k_, "linspace")
for i in range(it):
last_centers = centers.copy()
# Assign samples to the nearest cluster center
img_class_flat, _ = vq(img_flat, centers)
# Discard underpopulated clusters
centers, clusters_list, k_ = _discard_clusters(img_class_flat, centers, clusters_list, theta_m)
# Update cluster centers
centers, clusters_list, k_ = _update_clusters(img_flat, img_class_flat, centers, clusters_list)
# Handle excessive or insufficient clusters
if k_ <= (k / 2.0): # Too few clusters -> Split clusters
centers, clusters_list, k_ = _split_clusters(img_flat, img_class_flat, centers, clusters_list, theta_s, theta_m)
elif k_ > (k * 2.0): # Too many clusters -> Merge clusters
centers, clusters_list, k_ = _merge_clusters(img_class_flat, centers, clusters_list, p, theta_c, k_)
# Terminate early if cluster changes are minimal
if _quit_low_change_in_clusters(centers, last_centers, theta_o):
break
return img_class_flat.reshape(x, y)
def _generate_gaussian_kernel(size=3, sigma=1.0):
"""
Generate a 2D Gaussian kernel for image smoothing.
Creates a square kernel using the Gaussian function, which can be used
for spatial smoothing via convolution operations.
Parameters
----------
size : int, optional
Size of the kernel (must be odd). Default is 3.
sigma : float, optional
Standard deviation of the Gaussian distribution. Default is 1.0.
Returns
-------
np.ndarray
2D array representing the normalized Gaussian kernel.
Notes
-----
The kernel is normalized so that the sum of all elements equals 1. The
kernel is computed as the outer product of two 1D Gaussian vectors.
Examples
--------
>>> kernel = _generate_gaussian_kernel(size=5, sigma=1.5)
>>> print(kernel.shape)
(5, 5)
"""
ax = np.linspace(-(size // 2), size // 2, size)
gauss = np.exp(-0.5 * (ax / sigma) ** 2)
kernel = np.outer(gauss, gauss)
return kernel / kernel.sum()
def _optimal_clusters(pixels, max_clusters=5, threshold=0.1) -> int:
"""
Estimate the optimal number of KMeans clusters using centroid distance threshold.
Iteratively fits KMeans with increasing cluster counts and evaluates the minimum
distance between centroids. Clustering stops when the closest centroids are within
the specified distance threshold, indicating excessive overlap.
Parameters
----------
pixels : np.ndarray
2D array of shape (n_samples, n_features), representing the flattened image spectra.
max_clusters : int, optional
Maximum number of clusters to evaluate. Default is 5.
threshold : float, optional
Minimum acceptable distance between any pair of cluster centroids. Default is 0.1.
Returns
-------
int
Optimal number of clusters where centroid spacing satisfies the distance threshold.
Raises
------
ValueError
If input `pixels` is not a 2D array or has insufficient samples.
Notes
-----
Uses pairwise Euclidean distances to determine centroid separation.
Cluster count starts from 2 up to `max_clusters`.
Examples
--------
>>> pixels = dc.cube.reshape(dc.cube.shape[0], -1).T
>>> k = _optimal_clusters(pixels, max_clusters=6, threshold=0.2)
>>> print(k)
4
"""
best_k = 2
for k in range(2, max_clusters + 1):
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
kmeans.fit(pixels)
centers = kmeans.cluster_centers_
dists = pairwise_distances(centers)
np.fill_diagonal(dists, np.inf) # Ignore self-distances
closest_dist = np.min(dists)
if closest_dist > threshold:
best_k = k
else:
break
return best_k
[docs]
def smooth_kmeans(dc, n_clusters=5, threshold=.1, mrf_iterations=5, kernel_size=12, sigma=1.0):
"""
Segment a hyperspectral DataCube using KMeans clustering with MRF-based spatial smoothing.
Performs initial clustering on spectral data using KMeans, followed by iterative Markov Random
Field (MRF)-based smoothing using Gaussian kernel convolution to enforce spatial consistency.
Parameters
----------
dc : DataCube
Hyperspectral data cube with shape (v, x, y), where `v` is spectral resolution.
n_clusters : int, optional
Maximum number of clusters for initial KMeans clustering. Default is 5.
threshold : float, optional
Minimum allowable distance between KMeans centroids to stop increasing cluster count. Default is 0.1.
mrf_iterations : int, optional
Number of spatial smoothing iterations using MRF regularization. Default is 5.
kernel_size : int, optional
Size of the Gaussian kernel used for smoothing. Must be an odd integer. Default is 12.
sigma : float, optional
Standard deviation of the Gaussian kernel. Default is 1.0.
Returns
-------
np.ndarray
2D array (x, y) of cluster labels after smoothing.
Raises
------
ValueError
If input DataCube is malformed or smoothing parameters are invalid.
Notes
-----
The function uses `optimal_clusters` to determine a suitable number of clusters
before applying KMeans. MRF smoothing is implemented by convolving binary masks
of each cluster with a Gaussian kernel and reassigning pixels based on weighted responses.
Examples
--------
>>> labels = smooth_kmeans(dc, n_clusters=6, mrf_iterations=3)
>>> plt.imshow(labels, cmap='viridis')
>>> plt.show()
"""
v, x, y = dc.shape
# Reshape cube for clustering
pixels = dc.cube.reshape(v, -1).T
# Determine optimal number of clusters
optimal_k = _optimal_clusters(pixels, max_clusters=n_clusters, threshold=threshold)
_kmeans = KMeans(n_clusters=optimal_k, random_state=42, n_init=10)
labels = _kmeans.fit_predict(pixels)
labels = labels.reshape(x, y)
# Generate dynamic Gaussian kernel
kernel = _generate_gaussian_kernel(size=kernel_size, sigma=sigma)
# Apply Markov Random Field-based spatial regularization
for _ in range(mrf_iterations):
smoothed_labels = np.zeros_like(labels, dtype=np.float64)
for cluster in range(optimal_k):
binary_mask = (labels == cluster).astype(np.float64)
smoothed_mask = convolve2d(binary_mask, kernel, mode='same', boundary='symm')
smoothed_labels += cluster * smoothed_mask
labels = np.round(smoothed_labels).astype(np.int32)
return labels
[docs]
def pca(dc, n_components=25):
"""
Perform principal component analysis to reduce the spectral dimensionality of a DataCube.
This function reshapes the DataCube’s underlying array from shape (v, x, y) to (x*y, v),
applies PCA to reduce the number of spectral bands to `n_components`, then reshapes
the result back to (n_components, x, y). It also updates the DataCube’s wavelengths
to a simple integer index for each new component.
Parameters
----------
dc : DataCube
The DataCube instance whose `cube` attribute (a numpy array of shape (v, x, y))
will be reduced in its spectral dimension.
n_components : int, optional
The number of principal components to retain. Defaults to 25.
Returns
-------
DataCube
The same DataCube instance, with its `cube` attribute replaced by the reduced
cube of shape (n_components, x, y) and its `wavelengths` attribute set to
integer indices from 0 to n_components-1.
Raises
------
ValueError
If `n_components` is greater than the original number of spectral bands (v).
Notes
-----
- Uses `sklearn.decomposition.PCA` under the hood.
- The new wavelengths are not actual physical wavelengths but simple indices.
- The transformation is done in-place on the provided DataCube.
Examples
--------
>>> import wizard
>>> dc = wizard.read("hyperspectral_data.cube")
>>> dc = pca(dc, n_components=10)
>>> print(dc.cube.shape)
(10, 512, 512)
"""
v, x, y = dc.cube.shape
cube = dc.cube
# Reshape to (n_pixels, v)
data = cube.reshape(v, -1).T # (x*y, v)
# PCA reduction
pca_model = PCA(n_components=n_components)
reduced = pca_model.fit_transform(data) # (x*y, n_components)
# Reshape back to (n_components, x, y)
reduced_cube = reduced.T.reshape(n_components, x, y)
wave = np.arange(n_components, dtype=int)
dc.set_cube(reduced_cube)
dc.set_wavelengths(wave)
return dc
[docs]
def spectral_spatial_kmeans(dc, n_clusters: int, spatial_radius: int) -> np.ndarray:
"""
Spectral–spatial K-Means clustering for hyperspectral images.
Applies K-Means to pixel spectra augmented by the mean spectrum of their local neighborhood.
Parameters
----------
dc : DataCube
Hyperspectral data cube with shape (v, x, y), where v is number of bands.
n_clusters : int
Number of clusters to form.
spatial_radius : int
Radius (in pixels) of the square neighborhood for local averaging.
Returns
-------
labels : np.ndarray of shape (x, y)
Integer cluster label for each pixel.
Raises
------
ValueError
If `spatial_radius < 0` or `n_clusters <= 0`.
Notes
-----
- Uses a uniform filter to compute the local mean spectrum for each pixel.
- Flattens the augmented spectral features and applies scikit-learn’s KMeans.
Examples
--------
>>> labels = spectral_spatial_kmeans(dc, n_clusters=5, spatial_radius=1)
"""
if spatial_radius < 0 or n_clusters <= 0:
raise ValueError("spatial_radius must be ≥0 and n_clusters >0")
# Compute local-mean cube
v, x, y = dc.cube.shape
local_cube = np.empty_like(dc.cube)
for band in range(v):
local_cube[band] = uniform_filter(dc.cube[band], size=2 * spatial_radius + 1, mode='reflect')
# Stack spectral + spatial-mean features
features = np.vstack([
dc.cube.reshape(v, -1),
local_cube.reshape(v, -1)
]).T # shape (x*y, 2v)
# Run KMeans
km = KMeans(n_clusters=n_clusters, random_state=0)
flat_labels = km.fit_predict(features)
return flat_labels.reshape(x, y)
[docs]
def spatial_agglomerative_clustering(dc, n_clusters: int) -> np.ndarray:
"""
Agglomerative clustering with a 4-connected grid graph enforcing spatial contiguity.
Flattens the spectral vectors and uses `grid_to_graph` for pixel connectivity,
so only spatial neighbors can merge.
Parameters
----------
dc : DataCube
Hyperspectral data cube (v, x, y).
n_clusters : int
Desired number of clusters.
Returns
-------
labels : np.ndarray of shape (x, y)
Connected clusters that respect spatial adjacency.
Notes
-----
- Uses `sklearn.feature_extraction.image.grid_to_graph` to build a sparse connectivity matrix over the x×y grid.
- May be more memory-intensive for large images.
Examples
--------
>>> labels = spatial_agglomerative_clustering(dc, n_clusters=8)
"""
v, x, y = dc.cube.shape
# Build connectivity graph on the 2D grid
connectivity = grid_to_graph(n_x=x, n_y=y)
# Flatten spectral data
data = dc.cube.reshape(v, -1).T # shape (x*y, v)
# Agglomerative clustering with spatial connectivity
agg = AgglomerativeClustering(n_clusters=n_clusters,
connectivity=connectivity,
linkage='ward')
flat_labels = agg.fit_predict(data)
return flat_labels.reshape(x, y)
[docs]
def smooth_cluster(img, sigma=1.0, n_iter=1):
"""
Smooth a cluster label image to remove mislabelled pixels.
Apply a Gaussian filter to the input cluster label image to reduce spurious
mislabelled pixels by smoothing label intensities, then round back to the
nearest integer labels.
Parameters
----------
img : numpy.ndarray
Integer label image of shape (H, W) or (H, W, ...).
sigma : float, optional
Standard deviation for Gaussian kernel. Default is 1.0.
n_iter: int, optional
Number of iterations to apply the Gaussian filter. Default is 1.
Returns
-------
numpy.ndarray
Smoothed label image with same shape and dtype as input.
Raises
------
TypeError
If img is not a numpy.ndarray.
ValueError
If img is empty.
Notes
-----
Internally, the image labels are converted to float, smoothed, then rounded
back to integer labels. This may remove small isolated noisy pixels.
Examples
--------
>>> import numpy as np
>>> img = np.array([[1,1,2],[1,2,2],[2,2,2]])
>>> smooth_cluster(img, sigma=0.5)
array([[1,1,2],
[1,2,2],
[2,2,2]])
"""
if not isinstance(img, np.ndarray):
raise TypeError("img must be a numpy.ndarray")
if img.size == 0:
raise ValueError("img must not be empty")
result = img.copy()
for _ in range(n_iter):
img_float = result.astype(np.float64)
smoothed = gaussian_filter(img_float, sigma=sigma)
result = np.rint(smoothed).astype(img.dtype)
return result
[docs]
def kmeans(dc, n_clusters=5, n_init=10):
"""
Perform KMeans clustering on a hyperspectral DataCube without spatial smoothing.
This function reshapes the spectral data into pixel vectors and applies KMeans to
segment the data into the specified number of clusters.
Parameters
----------
dc : DataCube
Hyperspectral data cube with shape (v, x, y), where `v` is the spectral resolution.
n_clusters : int
Number of clusters to form. Default is 5.
n_init : int
Number of time the k-means algorithm will be run with different centroid seeds.
Default is 10.
Returns
-------
np.ndarray
2D array of shape (x, y) containing cluster labels for each pixel.
Raises
------
ValueError
If the input DataCube is malformed or `n_clusters` or `n_init` are invalid.
Notes
-----
This function does not perform any spatial regularization or smoothing.
"""
# Validate inputs
if n_clusters < 1 or n_init < 1:
raise ValueError("`n_clusters` and `n_init` must be positive integers.")
# Reshape cube for clustering
v, x, y = dc.cube.shape
pixels = dc.cube.reshape(v, -1).T
# Apply KMeans clustering
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=n_init)
labels_flat = kmeans.fit_predict(pixels)
# Reshape back to image dimensions
return labels_flat.reshape(x, y)