Source code for homodyne.optimization.nlsq.core

"""NLSQ: Primary Optimization Method for Homodyne
==================================================

NLSQ package-based trust-region nonlinear least squares solver for the scaled
optimization process. This is the primary optimization method providing
fast, reliable parameter estimation for homodyne analysis.

Core Equation: c₂(φ,t₁,t₂) = 1 + contrast × [c₁(φ,t₁,t₂)]²

Key Features:
- NLSQ trust-region solver (TRF/Levenberg-Marquardt) for robust optimization
- JAX JIT compilation for high performance
- Intelligent error recovery with 3-attempt retry strategy (T022-T024)
- Compatible with existing ParameterSpace and FitResult classes
- HPC-optimized for 36/128-core CPU nodes
- CPU-only (no GPU support since v2.3.0)
- Dataset size-aware optimization strategies

Performance (Validated T036-T041):
- Parameter recovery accuracy: 2-14% on core parameters
- Sub-linear time scaling: ~1.5s for 500-9,375 point datasets
- Numerical stability: <4% deviation across initial conditions
- Throughput: 317-5,977 points/second
- 100% convergence rate across all validation tests

Production Status:
- Scientifically validated (7/7 tests passed)
- Production-ready with error recovery
- Approved for scientific research and deployment

Migration from Optimistix:
- Replaced Optimistix with NLSQ package (github.com/imewei/NLSQ)
- NLSQWrapper provides unified interface with error recovery
- Maintains backward API compatibility
- User-facing Optimistix references removed from public APIs

References:
- NLSQ Package: https://github.com/imewei/NLSQ
- Validation Report: SCIENTIFIC_VALIDATION_REPORT.md
- Production Report: PRODUCTION_READINESS_REPORT.md
"""

from __future__ import annotations

import types
from collections.abc import Callable
from typing import Any, TypeVar

import numpy as np

# JAX imports with fallback
F = TypeVar("F", bound=Callable[..., Any])

try:
    import jax.numpy as jnp
    from jax import grad, jit, vmap

    JAX_AVAILABLE = True
except ImportError:
    JAX_AVAILABLE = False
    jnp: types.ModuleType = np  # type: ignore[no-redef]

    def jit(f: F) -> F:  # type: ignore[no-redef]  # noqa: UP047
        return f

    def vmap(f: Any, **kwargs: Any) -> Any:  # type: ignore[misc]
        return f

    def grad(f: Callable[..., Any]) -> Callable[..., Any]:  # type: ignore[misc]
        return lambda x: np.zeros_like(x)


# Core homodyne imports
try:
    from homodyne.config.manager import ConfigManager
    from homodyne.core.fitting import ParameterSpace
    from homodyne.utils.logging import get_logger, log_performance

    HAS_CORE_MODULES = True
except ImportError:
    HAS_CORE_MODULES = False
    import logging
    from collections.abc import Mapping
    from logging import Logger, LoggerAdapter

    def get_logger(
        name: str | None = None, *, context: Mapping[str, Any] | None = None
    ) -> Logger | LoggerAdapter[Logger]:
        return logging.getLogger(name or __name__)

    def log_performance(
        logger: Logger | LoggerAdapter[Logger] | None = None,
        level: int = logging.INFO,
        threshold: float = 0.0,
    ) -> Callable[[F], F]:
        def decorator(func: F) -> F:
            return func

        return decorator


# Optional ParameterManager import (Phase 4.2)
try:
    from homodyne.config.parameter_manager import ParameterManager

    HAS_PARAMETER_MANAGER = True
except ImportError:
    HAS_PARAMETER_MANAGER = False
    ParameterManager = None  # type: ignore[assignment,misc]

# NLSQWrapper import for legacy implementation
try:
    from homodyne.optimization.nlsq.wrapper import NLSQWrapper
    from homodyne.optimization.nlsq.wrapper import (
        OptimizationResult as WrapperOptimizationResult,
    )

    HAS_NLSQ_WRAPPER = True
except ImportError:
    HAS_NLSQ_WRAPPER = False
    NLSQWrapper = None  # type: ignore[assignment,misc]
    WrapperOptimizationResult = None  # type: ignore[assignment,misc]

# Results module import (for return type)
try:
    from homodyne.optimization.nlsq.results import OptimizationResult

    HAS_RESULTS_MODULE = True
except ImportError:
    HAS_RESULTS_MODULE = False
    OptimizationResult = None  # type: ignore[assignment,misc]

# NLSQAdapter import for new CurveFit-based implementation (v2.11.0+)
try:
    from homodyne.optimization.nlsq.adapter import (
        AdapterConfig,
        NLSQAdapter,
        is_adapter_available,
    )

    HAS_NLSQ_ADAPTER = is_adapter_available()
except ImportError:
    HAS_NLSQ_ADAPTER = False
    NLSQAdapter = None  # type: ignore[assignment,misc]
    AdapterConfig = None  # type: ignore[assignment,misc]

# Multi-start optimization import (v2.6.0)
try:
    from homodyne.optimization.nlsq.multistart import (
        MultiStartConfig,
        MultiStartResult,
        SingleStartResult,
        run_multistart_nlsq,
    )

    HAS_MULTISTART = True
except ImportError:
    HAS_MULTISTART = False
    MultiStartConfig = None  # type: ignore[assignment,misc]
    MultiStartResult = None  # type: ignore[assignment,misc]
    SingleStartResult = None  # type: ignore[assignment,misc]
    run_multistart_nlsq = None  # type: ignore[assignment]

# CMA-ES global optimization import (v2.15.0 / NLSQ 0.6.4+)
try:
    from homodyne.optimization.nlsq.cmaes_wrapper import (
        CMAES_AVAILABLE,
        CMAESResult,
        CMAESWrapper,
        CMAESWrapperConfig,
        fit_with_cmaes,
    )

    HAS_CMAES = CMAES_AVAILABLE
except ImportError:
    HAS_CMAES = False
    CMAESWrapper = None  # type: ignore[assignment,misc]
    CMAESWrapperConfig = None  # type: ignore[assignment,misc]
    CMAESResult = None  # type: ignore[assignment,misc]
    fit_with_cmaes = None  # type: ignore[assignment]

# CPU threading configuration (FR-005, T026)
try:
    from homodyne.device.cpu import configure_cpu_threading

    HAS_CPU_CONFIG = True
except ImportError:
    HAS_CPU_CONFIG = False
    configure_cpu_threading = None  # type: ignore[assignment]

# Anti-degeneracy controller import (v2.16.1+)
try:
    from homodyne.optimization.nlsq.anti_degeneracy_controller import (
        AntiDegeneracyController,
    )

    HAS_ANTI_DEGENERACY = True
except ImportError:
    HAS_ANTI_DEGENERACY = False
    AntiDegeneracyController = None  # type: ignore[assignment,misc]

# Export NLSQ availability for tests and external code
NLSQ_AVAILABLE = HAS_NLSQ_WRAPPER and JAX_AVAILABLE

logger = get_logger(__name__)

# Default sigma used when experimental uncertainties are not provided.
# Applied at lines ~715 and ~1775 when creating placeholder sigma arrays.
# Used in auto-skip normalization to undo the 1/sigma^2 chi2 inflation.
_DEFAULT_SIGMA = 0.01


