Source code for wizard._utils._loader.fsm

"""
_utils/_loader/fsm.py
=======================

.. module:: fsm
   :platform: Unix
   :synopsis: Provides reader and writer functions for FSM files.

Module Overview
---------------

This module includes functions for reading `.fsm` files, typically from Perkin Elmer instruments.
It processes various blocks of data contained within FSM files to extract relevant spectral information.

Functions
---------

.. autofunction:: _block_info
.. autofunction:: _decode_5100
.. autofunction:: _decode_5104
.. autofunction:: _decode_5105
.. autofunction:: _parse_fsm_file
.. autofunction:: _read_fsm

Credits
-------
This code was inspired by:
- Repository: specio
- Author/Organization: paris-saclay-cds
- Original repository: https://github.com/paris-saclay-cds/specio

"""

import struct
import numpy as np

from ._helper import to_cube
from ..._core import DataCube


def _block_info(data):
    """Retrieve the information of the next block."""
    if len(data) != 6:
        raise ValueError(f"'data' should be 6 bytes. Got {len(data)} instead.")
    return struct.unpack('<Hi', data)


def _decode_5100(data):
    """Read the block of data with ID 5100."""
    name_size = struct.unpack('<h', data[:2])[0]
    name = data[2:name_size + 2].decode('utf8')
    header_format = '<ddddddddddiiihBhBhBhB'
    header_data = struct.unpack(header_format, data[name_size + 2:name_size + 106])
    return {
        'name': name,
        'x_delta': header_data[0],
        'y_delta': header_data[1],
        'z_delta': header_data[2],
        'z_start': header_data[3],
        'z_end': header_data[4],
        'z_4d_start': header_data[5],
        'z_4d_end': header_data[6],
        'x_init': header_data[7],
        'y_init': header_data[8],
        'z_init': header_data[9],
        'n_x': header_data[10],
        'n_y': header_data[11],
        'n_z': header_data[12],
        'resolution': header_data[17],
        'transmission': header_data[19]
    }


def _decode_5104(data):
    """Read the block of data with ID 5104."""
    text = []
    start_byte = 0
    data_length = len(data)

    while start_byte + 2 <= data_length:
        tag = data[start_byte:start_byte + 2]
        start_byte += 2

        if tag == b'#u':
            text_size = struct.unpack('<h', data[start_byte:start_byte + 2])[0]
            start_byte += 2
            text.append(data[start_byte:start_byte + text_size].decode('utf8'))
            start_byte += text_size + 6  # Skip over text and padding
        elif tag == b'$u' or tag == b',u':
            text_size = struct.unpack('<h', data[start_byte:start_byte + 2])[0]
            start_byte += 2
            text.append(text_size)  # Assuming it's an integer, as in the original code
            start_byte += 6 if tag == b'$u' else 0
        else:
            start_byte -= 1  # Step back if the tag is not recognized

    # Map extracted texts to their corresponding keys
    keys = [
        'analyst', 'date', 'image_name', 'instrument_model',
        'instrument_serial_number', 'instrument_software_version',
        'accumulations', 'detector', 'source', 'beam_splitter',
        'apodization', 'spectrum_type', 'beam_type', 'phase_correction',
        'ir_accessory', 'igram_type', 'scan_direction', 'background_scans',
        'ir_laser_wave_number_unit'
    ]
    
    return {key: (text[i] if i < len(text) else None) for i, key in enumerate(keys)}


def _decode_5105(data):
    """Read the block of data with ID 5105."""
    return np.frombuffer(data, dtype=np.float32)


FUNC_DECODE = {5100: _decode_5100, 5104: _decode_5104, 5105: _decode_5105}


def _parse_fsm_file(fsm_file):
    """Read the FSM file and extract spectrum data."""
    with open(fsm_file, "rb") as file:
        content = file.read()

    start_byte = 44
    meta = {
        'signature': content[:4],
        'description': content[4:44].decode('utf-8'),
        'filename': fsm_file
    }
    spectrum = []

    while start_byte < len(content):
        block_id, block_size = _block_info(content[start_byte:start_byte + 6])
        start_byte += 6
        if block_size == 0:
            continue
        block_data = content[start_byte:start_byte + block_size]
        if block_id not in FUNC_DECODE:
            print(f"Found block ID: {block_id} (size {block_size})")
            raise ValueError(f"Unexpected block ID {block_id}, possible write error")

        data_extracted = FUNC_DECODE[block_id](block_data)
        start_byte += block_size
        if isinstance(data_extracted, dict):
            meta.update(data_extracted)
        else:
            spectrum.append(data_extracted)

    wavelength = np.arange(meta['z_start'], meta['z_end'] + meta['z_delta'], meta['z_delta'])
    return np.squeeze(spectrum), wavelength, meta


[docs] def _read_fsm(path: str) -> DataCube: """ Read function for FSM files from Perkin Elmer. Tested with FTIR data. :param path: Path to the FSM file. :type path: str :return: DataCube containing the spectral data and wavelengths. :rtype: DataCube :raises FileNotFoundError: If the specified file does not exist. :raises ValueError: If the file format is incorrect. """ fsm_spectra, fsm_wave, fsm_meta = _parse_fsm_file(path) # Load dimensions from metadata fsm_len_x = fsm_meta['n_x'] fsm_len_y = fsm_meta['n_y'] # Convert wavelength to integer type fsm_wave = fsm_wave.astype('int') # Transform 2D array into a 3D data cube fsm_data_cube = to_cube(data=fsm_spectra.T, len_x=fsm_len_x, len_y=fsm_len_y) return DataCube(fsm_data_cube, wavelengths=fsm_wave, name='.fsm', notation='cm-1', registered=True)
""" In Progress def _write_fsm(data_cube, path): # Extract metadata fsm_data = data_cube.cube.T # Convert back to original shape fsm_wave = data_cube.wavelengths.astype(float) # Ensure proper dtype fsm_meta = { 'signature': b'FSM\x00', 'description': b'Generated FSM file' + b' ' * 23, # Pad to 40 bytes 'n_x': fsm_data.shape[1], 'n_y': fsm_data.shape[2], 'z_start': fsm_wave[0], 'z_end': fsm_wave[-1], 'z_delta': fsm_wave[1] - fsm_wave[0], } with open(path, "wb") as file: # Write file header file.write(fsm_meta['signature']) # 4 bytes file.write(fsm_meta['description']) # 40 bytes print("Writing File Header (44 bytes)") # ✅ Block 5100: Metadata name = b"Generated" block_5100_data = struct.pack( '<h dddddddddd iiih BhBhBhB', len(name), # Name length fsm_meta['z_delta'], fsm_meta['z_delta'], fsm_meta['z_delta'], fsm_meta['z_start'], fsm_meta['z_end'], 0, 0, 0, 0, 0, fsm_meta['n_x'], fsm_meta['n_y'], fsm_data.shape[0], 0, 0, 0, 0, 0, 0, 0, 0 # Placeholder values ) block_size_5100 = len(name) + len(block_5100_data) print(f"Writing Block 5100 (size {block_size_5100})") file.write(struct.pack('<Hi', 5100, block_size_5100)) # Write Block Header file.write(name) # Write name file.write(block_5100_data) # Write metadata # ✅ Block 5105: Spectral data spectrum_data = fsm_data.astype(np.float32).tobytes() block_size_5105 = len(spectrum_data) print(f"Writing Block 5105 (size {block_size_5105})") file.write(struct.pack('<Hi', 5105, block_size_5105)) # Write Block Header file.write(spectrum_data) # Write spectral data print(f"FSM file written to {path}") """