"""
_utils/helper.py
=================
.. module:: helper
:platform: Unix
:synopsis: Helper functions for processing wave and cube values.
Module Overview
---------------
This module contains helper functions to assist in processing wave and cube values.
Functions
---------
.. autofunction:: find_nex_greater_wave
.. autofunction:: find_nex_smaller_wave
"""
import cv2
import warnings
import numpy as np
from skimage.feature import canny
class RegistrationError(Exception):
"""Custom exception for registration errors."""
pass
[docs]
def find_nex_greater_wave(waves, wave_1: int, maximum_deviation: int = 5) -> int:
"""
Find the next greater wave value within a specified deviation.
This function searches for the smallest wave value greater than or equal to `wave_1`
that exists in the `waves` list, within a deviation range up to `maximum_deviation`.
If no such wave is found, the function returns -1.
Parameters
----------
waves : list[int]
A list of integer wave values to search within.
wave_1 : int
The reference wave value to find the next greater wave after.
maximum_deviation : int, optional
The maximum positive offset from `wave_1` to consider (default is 5).
Returns
-------
int
The next greater wave value found within the range, or -1 if none exists.
Raises
------
None
Notes
-----
The function stops searching once it finds the first match within the allowed range.
Examples
--------
>>> find_nex_greater_wave([400, 405, 410, 415], 403)
405
>>> find_nex_greater_wave([400, 405, 410, 415], 416)
-1
"""
wave_next = -1
for n in range(maximum_deviation):
wave_n = wave_1 + n
if wave_n in waves:
wave_next = wave_n
break
return wave_next
[docs]
def find_nex_smaller_wave(waves, wave_1: int, maximum_deviation: int = 5) -> int:
"""
Find the next smaller wave value within a specified deviation.
This function searches for the largest wave value smaller than or equal to `wave_1`
that exists in the `waves` list, within a deviation range up to `maximum_deviation`.
If no such wave is found, the function returns -1.
Parameters
----------
waves : list[int]
A list of integer wave values to search within.
wave_1 : int
The reference wave value to find the next smaller wave before.
maximum_deviation : int, optional
The maximum negative offset from `wave_1` to consider (default is 5).
Returns
-------
int
The next smaller wave value found within the range, or -1 if none exists.
Raises
------
None
Notes
-----
The function stops searching once it finds the first match within the allowed range.
Examples
--------
>>> find_nex_smaller_wave([390, 395, 400, 405], 402)
400
>>> find_nex_smaller_wave([390, 395, 400, 405], 389)
-1
"""
wave_next = -1
for n in range(maximum_deviation):
wave_n = wave_1 - n
if wave_n in waves:
wave_next = wave_n
break
return wave_next
[docs]
def normalize_spec(spec):
"""
Normalize a spectrum to the range [0, 1].
This function scales the input spectral array so that its values lie between 0 and 1.
If the spectrum has constant values (i.e., no variation), it is returned unchanged.
The function ensures numerical stability using `np.clip`.
Parameters
----------
spec : np.ndarray
A one-dimensional NumPy array representing the spectrum to normalize.
Returns
-------
np.ndarray
A normalized NumPy array with values scaled to the [0, 1] range, or the original
array if it has no dynamic range.
Raises
------
None
Notes
-----
Normalization is only performed if the minimum and maximum values differ.
Examples
--------
>>> import numpy as np
>>> normalize_spec(np.array([2.0, 4.0, 6.0]))
array([0. , 0.5, 1. ])
>>> normalize_spec(np.array([5.0, 5.0, 5.0]))
array([5., 5., 5.])
"""
spec_min, spec_max = spec.min(), spec.max()
return np.clip((spec - spec_min) / (spec_max - spec_min), 0, 1) if spec_max > spec_min else spec
[docs]
def feature_regestration(o_img: np.ndarray, a_img: np.ndarray, max_features: int = 5000, match_percent: float = 0.1):
"""
Perform feature-based registration of two grayscale images.
This function aligns a moving image (`a_img`) to a reference image (`o_img`) using
ORB keypoints and brute-force Hamming descriptor matching. It returns the aligned image
and the computed homography transformation matrix.
Parameters
----------
o_img : np.ndarray
A 2D NumPy array representing the reference (original) grayscale image.
a_img : np.ndarray
A 2D NumPy array representing the image to be aligned (affine-transformed).
max_features : int, optional
The maximum number of ORB features to detect (default is 5000).
match_percent : float, optional
The fraction of best matches to use when computing the homography (default is 0.1).
Returns
-------
tuple[np.ndarray, np.ndarray]
A tuple containing:
- The aligned image (np.ndarray) warped to match the reference.
- The homography matrix (np.ndarray) used for the transformation.
Raises
------
ValueError
If feature matching fails or not enough good matches are found to compute homography.
Notes
-----
Images are automatically normalized and converted to 8-bit if needed. The function uses
RANSAC to compute a robust homography estimate from feature correspondences.
Examples
--------
>>> import numpy as np
>>> import cv2
>>> ref_img = cv2.imread("reference.png", cv2.IMREAD_GRAYSCALE)
>>> mov_img = cv2.imread("moving.png", cv2.IMREAD_GRAYSCALE)
>>> aligned, H = feature_regestration(ref_img, mov_img)
>>> print(H.shape)
(3, 3)
"""
orb = cv2.ORB_create(max_features)
if o_img.dtype != np.uint8:
o_img = (o_img - o_img.min()) / (o_img.max() - o_img.min())
o_img = (o_img * 255).astype(np.uint8)
if a_img.dtype != np.uint8:
a_img = (a_img - a_img.min()) / (a_img.max() - a_img.min())
a_img = (a_img * 255).astype(np.uint8)
a_img_key, a_img_descr = orb.detectAndCompute(a_img, None)
o_img_key, o_img_descr = orb.detectAndCompute(o_img, None)
# Match features.
matcher = cv2.DescriptorMatcher_create(cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_HAMMING)
matches = matcher.match(a_img_descr, o_img_descr, None)
matches = list(matches)
# Sort matches by score
matches = sorted(matches, key=lambda x: x.distance)
# Remove not so good matches
num_good_matches = int(len(matches) * match_percent)
matches = matches[: num_good_matches]
# Extract location of good matches
a_points = np.zeros((len(matches), 2), dtype=np.float32)
o_points = np.zeros((len(matches), 2), dtype=np.float32)
for i, match in enumerate(matches):
a_points[i, :] = a_img_key[match.queryIdx].pt
o_points[i, :] = o_img_key[match.trainIdx].pt
# Find homography
h, mask = cv2.findHomography(a_points, o_points, cv2.RANSAC)
# Use homography
height, width = o_img.shape
aligned_img = cv2.warpPerspective(a_img, h, (width, height))
return aligned_img, h
def feature_registration(o_img: np.ndarray, a_img: np.ndarray, max_features: int = 5000, match_percent: float = 0.1) -> tuple[np.ndarray, np.ndarray]:
"""
Perform ORB-based feature registration.
Aligns `a_img` to `o_img` using ORB features and RANSAC
to find the homography matrix.
Parameters
----------
o_img : numpy.ndarray
The target (reference) image.
a_img : numpy.ndarray
The source image to be aligned.
max_features : int, optional
Maximum number of ORB features to detect, defaults to 5000.
match_percent : float, optional
Percentage of best matches to use for homography, defaults to 0.1.
Returns
-------
aligned_img : numpy.ndarray
The `a_img` warped to align with `o_img`.
H : numpy.ndarray
The 3x3 homography matrix mapping points from `a_img` to `o_img`.
Raises
------
RegistrationError
If no descriptors are found, not enough good matches are found,
or homography estimation fails.
"""
def to_uint8(im: np.ndarray) -> np.ndarray:
"""Convert image to uint8, normalizing if necessary."""
if im.dtype != np.uint8:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=RuntimeWarning, message="invalid value encountered in true_divide")
warnings.filterwarnings("ignore", category=RuntimeWarning, message="invalid value encountered in divide")
min_val, max_val = im.min(), im.max()
if min_val == max_val:
im_norm = np.zeros_like(im, dtype=np.float32)
else:
im_norm = (im.astype(np.float32) - min_val) / (max_val - min_val)
im_norm = np.nan_to_num(im_norm, nan=0.0, posinf=1.0, neginf=0.0)
im = (im_norm * 255).astype(np.uint8)
return im
o8, a8 = to_uint8(o_img), to_uint8(a_img)
orb = cv2.ORB_create(max_features)
kp1, des1 = orb.detectAndCompute(a8, None)
kp2, des2 = orb.detectAndCompute(o8, None)
if des1 is None or des2 is None:
raise RegistrationError("No descriptors found")
matcher = cv2.DescriptorMatcher_create(cv2.DESCRIPTOR_MATCHER_BRUTEFORCE_HAMMING)
matches = matcher.match(des1, des2)
matches = sorted(matches, key=lambda m: m.distance)
n_good = max(4, int(len(matches) * match_percent))
matches = matches[:n_good]
if len(matches) < 4:
raise RegistrationError(f"Not enough good matches found ({len(matches)}/{n_good})")
ptsA = np.float32([kp1[m.queryIdx].pt for m in matches])
ptsO = np.float32([kp2[m.trainIdx].pt for m in matches])
H, mask = cv2.findHomography(ptsA, ptsO, cv2.RANSAC)
if H is None:
raise RegistrationError("Homography estimation failed")
H = H / H[2, 2]
h_o, w_o = o_img.shape
aligned = cv2.warpPerspective(a_img, H, (w_o, h_o), flags=cv2.INTER_LINEAR)
return aligned, H
def _process_slice(spec_out_flat: np.ndarray, spikes_flat: np.ndarray, idx: int, window: int) -> tuple:
"""
Process a single slice to remove spikes.
Replaces spikes with the mean of neighboring values within a given window.
Parameters
----------
spec_out_flat : numpy.ndarray
Flattened output spectrum data from the data cube.
spikes_flat : numpy.ndarray
Flattened boolean array indicating spike detections.
idx : int
Index of the current slice to process.
window : int
Size of the window for mean calculation.
Returns
-------
tuple
A tuple containing the index of the processed slice and the
modified spectrum slice.
"""
w_h = int(window / 2)
spike = spikes_flat[idx]
tmp = np.copy(spec_out_flat[idx])
for spk_idx in np.where(spike)[0]:
window_min = max(0, spk_idx - w_h)
window_max = min(len(tmp), spk_idx + w_h + 1)
if window_min == spk_idx:
window_data = tmp[spk_idx + 1:window_max]
elif window_max == spk_idx + 1:
window_data = tmp[window_min:spk_idx]
else:
window_data = np.concatenate((tmp[window_min:spk_idx], tmp[spk_idx + 1:window_max]))
if len(window_data) > 0:
tmp[spk_idx] = np.mean(window_data)
return idx, tmp
def decompose_homography(H: np.ndarray) -> tuple[float, np.ndarray]:
"""
Extract rotation angle and singular values from the 2x2 linear part of H.
Parameters
----------
H : numpy.ndarray
The 3x3 homography matrix.
Returns
-------
angle : float
The rotation angle in degrees.
S : numpy.ndarray
The singular values (scales) from the 2x2 affine part of H.
"""
A = H[:2, :2]
U, S, Vt = np.linalg.svd(A)
R = U.dot(Vt)
angle = np.degrees(np.arctan2(R[1, 0], R[0, 0]))
return angle, S
def auto_canny(img: np.ndarray, sigma: float = 0.33) -> np.ndarray:
"""
Automatic Canny edge detection using median-based thresholds.
Assumes img is a float in the range [0,1].
Parameters
----------
img : numpy.ndarray
Input image (float, range [0,1]).
sigma : float, optional
Sigma value for threshold calculation, defaults to 0.33.
Returns
-------
numpy.ndarray
Binary edge map from Canny detector.
"""
v = np.median(img)
lower = max(0.0, (1.0 - sigma) * v)
upper = min(1.0, (1.0 + sigma) * v)
return canny(img, low_threshold=lower, high_threshold=upper)
def normalize_polarity(img: np.ndarray) -> np.ndarray:
"""
Ensure features are dark-on-light by inverting if necessary.
If the image is mostly bright (mean pixel value > 0.5 after
normalization to [0,1]), it inverts the image.
Handles float or uint8 inputs transparently, returning a float32
image in the range [0,1].
Parameters
----------
img : numpy.ndarray
Input image.
Returns
-------
numpy.ndarray
Polarity-normalized image as float32 in [0,1].
"""
if img.dtype == np.uint8:
img_f = img.astype(np.float32) / 255.0
else:
min_val, max_val = img.min(), img.max()
if min_val < 0.0 or max_val > 1.0:
if max_val == min_val:
img_f = np.zeros_like(img, dtype=np.float32)
else:
img_f = (img.astype(np.float32) - min_val) / (max_val - min_val)
else:
img_f = (img.astype(np.float32) - min_val) / (max_val - min_val)
if np.mean(img_f) > 0.5:
img_f = 1.0 - img_f
return img_f