Source code for wizard._core.datacube_ops

"""
_core/datacube_ops.py

.. module:: datacube_ops
   :platform: Unix
   :synopsis: DataCube Operations.

## Module Overview
This module contains operation function for processing datacubes.

## Functions
.. autofunction:: remove_spikes
.. autofunction:: resize
"""
import os
import cv2
import copy
import rembg
import random
import numpy as np
from PIL import Image
from joblib import Parallel, delayed
from scipy.signal import savgol_filter
from scipy.ndimage import gaussian_filter, uniform_filter
from skimage.transform import warp


from . import DataCube 
from .._processing.spectral import calculate_modified_z_score, spec_baseline_als
from .._utils.helper import _process_slice, feature_registration, RegistrationError, auto_canny, decompose_homography, normalize_polarity


[docs] def remove_spikes(dc: DataCube, threshold: int = 6500, window: int = 5) -> DataCube: """ Remove cosmic spikes from each pixel's spectral data. This function computes the modified z-score for each pixel's spectral vector, identifies spikes where the score exceeds the threshold, and replaces spike values with the local mean within a sliding window along the spectrum. Parameters ---------- dc : DataCube The input DataCube with shape (v, x, y), where v is the number of spectral bands. threshold : int, optional Threshold for spike detection via modified z-score, defaults to 6500. window : int, optional Window size (in spectral channels) for mean replacement of spikes, defaults to 5. Returns ------- DataCube A new DataCube instance with spikes removed per-pixel. Raises ------ ValueError If `window` is not in the range [1, number of spectral bands]. Notes ----- - The original DataCube is not modified in place; es wird eine Kopie zurückgegeben. - Die Modifizierte z-Score-Berechnung erwartet Input mit Form (n_samples, n_features). - Parallelisierung beschleunigt die Einzelpixel-Bearbeitung. Examples -------- >>> import wizard >>> dc = wizard.read("example.fsm") >>> dc.remove_spikes(threshold=6500, window=5) """ v, x, y = dc.cube.shape if not (1 <= window <= v): raise ValueError(f"window must be between 1 and {v}, got {window}") # reshape to (n_pixels, v) n_pixels = x * y flat_cube = dc.cube.reshape(v, n_pixels).T # shape: (n_pixels, v) # Berechne pro-pixel modifizierten z-score z_scores = calculate_modified_z_score(flat_cube) # (n_pixels, v) spikes = np.abs(z_scores) > threshold flat_out = flat_cube.copy() # Parallel auf jedes Pixel anwenden results = Parallel(n_jobs=-1)( delayed(_process_slice)(flat_out, spikes, idx, window) for idx in range(n_pixels) ) for idx, spec in results: flat_out[idx] = spec # zurück in (v, x, y) formen clean_cube = flat_out.T.reshape(v, x, y) # Kopie des DataCube mit dem bereinigten Cube dc.set_cube(clean_cube) return dc
[docs] def remove_background(dc: DataCube, threshold: int = 50, style: str = 'dark') -> DataCube: """ Remove background from images in a DataCube. Uses an external algorithm (rembg). The first image in the DataCube is processed to generate a mask, which is then applied to all images to remove the background. Parameters ---------- dc : DataCube DataCube containing the image stack. threshold : int, optional Threshold value to define the background from the alpha mask, defaults to 50. Pixels with alpha < threshold are considered background. style : str, optional Style of background removal, 'dark' or 'bright', defaults to 'dark'. If 'dark', background pixels are set to 0. If 'bright', background pixels are set to the max value of the cube.y Returns ------- DataCube DataCube with the background removed. Raises ------ ValueError If style is not 'dark' or 'bright'. Examples -------- >>> import wizard >>> dc = wizard.read("example.fsm") >>> dc.remove_background(threshold=50, style='bright') # or style 'dark' """ img = dc.cube[0] img = ((img - img.min()) / (img.max() - img.min()) * 255).astype('uint8') img = Image.fromarray(img) img_removed_bg = rembg.remove(img) mask = np.array(img_removed_bg.getchannel('A')) cube = dc.cube.copy() if style == 'dark': cube[:, mask < threshold] = 0 elif style == 'bright': cube[:, mask < threshold] = dc.cube.max() else: raise ValueError("Type must be 'dark' or 'bright'") dc.set_cube(cube) return dc
[docs] def resize(dc: DataCube, x_new: int, y_new: int, interpolation: str = 'linear') -> None: """ Resize the DataCube to new x and y dimensions. dc.shape is v,x,y Resizes each 2D slice (x, y) of the DataCube using the specified interpolation method. Interpolation methods: * ``linear``: Bilinear interpolation (ideal for enlarging). * ``nearest``: Nearest neighbor interpolation (fast but blocky). * ``area``: Pixel area interpolation (ideal for downscaling). * ``cubic``: Bicubic interpolation (high quality, slower). * ``lanczos``: Lanczos interpolation (highest quality, slowest). Parameters ---------- dc : DataCube The DataCube instance to be resized. x_new : int The new width (x-dimension). y_new : int The new height (y-dimension). interpolation : str, optional Interpolation method, defaults to 'linear'. Options: 'linear', 'nearest', 'area', 'cubic', 'lanczos'. Returns ------- None The DataCube is modified in-place. Raises ------ ValueError If the interpolation method is not recognized. Examples -------- >>> import wizard >>> dc = wizard.read("example.fsm") >>> dc.resize(x_new=500, y_new=500) """ mode = None shape = dc.cube.shape if shape[1] > x_new: print('\033[93mx_new is smaller than the existing cube, you will lose information\033[0m') if shape[2] > y_new: print('\033[93my_new is smaller than the existing cube, you will lose information\033[0m') if interpolation == 'linear': mode = cv2.INTER_LINEAR elif interpolation == 'nearest': mode = cv2.INTER_NEAREST elif interpolation == 'area': mode = cv2.INTER_AREA elif interpolation == 'cubic': mode = cv2.INTER_CUBIC elif interpolation == 'lanczos': mode = cv2.INTER_LANCZOS4 else: raise ValueError(f'Interpolation method `{interpolation}` not recognized.') _cube = np.empty(shape=(shape[0], x_new, y_new)) for idx, layer in enumerate(dc.cube): _cube[idx] = cv2.resize(layer, (y_new, x_new), interpolation=mode) dc.cube = _cube dc._set_cube_shape()
[docs] def baseline_als(dc: DataCube, lam: float = 1000000, p: float = 0.01, niter: int = 10) -> DataCube: """ Apply Adaptive Smoothness (ALS) baseline correction. Iterates through each pixel (spectrum) in the DataCube and subtracts the baseline calculated by the `spec_baseline_als` function. Parameters ---------- dc : DataCube The input DataCube. lam : float, optional The smoothness parameter for ALS, defaults to 1000000. Larger lambda makes the baseline smoother. p : float, optional The asymmetry parameter for ALS, defaults to 0.01. Value between 0 and 1. Controls how much the baseline is pushed towards the data (0 for minimal, 1 for maximal). niter : int, optional The number of iterations for the ALS algorithm, defaults to 10. Returns ------- DataCube The DataCube with baseline correction applied. Examples -------- >>> import wizard >>> dc = wizard.read("example.fsm") >>> dc.baseline_als(lam=1e6, p=.001, niter=10) """ for x in range(dc.shape[1]): for y in range(dc.shape[2]): dc.cube[:, x, y] -= spec_baseline_als( spectrum=dc.cube[:, x, y], lam=lam, p=p, niter=niter ) return dc
[docs] def merge_cubes(dc1: DataCube, dc2: DataCube, register: bool = False) -> DataCube: """ Merge two DataCubes into a single DataCube, with optional registration. If both datacubes are already registered and the `register` flag is True, the function will sample up to 10 random spectral layers from dc2, attempt to register each to the first layer of dc1, choose the transform with the lowest mean-squared-error alignment, then apply that best transform to all layers of dc2 before merging. Parameters ---------- dc1 : DataCube The first DataCube (used as reference). dc2 : DataCube The second DataCube to be merged into the first. register : bool, optional If True (default), registration will be attempted if both cubes are marked as registered. Returns ------- DataCube A new DataCube containing merged spatial and spectral data. Raises ------ NotImplementedError If the cubes have mismatched spatial dimensions and cannot be merged, or if wavelengths overlap without being purely indices. Examples -------- >>> import wizard >>> dc_a = wizard.read('example.fsm') >>> dc_b = wizard.read('another_file.csv') >>> dc_a.merge_cubes(dc_b) """ c1 = dc1.cube c2 = dc2.cube wave1 = dc1.wavelengths wave2 = dc2.wavelengths # Optional registration step with sampling if register and getattr(dc1, 'registered', False) and getattr(dc2, 'registered', False): print("Both datacubes registered. Sampling layers for alignment...") ref_img = c1[0] num_layers = c2.shape[0] sample_indices = random.sample(range(num_layers), min(10, num_layers)) best_score = np.inf best_transform = None # Try to register sampled layers and pick best for idx in sample_indices: try: aligned_slice, transform = feature_registration(ref_img, c2[idx]) # Compute alignment quality (mean squared error) mse = np.mean((ref_img - aligned_slice)**2) if mse < best_score: best_score = mse best_transform = transform except RegistrationError as e: print(f"Registration of sampled layer {idx} failed: {e}") if best_transform is not None: # Apply best transform to all layers of dc2 for i in range(num_layers): try: c2[i] = warp(c2[i], inverse_map=best_transform.inverse, preserve_range=True) except Exception as e: print(f"Failed to apply best transform to layer {i}: {e}") else: print("No successful sampled registration. Skipping registration.") # Spatial size check if c1.shape[1:] == c2.shape[1:]: c3 = np.concatenate([c1, c2], axis=0) else: raise NotImplementedError( 'Sorry - this function can only merge cubes with the same spatial dimensions.' ) # Handle wavelength merge if set(wave1) & set(wave2): # If wavelengths are index-based, just concatenate indices if set(wave1) <= set(range(c1.shape[0])) and set(wave2) <= set(range(c2.shape[0])): wave3 = list(range(c1.shape[0] + c2.shape[0])) else: raise NotImplementedError( 'Sorry - your wavelengths overlap and are not purely index-based.' ) else: wave3 = np.concatenate((wave1, wave2)) # Create new merged DataCube (modify dc1 in-place) dc1.set_cube(c3) dc1.set_wavelengths(wave3) return dc1
[docs] def inverse(dc: DataCube) -> DataCube: """ Invert the DataCube values. This operation is useful for converting between transmission and reflectance data, or similar inversions. The formula applied is: `tmp = cube * -1` `tmp += -tmp.min()` The data type of the cube is preserved if it's 'uint16' or 'uint8' after temporary conversion to 'float16' for calculation. Parameters ---------- dc : DataCube The DataCube to invert. Returns ------- DataCube The DataCube with inverted values. Examples -------- >>> import wizard >>> dc = wizard.read('example.fsm') >>> dc.inverse() """ dtype = dc.cube.dtype if dtype == np.uint16 or dtype == np.uint8: # Use np types for comparison cube = dc.cube.astype(np.float32) else: cube = dc.cube.copy() tmp = cube tmp *= -1 tmp += -tmp.min() dc.set_cube(tmp.astype(dtype)) return dc
[docs] def register_layers_simple(dc: DataCube, max_features: int = 5000, match_percent: float = 0.1) -> DataCube: """ Align images within a DataCube using simple feature-based registration. Each layer in the DataCube is aligned to the first layer (index 0) using ORB feature detection and homography estimation via `_feature_registration`. Parameters ---------- dc : DataCube The DataCube whose layers are to be registered. max_features : int, optional Maximum number of keypoint regions to detect, defaults to 5000. match_percent : float, optional Percentage of keypoint matches to consider for homography, defaults to 0.1 (10%). Returns ------- DataCube The DataCube with layers registered. Examples -------- >>> import wizard >>> dc = wizard.read('example.fsm') >>> dc.register_layers_simple() """ o_img = dc.cube[0, :, :] for i in range(dc.cube.shape[0]): if i > 0: a_img = copy.deepcopy(dc.cube[i, :, :]) try: _, h = feature_registration( o_img=o_img, a_img=a_img, max_features=max_features, match_percent=match_percent ) height, width = o_img.shape aligned_img = cv2.warpPerspective(a_img, h, (width, height)) dc.cube[i, :, :] = aligned_img except RegistrationError as e: print(f"Warning: Could not register layer {i} in simple registration: {e}") pass dc.registered = True return dc
[docs] def remove_vignetting_poly(dc: DataCube, axis: int = 1, slice_params: dict = None) -> DataCube: """ Remove vignetting using polynomial fitting along a specified axis. Calculates the mean along the specified axis (1 for rows, 2 for columns) for each spectral layer, fits a polynomial to this mean profile (after Savitzky-Golay smoothing), and subtracts this fitted profile from the corresponding rows/columns of the layer to correct for vignetting. Parameters ---------- dc : DataCube The DataCube instance to process. axis : int, optional The axis along which to calculate the mean and apply correction. 1 for correcting along rows (profile used for columns), 2 for correcting along columns (profile used for rows). Defaults to 1. slice_params : dict, optional Dictionary for slicing behavior before mean calculation. Keys: ``"start"`` (int), ``"end"`` (int), ``"step"`` (int). Defaults to full slice with step 1. Returns ------- DataCube The processed DataCube with vignetting removed. Raises ------ ValueError If the DataCube is empty or axis is not 1 or 2. Examples -------- >>> import wizard >>> dc = wizard.read('example.fsm') >>> params = {"start":25, "end":50} >>> dc.remove_vignetting_poly(slice_params=params, axis=2) """ if dc.cube is None: raise ValueError("The DataCube is empty. Please provide a valid cube.") if slice_params is None: slice_params = {"start": None, "end": None, "step": 1} start = slice_params.get("start", None) end = slice_params.get("end", None) step = slice_params.get("step", 1) if axis == 1: summed_data = np.mean(dc.cube[:, :, start:end:step], axis=2) elif axis == 2: summed_data = np.mean(dc.cube[:, start:end:step, :], axis=1) else: raise ValueError('Axis can only be 1 or 2.') corrected_cube = dc.cube.copy().astype(np.float32) for i, layer_profile in enumerate(summed_data): smoothed_layer_profile = savgol_filter(layer_profile, window_length=71, polyorder=1) if axis == 1: for j_col in range(dc.cube.shape[2]): corrected_cube[i, :, j_col] -= smoothed_layer_profile elif axis == 2: for j_row in range(dc.cube.shape[1]): corrected_cube[i, j_row, :] -= smoothed_layer_profile dc.set_cube(corrected_cube) return dc
def normalize(dc: DataCube) -> DataCube: """ Normalize spectral information in the data cube to the range [0, 1]. For each 2D spatial layer in the DataCube, the normalization is performed by: `layer = (layer - min_in_layer) / (max_in_layer - min_in_layer)` This scales the intensity values of each layer independently across its spatial dimensions. Parameters ---------- dc : DataCube The DataCube instance to normalize. Returns ------- DataCube The normalized DataCube. Examples -------- >>> import wizard >>> dc = wizard.read('example.fsm') >>> dc.normalize() """ cube = dc.cube.astype(np.float32) min_vals = cube.min(axis=(1, 2), keepdims=True) max_vals = cube.max(axis=(1, 2), keepdims=True) range_vals = max_vals - min_vals range_vals[range_vals == 0] = 1 cube = (cube - min_vals) / range_vals dc.set_cube(cube) return dc
[docs] def register_layers_best( dc: DataCube, ref_layer: int = 0, max_features: int = 5000, match_percent: float = 0.1, rot_thresh: float = 20.0, scale_thresh: float = 1.1 ) -> DataCube: """ Align DataCube layers with robust registration. Aligns each slice of `dc.cube` to a reference layer. It uses feature-based registration primarily. If feature-based registration yields a degenerate homography (based on rotation and scale thresholds) or fails, it falls back to Canny-based edge registration. Failed alignments are retried once. Parameters ---------- dc : DataCube The DataCube to process. ref_layer : int, optional Index of the reference layer, defaults to 0. max_features : int, optional Maximum features for ORB, defaults to 5000. match_percent : float, optional Match percentage for ORB, defaults to 0.1. rot_thresh : float, optional Rotation threshold (degrees) for homography validation, defaults to 20.0. scale_thresh : float, optional Scale threshold for homography validation, defaults to 1.1. Checks if max_scale <= scale_thresh and min_scale >= 1/scale_thresh. Returns ------- DataCube The DataCube with aligned layers. Raises ------ RuntimeError If alignment fails for a layer after retry. Examples -------- >>> import wizard >>> dc = wizard.read('example.fsm') >>> dc.register_layers_best() """ aligned_indices = {ref_layer} waitlist = set() n_layers, H_dim, W_dim = dc.cube.shape def try_align(layer_idx: int, current_aligned_indices: set) -> bool: # nonlocal dc a_img = dc.cube[layer_idx] best_alignment_img = None for ref_idx in current_aligned_indices: try: o_img = dc.cube[ref_idx] aligned_img_feat, H_ij = feature_registration( o_img, a_img, max_features, match_percent ) angle, S = decompose_homography(H_ij) s_ok = (S.max() <= scale_thresh and S.min() >= 1 / scale_thresh) if abs(angle) <= rot_thresh and s_ok: print(f"[Layer {layer_idx}] aligned to {ref_idx}: θ={angle:.1f}°, S={S.round(3)}") best_alignment_img = aligned_img_feat break else: print(f"[Layer {layer_idx}] reject vs {ref_idx}: θ={angle:.1f}°, S={S.round(3)}") except RegistrationError as e: print(f"[Layer {layer_idx}] registration to {ref_idx} failed: {e}") if best_alignment_img is None: print(f"[Layer {layer_idx}] edge-map fallback to reference layer {ref_layer}") try: ref_img_for_edge = normalize_polarity(dc.cube[ref_layer]) tgt_img_for_edge = normalize_polarity(a_img) edges_ref = auto_canny(ref_img_for_edge) edges_tgt = auto_canny(tgt_img_for_edge) aligned_img_edge, H_e = feature_registration( edges_ref.astype(float), edges_tgt.astype(float), max_features, match_percent ) h_orig, w_orig = a_img.shape best_alignment_img = cv2.warpPerspective(a_img, H_e, (w_orig, h_orig), flags=cv2.INTER_LINEAR) angle_e, S_e = decompose_homography(H_e) print(f"[Layer {layer_idx}] edges (vs layer {ref_layer}): θ={angle_e:.1f}°, S={S_e.round(3)}") except RegistrationError as e: print(f"[Layer {layer_idx}] edge registration failed: {e}") except Exception as e: print(f"[Layer {layer_idx}] unexpected error in edge registration: {e}") if best_alignment_img is not None: dc.cube[layer_idx] = best_alignment_img return True return False for i in range(n_layers): if i == ref_layer: continue if try_align(i, aligned_indices.copy()): aligned_indices.add(i) else: waitlist.add(i) if waitlist: print(f"\nRetrying layers: {list(waitlist)}\n") for i in list(waitlist): if try_align(i, aligned_indices.copy()): aligned_indices.add(i) waitlist.remove(i) else: raise RuntimeError(f"Layer {i}: alignment failed after retry.") dc.registered = True return dc
[docs] def remove_vignetting(dc: DataCube, sigma: float = 50, clip: bool = True, epsilon: float = 1e-6) -> DataCube: """ Remove vignetting from a hyperspectral DataCube. Corrects vignetting in each spectral band by estimating a smooth background using Gaussian blur and then performing flat-field correction. The background is normalized by its mean before correction. Parameters ---------- dc : DataCube The input DataCube (bands, height, width). sigma : float, optional Standard deviation for Gaussian blur, controlling smoothness. Larger sigma means coarser background estimation. Defaults to 50. clip : bool, optional If True and the original DataCube dtype is integer, clip output values to the valid range of that integer type. Defaults to True. epsilon : float, optional A small constant to add to the background before division to prevent division by zero errors. Defaults to 1e-6. Returns ------- DataCube The DataCube with vignetting corrected. The output cube has the same shape and dtype as the input. Examples -------- >>> import wizard >>> dc = wizard.read('example.fsm') >>> dc.remove_vignetting() """ corrected_cube = np.empty_like(dc.cube) orig_dtype = dc.cube.dtype is_int = np.issubdtype(orig_dtype, np.integer) for i in range(dc.cube.shape[0]): band = dc.cube[i].astype(np.float64) background = gaussian_filter(band, sigma=sigma) background = np.maximum(background, epsilon) background_mean = background.mean() if background_mean > epsilon: background /= background_mean else: background = np.ones_like(background, dtype=np.float64) corrected_band = band / background if is_int: info = np.iinfo(orig_dtype) corrected_band = np.round(corrected_band) corrected_band = np.clip(corrected_band, info.min, info.max) corrected_cube[i] = corrected_band.astype(orig_dtype) dc.set_cube(corrected_cube) return dc
[docs] def upscale_datacube_edsr(dc: DataCube, scale: int, model_path: str): """ Upscale the spatial dimensions of a DataCube using the EDSR super-resolution model. This function applies the EDSR (Enhanced Deep Super-Resolution) model from OpenCV’s dnn_superres module to each spectral slice of the given DataCube. Since EDSR expects a 3-channel input, each single-band slice is duplicated across three channels before processing. The output slices are then reassembled into a new DataCube with upscaled spatial dimensions. Parameters ---------- dc : DataCube An instance of the DataCube class. Must have attributes: - datacube.cube: numpy array of shape (v, x, y) - datacube.wavelength: list of length v scale : int The upscaling factor (e.g., 2, 3, or 4) supported by the EDSR model. model_path : str Filesystem path to the pretrained EDSR `.pb` model file (e.g., `"EDSR_x4.pb"`). Returns ------- DataCube A new DataCube instance whose `.cube` has shape (v, x * scale, y * scale) and the same `.wavelength` list. Raises ------ FileNotFoundError If the specified `model_path` does not exist or is unreadable. cv2.error If OpenCV fails to load the model or perform upsampling. ValueError If the `scale` is not one of the factors supported by the loaded model. Notes ----- - Requires `opencv-contrib-python>=4.3.0`. - Processing is done slice-by-slice and may be slow for large cubes. - Memory usage grows by approximately `scale^2`. Examples -------- >>> import wizard >>> dc = wizard.read("hyperspectral.fsm") >>> dc.upscale_datacube_edsr(scale=4, model_path="EDSR_x4.pb") >>> print(dc.cube.shape) (v, x*4, y*4) """ # Create EDSR super-res object if not os.path.isfile(model_path): raise FileNotFoundError(f"Model file not found: {model_path}") sr = cv2.dnn_superres.DnnSuperResImpl_create() sr.readModel(model_path) sr.setModel("edsr", scale) v, x, y = dc.shape new_x, new_y = x * scale, y * scale up_cube = np.empty((v, new_x, new_y), dtype=dc.cube.dtype) for i in range(v): # Extract single-band slice band = dc.cube[i, :, :] # Convert to 3-channel BGR by stacking bgr = cv2.merge([band, band, band]) # Upsample up_bgr = sr.upsample(bgr) # Take one channel (all channels are identical) up_band = up_bgr[:, :, 0] up_cube[i, :, :] = up_band # Build new DataCube dc.set_cube(up_cube) return dc
[docs] def upscale_datacube_espcn(dc: DataCube, scale: int, model_path: str): """ Upscale the spatial dimensions of a DataCube using the ESPCN super-resolution model. This function applies the ESPCN (Efficient Sub-Pixel Convolutional Neural Network) model from OpenCV’s dnn_superres module to each spectral slice of the given DataCube. Since ESPCN expects a 3-channel input, each single-band slice is duplicated across three channels before processing. The output slices are then reassembled into a new DataCube with upscaled spatial dimensions. Parameters ---------- dc : DataCube An instance of the DataCube class. Must have attributes: - .cube: numpy array of shape (v, x, y) - .wavelength: list of length v scale : int The upscaling factor (2, 3, or 4) supported by the ESPCN model. model_path : str Filesystem path to the pretrained ESPCN `.pb` model file (e.g., "ESPCN_x3.pb"). Returns ------- DataCube A new DataCube instance whose `.cube` has shape (v, x * scale, y * scale) and the same `.wavelength` list. Raises ------ FileNotFoundError If the specified `model_path` does not exist. ValueError If `scale` is not in {2, 3, 4}. cv2.error If OpenCV fails to load the model or perform upsampling. Notes ----- - Requires `opencv-contrib-python>=4.3.0`. - Processing is done slice-by-slice and may be slow for large cubes. - Memory usage grows by approximately `scale**2`. Examples -------- >>> import wizard >>> dc = wizard.read("test.fsm") >>> dc.upscale_datacube_espcn(scale=3, model_path="ESPCN_x3.pb") >>> print(dc.cube.shape) (v, x*3, y*3) """ if not os.path.isfile(model_path): raise FileNotFoundError(f"Model file not found: {model_path}") if scale not in (2, 3, 4): raise ValueError(f"ESPCN only supports scale factors 2, 3, or 4, got {scale}") # Initialize ESPCN super-resolver sr = cv2.dnn_superres.DnnSuperResImpl_create() sr.readModel(model_path) sr.setModel("espcn", scale) v, x, y = dc.shape new_x, new_y = x * scale, y * scale up_cube = np.empty((v, new_x, new_y), dtype=dc.cube.dtype) for i in range(v): # extract single-band slice band = dc.cube[i, :, :] # create 3-channel image bgr = cv2.merge([band, band, band]) # run ESPCN upsampling up_bgr = sr.upsample(bgr) # channels are identical: pick one up_band = up_bgr[:, :, 0] up_cube[i, :, :] = up_band # assemble new DataCube dc.set_cube(up_cube) return dc
def upscale_datacube_with_reference(dc: DataCube, reference_image: np.ndarray) -> 'DataCube': """ Upscales a DataCube to match the spatial resolution of a reference image using ESRGAN. Each spectral band of the DataCube is individually upscaled using the ESRGAN model by treating the single band as a pseudo-grayscale image. The resulting DataCube has the same spatial resolution as the reference image. Parameters ---------- dc : DataCube The input DataCube to be upscaled. Expected shape (v, x, y). reference_image : np.ndarray A high-resolution reference image (e.g., RGB) that defines the target (x, y) resolution. Returns ------- DataCube A new DataCube with the same number of bands, but upscaled to match the reference image's spatial resolution. Raises ------ ValueError If the reference image resolution is smaller than the datacube resolution. Notes ----- This method uses the ESRGAN model via TensorFlow Hub. The model is applied to each spectral band independently. Examples -------- >>> upscaled_cube = upscale_datacube_with_reference(cube, ref_image) """ import tensorflow as tf import tensorflow_hub as hub # Load ESRGAN model model_url = "https://tfhub.dev/captain-pool/esrgan-tf2/1" model = hub.load(model_url) # Target resolution from reference image target_x, target_y = reference_image.shape[:2] v, x, y = dc.shape if x >= target_x or y >= target_y: raise ValueError("Reference image must be larger than the DataCube in spatial dimensions.") upscaled_bands = [] for i in range(v): band = dc.cube[i, :, :] # Normalize and expand dims for ESRGAN img = (band / band.max() * 255).astype(np.uint8) img_rgb = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB) img_rgb = tf.convert_to_tensor(img_rgb, dtype=tf.float32) # Ensure divisible by 4 imageSize = (tf.convert_to_tensor(img_rgb.shape[:-1]) // 4) * 4 cropped_image = tf.image.crop_to_bounding_box( img_rgb, 0, 0, imageSize[0], imageSize[1] ) preprocessed_image = tf.expand_dims(cropped_image, 0) # Run super-resolution sr_result = model(preprocessed_image) sr_image = tf.squeeze(sr_result).numpy() / 255.0 # Convert back to single band and resize to match reference sr_gray = cv2.cvtColor((sr_image * 255).astype(np.uint8), cv2.COLOR_RGB2GRAY) resized_band = cv2.resize(sr_gray.astype(np.float32), (target_y, target_x), interpolation=cv2.INTER_CUBIC) upscaled_bands.append(resized_band) upscaled_cube_array = np.stack(upscaled_bands, axis=0) dc.set_cube(upscaled_cube_array) return dc def upscale_datacube_fsrcnn(dc, scale, model_path): """ Upscale the spatial dimensions of a DataCube using the FSRCNN super-resolution model. This function applies the FSRCNN (Fast Super-Resolution Convolutional Neural Network) model from OpenCV’s dnn_superres module to each spectral slice of the given DataCube. Since FSRCNN expects a 3-channel input, each single-band slice is duplicated across three channels before processing. The output slices are then reassembled into a new DataCube with upscaled spatial dimensions. Parameters ---------- dc : DataCube An instance of the DataCube class. Must have attributes: - .cube: numpy array of shape (v, x, y) - .wavelength: list of length v scale : int The upscaling factor (2, 3, or 4) supported by the FSRCNN model. model_path : str Filesystem path to the pretrained FSRCNN `.pb` model file (e.g., "FSRCNN_x3.pb"). Returns ------- DataCube A new DataCube instance whose `.cube` has shape (v, x * scale, y * scale) and the same `.wavelength` list. Raises ------ FileNotFoundError If the specified `model_path` does not exist. ValueError If `scale` is not in {2, 3, 4}. cv2.error If OpenCV fails to load the model or perform upsampling. Notes ----- - Requires `opencv-contrib-python>=4.3.0`. - Processing is done slice-by-slice and may be slow for large cubes. - Memory usage grows by approximately `scale**2`. Examples -------- >>> dc = wizard.read("hyperspectral.npy", wavelengths=[...]) >>> up_dc = upscale_datacube_fsrcnn(dc, scale=3, model_path="FSRCNN_x3.pb") >>> print(up_dc.cube.shape) (v, x*3, y*3) """ # Validate model file if not os.path.isfile(model_path): raise FileNotFoundError(f"Model file not found: {model_path}") # Validate supported scale factors if scale not in (2, 3, 4): raise ValueError(f"FSRCNN only supports scale factors 2, 3, or 4, got {scale}") # Initialize FSRCNN super-resolver sr = cv2.dnn_superres.DnnSuperResImpl_create() sr.readModel(model_path) sr.setModel("fsrcnn", scale) # Prepare new cube v, x, y = dc.cube.shape new_x, new_y = x * scale, y * scale up_cube = np.empty((v, new_x, new_y), dtype=dc.cube.dtype) # Upscale each spectral band for i in range(v): band = dc.cube[i, :, :] bgr = cv2.merge([band, band, band]) up_bgr = sr.upsample(bgr) up_band = up_bgr[:, :, 0] up_cube[i, :, :] = up_band # Create and return new DataCube dc.set_cube(up_cube) return dc def remove_vignette(dc: DataCube, vignette_map: np.ndarray, flip: bool = False) -> DataCube: """ Subtract a vignette pattern from every spectral layer. Removes spatial vignetting by subtracting the provided vignette_map from each (x, y) layer in the data cube. If flip=True, the vignette pattern is inverted (dark center → bright center) before subtraction. Parameters ---------- dc: DataCube An instance of the DataCube class. Must have attributes: - .cube: numpy array of shape (v, x, y) - .wavelength: list of length v vignette_map : np.ndarray 2D array of shape (x, y) representing the vignette intensity to subtract. Values should be on the same scale as the cube’s pixel intensities. flip : bool, default=False If True, invert the vignette_map before subtraction. Returns ------- None Modifies the DataCube.cube in-place. Raises ------ ValueError If vignette_map.shape does not match the spatial dimensions of the cube. Notes ----- - After subtraction, any negative values in the cube are clipped to zero. - Assumes cube and vignette_map share the same intensity scale. Examples -------- >>> dc = DataCube(cube=np.random.rand(10, 256, 256), wavelength=list(np.linspace(400,700,10))) >>> # Remove standard vignette (darker at edges, brighter center) >>> dc.remove_vignette(vignette_map=my_vignette_image) >>> # Remove flipped vignette (darker center, brighter edges) >>> dc.remove_vignette(vignette_map=my_vignette_image, flip=True) """ # Check that the vignette map matches spatial dims if vignette_map.shape != dc.cube.shape[1:]: raise ValueError( f"vignette_map shape {vignette_map.shape} does not match cube spatial shape {dc.cube.shape[1:]}" ) cube = dc.cube.copy() # Optionally invert the vignette pattern if flip: vignette_map = vignette_map.max() - vignette_map # Subtract vignette from each layer # Using broadcasting: vignette_map has shape (x, y), expand to (1, x, y) cube -= vignette_map[np.newaxis, :, :] # Clip negative values to zero np.clip(cube, a_min=0, a_max=None, out=cube) dc.set_cube(cube) return dc def uniform_filter_dc(dc, size=3): """ Smooth each spectral band of a DataCube using a uniform spatial filter. Applies a uniform filter of the given window size to every slice (band) in the DataCube’s cube, reducing spatial noise by averaging within a local neighborhood. The operation modifies the DataCube in place. Parameters ---------- dc : DataCube The DataCube instance whose `cube` attribute (a numpy array of shape (v, x, y)) will be smoothed across the spatial dimensions for each spectral band. size : int, optional The size of the square window used by `scipy.ndimage.uniform_filter` for smoothing. Must be a positive odd integer. Defaults to 3. Returns ------- DataCube The same DataCube instance, with its `cube` attribute replaced by the smoothed data of shape (v, x, y). Raises ------ ValueError If `size` is not a positive integer. Notes ----- - Requires `scipy.ndimage.uniform_filter` to be imported. - Smoothing is performed independently on each spectral band. - This function updates `dc` in place; no new DataCube is created. """ if not isinstance(size, int) or size < 1: raise ValueError("`size` must be a positive integer") cube = dc.cube for i in range(dc.cube.shape[0]): cube[i] = uniform_filter(cube[i], size=size) dc.set_cube(cube) return dc