[docs] class NLSQResult: """Result container for NLSQ optimization compatible with FitResult."""
[docs] def __init__( self, parameters: dict[str, float], parameter_errors: dict[str, float], chi_squared: float, reduced_chi_squared: float, success: bool, message: str, n_iterations: int, optimization_time: float, method: str = "nlsq", ): self.parameters = parameters self.parameter_errors = parameter_errors self.chi_squared = chi_squared self.reduced_chi_squared = reduced_chi_squared self.success = success self.message = message self.n_iterations = n_iterations self.optimization_time = optimization_time self.method = method
[docs] @log_performance(threshold=1.0) def fit_nlsq_jax( data: dict[str, Any], config: ConfigManager, initial_params: dict[str, float] | None = None, per_angle_scaling: bool = True, # REQUIRED: per-angle is physically correct use_adapter: bool = False, # Experimental: use NLSQAdapter (CurveFit) instead of NLSQWrapper _skip_global_selection: bool = False, # Internal: skip global opt check (for fallback) ) -> OptimizationResult: """NLSQ trust-region nonlinear least squares optimization with per-angle scaling. Uses NLSQ package (github.com/imewei/NLSQ) for trust-region optimization. v2.11.0+: Experimental NLSQAdapter with CurveFit class available for improved JIT caching and automatic workflow selection. Set use_adapter=True to enable (default is False, uses NLSQWrapper). Primary optimization method implementing the scaled optimization process: c₂(φ,t₁,t₂) = 1 + contrast × [c₁(φ,t₁,t₂)]² Parameters ---------- data : dict XPCS experimental data. Accepts two formats: **Format 1 (CLI/loader format)**: - 'phi_angles_list': phi angle array (mapped to 'phi') - 'c2_exp': experimental correlation data (n_phi, n_t1, n_t2) (mapped to 'g2') - 't1': first delay time array - 't2': second delay time array - 'wavevector_q_list': q-vector array (first element extracted as scalar 'q') - 'sigma': (optional) uncertainty array, defaults to 0.01 * ones_like(g2) - 'L': (optional) stator-rotor gap (rheology) or sample-detector distance (standard XPCS), defaults to config value or 2000000 Å (200 µm, typical rheology-XPCS gap) - 'dt': (optional) time step, defaults to config value or None **Format 2 (Direct format)**: - 'phi': phi angle array - 'g2': experimental correlation data (n_phi, n_t1, n_t2) - 't1': first delay time array - 't2': second delay time array - 'q': wavevector magnitude (scalar) - 'sigma': (optional) uncertainty array - 'L': (optional) stator-rotor gap or sample-detector distance [Å] - 'dt': (optional) time step [s] config : ConfigManager Configuration manager with optimization settings initial_params : dict, optional Initial parameter guesses. If None, uses defaults from config. per_angle_scaling : bool, default=True MUST be True. Per-angle contrast/offset parameters are physically correct as each scattering angle has different optical properties and detector responses. Legacy scalar mode (False) is no longer supported (removed Nov 2025). use_adapter : bool, default=False EXPERIMENTAL (v2.11.0+): If True, use NLSQAdapter with NLSQ's CurveFit class for improved JIT caching and automatic workflow selection. If False (default), use the stable NLSQWrapper implementation. Notes ----- **Global Optimization Selection (v2.15.0+):** This function serves as the unified entry point for NLSQ optimization. When called, it first checks for global optimization methods: 1. If ``cmaes.enable: true`` → delegates to ``fit_nlsq_cmaes()`` 2. If ``multi_start.enable: true`` → delegates to ``fit_nlsq_multistart()`` 3. Otherwise → runs local trust-region optimization The CMA-ES function will fall back to multi-start (if enabled) when the scale ratio is below the threshold, implementing the full fallback chain. Returns ------- OptimizationResult Optimization result with parameters, uncertainties, and diagnostics Raises ------ ImportError If NLSQ package is not available ValueError If data validation fails """ # Determine which backend to use _use_adapter = use_adapter and HAS_NLSQ_ADAPTER if _use_adapter: logger.debug("Using NLSQAdapter (CurveFit class) for optimization") else: if use_adapter and not HAS_NLSQ_ADAPTER: logger.warning( "NLSQAdapter requested but not available, falling back to NLSQWrapper" ) if not HAS_NLSQ_WRAPPER: raise ImportError( "NLSQWrapper is required for NLSQ optimization. " "Ensure homodyne.optimization.nlsq_wrapper is available.", ) logger.info("=" * 60) logger.info("NLSQ OPTIMIZATION") logger.info("=" * 60) # Track whether sigma was provided or auto-generated (affects chi-squared interpretation) _sigma_is_default = isinstance(data, dict) and "sigma" not in data # ========================================================================== # Global Optimization Selection (v2.15.0+) # Priority: CMA-ES > Multi-Start > Local Optimization # ========================================================================== if not _skip_global_selection: # Handle both ConfigManager objects and plain dicts config_dict: dict[str, Any] = ( config.config if hasattr(config, "config") else config ) # type: ignore[assignment] nlsq_dict = config_dict.get("optimization", {}).get("nlsq", {}) # CMA-ES has highest priority (for multi-scale problems) cmaes_dict = nlsq_dict.get("cmaes", {}) if cmaes_dict.get("enable", False): if HAS_CMAES: logger.info("CMA-ES enabled, delegating to fit_nlsq_cmaes") cmaes_result = fit_nlsq_cmaes( data=data, config=config, initial_params=initial_params, per_angle_scaling=per_angle_scaling, ) cmaes_result.sigma_is_default = _sigma_is_default return cmaes_result else: logger.warning( "[CMA-ES] Enabled in config but not available (evosax not installed). " "Install with: pip install nlsq[evosax]. " "Falling back to multi-start or local optimization." ) # Multi-start is second priority multi_start_dict = nlsq_dict.get("multi_start", {}) if multi_start_dict.get("enable", False): if HAS_MULTISTART: logger.info("Multi-start enabled, delegating to fit_nlsq_multistart") multistart_result = fit_nlsq_multistart( data=data, config=config, initial_params=initial_params, per_angle_scaling=per_angle_scaling, ) ms_result = multistart_result.to_optimization_result() ms_result.sigma_is_default = _sigma_is_default return ms_result else: logger.warning( "[Multi-Start] Enabled in config but not available. " "Falling back to local optimization." ) logger.debug("No global optimization enabled, using local optimization") # Performance Optimization (Spec 001 - FR-005, T026): Configure CPU threading for HPC if HAS_CPU_CONFIG: try: cpu_config = configure_cpu_threading() logger.debug( f"CPU threading configured: {cpu_config.get('threads_configured', 'auto')} threads" ) except (OSError, RuntimeError, AttributeError) as e: logger.debug(f"CPU threading configuration skipped: {e}") # Determine analysis mode analysis_mode = _get_analysis_mode(config) logger.info(f"Analysis mode: {analysis_mode}") logger.info(f"Per-angle scaling: {per_angle_scaling}") # Set up initial parameters per_angle_scaling_initial: dict[str, list[float]] | None = None if initial_params is None: # Try to load from config first (pass data for contrast/offset estimation) initial_params_temp, per_angle_scaling_initial_temp = ( _load_initial_params_from_config(config, analysis_mode, data) ) initial_params = initial_params_temp per_angle_scaling_initial = per_angle_scaling_initial_temp if initial_params is None: # Fallback to defaults (estimate contrast/offset from data if available) initial_params = _get_default_initial_params(analysis_mode) if data is not None: contrast_est, offset_est = _estimate_contrast_offset_from_data(data) initial_params["contrast"] = contrast_est initial_params["offset"] = offset_est logger.info("Using default initial parameters") else: logger.info("Using initial parameters from configuration") else: # Make a copy so we don't mutate caller-provided dict initial_params = initial_params.copy() per_angle_scaling_initial_pop = initial_params.pop("per_angle_scaling", None) if isinstance(per_angle_scaling_initial_pop, dict): per_angle_scaling_initial = per_angle_scaling_initial_pop else: per_angle_scaling_initial = None # Convert initial params dict to array x0 = _params_to_array(initial_params, analysis_mode) # Set up parameter bounds # FIX: Use ParameterManager to load bounds from config (including custom user bounds) if HAS_PARAMETER_MANAGER: # Handle both ConfigManager objects and plain dicts if hasattr(config, "config"): config_dict_for_pm: Any = config.config # ConfigManager object else: config_dict_for_pm = config # Already a dict # Use ParameterManager to get bounds from config (properly loads custom bounds) param_manager = ParameterManager( config_dict=config_dict_for_pm, analysis_mode=analysis_mode ) param_names = _get_param_names(analysis_mode) bounds_list = param_manager.get_parameter_bounds(param_names) # Convert ParameterManager format (list of dicts) to _bounds_to_arrays format (dict of tuples) bounds_dict = {b["name"]: (b["min"], b["max"]) for b in bounds_list} else: # Fallback to ParameterSpace with hardcoded defaults (for backward compatibility) param_space = ParameterSpace() bounds_dict = _get_parameter_bounds(analysis_mode, param_space) lower_bounds, upper_bounds = _bounds_to_arrays(bounds_dict, analysis_mode) bounds = (lower_bounds, upper_bounds) # Convert data dict to object if needed (NLSQWrapper expects object attributes) data = _normalize_data_to_object(data, config, logger) diagnostics_enabled = _is_nlsq_diagnostics_enabled(config) shear_transform_cfg = _extract_shear_transform_config(config) # ========================================================================== # T021-T024: Fallback mechanism from NLSQAdapter to NLSQWrapper # ========================================================================== adapter_error: Exception | None = None fallback_occurred = False result: Any # Will be OptimizationResult from either adapter or wrapper # Create optimizer and run optimization if _use_adapter: # T021: Try NLSQAdapter first with CurveFit class (v2.11.0+) try: adapter_config = AdapterConfig( enable_cache=True, enable_jit=True, enable_recovery=True, enable_stability=True, goal="quality", # XPCS requires precision ) adapter = NLSQAdapter(config=adapter_config) logger.debug("Attempting optimization with NLSQAdapter") result = adapter.fit( data=data, config=config, initial_params=x0, # type: ignore[arg-type] bounds=bounds, # type: ignore[arg-type] analysis_mode=analysis_mode, per_angle_scaling=per_angle_scaling, diagnostics_enabled=diagnostics_enabled, shear_transforms=shear_transform_cfg, per_angle_scaling_initial=per_angle_scaling_initial, ) # T023: Add fallback_occurred to device_info (adapter succeeded) result.device_info["fallback_occurred"] = False result.device_info["fallback_reason"] = None logger.info("NLSQAdapter optimization succeeded") except ( ValueError, RuntimeError, TypeError, AttributeError, OSError, MemoryError, ) as e: # T022: Log WARNING when fallback occurs adapter_error = e logger.warning("NLSQAdapter failed, falling back to NLSQWrapper: %s", e) fallback_occurred = True # Fall through to wrapper below # Use NLSQWrapper if: (1) use_adapter=False, or (2) adapter failed if not _use_adapter or fallback_occurred: try: # Use legacy NLSQWrapper # Note: enable_recovery=True provides automatic error recovery wrapper = NLSQWrapper(enable_large_dataset=True, enable_recovery=True) logger.debug("Attempting optimization with NLSQWrapper") result = wrapper.fit( data=data, config=config, initial_params=x0, # type: ignore[arg-type] bounds=bounds, # type: ignore[arg-type] analysis_mode=analysis_mode, per_angle_scaling=per_angle_scaling, diagnostics_enabled=diagnostics_enabled, shear_transforms=shear_transform_cfg, per_angle_scaling_initial=per_angle_scaling_initial, ) # T023: Add fallback info to device_info result.device_info["adapter"] = "NLSQWrapper" result.device_info["fallback_occurred"] = fallback_occurred result.device_info["fallback_reason"] = ( str(adapter_error) if adapter_error else None ) if fallback_occurred: logger.info("NLSQWrapper fallback optimization succeeded") else: logger.info("NLSQWrapper optimization succeeded") except ( ValueError, RuntimeError, TypeError, AttributeError, OSError, MemoryError, ) as wrapper_error: # T024: Both adapter and wrapper failed - return failed result logger.error( "Both NLSQAdapter and NLSQWrapper failed: adapter=%s, wrapper=%s", adapter_error, wrapper_error, ) n_params = len(x0) result = OptimizationResult( parameters=np.asarray(x0), uncertainties=np.zeros(n_params), covariance=np.eye(n_params), chi_squared=float("inf"), reduced_chi_squared=float("inf"), convergence_status="failed", iterations=0, execution_time=0.0, device_info={ "device": "cpu", "adapter": "NLSQWrapper", "fallback_occurred": fallback_occurred, "fallback_reason": str(adapter_error) if adapter_error else None, "wrapper_error": str(wrapper_error), }, recovery_actions=[], quality_flag="poor", ) result.sigma_is_default = _sigma_is_default _log_optimization_results(result, analysis_mode, per_angle_scaling, logger) return result
def _log_optimization_results( result: Any, analysis_mode: str, per_angle_scaling: bool, logger: Any, ) -> None: """Log optimization results including parameters and uncertainties. Pure logging function — reads from result but does not modify state. """ logger.info("=" * 60) logger.info("NLSQ OPTIMIZATION COMPLETE") logger.info("=" * 60) logger.info(f"Status: {'SUCCESS' if result.success else 'FAILED'}") logger.info(f"Iterations: {result.iterations}") logger.info(f"Execution time: {result.execution_time:.3f}s") logger.info(f"chi2 = {result.chi_squared:.6e}") logger.info(f"Reduced chi2 = {result.reduced_chi_squared:.6f}") if hasattr(result, "parameters") and result.parameters is not None: physical_param_names = _get_physical_param_names(analysis_mode) n_physical = len(physical_param_names) n_params = len(result.parameters) if per_angle_scaling and n_params > (2 + n_physical): n_angles = (n_params - n_physical) // 2 logger.info(f"Fitted parameters (per-angle scaling: {n_angles} angles):") logger.info(" Physical parameters:") physical_start_idx = 2 * n_angles unc_array = ( result.uncertainties if hasattr(result, "uncertainties") else None ) unc_size = len(unc_array) if unc_array is not None else 0 if unc_array is not None and unc_size != n_params: logger.warning( f"Uncertainty array size mismatch: expected {n_params}, got {unc_size}. " "This may occur when Fourier covariance transformation failed." ) unc_array = None for i, name in enumerate(physical_param_names): idx = physical_start_idx + i param_val = result.parameters[idx] unc_val = ( unc_array[idx] if unc_array is not None and idx < len(unc_array) else 0.0 ) logger.info(f" {name}: {param_val:.6g} +/- {unc_val:.6g}") contrast_vals = result.parameters[:n_angles] offset_vals = result.parameters[n_angles : 2 * n_angles] logger.info( f" Mean scaling: contrast={np.nanmean(contrast_vals):.4f}, " f"offset={np.nanmean(offset_vals):.4f}" ) else: param_names = _get_param_names(analysis_mode) n_display = min(len(param_names), n_params) logger.info("Fitted parameters:") for i in range(n_display): param_val = result.parameters[i] unc_val = ( result.uncertainties[i] if hasattr(result, "uncertainties") and result.uncertainties is not None and i < len(result.uncertainties) else 0.0 ) logger.info(f" {param_names[i]}: {param_val:.6g} +/- {unc_val:.6g}") logger.info("=" * 60) def _normalize_data_to_object(data: Any, config: Any, logger: Any) -> Any: """Normalize data dict to object with expected attributes for NLSQWrapper. Handles: key mapping (CLI→internal names), q extraction from wavevector_q_list, sigma generation, 2D→1D time vector extraction, L loading, dt loading. If data is already an object (not a dict), only validates sigma. """ def _ensure_positive_sigma(obj: Any) -> None: if not hasattr(obj, "sigma"): return sigma_array = np.asarray(obj.sigma, dtype=np.float64) if not np.all(np.isfinite(sigma_array)): raise ValueError("sigma values must be finite") if np.any(sigma_array <= 0): raise ValueError("sigma values must be strictly positive") obj.sigma = sigma_array if isinstance(data, dict): class DataObject: pass data_obj = DataObject() # Map CLI data structure keys to NLSQWrapper expected names key_mapping = { "phi_angles_list": "phi", "c2_exp": "g2", } for key, value in data.items(): mapped_key = key_mapping.get(key, key) setattr(data_obj, mapped_key, value) # Extract scalar q from wavevector_q_list if present if hasattr(data_obj, "wavevector_q_list"): q_list = np.asarray(data_obj.wavevector_q_list) if q_list.size > 0: data_obj.q = float(q_list[0]) logger.debug(f"Extracted q = {data_obj.q:.6f} from wavevector_q_list") # Generate default sigma (uncertainty) if missing if not hasattr(data_obj, "sigma") and hasattr(data_obj, "g2"): g2_array = np.asarray(data_obj.g2) # type: ignore[attr-defined] data_obj.sigma = _DEFAULT_SIGMA * np.ones_like(g2_array) # type: ignore[attr-defined] data_obj.sigma_is_default = True # type: ignore[attr-defined] logger.debug(f"Generated default sigma: shape {data_obj.sigma.shape}") # type: ignore[attr-defined] else: data_obj.sigma_is_default = False # type: ignore[attr-defined] _ensure_positive_sigma(data_obj) # Extract 1D time vectors from 2D meshgrids if needed if hasattr(data_obj, "t1"): t1 = np.asarray(data_obj.t1) if t1.ndim == 2: data_obj.t1 = t1[:, 0] logger.debug( f"Extracted 1D t1 vector from 2D meshgrid: {t1.shape} -> {data_obj.t1.shape}", ) elif t1.ndim != 1: raise ValueError(f"t1 must be 1D or 2D array, got shape {t1.shape}") if hasattr(data_obj, "t2"): t2 = np.asarray(data_obj.t2) if t2.ndim == 2: data_obj.t2 = t2[0, :] logger.debug( f"Extracted 1D t2 vector from 2D meshgrid: {t2.shape} -> {data_obj.t2.shape}", ) elif t2.ndim != 1: raise ValueError(f"t2 must be 1D or 2D array, got shape {t2.shape}") # Get characteristic length L from config if not hasattr(data_obj, "L"): try: analyzer_params = config.config.get("analyzer_parameters", {}) # type: ignore[union-attr] geometry = analyzer_params.get("geometry", {}) if "stator_rotor_gap" in geometry: data_obj.L = float(geometry["stator_rotor_gap"]) # type: ignore[attr-defined] logger.debug( f"Using stator_rotor_gap L = {data_obj.L:.1f} AA (from config.analyzer_parameters.geometry)", # type: ignore[attr-defined] ) else: exp_config = config.config.get("experimental_data", {}) # type: ignore[union-attr] exp_geometry = exp_config.get("geometry", {}) if "stator_rotor_gap" in exp_geometry: data_obj.L = float(exp_geometry["stator_rotor_gap"]) # type: ignore[attr-defined] logger.debug( f"Using stator_rotor_gap L = {data_obj.L:.1f} AA (from config.experimental_data.geometry)", # type: ignore[attr-defined] ) elif "sample_detector_distance" in exp_config: data_obj.L = float(exp_config["sample_detector_distance"]) # type: ignore[attr-defined] logger.debug( f"Using sample_detector_distance L = {data_obj.L:.1f} AA (from config.experimental_data)", # type: ignore[attr-defined] ) else: data_obj.L = 2000000.0 # type: ignore[attr-defined] logger.warning( f"No L parameter found in config, using default L = {data_obj.L:.1f} AA (200 um, typical rheology-XPCS gap)", # type: ignore[attr-defined] ) except (AttributeError, TypeError, ValueError) as e: data_obj.L = 2000000.0 # type: ignore[attr-defined] logger.warning( f"Error reading L from config: {e}, using default L = {data_obj.L:.1f} AA (200 um)", # type: ignore[attr-defined] ) # Get time step dt from config if available if not hasattr(data_obj, "dt"): try: analyzer_params = config.config.get("analyzer_parameters", {}) # type: ignore[union-attr] dt_value = analyzer_params.get("dt") if dt_value is None: exp_config = config.config.get("experimental_data", {}) # type: ignore[union-attr] dt_value = exp_config.get("dt") if dt_value is not None: data_obj.dt = float(dt_value) # type: ignore[attr-defined] logger.debug(f"Using time step dt = {data_obj.dt:.6f} s") # type: ignore[attr-defined] except (AttributeError, TypeError, ValueError) as e: logger.warning(f"Error reading dt from config: {e}") return data_obj else: _ensure_positive_sigma(data) return data def _validate_data(data: dict[str, Any]) -> None: """Validate experimental data structure.""" required_keys = ["wavevector_q_list", "phi_angles_list", "t1", "t2", "c2_exp"] for key in required_keys: if key not in data: raise ValueError(f"Missing required data key: {key}") if data["c2_exp"].shape[0] == 0: raise ValueError("Empty experimental data") def _get_analysis_mode(config: ConfigManager) -> str: """Determine analysis mode from configuration.""" if hasattr(config, "config") and config.config: return config.config.get("analysis_mode", "static_isotropic") return "static_isotropic" def _is_nlsq_diagnostics_enabled(config: ConfigManager | dict[str, Any]) -> bool: """Return True if optimization.nlsq.diagnostics.enabled is truthy.""" config_dict: dict[str, Any] | None = None if hasattr(config, "config") and config.config: config_dict = config.config elif isinstance(config, dict): config_dict = config if not config_dict: return False return bool( config_dict.get("optimization", {}) .get("nlsq", {}) .get("diagnostics", {}) .get("enabled", False) ) def _extract_shear_transform_config( config: ConfigManager | dict[str, Any], ) -> dict[str, Any]: config_dict: dict[str, Any] | None = None if hasattr(config, "config") and config.config: config_dict = config.config elif isinstance(config, dict): config_dict = config if not config_dict: return {} return ( config_dict.get("optimization", {}).get("nlsq", {}).get("shear_transforms", {}) ) def _load_initial_params_from_config( config: ConfigManager, analysis_mode: str, data: dict[str, Any] | None = None, ) -> tuple[dict[str, float] | None, dict[str, list[float]] | None]: """Load initial parameters from configuration file. Handles parameter name mapping between config format and code format. Estimates contrast/offset from experimental data if not provided in config. Parameters ---------- config : ConfigManager Configuration manager with initial_parameters section analysis_mode : str Analysis mode (static_isotropic or laminar_flow) data : dict, optional Experimental data used to estimate contrast/offset if not in config Returns ------- dict or None Dictionary of initial parameters, or None if not found in config """ if not hasattr(config, "config") or not config.config: return None, None config_dict = config.config if "initial_parameters" not in config_dict: return None, None init_params = config_dict["initial_parameters"] if "parameter_names" not in init_params or "values" not in init_params: logger.warning( "Initial parameters in config missing 'parameter_names' or 'values'", ) return None, None names = init_params["parameter_names"] values = init_params["values"] if len(names) != len(values): logger.warning( f"Parameter name/value count mismatch: {len(names)} names, {len(values)} values", ) return None, None # Map config parameter names to code parameter names NAME_MAP = { "gamma_dot_0": "gamma_dot_t0", "gamma_dot_offset": "gamma_dot_t_offset", "phi_0": "phi0", "D0": "D0", "alpha": "alpha", "D_offset": "D_offset", "beta": "beta", } # Build parameter dictionary with name mapping params = {} for name, value in zip(names, values, strict=False): mapped_name = NAME_MAP.get(name, name) params[mapped_name] = float(value) # Add scaling parameters if missing # (config typically only includes physical parameters) # FIX (Nov 14, 2025): Use physically reasonable defaults instead of data estimation # PROBLEM: Data estimation from diagonal-corrected g2 gives wrong values # - Estimated: contrast~0.055, offset~1.003 (from percentile + max) # - Actual fitted: contrast~0.26, offset~0.77 (from previous successful runs) # - Mismatch causes optimization to get stuck in wrong parameter space # SOLUTION: Use typical XPCS values as defaults if "contrast" not in params or "offset" not in params: # Use typical homodyne XPCS values (empirically validated) contrast_default = 0.3 # Typical range [0.1, 0.5] for homodyne detection offset_default = 0.8 # Typical range [0.5, 1.0] for baseline if "contrast" not in params: params["contrast"] = contrast_default logger.info( f"Using default contrast={contrast_default:.3f} (typical homodyne XPCS)" ) if "offset" not in params: params["offset"] = offset_default logger.info( f"Using default offset={offset_default:.3f} (typical homodyne XPCS)" ) # Validate parameter count matches analysis mode expected_count = 5 if "static" in analysis_mode.lower() else 9 if len(params) != expected_count: logger.warning( f"Parameter count mismatch for {analysis_mode}: " f"got {len(params)}, expected {expected_count}", ) # Don't return None - let validation/clipping handle it per_angle_scaling: dict[str, list[float]] | None = None per_angle_cfg = init_params.get("per_angle_scaling") if isinstance(per_angle_cfg, dict): contrast_vals = per_angle_cfg.get("contrast") offset_vals = per_angle_cfg.get("offset") try: contrast_array = ( [float(x) for x in contrast_vals] if isinstance(contrast_vals, (list, tuple)) else None ) offset_array = ( [float(x) for x in offset_vals] if isinstance(offset_vals, (list, tuple)) else None ) except (TypeError, ValueError): contrast_array = offset_array = None if contrast_array and offset_array and len(contrast_array) == len(offset_array): per_angle_scaling = { "contrast": contrast_array, "offset": offset_array, } elif contrast_array or offset_array: logger.warning( "per_angle_scaling in initial_parameters must provide equal-length contrast/offset arrays; ignoring overrides", ) logger.debug(f"Loaded {len(params)} parameters from config: {list(params.keys())}") return params, per_angle_scaling def _estimate_contrast_offset_from_data( data: Any, ) -> tuple[float, float]: """Estimate contrast and offset from experimental g2 data. For XPCS correlation function: c₂(φ,t₁,t₂) = offset + contrast × [c₁(φ,t₁,t₂)]² Parameters ---------- data : dict or object Experimental data with 'g2' or 'c2_exp' key/attribute containing correlation data Returns ------- contrast : float Estimated contrast parameter (amplitude of correlations) offset : float Estimated offset parameter (baseline of g2) """ # Extract g2 data (try multiple possible key names) # Note: Cannot use `or` operator with numpy arrays as it evaluates truth value # Support both dict-like and object-like data access if isinstance(data, dict): g2 = data.get("g2") if g2 is None: g2 = data.get("c2_exp") else: g2 = getattr(data, "g2", None) if g2 is None: g2 = getattr(data, "c2_exp", None) if g2 is None: logger.warning( "Could not estimate contrast/offset: no 'g2' or 'c2_exp' in data. " "Using generic defaults (0.5, 1.0)" ) return 0.5, 1.0 # Convert to numpy array if needed g2_array = np.asarray(g2) # Estimate offset from baseline (5th percentile to avoid outliers) offset_est = float(np.nanpercentile(g2_array, 5)) # Estimate contrast from amplitude (max - baseline) # For c2 = offset + contrast * [c1]^2, max occurs at c1^2=1 max_g2 = float(np.nanmax(g2_array)) contrast_est = max_g2 - offset_est # Sanity checks if contrast_est <= 0 or offset_est <= 0: logger.warning( f"Invalid estimated contrast={contrast_est:.3f} or offset={offset_est:.3f}. " f"Using generic defaults (0.5, 1.0)" ) return 0.5, 1.0 logger.info( f"Estimated scaling parameters from data: " f"contrast={contrast_est:.4f}, offset={offset_est:.4f} " f"(g2 range: [{np.nanmin(g2_array):.4f}, {np.nanmax(g2_array):.4f}])" ) return contrast_est, offset_est def _get_default_initial_params(analysis_mode: str) -> dict[str, float]: """Get default initial parameters for analysis mode. NOTE: This function provides generic physical parameter defaults. Contrast and offset should be estimated from experimental data using _estimate_contrast_offset_from_data() before calling this function. """ # Static isotropic mode (3 parameters) if "static" in analysis_mode.lower(): return { "contrast": 0.5, # Generic default - should be replaced with data estimate "offset": 1.0, # Generic default - should be replaced with data estimate "D0": 10000.0, "alpha": -1.5, "D_offset": 0.0, } # Laminar flow mode (7 parameters) else: return { "contrast": 0.5, # Generic default - should be replaced with data estimate "offset": 1.0, # Generic default - should be replaced with data estimate "D0": 10000.0, "alpha": -1.5, "D_offset": 0.0, "gamma_dot_t0": 0.001, "beta": 0.0, "gamma_dot_t_offset": 0.0, "phi0": 0.0, } def _get_parameter_bounds( analysis_mode: str, param_space: ParameterSpace, ) -> dict[str, tuple[float, float]]: """Get parameter bounds for analysis mode.""" bounds = { "contrast": param_space.contrast_bounds, "offset": param_space.offset_bounds, "D0": param_space.D0_bounds, "alpha": param_space.alpha_bounds, "D_offset": param_space.D_offset_bounds, } if "laminar" in analysis_mode.lower(): bounds.update( { "gamma_dot_t0": param_space.gamma_dot_t0_bounds, "beta": param_space.beta_bounds, "gamma_dot_t_offset": param_space.gamma_dot_t_offset_bounds, "phi0": param_space.phi0_bounds, }, ) return bounds def _get_param_names(analysis_mode: str) -> list[str]: """Get parameter names for a given analysis mode. Parameters ---------- analysis_mode : str Analysis mode (e.g., 'static', 'laminar_flow') Returns ------- list[str] List of parameter names in the order they appear in the parameter array """ if "static" in analysis_mode.lower(): return ["contrast", "offset", "D0", "alpha", "D_offset"] else: return [ "contrast", "offset", "D0", "alpha", "D_offset", "gamma_dot_t0", "beta", "gamma_dot_t_offset", "phi0", ] def _get_physical_param_names(analysis_mode: str) -> list[str]: """Get physical parameter names for a given analysis mode. Unlike _get_param_names, this excludes scaling parameters (contrast, offset) and returns only the physical parameters. Parameters ---------- analysis_mode : str Analysis mode (e.g., 'static', 'laminar_flow') Returns ------- list[str] List of physical parameter names """ if "static" in analysis_mode.lower(): return ["D0", "alpha", "D_offset"] else: return [ "D0", "alpha", "D_offset", "gamma_dot_t0", "beta", "gamma_dot_t_offset", "phi0", ] def _params_to_array(params: dict[str, float], analysis_mode: str) -> jnp.ndarray: """Convert parameter dictionary to array.""" if "static" in analysis_mode.lower(): return jnp.array( [ params["contrast"], params["offset"], params["D0"], params["alpha"], params["D_offset"], ], ) else: return jnp.array( [ params["contrast"], params["offset"], params["D0"], params["alpha"], params["D_offset"], params["gamma_dot_t0"], params["beta"], params["gamma_dot_t_offset"], params["phi0"], ], ) def _array_to_params(array: jnp.ndarray, analysis_mode: str) -> dict[str, Any]: """Convert parameter array to dictionary. Returns JAX arrays as-is to avoid tracing issues. Conversion to Python floats should only happen at the final step. """ if "static" in analysis_mode.lower(): return { "contrast": array[0], "offset": array[1], "D0": array[2], "alpha": array[3], "D_offset": array[4], } else: return { "contrast": array[0], "offset": array[1], "D0": array[2], "alpha": array[3], "D_offset": array[4], "gamma_dot_t0": array[5], "beta": array[6], "gamma_dot_t_offset": array[7], "phi0": array[8], } def _bounds_to_arrays( bounds: dict[str, tuple[float, float]], analysis_mode: str, ) -> tuple[jnp.ndarray, jnp.ndarray]: """Convert bounds dictionary to lower/upper bound arrays.""" if "static" in analysis_mode.lower(): param_order = ["contrast", "offset", "D0", "alpha", "D_offset"] else: param_order = [ "contrast", "offset", "D0", "alpha", "D_offset", "gamma_dot_t0", "beta", "gamma_dot_t_offset", "phi0", ] lower = jnp.array([bounds[key][0] for key in param_order]) upper = jnp.array([bounds[key][1] for key in param_order]) return lower, upper def _get_optimizer_config(config: ConfigManager) -> dict[str, Any]: """Get NLSQ optimizer configuration from config.""" default_config = { "method": "levenberg_marquardt", "max_iterations": 10000, "tolerance": 1e-8, "verbose": False, } if hasattr(config, "config") and config.config: lsq_config = config.config.get("optimization", {}).get("lsq", {}) default_config.update(lsq_config) return default_config def _check_convergence(result: Any) -> bool: """Check if NLSQ optimization converged.""" return getattr(result, "success", False) def _get_optimization_message(result: Any) -> str: """Get optimization status message from NLSQ result.""" if hasattr(result, "result_flag") and result.result_flag is not None: return str(result.result_flag) elif result.success: return "Optimization converged successfully" else: return "Optimization failed to converge" def _get_iteration_count(result: Any) -> int: """Get iteration count from NLSQ result.""" if hasattr(result, "stats") and "num_steps" in result.stats: return int(result.stats["num_steps"]) return 0 # ============================================================================= # Multi-Start Optimization Entry Point (v2.6.0) # ============================================================================= class _SingleFitWorker: """Picklable worker class for parallel multi-start optimization. This class encapsulates the state needed for single NLSQ fits, making it suitable for use with ProcessPoolExecutor. The key is storing only picklable data (dict, str, bool) rather than complex objects like ConfigManager. The ConfigManager is reconstructed in the worker process using config_override, which avoids pickle issues with loggers, file handles, and cached objects. Attributes ---------- config_dict : dict Raw configuration dictionary (picklable). config_file : str Path to the original config file. per_angle_scaling : bool Whether to use per-angle contrast/offset scaling. analysis_mode : str Analysis mode ("laminar_flow" or "static"). """ def __init__( self, config: Any, # ConfigManager per_angle_scaling: bool, analysis_mode: str, ) -> None: # Extract picklable data from ConfigManager # This avoids pickle issues with loggers, file handles, etc. self.config_dict = config.config.copy() # Raw dict is picklable self.config_file = config.config_file # str is picklable self.per_angle_scaling = per_angle_scaling self.analysis_mode = analysis_mode def __call__( self, fit_data: dict[str, Any], start_params: np.ndarray ) -> SingleStartResult: """Run a single NLSQ fit. Parameters ---------- fit_data : dict XPCS data dictionary. start_params : np.ndarray Starting parameter values as array. Returns ------- SingleStartResult Result from this optimization run. """ import time start_time = time.perf_counter() # Reconstruct ConfigManager in the worker process # Using config_override avoids file I/O and is faster from homodyne.config.manager import ConfigManager config = ConfigManager( config_file=self.config_file, config_override=self.config_dict, ) # Convert array to dict param_names = _get_param_names(self.analysis_mode) params_dict = { name: float(start_params[i]) for i, name in enumerate(param_names) } try: result = fit_nlsq_jax( data=fit_data, config=config, initial_params=params_dict, per_angle_scaling=self.per_angle_scaling, _skip_global_selection=True, # Prevent recursion from multistart ) return SingleStartResult( start_idx=0, initial_params=start_params, final_params=np.array(result.parameters), chi_squared=result.chi_squared, reduced_chi_squared=result.reduced_chi_squared, success=result.success, status=0, message=result.message, n_iterations=result.iterations, n_fev=result.iterations, wall_time=time.perf_counter() - start_time, covariance=result.covariance if hasattr(result, "covariance") else None, ) except (ValueError, RuntimeError, TypeError, OSError) as e: return SingleStartResult( start_idx=0, initial_params=start_params, final_params=start_params, chi_squared=np.inf, success=False, message=str(e), wall_time=time.perf_counter() - start_time, )
[docs] @log_performance(threshold=1.0) def fit_nlsq_multistart( data: dict[str, Any], config: ConfigManager, initial_params: dict[str, float] | None = None, per_angle_scaling: bool = True, ) -> MultiStartResult: """Multi-start NLSQ optimization with Latin Hypercube Sampling. This function explores the parameter space using Latin Hypercube Sampling to avoid local minima. FULL strategy is always used regardless of dataset size - numerical precision and reproducibility take priority over speed. NOTE: Subsampling is explicitly NOT supported per project requirements. Parameters ---------- data : dict[str, Any] XPCS experimental data with keys: - wavevector_q_list: Q-vector values - phi_angles_list: Azimuthal angles - t1, t2: Time coordinates - c2_exp: Experimental g2 correlation data - sigma (optional): Error weights config : ConfigManager Configuration manager with optimization.nlsq.multi_start settings. initial_params : dict[str, float], optional Initial parameter guess. If provided, included as one of the starts. per_angle_scaling : bool Whether to use per-angle contrast/offset scaling. Default: True. Returns ------- MultiStartResult Aggregated results including: - best: Best result by chi-squared - all_results: All optimization attempts - strategy_used: "full" (only supported strategy) - n_unique_basins: Number of distinct local minima found - degeneracy_detected: Whether parameter degeneracy was detected Raises ------ ImportError If multi-start module is not available. ValueError If multi-start is not enabled in configuration. Examples -------- >>> config = ConfigManager("config.yaml") >>> # Ensure multi_start.enable: true in config >>> result = fit_nlsq_multistart(data, config) >>> print(f"Best chi2: {result.best.chi_squared:.4g}") >>> print(f"Strategy used: {result.strategy_used}") >>> if result.degeneracy_detected: ... print(f"Warning: {result.n_unique_basins} distinct basins found") """ if not HAS_MULTISTART: raise ImportError( "Multi-start optimization requires homodyne.optimization.nlsq.multistart. " "Ensure the multistart module is properly installed." ) if not HAS_NLSQ_WRAPPER: raise ImportError("NLSQWrapper is required for multi-start optimization") # Extract multi-start config nlsq_dict = config.config.get("optimization", {}).get("nlsq", {}) multi_start_dict = nlsq_dict.get("multi_start", {}) if not multi_start_dict.get("enable", False): raise ValueError( "Multi-start optimization is not enabled. " "Set optimization.nlsq.multi_start.enable: true in config." ) from homodyne.optimization.nlsq.config import NLSQConfig nlsq_config = NLSQConfig.from_dict(nlsq_dict) ms_config = MultiStartConfig.from_nlsq_config(nlsq_config) # Validate data _validate_data(data) # Get analysis mode and parameter setup analysis_mode = _get_analysis_mode(config) param_space = ParameterSpace() if HAS_CORE_MODULES else None # Load initial_params from config if not provided as argument # CRITICAL: User's initial parameters must be included in multi-start to ensure # the known-good solution is explored, especially for laminar_flow mode where # LHS starting points may not converge to the correct physical parameters if initial_params is None: initial_params, _ = _load_initial_params_from_config( config, analysis_mode, data ) if initial_params is not None: logger.info( "Loaded initial parameters from configuration for multi-start optimization" ) # Get bounds if HAS_PARAMETER_MANAGER: param_manager = ParameterManager(config.config, analysis_mode=analysis_mode) bounds_list = param_manager.get_parameter_bounds() bounds_dict = {b["name"]: (b["min"], b["max"]) for b in bounds_list} else: bounds_dict = _get_parameter_bounds(analysis_mode, param_space) lower_bounds, upper_bounds = _bounds_to_arrays(bounds_dict, analysis_mode) bounds_array = np.column_stack([lower_bounds, upper_bounds]) # Create picklable single fit worker (replaces closure-based function) # This enables parallel execution with ProcessPoolExecutor single_fit_func = _SingleFitWorker( config=config, per_angle_scaling=per_angle_scaling, analysis_mode=analysis_mode, ) # Create cost function for screening def cost_func(params: np.ndarray) -> float: """Quick cost evaluation for screening. Uses a heuristic based on distance from bounds center rather than full residual evaluation for efficiency during screening phase. """ try: # Check if params are at bounds (return large cost) for i, (low, high) in enumerate( zip(lower_bounds, upper_bounds, strict=True) ): if params[i] <= low or params[i] >= high: return 1e20 # Approximate cost from parameter distance to center center = (lower_bounds + upper_bounds) / 2 scale = upper_bounds - lower_bounds normalized_dist = np.sum(((params - center) / scale) ** 2) return normalized_dist except (ValueError, IndexError, TypeError, FloatingPointError): return 1e20 # Prepare custom_starts with user's initial parameters (if provided) custom_starts = None if initial_params is not None: # Convert initial_params dict to array in correct order param_names = _get_param_names(analysis_mode) initial_array = np.array([initial_params[name] for name in param_names]) custom_starts = [initial_array.tolist()] logger.info("Including user-specified initial parameters as custom start point") # Run multi-start optimization logger.info( f"Starting multi-start NLSQ with {ms_config.n_starts} starts, " f"strategy will be auto-selected based on dataset size" ) result = run_multistart_nlsq( data=data, bounds=bounds_array, config=ms_config, single_fit_func=single_fit_func, cost_func=cost_func if ms_config.use_screening else None, custom_starts=custom_starts, ) logger.info( f"Multi-start complete: strategy={result.strategy_used}, " f"best chi2={result.best.chi_squared:.4g}, " f"basins={result.n_unique_basins}" ) return result
[docs] @log_performance(threshold=1.0) def fit_nlsq_cmaes( data: dict[str, Any], config: ConfigManager, initial_params: dict[str, float] | None = None, per_angle_scaling: bool = True, ) -> OptimizationResult: """CMA-ES global optimization for multi-scale parameter problems. Uses NLSQ's CMAESOptimizer with evosax backend for global optimization. Particularly beneficial for laminar_flow mode where parameters have vastly different scales (e.g., D₀ ~ 1e4 vs γ̇₀ ~ 1e-3, scale ratio > 1e7). Features: - Covariance Matrix Adaptation for multi-scale parameters - BIPOP restart strategy for robust convergence - Memory batching/streaming for large datasets - Optional L-M refinement of CMA-ES solution Parameters ---------- data : dict[str, Any] XPCS experimental data (same format as fit_nlsq_jax). config : ConfigManager Configuration manager with optimization.nlsq.cmaes settings. initial_params : dict[str, float], optional Initial parameter guess. Used as CMA-ES starting point. per_angle_scaling : bool Whether to use per-angle contrast/offset scaling. Default: True. Returns ------- OptimizationResult Optimization result with parameters, uncertainties, and diagnostics. Raises ------ ImportError If CMA-ES is not available (requires NLSQ 0.6.4+ with evosax). ValueError If CMA-ES is not enabled in configuration. Examples -------- >>> config = ConfigManager("config.yaml") >>> # Ensure cmaes.enable: true in config >>> result = fit_nlsq_cmaes(data, config) >>> print(f"Chi2: {result.chi_squared:.4e}") >>> print(f"Method: {result.device_info['method']}") """ import time if not HAS_CMAES: raise ImportError( "CMA-ES requires NLSQ 0.6.4+ with evosax backend. " "Install with: pip install nlsq[evosax]" ) # Extract CMA-ES config nlsq_dict = config.config.get("optimization", {}).get("nlsq", {}) cmaes_dict = nlsq_dict.get("cmaes", {}) if not cmaes_dict.get("enable", False): raise ValueError( "CMA-ES optimization is not enabled. " "Set optimization.nlsq.cmaes.enable: true in config." ) from homodyne.optimization.nlsq.config import NLSQConfig nlsq_config = NLSQConfig.from_dict(nlsq_dict) # Validate data _validate_data(data) # Get analysis mode and parameter setup analysis_mode = _get_analysis_mode(config) param_space = ParameterSpace() if HAS_CORE_MODULES else None logger.info("=" * 60) logger.info("CMA-ES GLOBAL OPTIMIZATION") logger.info("=" * 60) logger.info(f"Analysis mode: {analysis_mode}") logger.info(f"Preset: {nlsq_config.cmaes_preset}") # Set up initial parameters if initial_params is None: initial_params, _ = _load_initial_params_from_config( config, analysis_mode, data ) if initial_params is None: initial_params = _get_default_initial_params(analysis_mode) logger.info("Using default initial parameters") else: logger.info("Using initial parameters from configuration") # Convert initial params to array x0 = _params_to_array(initial_params, analysis_mode) # Get bounds if HAS_PARAMETER_MANAGER: param_manager = ParameterManager(config.config, analysis_mode=analysis_mode) bounds_list = param_manager.get_parameter_bounds() bounds_dict = {b["name"]: (b["min"], b["max"]) for b in bounds_list} else: bounds_dict = _get_parameter_bounds(analysis_mode, param_space) lower_bounds, upper_bounds = _bounds_to_arrays(bounds_dict, analysis_mode) bounds = (lower_bounds, upper_bounds) # Create CMA-ES wrapper config cmaes_config = CMAESWrapperConfig.from_nlsq_config(nlsq_config) # Create CMA-ES wrapper and check if we should use it wrapper = CMAESWrapper(cmaes_config) if nlsq_config.cmaes_auto_select: if not wrapper.should_use_cmaes(bounds, nlsq_config.cmaes_scale_threshold): # Scale ratio too low for CMA-ES, check fallback options # Fall back to multi-start if enabled and available, otherwise local optimization multi_start_dict = nlsq_dict.get("multi_start", {}) if multi_start_dict.get("enable", False) and HAS_MULTISTART: logger.info( f"[CMA-ES] Scale ratio < {nlsq_config.cmaes_scale_threshold}, " "falling back to multi-start optimization" ) result = fit_nlsq_multistart( data=data, config=config, initial_params=initial_params, per_angle_scaling=per_angle_scaling, ) return result.to_optimization_result() else: logger.info( f"[CMA-ES] Scale ratio < {nlsq_config.cmaes_scale_threshold}, " "falling back to local NLSQ optimization" ) # Use _skip_global_selection=True to avoid infinite loop return fit_nlsq_jax( data=data, config=config, initial_params=initial_params, per_angle_scaling=per_angle_scaling, _skip_global_selection=True, ) # Prepare data arrays for CMA-ES # Need to build model function and flatten data start_time = time.time() try: # Use adapter's model building infrastructure from homodyne.optimization.nlsq.adapter import get_or_create_model # Prepare phi angles phi_key = "phi_angles_list" if "phi_angles_list" in data else "phi" phi_angles = np.asarray(data[phi_key]) n_phi = len(phi_angles) # Get q value if "wavevector_q_list" in data: q = float(np.asarray(data["wavevector_q_list"])[0]) else: q = float(data.get("q", 0.01)) # Get model and function model, model_func, cache_hit = get_or_create_model( analysis_mode=analysis_mode, phi_angles=phi_angles, q=q, per_angle_scaling=per_angle_scaling, enable_jit=True, ) logger.debug(f"Model cache {'hit' if cache_hit else 'miss'}") # Prepare data arrays t1_key = "t1" t2_key = "t2" g2_key = "c2_exp" if "c2_exp" in data else "g2" t1 = np.asarray(data[t1_key]) t2 = np.asarray(data[t2_key]) g2 = np.asarray(data[g2_key]) # Handle 2D meshgrids if t1.ndim == 2: t1 = t1[:, 0] if t2.ndim == 2: t2 = t2[0, :] # Get sigma (uncertainty) _sigma_is_default = "sigma" not in data if not _sigma_is_default: sigma = np.asarray(data["sigma"]) else: sigma = _DEFAULT_SIGMA * np.ones_like(g2) # Flatten data for CMA-ES # Flatten g2 and sigma ydata = g2.flatten() sigma_flat = sigma.flatten() n_data = len(ydata) logger.info(f"Data points: {n_data:,}") # Number of physical parameters n_physical = len(_get_physical_param_names(analysis_mode)) # ========================================================================== # ANTI-DEGENERACY INTEGRATION (v2.18.0+) # ========================================================================== # For laminar_flow mode with per-angle scaling, the parameter space can be # degenerate: per-angle contrast/offset can absorb shear signals. # # Two modes: # - auto_averaged: Compute N quantile estimates, AVERAGE to 1 contrast + 1 offset, # OPTIMIZE these 2 along with 7 physical = 9 params # - fixed_constant: Compute N quantile estimates, use per-angle values DIRECTLY # as FIXED scaling, OPTIMIZE only 7 physical params # ========================================================================== use_constant_mode = False use_fixed_scaling = False use_averaged_scaling = False ad_controller = None is_laminar_flow = analysis_mode == "laminar_flow" fixed_contrast_per_angle = None fixed_offset_per_angle = None if HAS_ANTI_DEGENERACY and per_angle_scaling and is_laminar_flow: # Load anti-degeneracy config anti_degeneracy_config = nlsq_dict.get("anti_degeneracy", {}) if anti_degeneracy_config: phi_unique_rad = np.deg2rad(phi_angles) ad_controller = AntiDegeneracyController.from_config( config_dict=anti_degeneracy_config, n_phi=n_phi, phi_angles=phi_unique_rad, n_physical=n_physical, per_angle_scaling=per_angle_scaling, is_laminar_flow=is_laminar_flow, ) if ad_controller.is_enabled and ad_controller.use_constant: use_constant_mode = True # v2.18.0: Distinguish between fixed_constant and auto_averaged use_fixed_scaling = ad_controller.use_fixed_scaling use_averaged_scaling = ad_controller.use_averaged_scaling if use_fixed_scaling: logger.info("=" * 60) logger.info( "ANTI-DEGENERACY: Enabled for CMA-ES (Fixed Constant Mode)" ) logger.info( f" Quantile estimation: N={n_phi} contrast + N={n_phi} offset values" ) logger.info(" Per-angle values: FIXED (not optimized)") logger.info(" Total parameters: 7 physical only") logger.info("=" * 60) else: logger.info("=" * 60) logger.info( "ANTI-DEGENERACY: Enabled for CMA-ES (Auto Averaged Mode)" ) logger.info( f" Quantile estimation: N={n_phi} contrast + N={n_phi} offset values" ) logger.info(" Averaged to: 1 contrast + 1 offset (OPTIMIZED)") logger.info( " Total parameters: 7 physical + 2 averaged scaling = 9" ) logger.info("=" * 60) # Handle parameter expansion based on anti-degeneracy mode if per_angle_scaling and not use_constant_mode: # Standard behavior: expand to per-angle parameters (13 params for n_phi=3) if len(x0) == 2 + n_physical: from homodyne.optimization.nlsq.data_prep import ( expand_per_angle_parameters, ) expanded = expand_per_angle_parameters( x0, bounds, n_phi, n_physical, logger=logger, ) x0 = expanded.params bounds = expanded.bounds lower_bounds, upper_bounds = bounds effective_per_angle_scaling = True elif use_constant_mode: # CONSTANT MODE (v2.18.0+): Compute per-angle scaling from quantiles # - use_fixed_scaling: per-angle values FIXED, optimize 7 physical only # - use_averaged_scaling: average to 2 values, optimize 9 params from homodyne.optimization.nlsq.parameter_utils import ( compute_quantile_per_angle_scaling, ) logger.info("=" * 60) logger.info("CONSTANT MODE: Computing per-angle scaling from quantiles") logger.info("=" * 60) # Create stratified data structure for quantile computation # Build flat arrays for all phi angles t1_mesh_temp, t2_mesh_temp = np.meshgrid(t1, t2, indexing="ij") n_time_points_temp = t1_mesh_temp.size # Build arrays with phi information g2_flat_all = [] t1_flat_all = [] t2_flat_all = [] phi_flat_all = [] for i_phi, phi_val in enumerate(phi_angles): # g2 shape is (n_phi, n_t1, n_t2) g2_slice = g2[i_phi].flatten() g2_flat_all.append(g2_slice) t1_flat_all.append(t1_mesh_temp.flatten()) t2_flat_all.append(t2_mesh_temp.flatten()) phi_flat_all.append(np.full(n_time_points_temp, phi_val)) # Create simple data container for quantile estimation class SimpleStratifiedData: def __init__( self, g2_flat: Any, phi_flat: Any, t1_flat: Any, t2_flat: Any ) -> None: self.g2_flat = g2_flat self.phi_flat = phi_flat self.t1_flat = t1_flat self.t2_flat = t2_flat stratified_for_quantile = SimpleStratifiedData( g2_flat=np.concatenate(g2_flat_all), phi_flat=np.concatenate(phi_flat_all), t1_flat=np.concatenate(t1_flat_all), t2_flat=np.concatenate(t2_flat_all), ) # Get contrast/offset bounds from initial bounds contrast_bounds = (float(lower_bounds[0]), float(upper_bounds[0])) offset_bounds = (float(lower_bounds[1]), float(upper_bounds[1])) # Compute per-angle scaling from quantiles fixed_contrast_per_angle, fixed_offset_per_angle = ( compute_quantile_per_angle_scaling( stratified_data=stratified_for_quantile, contrast_bounds=contrast_bounds, offset_bounds=offset_bounds, logger=logger, ) ) logger.info( f"Per-angle scaling computed:\n" f" Contrast: mean={np.nanmean(fixed_contrast_per_angle):.4f}, " f"range=[{np.nanmin(fixed_contrast_per_angle):.4f}, {np.nanmax(fixed_contrast_per_angle):.4f}]\n" f" Offset: mean={np.nanmean(fixed_offset_per_angle):.4f}, " f"range=[{np.nanmin(fixed_offset_per_angle):.4f}, {np.nanmax(fixed_offset_per_angle):.4f}]" ) if use_fixed_scaling: # FIXED CONSTANT MODE: Use per-angle values DIRECTLY as FIXED # Optimize only 7 physical parameters logger.info("Fixed constant mode: per-angle scaling will be FIXED") # Extract physical parameters only from x0 if len(x0) == 2 + n_physical: # [contrast, offset, physical] format - extract physical only physical_params = x0[2:] x0 = physical_params.copy() logger.info(f"Reduced to physical params only: {len(x0)} params") elif len(x0) == 2 * n_phi + n_physical: # Per-angle format - extract physical only physical_params = x0[2 * n_phi :] x0 = physical_params.copy() logger.info(f"Reduced per-angle to physical only: {len(x0)} params") elif len(x0) == n_physical: # Already physical only logger.info(f"Already physical params only: {len(x0)} params") # Update bounds for 7-parameter format: [*physical] if len(lower_bounds) == 2 + n_physical: lower_bounds = lower_bounds[2:] upper_bounds = upper_bounds[2:] elif len(lower_bounds) == 2 * n_phi + n_physical: lower_bounds = lower_bounds[2 * n_phi :] upper_bounds = upper_bounds[2 * n_phi :] bounds = (lower_bounds, upper_bounds) logger.info( f"CMA-ES using fixed constant mode: {len(x0)} parameters (7 physical only)" ) else: # AUTO AVERAGED MODE: Average to 2 values, optimize 9 params avg_contrast = float(np.nanmean(fixed_contrast_per_angle)) avg_offset = float(np.nanmean(fixed_offset_per_angle)) logger.info( f"Auto averaged mode: scaling averaged to contrast={avg_contrast:.4f}, offset={avg_offset:.4f}" ) # Build 9-parameter initial guess: [contrast_avg, offset_avg, *physical] if len(x0) == 2 + n_physical: # Already in [contrast, offset, physical] format physical_params = x0[2:] x0 = np.concatenate([[avg_contrast], [avg_offset], physical_params]) logger.info( f"Using averaged quantile estimates for scaling: contrast={avg_contrast:.4f}, offset={avg_offset:.4f}" ) elif len(x0) == 2 * n_phi + n_physical: # Per-angle format: reduce to [contrast_avg, offset_avg, physical] physical_params = x0[2 * n_phi :] x0 = np.concatenate([[avg_contrast], [avg_offset], physical_params]) logger.info(f"Reduced per-angle to averaged: {len(x0)} params") elif len(x0) == n_physical: # Physical only: prepend averaged scaling x0 = np.concatenate([[avg_contrast], [avg_offset], x0]) logger.info(f"Prepended averaged scaling: {len(x0)} params") # Update bounds for 9-parameter format: [contrast, offset, *physical] if len(lower_bounds) == 2 + n_physical: # Already correct format pass elif len(lower_bounds) == 2 * n_phi + n_physical: # Per-angle format: reduce to single scaling bounds lower_bounds = np.concatenate( [ [lower_bounds[0]], # Single contrast bound [lower_bounds[n_phi]], # Single offset bound lower_bounds[2 * n_phi :], # Physical bounds ] ) upper_bounds = np.concatenate( [ [upper_bounds[0]], [upper_bounds[n_phi]], upper_bounds[2 * n_phi :], ] ) bounds = (lower_bounds, upper_bounds) logger.info( f"CMA-ES using auto averaged mode: {len(x0)} parameters (9 = 7 physical + 2 averaged)" ) effective_per_angle_scaling = False else: effective_per_angle_scaling = False # Create wrapped model function for CMA-ES # IMPORTANT: This function must be JAX-traceable for CMA-ES JIT compilation # Use JAX operations throughout to avoid TracerArrayConversionError t1_mesh, t2_mesh = np.meshgrid(t1, t2, indexing="ij") # Diagonal filtering for CMA-ES (v2.19.0: configurable) # At t1==t2, the experimental g2 has a diagonal correction applied at load # time but the CMA-ES theory function does not apply this correction, # creating systematic residual mismatch. Two approaches: # - "remove" (default): Filter diagonal points from data (clean, ~0.1% data loss) # - "none": Keep all points (matches stratified LS point count, but residual # mismatch at diagonal persists for theory values) diagonal_mode = cmaes_dict.get("diagonal_filtering", "remove") idx1, idx2 = np.meshgrid(np.arange(len(t1)), np.arange(len(t2)), indexing="ij") if diagonal_mode == "remove": non_diag_single = (idx1 != idx2).flatten() non_diag_all = np.tile(non_diag_single, n_phi) n_before_diag = len(ydata) ydata = ydata[non_diag_all] sigma_flat = sigma_flat[non_diag_all] n_data = len(ydata) n_diag_removed = n_before_diag - n_data logger.info( f"Diagonal filtering: removed {n_diag_removed:,} points " f"({100 * n_diag_removed / n_before_diag:.1f}%)" ) else: non_diag_single = np.ones(idx1.size, dtype=bool) non_diag_all = np.ones(len(ydata), dtype=bool) logger.info( f"Diagonal filtering: disabled (mode={diagonal_mode!r}), " f"keeping all {len(ydata):,} points" ) # Pre-compute data arrays as JAX arrays for efficiency (post diagonal filter) t1_flat_np = t1_mesh.flatten()[non_diag_single] t2_flat_np = t2_mesh.flatten()[non_diag_single] n_time_points = len(t1_flat_np) t1_flat = jnp.array(t1_flat_np) t2_flat = jnp.array(t2_flat_np) # Build phi indices: each time grid repeats for all phi angles # phi_indices[k] gives which phi angle point k belongs to phi_indices = jnp.repeat(jnp.arange(n_phi), n_time_points) # Build phi values for each point phi_values = jnp.array(phi_angles)[phi_indices] # Tile time arrays for all phi angles t1_all = jnp.tile(t1_flat, n_phi) t2_all = jnp.tile(t2_flat, n_phi) # Get dt and L from config if available config_dict = config.config if hasattr(config, "config") else config dt_val = config_dict.get("analyzer_parameters", {}).get("dt", 0.1) # Get L from config (stator_rotor_gap or default 200 µm) analyzer_params = config_dict.get("analyzer_parameters", {}) geometry = analyzer_params.get("geometry", {}) L_val = float(geometry.get("stator_rotor_gap", 2000000.0)) # Pre-compute physics factors (outside the traced function for efficiency) wavevector_q_squared_half_dt = 0.5 * (q**2) * dt_val sinc_prefactor = 0.5 / np.pi * q * L_val * dt_val # ====================================================================== # SHEAR-SENSITIVITY WEIGHTING FOR CMA-ES (v2.19.0, Fix #6) # ====================================================================== # Apply angle-dependent weighting to sigma to emphasize shear-sensitive # angles (parallel/antiparallel to flow). Unlike stratified LS, CMA-ES # is derivative-free so gradient cancellation is not the concern. # Instead, weighting helps CMA-ES prioritize fit quality at angles # that are most informative for shear parameters. # ====================================================================== if ( is_laminar_flow and ad_controller is not None and ad_controller.is_enabled and hasattr(ad_controller, "shear_weighter") and ad_controller.shear_weighter is not None ): # Get phi0 from current x0 (may be NLSQ warm-started) # phi0 is stored in degrees in the parameter array (same as config/bounds) physical_params = x0[2:] if len(x0) > n_physical else x0 phi0_idx = _get_physical_param_names(analysis_mode).index("phi0") phi0_current_deg = float(physical_params[phi0_idx]) # Compute per-angle shear weights shear_weights = ad_controller.shear_weighter.get_weights(phi0_current_deg) shear_weights_np = np.asarray(shear_weights) # Broadcast per-angle weights to per-point weights # Each angle's weight applies to all its time points per_point_weights = np.repeat(shear_weights_np, n_time_points) # Apply weighting: divide sigma by sqrt(weight) so that # higher-weighted angles have smaller effective sigma # (larger contribution to chi-squared) sigma_flat = sigma_flat / np.sqrt(np.maximum(per_point_weights, 0.01)) logger.info( f"[CMA-ES] Shear weighting applied: phi0={phi0_current_deg:.1f} deg, " f"weight range=[{np.min(shear_weights_np):.3f}, {np.max(shear_weights_np):.3f}]" ) # Import the core JAX computation function that supports element-wise mode from homodyne.core.jax_backend import _compute_g1_total_core # Note: In constant mode (v2.18.0+), we have two sub-modes: # - auto_averaged: 9 parameters [contrast_avg, offset_avg, *physical] # - fixed_constant: 7 parameters [*physical] (scaling from pre-computed arrays) # Convert fixed per-angle scaling to JAX arrays if using fixed scaling fixed_contrast_jax = None fixed_offset_jax = None if use_fixed_scaling and fixed_contrast_per_angle is not None: fixed_contrast_jax = jnp.asarray(fixed_contrast_per_angle) fixed_offset_jax = jnp.asarray(fixed_offset_per_angle) def model_for_cmaes(xdata_unused: Any, *params: Any) -> Any: """JAX-traceable model function wrapper for CMA-ES. Uses pure JAX operations to allow JIT compilation by NLSQ's CMAESOptimizer. Element-wise mode is triggered automatically when len(t1) > 2000. In constant mode (v2.18.0+): - auto_averaged: 9 parameters [contrast, offset, *physical] - fixed_constant: 7 parameters [*physical] (scaling from fixed arrays) """ params_array = jnp.asarray(params) # Extract per-angle scaling and physical params using JAX operations # Four modes: # 1. effective_per_angle_scaling=True: params = [contrast(n_phi), offset(n_phi), physical] # 2. use_fixed_scaling=True: params = [physical] (7 params, scaling from fixed arrays) # 3. use_averaged_scaling=True: params = [contrast, offset, physical] (9 params, broadcast) # 4. Neither: params = [contrast, offset, physical], broadcast to all angles if effective_per_angle_scaling: contrasts = params_array[:n_phi] offsets = params_array[n_phi : 2 * n_phi] physical = params_array[2 * n_phi :] elif use_fixed_scaling: # FIXED CONSTANT MODE (v2.18.0+): 7 physical params only # Use pre-computed fixed per-angle scaling arrays contrasts = fixed_contrast_jax offsets = fixed_offset_jax physical = params_array # All params are physical elif use_averaged_scaling: # AUTO AVERAGED MODE (v2.18.0+): 9 parameters [contrast, offset, *physical] # Broadcast single contrast/offset to all angles contrasts = jnp.full(n_phi, params_array[0]) offsets = jnp.full(n_phi, params_array[1]) physical = params_array[2:] else: # Fallback: broadcast single contrast/offset to all angles contrasts = jnp.full(n_phi, params_array[0]) offsets = jnp.full(n_phi, params_array[1]) physical = params_array[2:] # Map per-angle contrast/offset to each data point contrast_per_point = contrasts[phi_indices] offset_per_point = offsets[phi_indices] # Use element-wise g1 computation (triggered when len > 2000) # This is a pure JAX function that can be JIT-traced g1_all = _compute_g1_total_core( physical, t1_all, t2_all, phi_values, wavevector_q_squared_half_dt, sinc_prefactor, dt_val, ) # Compute g2 = offset + contrast * g1^2 g2_all = offset_per_point + contrast_per_point * g1_all**2 return g2_all # Create xdata placeholder (model_for_cmaes ignores it) # Use 1D array to match NLSQ curve_fit's expected shape for refinement xdata = np.zeros(n_data) # ====================================================================== # PHASE 1: NLSQ WARM-START (v2.19.0) # ====================================================================== # Run a quick NLSQ fit first to provide CMA-ES with an informed # starting point. This prevents CMA-ES from wasting generations # in poor regions of parameter space. # ====================================================================== nlsq_warmstart_chi2 = float("inf") nlsq_warmstart_params = None nlsq_warmstart_cov = None warmstart_enabled = cmaes_dict.get("nlsq_warmstart", True) if warmstart_enabled: logger.info("[CMA-ES] Phase 1: Running NLSQ warm-start...") try: warmstart_result = wrapper._run_nlsq_refinement( model_func=model_for_cmaes, xdata=xdata, ydata=ydata, p0=x0, bounds=bounds, sigma=sigma_flat, ) if ( warmstart_result["success"] and warmstart_result["chi_squared"] is not None ): nlsq_warmstart_chi2 = warmstart_result["chi_squared"] nlsq_warmstart_params = warmstart_result["popt"] nlsq_warmstart_cov = warmstart_result["pcov"] # Use NLSQ solution as CMA-ES starting point x0 = np.asarray(nlsq_warmstart_params) logger.info( f"[CMA-ES] NLSQ warm-start succeeded: chi2={nlsq_warmstart_chi2:.4e}, " f"using as CMA-ES starting point" ) else: logger.info( "[CMA-ES] NLSQ warm-start did not improve fit, " "using original starting point" ) except (ValueError, RuntimeError, TypeError, OSError, MemoryError) as e: logger.warning(f"[CMA-ES] NLSQ warm-start failed: {e}") # ====================================================================== # PHASE 2: CMA-ES GLOBAL SEARCH (with auto-skip, v2.20.0) # ====================================================================== # When warm-start achieves a good fit (reduced chi2 < threshold), skip # the expensive CMA-ES global search. CMA-ES with warm-start sigma is # a local refinement that rarely improves on a good NLSQ solution. skip_cmaes = False warmstart_skip_threshold = cmaes_dict.get( "warmstart_skip_threshold", getattr(nlsq_config, "cmaes_warmstart_skip_threshold", 5.0), ) warmstart_auto_skip = cmaes_dict.get( "warmstart_auto_skip", getattr(nlsq_config, "cmaes_warmstart_auto_skip", True), ) if ( warmstart_auto_skip and nlsq_warmstart_params is not None and nlsq_warmstart_chi2 < float("inf") ): # Compute reduced chi-squared for auto-skip decision # Guard: if DOF <= 0 (more params than data), never skip CMA-ES n_data_eff = len(ydata) - len(x0) if n_data_eff <= 0: warmstart_reduced_chi2 = float("inf") else: # When sigma is a default placeholder, chi2 is inflated by # 1/sigma^2 relative to unweighted residuals. Undo this # inflation using the known base sigma value directly, not the # post-shear-weighted sigma_flat. Shear weighting is intentional # physics that should remain in chi2 — only the arbitrary # default inflation needs to be removed. effective_chi2 = nlsq_warmstart_chi2 if _sigma_is_default: effective_chi2 = nlsq_warmstart_chi2 * _DEFAULT_SIGMA**2 warmstart_reduced_chi2 = effective_chi2 / n_data_eff if warmstart_reduced_chi2 < warmstart_skip_threshold: skip_cmaes = True logger.info( f"[CMA-ES] Auto-skip: NLSQ warm-start reduced chi2=" f"{warmstart_reduced_chi2:.4f} < threshold=" f"{warmstart_skip_threshold:.1f}. Skipping CMA-ES global search." f"{' (chi2 sigma-normalized for default sigma)' if _sigma_is_default else ''}" ) if skip_cmaes: # Build a CMAESResult directly from warm-start cmaes_result = CMAESResult( parameters=nlsq_warmstart_params, covariance=nlsq_warmstart_cov, chi_squared=nlsq_warmstart_chi2, success=True, diagnostics={ "selected": "nlsq_warmstart_auto_skip", "warmstart_reduced_chi2": warmstart_reduced_chi2, "warmstart_raw_chi2": nlsq_warmstart_chi2, "warmstart_skip_threshold": warmstart_skip_threshold, "sigma_is_default": _sigma_is_default, "cmaes_skipped": True, }, method_used="nlsq_warmstart", nlsq_refined=True, message=( f"CMA-ES skipped: warm-start reduced chi2=" f"{warmstart_reduced_chi2:.4f} < {warmstart_skip_threshold:.1f}" ), ) else: logger.info("[CMA-ES] Phase 2: Running CMA-ES global optimization...") cmaes_result = wrapper.fit( model_func=model_for_cmaes, xdata=xdata, ydata=ydata, p0=x0, bounds=bounds, sigma=sigma_flat, warmstart_chi2=nlsq_warmstart_chi2, ) # ====================================================================== # PHASE 3: COMPARE AND SELECT BEST RESULT (v2.19.0) # ====================================================================== # If NLSQ warm-start produced a better result than CMA-ES + refinement, # use the NLSQ result instead. This ensures CMA-ES never degrades # the solution quality compared to direct NLSQ. # ====================================================================== if ( nlsq_warmstart_params is not None and nlsq_warmstart_chi2 < cmaes_result.chi_squared ): logger.info( f"[CMA-ES] NLSQ warm-start result is better: " f"NLSQ chi2={nlsq_warmstart_chi2:.4e} < " f"CMA-ES chi2={cmaes_result.chi_squared:.4e}. " f"Using NLSQ solution." ) # Replace CMA-ES result with NLSQ warm-start result cmaes_result = CMAESResult( parameters=nlsq_warmstart_params, covariance=nlsq_warmstart_cov, chi_squared=nlsq_warmstart_chi2, success=True, diagnostics={ **cmaes_result.diagnostics, "selected": "nlsq_warmstart", "cmaes_chi_squared": cmaes_result.chi_squared, "nlsq_warmstart_chi_squared": nlsq_warmstart_chi2, }, method_used="cmaes", nlsq_refined=True, message="NLSQ warm-start selected over CMA-ES (lower chi-squared)", ) else: if nlsq_warmstart_params is not None: logger.info( f"[CMA-ES] CMA-ES result is better: " f"CMA-ES chi2={cmaes_result.chi_squared:.4e} <= " f"NLSQ chi2={nlsq_warmstart_chi2:.4e}" ) execution_time = time.time() - start_time # ========================================================================== # EXPAND CONSTANT MODE RESULTS (v2.17.0+) # ========================================================================== # If constant mode was used, expand 9 params [contrast, offset, *physical] # back to 2*n_phi + 7 for consistency with per_angle_scaling=True expectations. # The single contrast/offset are broadcast to all angles. # ========================================================================== final_params = np.asarray(cmaes_result.parameters) final_covariance = cmaes_result.covariance if use_constant_mode and per_angle_scaling: # Expand constant mode (9 params) to per-angle format from homodyne.optimization.nlsq.data_prep import ( expand_per_angle_parameters, ) n_before = len(final_params) expanded = expand_per_angle_parameters( final_params, None, n_phi, n_physical, ) final_params = expanded.params logger.info( f"Expanding constant mode results: {n_before} -> " f"{len(final_params)} parameters (broadcast contrast={final_params[0]:.4f}, offset={final_params[n_phi]:.4f})" ) # Expand covariance matrix if available # The covariance for single contrast/offset must be broadcast to per-angle if final_covariance is not None: _n_original = len(cmaes_result.parameters) # noqa: F841 n_expanded = 2 * n_phi + n_physical expanded_cov = np.zeros((n_expanded, n_expanded)) # Original layout: [contrast, offset, physical(7)] # Expanded layout: [contrast(n_phi), offset(n_phi), physical(7)] # Contrast block: all entries = contrast_var (perfectly correlated # since all angles share a single source parameter) contrast_var = final_covariance[0, 0] expanded_cov[:n_phi, :n_phi] = contrast_var # Offset block: all entries = offset_var (perfectly correlated) offset_var = final_covariance[1, 1] expanded_cov[n_phi : 2 * n_phi, n_phi : 2 * n_phi] = offset_var # Cross contrast-offset block contrast_offset_cov = final_covariance[0, 1] expanded_cov[:n_phi, n_phi : 2 * n_phi] = contrast_offset_cov expanded_cov[n_phi : 2 * n_phi, :n_phi] = contrast_offset_cov # Physical params block: direct slice copy # Original indices [2:2+n_physical] -> expanded indices [2*n_phi:] expanded_cov[2 * n_phi :, 2 * n_phi :] = final_covariance[ 2 : 2 + n_physical, 2 : 2 + n_physical ] # Cross contrast-physical and offset-physical covariance # Each physical param has one covariance value broadcast to all angles for i in range(n_physical): expanded_cov[:n_phi, 2 * n_phi + i] = final_covariance[0, 2 + i] expanded_cov[2 * n_phi + i, :n_phi] = final_covariance[0, 2 + i] expanded_cov[n_phi : 2 * n_phi, 2 * n_phi + i] = final_covariance[ 1, 2 + i ] expanded_cov[2 * n_phi + i, n_phi : 2 * n_phi] = final_covariance[ 1, 2 + i ] final_covariance = expanded_cov # Convert CMAESResult to OptimizationResult n_params = len(final_params) dof = max(1, n_data - n_params) reduced_chi_squared = cmaes_result.chi_squared / dof # Determine quality flag using reduced chi-squared thresholds # consistent with NLSQWrapper's 3-level system (wrapper.py:3577-3583) if reduced_chi_squared < 1.5: quality_flag = "good" elif reduced_chi_squared < 3.0: quality_flag = "marginal" else: quality_flag = "poor" result = OptimizationResult( parameters=final_params, uncertainties=( np.sqrt(np.diag(final_covariance)) if final_covariance is not None else np.zeros(n_params) ), covariance=( final_covariance if final_covariance is not None else np.eye(n_params) ), chi_squared=cmaes_result.chi_squared, reduced_chi_squared=reduced_chi_squared, convergence_status="converged" if cmaes_result.success else "failed", iterations=cmaes_result.diagnostics.get("generations", 0), execution_time=execution_time, device_info={ "device": "cpu", "method": "cmaes", "adapter": "CMAESWrapper", "preset": cmaes_config.preset, "nlsq_refined": cmaes_result.nlsq_refined, "restarts": cmaes_result.diagnostics.get("restarts", 0), "evaluations": cmaes_result.diagnostics.get("evaluations", 0), "anti_degeneracy_constant_mode": use_constant_mode, "fixed_per_angle_scaling": fixed_contrast_per_angle is not None, "cmaes_params": len(cmaes_result.parameters), "final_params": n_params, }, recovery_actions=[], quality_flag=quality_flag, ) logger.info("=" * 60) logger.info("CMA-ES OPTIMIZATION COMPLETE") logger.info("=" * 60) logger.info(f"Status: {'SUCCESS' if result.success else 'FAILED'}") logger.info(f"Generations: {result.iterations}") logger.info(f"Execution time: {execution_time:.3f}s") logger.info(f"chi2 = {result.chi_squared:.6e}") logger.info(f"Reduced chi2 = {result.reduced_chi_squared:.6f}") logger.info(f"L-M refined: {cmaes_result.nlsq_refined}") if use_constant_mode: logger.info( f"Anti-degeneracy: constant mode with fixed per-angle scaling " f"({len(cmaes_result.parameters)} physical -> {n_params} total params)" ) return result except (ValueError, RuntimeError, TypeError, OSError, MemoryError) as e: execution_time = time.time() - start_time logger.error(f"CMA-ES optimization failed: {e}") # Return failed result n_params = len(x0) return OptimizationResult( parameters=np.asarray(x0), uncertainties=np.zeros(n_params), covariance=np.eye(n_params), chi_squared=float("inf"), reduced_chi_squared=float("inf"), convergence_status="failed", iterations=0, execution_time=execution_time, device_info={ "device": "cpu", "method": "cmaes", "adapter": "CMAESWrapper", "error": str(e), }, recovery_actions=[], quality_flag="poor", )