Source code for homodyne.optimization.nlsq.wrapper

"""NLSQ Wrapper for Homodyne Optimization.

Role and When to Use (v2.11.0+)
-------------------------------

**NLSQWrapper** (this module) is the **stable fallback adapter** for:
- Complex optimizations requiring full anti-degeneracy integration
- laminar_flow mode with many phi angles (> 6)
- Large datasets (> 100M points) requiring streaming/chunking strategies
- Custom transforms or advanced recovery mechanisms
- Production stability where reliability is critical

Use **NLSQAdapter** instead for:
- Standard optimizations (static_isotropic mode)
- Small to medium datasets (< 10M points)
- Multi-start optimization (model caching provides 3-5× speedup)
- Performance-critical workflows requiring JIT compilation

**Key Differences:**

* Model caching: NLSQWrapper=None, NLSQAdapter=Built-in
* JIT compilation: NLSQWrapper=Manual, NLSQAdapter=Auto
* Workflow auto-select: NLSQWrapper=Custom, NLSQAdapter=Via NLSQ
* Anti-degeneracy layers: NLSQWrapper=Full, NLSQAdapter=Via fit()
* Recovery system: NLSQWrapper=3-attempt, NLSQAdapter=NLSQ native
* Streaming support: NLSQWrapper=Full custom, NLSQAdapter=Via NLSQ

**Decision Guide:**

1. If you need robust streaming for 100M+ points: Use NLSQWrapper
2. If you need full anti-degeneracy control: Use NLSQWrapper
3. If you need maximum speed for multi-start optimization: Use NLSQAdapter
4. Default recommendation: NLSQAdapter with automatic fallback to NLSQWrapper

This module provides an adapter layer between homodyne's optimization API
and the NLSQ package's trust-region nonlinear least squares interface.

The NLSQWrapper class implements the Adapter pattern to translate:
- Homodyne's multi-dimensional XPCS data → NLSQ's flattened array format
- Homodyne's parameter bounds tuple → NLSQ's (lower, upper) format
- NLSQ's (popt, pcov) output → Homodyne's OptimizationResult dataclass

Key Features:
- Automatic dataset size detection and strategy selection
- Angle-stratified chunking for per-angle parameter compatibility (v2.2+)
- Intelligent error recovery with 3-attempt retry strategy (T022-T024)
- Actionable error diagnostics with 5 error categories
- CPU-optimized execution through JAX
- Progress logging and convergence diagnostics
- Scientifically validated (7/7 validation tests passed, T036-T041)
- Serves as fallback when NLSQAdapter fails

Per-Angle Scaling Fix (v2.2):
- Fixes silent optimization failures with per-angle parameters on large datasets
- Applies angle-stratified chunking when: per_angle_scaling=True AND n_points>100k
- Ensures every NLSQ chunk contains all phi angles → gradients always well-defined
- <1% performance overhead (0.15s for 3M points)
- Reference: ultra-think-20251106-012247

Production Status:
- Production-ready with comprehensive error recovery
- Scientifically validated (100% test pass rate)
- Parameter recovery accuracy: 2-14% on core parameters
- Sub-linear performance scaling with dataset size
- Per-angle scaling compatible with large datasets (v2.2+)

References:
- NLSQ Package: https://github.com/imewei/NLSQ
- Validation: See tests/validation/test_scientific_validation.py (T036-T041)
- Documentation: See CHANGELOG.md and CLAUDE.md for detailed status
"""

from collections.abc import Callable
from typing import Any, cast

import jax
import jax.numpy as jnp
import numpy as np

# ruff: noqa: I001
# Import order is INTENTIONAL: nlsq must be imported BEFORE JAX
# This enables automatic x64 (double precision) configuration per NLSQ best practices
# Reference: https://nlsq.readthedocs.io/en/latest/guides/advanced_features.html
from nlsq import curve_fit, curve_fit_large

# Try importing AdaptiveHybridStreamingOptimizer (available in NLSQ >= 0.3.2)
# This is the preferred streaming optimizer - the old StreamingOptimizer was removed in NLSQ 0.4.0
# Fixes: 1) Shear-term weak gradients, 2) Slow convergence, 3) Crude covariance
try:
    from nlsq import AdaptiveHybridStreamingOptimizer, HybridStreamingConfig

    STREAMING_AVAILABLE = True  # For backwards compatibility
    HYBRID_STREAMING_AVAILABLE = True
except ImportError:
    STREAMING_AVAILABLE = False
    HYBRID_STREAMING_AVAILABLE = False
    AdaptiveHybridStreamingOptimizer = None
    HybridStreamingConfig = None

import logging

from homodyne.utils.logging import get_logger

from homodyne.optimization.batch_statistics import BatchStatistics
from homodyne.optimization.nlsq.adapter_base import NLSQAdapterBase
from homodyne.optimization.nlsq.results import (
    FunctionEvaluationCounter,
    OptimizationResult,
    UseSequentialOptimization,
)
from homodyne.optimization.nlsq.strategies.chunking import (
    StratificationDiagnostics,
    analyze_angle_distribution,
    compute_stratification_diagnostics,
    create_angle_stratified_data,
    create_angle_stratified_indices,
    estimate_stratification_memory,
    format_diagnostics_report,
    should_use_stratification,
)
from homodyne.optimization.nlsq.strategies.residual import (
    StratifiedResidualFunction,
    create_stratified_residual_function,
)
from homodyne.optimization.nlsq.strategies.residual_jit import (
    StratifiedResidualFunctionJIT,
)

# Fallback chain logic (extracted to fallback_chain.py)
from homodyne.optimization.nlsq.fallback_chain import (
    OptimizationStrategy,
    execute_optimization_with_fallback,
    get_fallback_strategy,
    handle_nlsq_result,
)
from homodyne.optimization.nlsq.recovery import (
    diagnose_error,
    execute_with_recovery,
    safe_uncertainties_from_pcov,
)
from homodyne.optimization.nlsq.strategies.out_of_core import (
    fit_with_out_of_core_accumulation,
)
from homodyne.optimization.nlsq.strategies.stratified_ls import (
    create_stratified_chunks,
    fit_with_stratified_least_squares,
)
from homodyne.optimization.nlsq.strategies.hybrid_streaming import (
    estimate_memory_for_stratified_ls,
    fit_with_hybrid_streaming_optimizer,
    fit_with_stratified_hybrid_streaming,
    fit_with_streaming_optimizer_deprecated,
    fit_with_streaming_optimizer_stratified_deprecated,
    should_use_streaming,
)


from homodyne.optimization.nlsq.strategies.sequential import (  # noqa: E402
    JAC_SAMPLE_SIZE,
    optimize_per_angle_sequential,
)
from homodyne.core.physics_nlsq import compute_g2_scaled  # noqa: E402
from homodyne.core.physics_utils import apply_diagonal_correction  # noqa: E402
from homodyne.optimization.nlsq.transforms import (  # noqa: E402
    adjust_covariance_for_transforms,
    apply_forward_shear_transforms_to_bounds,
    apply_forward_shear_transforms_to_vector,
    apply_inverse_shear_transforms_to_vector,
    build_per_parameter_x_scale,
    build_physical_index_map,
    format_x_scale_for_log,
    normalize_x_scale_map,
    parse_shear_transform_config,
    wrap_model_function_with_transforms,
    wrap_stratified_function_with_transforms,
)
from homodyne.optimization.numerical_validation import NumericalValidator  # noqa: E402
from homodyne.optimization.recovery_strategies import (  # noqa: E402
    RecoveryStrategyApplicator,
)

# Anti-Degeneracy Defense System v2.9.0

# Memory management utilities (extracted to memory.py for reduced complexity)
from homodyne.optimization.nlsq.memory import (  # noqa: E402
    get_adaptive_memory_threshold,
    NLSQStrategy,
    select_nlsq_strategy,
)

# Parameter utilities (extracted to parameter_utils.py for reduced complexity)
from homodyne.optimization.nlsq.parameter_utils import (  # noqa: E402
    build_parameter_labels as _build_parameter_labels,
    classify_parameter_status as _classify_parameter_status,
    sample_xdata as _sample_xdata,
    compute_jacobian_stats as _compute_jacobian_stats,
    compute_consistent_per_angle_init as _compute_consistent_per_angle_init,
)

# Module-level logger
_memory_logger = get_logger(__name__)


def _extract_n_points(data: Any) -> int:
    """Extract number of data points from various data formats.

    Handles XPCSData objects, numpy arrays, lists, and other iterables.

    Parameters
    ----------
    data : Any
        Data object with g2 attribute or array-like

    Returns
    -------
    int
        Number of data points (0 if cannot determine)
    """
    # Try g2 attribute (XPCSData)
    if hasattr(data, "g2"):
        g2 = data.g2
        if hasattr(g2, "size"):
            return int(g2.size)
        if hasattr(g2, "__len__"):
            return len(g2)
    # Try direct array-like
    if hasattr(data, "size"):
        return int(data.size)
    if hasattr(data, "__len__"):
        return len(data)
    return 0


[docs] def create_multistart_warmup_func( model_func: Callable[..., np.ndarray], xdata: np.ndarray, ydata: np.ndarray, bounds: tuple[np.ndarray, np.ndarray] | None = None, warmup_learning_rate: float = 0.001, normalize: bool = True, chunk_size: int = 50_000, ) -> Callable[[dict[str, Any], np.ndarray, int], Any]: """Create a warmup-only fit function for multi-start Phase 1 strategy. This function creates a warmup_fit_func compatible with the multi-start optimization module Phase 1 strategy. It uses the L-BFGS warmup phase from the NLSQ AdaptiveHybridStreamingOptimizer to quickly explore the parameter space without full Gauss-Newton refinement. Parameters ---------- model_func : Callable Model function with signature: ``func(x, *params) -> predictions`` xdata : np.ndarray Independent variable data ydata : np.ndarray Dependent variable data (observations) bounds : tuple[np.ndarray, np.ndarray] | None, optional Parameter bounds as (lower, upper) warmup_learning_rate : float, default=0.001 L-BFGS line search scale for warmup phase normalize : bool, default=True Whether to use parameter normalization (recommended for scale imbalance) chunk_size : int, default=50000 Points per chunk for streaming computation Returns ------- warmup_fit_func : Callable Function with signature: (data, initial_params, n_iterations) -> SingleStartResult Compatible with run_multistart_nlsq() warmup_fit_func parameter. Raises ------ RuntimeError If AdaptiveHybridStreamingOptimizer is not available (NLSQ < 0.3.2) Examples -------- >>> from homodyne.optimization.nlsq.wrapper import create_multistart_warmup_func >>> from homodyne.optimization.nlsq.multistart import run_multistart_nlsq >>> >>> # Create warmup function >>> warmup_func = create_multistart_warmup_func( ... model_func=my_model, ... xdata=x_data, ... ydata=y_data, ... bounds=(lower, upper), ... ) >>> >>> # Use with multi-start >>> result = run_multistart_nlsq( ... data=my_data, ... bounds=bounds, ... config=config, ... single_fit_func=full_fit_func, ... warmup_fit_func=warmup_func, # For Phase 1 strategy ... ) Notes ----- This function integrates with the Phase 1 multi-start strategy which: 1. Runs parallel L-BFGS warmup from multiple starting points 2. Selects the best warmup result 3. Performs full Gauss-Newton refinement from the best starting point This approach is memory-efficient for very large datasets (>100M points) and provides good exploration of the parameter space. See Also -------- homodyne.optimization.nlsq.multistart.run_multistart_nlsq : Main multi-start function homodyne.optimization.nlsq.multistart._run_phase1_strategy : Phase 1 strategy implementation """ from homodyne.optimization.nlsq.multistart import SingleStartResult if not HYBRID_STREAMING_AVAILABLE: raise RuntimeError( "AdaptiveHybridStreamingOptimizer not available. " "Please upgrade NLSQ to version >= 0.3.2: pip install --upgrade nlsq" ) def warmup_fit_func( data: dict[str, Any], initial_params: np.ndarray, n_iterations: int, ) -> SingleStartResult: """Run warmup-only optimization from a starting point. Parameters ---------- data : dict Data dictionary (not used directly; uses captured xdata/ydata) initial_params : np.ndarray Initial parameter values n_iterations : int Number of L-BFGS warmup iterations Returns ------- SingleStartResult Optimization result with warmup parameters and cost """ import time start_time = time.perf_counter() try: # Configure for warmup-only: skip Gauss-Newton phase config = HybridStreamingConfig( normalize=normalize, normalization_strategy="bounds", warmup_iterations=n_iterations, max_warmup_iterations=n_iterations, # Force stop at n_iterations warmup_learning_rate=warmup_learning_rate, gauss_newton_max_iterations=0, # Skip GN phase gauss_newton_tol=1e-8, chunk_size=chunk_size, validate_numerics=True, ) optimizer = AdaptiveHybridStreamingOptimizer(config) # Run warmup-only optimization result = optimizer.fit( data_source=(xdata, ydata), func=model_func, p0=initial_params, bounds=bounds, verbose=0, # Quiet mode ) # Extract results final_params = np.asarray(result["x"]) # Compute chi-squared from final cost # The optimizer returns cost as 0.5 * sum(residuals^2) diagnostics = result.get("streaming_diagnostics", {}) warmup_diag = diagnostics.get("warmup_diagnostics", {}) final_loss = warmup_diag.get("final_loss", float("inf")) # Convert loss to chi-squared (loss = 0.5 * chi_sq for LSQ) chi_squared = ( 2.0 * final_loss if final_loss != float("inf") else float("inf") ) wall_time = time.perf_counter() - start_time return SingleStartResult( start_idx=0, initial_params=initial_params, final_params=final_params, chi_squared=chi_squared, success=result.get("success", False), n_iterations=n_iterations, wall_time=wall_time, message="L-BFGS warmup completed", ) except (ValueError, RuntimeError, OSError) as e: wall_time = time.perf_counter() - start_time return SingleStartResult( start_idx=0, initial_params=initial_params, final_params=initial_params, chi_squared=float("inf"), success=False, n_iterations=0, wall_time=wall_time, message=f"Warmup failed: {str(e)}", ) return warmup_fit_func
def _safe_uncertainties_from_pcov(pcov: np.ndarray, n_params: int) -> np.ndarray: """Extract uncertainties with diagonal regularization for singular pcov.""" return safe_uncertainties_from_pcov(pcov, n_params)
[docs] class NLSQWrapper(NLSQAdapterBase): """Adapter class for NLSQ package integration with homodyne optimization. This class translates between homodyne's optimization API and the NLSQ package's curve_fit interface, handling: - Data format transformations - Parameter validation and bounds checking - Automatic strategy selection for large datasets - Hybrid error handling and recovery Usage: wrapper = NLSQWrapper(enable_large_dataset=True) result = wrapper.fit(data, config, initial_params, bounds, analysis_mode) """
[docs] def __init__( self, enable_large_dataset: bool = True, enable_recovery: bool = True, enable_numerical_validation: bool = True, max_retries: int = 2, fast_mode: bool = False, ) -> None: """Initialize NLSQWrapper. Args: enable_large_dataset: Use curve_fit_large for datasets >1M points enable_recovery: Enable automatic error recovery strategies enable_numerical_validation: Enable NaN/Inf validation at 3 critical points max_retries: Maximum retry attempts per batch (default: 2) fast_mode: Disable non-essential checks for < 1% overhead (Task 5.5) """ self.enable_large_dataset = enable_large_dataset self.enable_recovery = enable_recovery self.enable_numerical_validation = enable_numerical_validation and not fast_mode self.max_retries = max_retries self.fast_mode = fast_mode # Initialize streaming optimization components self.batch_statistics = BatchStatistics(max_size=100) self.recovery_applicator = RecoveryStrategyApplicator(max_retries=max_retries) self.numerical_validator = NumericalValidator( enable_validation=enable_numerical_validation and not fast_mode ) # Best parameter tracking self.best_params = None self.best_loss = float("inf") self.best_batch_idx = -1
@staticmethod def _get_physical_param_names(analysis_mode: str) -> list[str]: """Get physical parameter names for a given analysis mode. Args: analysis_mode: 'static_isotropic' or 'laminar_flow' Returns: List of physical parameter names (excludes scaling parameters) Raises: ValueError: If analysis_mode is not recognized """ normalized_mode = analysis_mode.lower() if normalized_mode in {"static", "static_isotropic"}: return ["D0", "alpha", "D_offset"] elif normalized_mode == "laminar_flow": return [ "D0", "alpha", "D_offset", "gamma_dot_t0", # Canonical name (was gamma_dot_0) "beta", "gamma_dot_t_offset", # Canonical name (was gamma_dot_offset) "phi0", ] else: raise ValueError( f"Unknown analysis_mode: '{analysis_mode}'. " f"Expected 'static_isotropic'/'static' or 'laminar_flow'" ) @staticmethod def _extract_nlsq_settings(config: Any) -> dict[str, Any]: """Return NLSQ-specific settings from the config tree (if present).""" config_dict = None if hasattr(config, "config") and isinstance(config.config, dict): config_dict = config.config elif isinstance(config, dict): config_dict = config if not config_dict: return {} nlsq_settings = config_dict.get("optimization", {}).get("nlsq", {}) return cast(dict[str, Any], nlsq_settings) @staticmethod def _handle_nlsq_result( result: Any, strategy: OptimizationStrategy, ) -> tuple[np.ndarray, np.ndarray, dict]: """Normalize NLSQ return values to consistent format. Delegates to fallback_chain.handle_nlsq_result(). """ return handle_nlsq_result(result, strategy) def _get_fallback_strategy( self, current_strategy: OptimizationStrategy ) -> OptimizationStrategy | None: """Get fallback strategy when current strategy fails. Delegates to fallback_chain.get_fallback_strategy(). """ return get_fallback_strategy(current_strategy)
[docs] def fit( self, data: Any, config: Any, initial_params: np.ndarray | None = None, bounds: tuple[np.ndarray, np.ndarray] | None = None, analysis_mode: str = "static_isotropic", per_angle_scaling: bool = True, # REQUIRED: per-angle is physically correct diagnostics_enabled: bool = False, shear_transforms: dict[str, Any] | None = None, per_angle_scaling_initial: dict[str, list[float]] | None = None, ) -> OptimizationResult: """Execute NLSQ optimization with automatic strategy selection and per-angle scaling. Args: data: XPCS experimental data config: Configuration manager with optimization settings initial_params: Initial parameter guess (auto-loaded if None) bounds: Parameter bounds as (lower, upper) tuple analysis_mode: 'static_isotropic' or 'laminar_flow' per_angle_scaling: MUST be True. Per-angle contrast/offset parameters are physically correct as each scattering angle has different optical properties and detector responses. Legacy scalar mode (False) is no longer supported. Returns: OptimizationResult with converged parameters and diagnostics Raises: ValueError: If bounds are invalid (lower > upper) or if per_angle_scaling=False """ import time # nlsq imported at module level (line 36) for automatic x64 configuration logger = get_logger(__name__) # BREAKING CHANGE (Nov 2025): Validate per-angle scaling is enabled # Legacy scalar contrast/offset mode is not physically meaningful if not per_angle_scaling: logger.error( "Legacy scalar contrast/offset mode (per_angle_scaling=False) is no longer supported. " "Single contrast/offset parameters are not physically meaningful as each scattering " "angle has different optical properties and detector responses. " "Per-angle scaling is required for physically correct NLSQ optimization." ) raise ValueError( "per_angle_scaling=False is deprecated and removed. " "Use per_angle_scaling=True (default) for physically correct behavior." ) # Start timing start_time = time.time() physical_param_names = self._get_physical_param_names(analysis_mode) nlsq_settings = self._extract_nlsq_settings(config) loss_name = nlsq_settings.get("loss", "soft_l1") trust_region_scale = float(nlsq_settings.get("trust_region_scale", 1.0)) if trust_region_scale <= 0: trust_region_scale = 1.0 x_scale_override = nlsq_settings.get("x_scale") x_scale_value = x_scale_override if x_scale_override is not None else "jac" x_scale_map_config = normalize_x_scale_map(nlsq_settings.get("x_scale_map")) diagnostics_cfg = nlsq_settings.get("diagnostics", {}) diagnostics_enabled = diagnostics_enabled or bool( diagnostics_cfg.get("enable", False), ) diagnostics_sample_size = int(diagnostics_cfg.get("sample_size", 2048)) diagnostics_payload = ( {"solver_settings": {"loss": loss_name}} if diagnostics_enabled else None ) transform_cfg = parse_shear_transform_config(shear_transforms) # Step 0.5: Unified Memory-Based Strategy Selection (v2.13.0) # Uses pure memory estimation - no legacy point thresholds. n_est_points = _extract_n_points(data) n_params = len(initial_params) if initial_params is not None else 0 strategy_decision = select_nlsq_strategy(n_est_points, n_params) logger.info( f"Strategy selection: {strategy_decision.strategy.value} " f"({strategy_decision.reason})" ) # Handle HYBRID_STREAMING (extreme scale - index array > 75% RAM) if strategy_decision.strategy == NLSQStrategy.HYBRID_STREAMING: if not HYBRID_STREAMING_AVAILABLE: logger.critical( "AdaptiveHybridStreamingOptimizer required for extreme-scale " f"dataset ({n_est_points:,} points) but not available." ) raise MemoryError( f"Dataset too large for RAM (index={strategy_decision.index_memory_gb:.1f} GB > " f"threshold={strategy_decision.threshold_gb:.1f} GB) and Streaming unavailable." ) # Streaming path continues below (handled by existing streaming logic) logger.warning( f"Extreme-scale dataset: {strategy_decision.reason}. " "Proceeding with Adaptive Hybrid Streaming." ) # Handle OUT_OF_CORE (large scale - peak memory > 75% RAM) elif strategy_decision.strategy == NLSQStrategy.OUT_OF_CORE: if initial_params is None: raise ValueError("initial_params required for out-of-core optimization") validated_params = self._validate_initial_params(initial_params, bounds) nlsq_bounds = self._convert_bounds(bounds) # Default to False (User requirement: Never subsample data) use_fast_mode = self.fast_mode or config.config.get("optimization", {}).get( "fast_chi2_mode", False ) # Extract anti-degeneracy config (will warn that it's not supported for out-of-core) ooc_anti_degeneracy_config = None if config is not None and hasattr(config, "config"): ooc_nlsq_config = config.config.get("optimization", {}).get("nlsq", {}) ooc_anti_degeneracy_config = ooc_nlsq_config.get("anti_degeneracy", {}) popt, pcov, info = self._fit_with_out_of_core_accumulation( stratified_data=None, data=data, per_angle_scaling=per_angle_scaling, physical_param_names=physical_param_names, initial_params=validated_params, bounds=nlsq_bounds, logger=logger, config=config, fast_chi2_mode=use_fast_mode, anti_degeneracy_config=ooc_anti_degeneracy_config, ) execution_time = time.time() - start_time uncertainties = _safe_uncertainties_from_pcov(pcov, len(popt)) # Effective DOF for reduced chi-squared: in auto_averaged mode the # optimizer works on a compressed param vector but the true model # DOF is 2*n_phi + n_physical (one contrast+offset per angle). _ooc_init_n_params_effective: int | None = None if per_angle_scaling and config is not None and hasattr(config, "config"): _ooc_init_ad = ( config.config.get("optimization", {}) .get("nlsq", {}) .get("anti_degeneracy", {}) ) _ooc_init_mode = _ooc_init_ad.get("per_angle_mode", "auto") _ooc_init_thresh = _ooc_init_ad.get("constant_scaling_threshold", 3) # Use data.phi to count unique angles — reading n_phi from len(popt) # is incorrect in auto_averaged mode where popt has compressed length # (e.g. 9 for laminar_flow: 7 physical + 2 averaged) rather than # the expanded length (2*n_phi + n_physical = 53 for 23 angles). # Inferring (len(popt) - n_physical) // 2 gives 1, not 23, so the # threshold check 1 >= 3 never fires and the DOF fix is silently skipped. _ooc_init_n_phi = len(np.unique(np.asarray(data.phi))) _ooc_init_n_physical = len(physical_param_names) if _ooc_init_mode == "auto" and _ooc_init_n_phi >= _ooc_init_thresh: _ooc_init_n_params_effective = ( 2 * _ooc_init_n_phi + _ooc_init_n_physical ) elif _ooc_init_mode == "constant": _ooc_init_n_params_effective = _ooc_init_n_physical _ooc_init_dof = ( _ooc_init_n_params_effective if _ooc_init_n_params_effective is not None else len(popt) ) reduced_chi2 = info.get("chi_squared", 0.0) / max( 1, n_est_points - _ooc_init_dof ) # Derive quality_flag from reduced chi-squared (same thresholds # as _create_fit_result) instead of hardcoding "good". if reduced_chi2 < 1.5: _ooc_init_quality = "good" elif reduced_chi2 < 3.0: _ooc_init_quality = "marginal" else: _ooc_init_quality = "poor" return OptimizationResult( parameters=popt, uncertainties=uncertainties, covariance=pcov, chi_squared=info.get("chi_squared", 0.0), reduced_chi_squared=reduced_chi2, convergence_status=info.get("convergence_status", "unknown"), iterations=info.get("iterations", 0), execution_time=execution_time, device_info={ "device": "cpu_accumulated", "strategy": "out_of_core", "fast_mode": use_fast_mode, "decision": strategy_decision.reason, }, recovery_actions=["out_of_core_delegation"], quality_flag=_ooc_init_quality, ) # STANDARD strategy falls through to existing optimization path # Step 1: Apply angle-stratified chunking if needed (BEFORE data preparation) # This fixes per-angle parameter incompatibility with NLSQ chunking (ultra-think-20251106-012247) stratified_data = self._apply_stratification_if_needed( data, per_angle_scaling, config, logger ) # Extract stratification diagnostics if available stratification_diagnostics = None if hasattr(stratified_data, "stratification_diagnostics"): stratification_diagnostics = stratified_data.stratification_diagnostics # Check if sequential optimization is required transform_state: dict[str, Any] | None = None if isinstance(stratified_data, UseSequentialOptimization): logger.info( f"Using sequential per-angle optimization: {stratified_data.reason}" ) return self._run_sequential_optimization( stratified_data.data, config, initial_params, bounds, analysis_mode, per_angle_scaling, logger, start_time, x_scale_value=x_scale_value, transform_cfg=transform_cfg, physical_param_names=physical_param_names, per_angle_scaling_initial=per_angle_scaling_initial, ) # NEW: Check if stratified least_squares should be used (v2.2.0 double-chunking fix) # Conditions: # 1. Stratified data was created (has phi_flat attribute) # 2. Per-angle scaling is enabled # 3. Dataset is large enough to benefit (>1M points) # # FIXED (Nov 13, 2025): Use JIT-compatible StratifiedResidualFunctionJIT # Solution: Padded vmap implementation with static shapes # - Pads chunks to uniform size (enables JIT compilation) # - Uses jax.vmap for parallel chunk processing (no Python loops) # - Masks padded values in final residuals # Performance: ~1% memory overhead, 10-100x speedup from vectorization use_stratified_least_squares = ( hasattr(stratified_data, "phi_flat") and per_angle_scaling and hasattr(stratified_data, "g2_flat") and len(stratified_data.g2_flat) >= 1_000_000 ) if use_stratified_least_squares: logger.info("=" * 80) logger.info("STRATIFIED LEAST-SQUARES PATH ACTIVATED (v2.2.1)") logger.info("Solving double-chunking problem with NLSQ's least_squares()") logger.info("=" * 80) # Validate initial parameters if initial_params is None: raise ValueError("initial_params must be provided") validated_params = self._validate_initial_params(initial_params, bounds) # Convert bounds nlsq_bounds = self._convert_bounds(bounds) # Validate bounds consistency if nlsq_bounds is not None: lower, upper = nlsq_bounds if np.any(lower > upper): invalid_indices = np.where(lower > upper)[0] raise ValueError( f"Invalid bounds at indices {invalid_indices}: " f"lower > upper. Lower: {lower[invalid_indices]}, Upper: {upper[invalid_indices]}" ) # Get physical parameter names for this analysis mode physical_param_names = self._get_physical_param_names(analysis_mode) logger.info( f"Physical parameters for {analysis_mode}: {physical_param_names}" ) # FIX: Expand scaling parameters for per-angle scaling # When per_angle_scaling=True with N angles, we need: # - All physical parameters (7 for laminar_flow, 3 for static) # - N contrast parameters (one per angle) # - N offset parameters (one per angle) # Total: n_physical + 2*N parameters # # Config provides: n_physical + 2 parameters (single contrast, single offset) # We must expand: [contrast, offset] → [c0, c1, ..., cN-1, o0, o1, ..., oN-1] if per_angle_scaling: # Determine number of angles from stratified data n_angles = len(np.unique(stratified_data.phi_flat)) n_physical = len(physical_param_names) logger.info("Expanding scaling parameters for per-angle scaling:") logger.info(f" Angles: {n_angles}") logger.info(f" Physical parameters: {n_physical}") logger.info( f" Input parameters: {len(validated_params)} (expected: {n_physical + 2})" ) # Validate input parameter count expected_input = ( n_physical + 2 ) # Physical params + single contrast + single offset if len(validated_params) != expected_input: raise ValueError( f"Parameter count mismatch for per-angle scaling: " f"got {len(validated_params)}, expected {expected_input} " f"({n_physical} physical + 2 scaling). " f"For {n_angles} angles, will expand to {n_physical + 2 * n_angles} parameters." ) # Expand compact [contrast, offset, physical...] to per-angle format # matching StratifiedResidualFunction order: # [contrast_per_angle, offset_per_angle, physical_params] from homodyne.optimization.nlsq.data_prep import ( expand_per_angle_parameters, ) expanded = expand_per_angle_parameters( validated_params, nlsq_bounds, n_angles, n_physical, logger=logger, ) validated_params = expanded.params nlsq_bounds = expanded.bounds # Parameter count validation (CRITICAL) # Per-angle scaling is always enabled (legacy mode removed Nov 2025) n_physical = len(physical_param_names) n_angles = len(np.unique(stratified_data.phi_flat)) expected_params = n_physical + 2 * n_angles if len(validated_params) != expected_params: raise ValueError( f"Parameter count mismatch: got {len(validated_params)}, " f"expected {expected_params} " f"(physical={n_physical}, per_angle_scaling=True, " f"n_angles={n_angles})" ) logger.info( f"Parameter validation passed: {len(validated_params)} parameters" ) # Step: Re-run unified strategy selection with EFFECTIVE parameter count # (v2.14.0, v2.22.0 fix: anti-degeneracy pre-check) # # The expanded param count (e.g. 53 for 23 angles individual) may be much # larger than the effective count after anti-degeneracy mode selection # (e.g. 9 for auto_averaged). Using the expanded count for memory estimation # can unnecessarily trigger out-of-core routing, which bypasses the # anti-degeneracy defense system entirely — causing parameter absorption # degeneracy and false convergence. # # Fix: Pre-check what anti-degeneracy would select, and use the effective # param count for memory routing. The actual anti-degeneracy transformation # still happens inside _fit_with_stratified_least_squares(). n_total_points = len(stratified_data.g2_flat) actual_n_params = len(validated_params) effective_n_params = actual_n_params # Default: no reduction if per_angle_scaling and config is not None and hasattr(config, "config"): nlsq_cfg = config.config.get("optimization", {}).get("nlsq", {}) ad_cfg = nlsq_cfg.get("anti_degeneracy", {}) ad_per_angle_mode = ad_cfg.get("per_angle_mode", "auto") ad_threshold = ad_cfg.get("constant_scaling_threshold", 3) n_angles_check = len(np.unique(stratified_data.phi_flat)) if ad_per_angle_mode == "auto" and n_angles_check >= ad_threshold: # auto_averaged: 2 averaged scaling params replace 2*n_angles effective_n_params = n_physical + 2 logger.info( f"Anti-Degeneracy pre-check: auto -> auto_averaged " f"(n_phi={n_angles_check} >= threshold={ad_threshold}). " f"Effective params: {effective_n_params} " f"(expanded: {actual_n_params})" ) elif ad_per_angle_mode == "constant": # constant: scaling fixed, only physical params optimized effective_n_params = n_physical logger.info( f"Anti-Degeneracy pre-check: constant mode. " f"Effective params: {effective_n_params} " f"(expanded: {actual_n_params})" ) strategy_recheck = select_nlsq_strategy(n_total_points, effective_n_params) logger.info( f"Strategy re-check (with {effective_n_params} effective params, " f"{actual_n_params} expanded): " f"{strategy_recheck.strategy.value} ({strategy_recheck.reason})" ) # Route to OUT_OF_CORE if peak memory exceeds threshold if strategy_recheck.strategy == NLSQStrategy.OUT_OF_CORE: # Safety check: warn if anti-degeneracy would have prevented this if effective_n_params < actual_n_params: logger.warning( f"Out-of-core triggered with {actual_n_params} expanded params, " f"but anti-degeneracy would reduce to {effective_n_params}. " f"This should not happen - the pre-check should have used " f"effective params for memory estimation. Check routing logic." ) logger.info("=" * 80) logger.info("OUT-OF-CORE ACCUMULATION MODE (Re-check)") logger.info( f"Peak memory ({strategy_recheck.peak_memory_gb:.1f} GB) exceeds " f"threshold ({strategy_recheck.threshold_gb:.1f} GB)" ) logger.info("Using chunk-wise J^T J accumulation for memory efficiency") logger.info("=" * 80) # Default to False (User requirement: Never subsample data) use_fast_mode = self.fast_mode or ( config.config.get("optimization", {}).get("fast_chi2_mode", False) if config is not None and hasattr(config, "config") else False ) # Extract anti-degeneracy config (will warn that it's not supported for out-of-core) recheck_anti_degeneracy_config = None if config is not None and hasattr(config, "config"): recheck_nlsq_config = config.config.get("optimization", {}).get( "nlsq", {} ) recheck_anti_degeneracy_config = recheck_nlsq_config.get( "anti_degeneracy", {} ) popt, pcov, info = self._fit_with_out_of_core_accumulation( stratified_data=stratified_data, data=data, per_angle_scaling=per_angle_scaling, physical_param_names=physical_param_names, initial_params=validated_params, bounds=nlsq_bounds, logger=logger, config=config, fast_chi2_mode=use_fast_mode, anti_degeneracy_config=recheck_anti_degeneracy_config, ) execution_time = time.time() - start_time uncertainties = _safe_uncertainties_from_pcov(pcov, len(popt)) # Effective DOF for reduced chi-squared: in auto_averaged mode the # optimizer works on a compressed param vector but the true model # DOF is 2*n_phi + n_physical (one contrast+offset per angle). _ooc_n_params_effective: int | None = None if ( per_angle_scaling and config is not None and hasattr(config, "config") ): _ooc_ad = ( config.config.get("optimization", {}) .get("nlsq", {}) .get("anti_degeneracy", {}) ) _ooc_mode = _ooc_ad.get("per_angle_mode", "auto") _ooc_thresh = _ooc_ad.get("constant_scaling_threshold", 3) if _ooc_mode == "auto" and n_angles_check >= _ooc_thresh: _ooc_n_params_effective = 2 * n_angles_check + n_physical elif _ooc_mode == "constant": _ooc_n_params_effective = n_physical _ooc_dof = ( _ooc_n_params_effective if _ooc_n_params_effective is not None else len(popt) ) reduced_chi2 = info.get("chi_squared", 0.0) / max( 1, n_total_points - _ooc_dof ) # Derive quality_flag from reduced chi-squared (same thresholds # as _create_fit_result) instead of hardcoding "good". if reduced_chi2 < 1.5: _ooc_recheck_quality = "good" elif reduced_chi2 < 3.0: _ooc_recheck_quality = "marginal" else: _ooc_recheck_quality = "poor" return OptimizationResult( parameters=popt, uncertainties=uncertainties, covariance=pcov, chi_squared=info.get("chi_squared", 0.0), reduced_chi_squared=reduced_chi2, convergence_status=info.get("convergence_status", "unknown"), iterations=info.get("iterations", 0), execution_time=execution_time, device_info={ "device": "cpu_accumulated", "strategy": "out_of_core", "fast_mode": use_fast_mode, "decision": strategy_recheck.reason, }, recovery_actions=["out_of_core_recheck_delegation"], quality_flag=_ooc_recheck_quality, ) # Route to HYBRID_STREAMING if index array exceeds threshold (extreme scale) if strategy_recheck.strategy == NLSQStrategy.HYBRID_STREAMING: if not HYBRID_STREAMING_AVAILABLE: logger.critical( "AdaptiveHybridStreamingOptimizer required for extreme-scale " f"dataset ({n_total_points:,} points) but not available." ) raise MemoryError( f"Dataset too large for RAM (index={strategy_recheck.index_memory_gb:.1f} GB > " f"threshold={strategy_recheck.threshold_gb:.1f} GB) and Streaming unavailable." ) logger.warning( f"Extreme-scale dataset: {strategy_recheck.reason}. " "Proceeding with Adaptive Hybrid Streaming." ) # Fall through to streaming path below (use_streaming_mode will be set) # Extract target chunk size from config target_chunk_size = 100_000 # Default hybrid_streaming_config = None use_streaming_mode = False use_hybrid_streaming = False # Compute adaptive memory threshold (v2.7.0+) # Default: 75% of total system memory instead of fixed 16 GB memory_fraction: float | None = None # Will use default or env var memory_threshold_gb: float | None = None # Will be computed adaptively if config is not None and hasattr(config, "config"): strat_config = config.config.get("optimization", {}).get( "stratification", {} ) target_chunk_size = strat_config.get("target_chunk_size", 100_000) # Extract streaming configuration nlsq_config = config.config.get("optimization", {}).get("nlsq", {}) hybrid_streaming_config = nlsq_config.get("hybrid_streaming", {}) # Support for explicit memory_threshold_gb (backwards compatible) # or memory_fraction (new adaptive approach) if "memory_threshold_gb" in nlsq_config: memory_threshold_gb = nlsq_config["memory_threshold_gb"] if "memory_fraction" in nlsq_config: memory_fraction = nlsq_config["memory_fraction"] # Compute adaptive threshold if not explicitly set if memory_threshold_gb is None: memory_threshold_gb, threshold_info = get_adaptive_memory_threshold( memory_fraction=memory_fraction ) logger.debug( f"Using adaptive memory threshold: {memory_threshold_gb:.1f} GB " f"(fraction={threshold_info['memory_fraction']}, " f"total={threshold_info['total_memory_gb']:.1f} GB, " f"source={threshold_info['source']})" ) else: logger.debug( f"Using explicit memory threshold from config: {memory_threshold_gb:.1f} GB" ) # Check for hybrid streaming mode (preferred for large datasets) if hybrid_streaming_config is not None: use_hybrid_streaming = hybrid_streaming_config.get("enable", False) # Check for forced streaming mode from config # Also set from strategy_recheck if it returned HYBRID_STREAMING if config is not None and hasattr(config, "config"): nlsq_config = config.config.get("optimization", {}).get("nlsq", {}) use_streaming_mode = nlsq_config.get("use_streaming", False) # Set streaming mode if strategy_recheck returned HYBRID_STREAMING (extreme scale) # This unified decision replaces the legacy _should_use_streaming() check if strategy_recheck.strategy == NLSQStrategy.HYBRID_STREAMING: logger.info("=" * 80) logger.info("HYBRID STREAMING MODE (Strategy Re-check)") logger.info( f"Index array ({strategy_recheck.index_memory_gb:.1f} GB) exceeds " f"threshold ({strategy_recheck.threshold_gb:.1f} GB)" ) logger.info("=" * 80) use_streaming_mode = True # Log strategy decision for STANDARD (in-memory) path if not use_streaming_mode: logger.info( f"Memory check: {strategy_recheck.reason}. " "Proceeding with in-memory stratified least-squares." ) # Use streaming optimizer if needed if use_streaming_mode: # Prefer AdaptiveHybridStreamingOptimizer when available # It fixes shear-term gradients, convergence, and covariance issues # Use hybrid if: (1) explicitly enabled, OR (2) basic streaming unavailable use_hybrid = HYBRID_STREAMING_AVAILABLE and ( use_hybrid_streaming or not STREAMING_AVAILABLE ) if use_hybrid: logger.info("=" * 80) logger.info("ADAPTIVE HYBRID STREAMING MODE (Preferred)") logger.info( "Using NLSQ AdaptiveHybridStreamingOptimizer for better " "convergence and parameter estimation" ) logger.info("=" * 80) # Extract anti-degeneracy config for defense system v2.9.0 anti_degeneracy_config = nlsq_config.get("anti_degeneracy", {}) try: popt, pcov, info = self._fit_with_stratified_hybrid_streaming( stratified_data=stratified_data, per_angle_scaling=per_angle_scaling, physical_param_names=physical_param_names, initial_params=validated_params, bounds=nlsq_bounds, logger=logger, hybrid_config=hybrid_streaming_config, anti_degeneracy_config=anti_degeneracy_config, ) # Compute final residuals for result creation chunked_data = self._create_stratified_chunks( stratified_data, target_chunk_size ) residual_fn = create_stratified_residual_function( stratified_data=chunked_data, per_angle_scaling=per_angle_scaling, physical_param_names=physical_param_names, logger=cast(logging.Logger | None, logger), validate=False, ) final_residuals = residual_fn(popt) n_data = len(final_residuals) # Get execution time execution_time = time.time() - start_time # Compute effective DOF for reduced_chi_squared. # In auto_averaged mode, popt has compressed length (e.g. 9), # but the true model DOF is 2*n_phi + n_physical (e.g. 53). _hs_n_params_effective: int | None = None if per_angle_scaling and anti_degeneracy_config: _hs_ad_mode = anti_degeneracy_config.get( "per_angle_mode", "auto" ) _hs_thresh = anti_degeneracy_config.get( "constant_scaling_threshold", 3 ) if _hs_ad_mode == "auto" and n_angles_check >= _hs_thresh: _hs_n_params_effective = 2 * n_angles_check + n_physical elif _hs_ad_mode == "constant": _hs_n_params_effective = n_physical # Create result result = self._create_fit_result( popt=popt, pcov=pcov, residuals=final_residuals, n_data=n_data, iterations=info.get("nit", 0), execution_time=execution_time, convergence_status=( "converged" if info.get("success", True) else "failed" ), recovery_actions=["hybrid_streaming_optimizer_method"], streaming_diagnostics=info.get( "hybrid_streaming_diagnostics" ), stratification_diagnostics=stratification_diagnostics, diagnostics_payload=None, n_params_effective=_hs_n_params_effective, ) logger.info("=" * 80) logger.info("HYBRID STREAMING OPTIMIZATION COMPLETE") logger.info( f"Final chi2: {result.chi_squared:.4e}, " f"Reduced chi2: {result.reduced_chi_squared:.4f}" ) logger.info("=" * 80) return result except (ValueError, RuntimeError, MemoryError, OSError) as e: logger.warning( f"Hybrid streaming optimization failed: {e}\n" f"Falling back to stratified least-squares..." ) # Fall through to stratified least-squares if not STREAMING_AVAILABLE: # AdaptiveHybridStreamingOptimizer not available (NLSQ < 0.3.2) logger.error( "Streaming mode requested but AdaptiveHybridStreamingOptimizer " "not available. Upgrade NLSQ to >= 0.3.2. " "Falling back to stratified least-squares." ) # Fall through to stratified least-squares # Extract NLSQ config dict for tolerance propagation and anti-degeneracy nlsq_config_dict = None anti_degeneracy_config = None if config is not None and hasattr(config, "config"): nlsq_config_dict = config.config.get("optimization", {}).get("nlsq", {}) anti_degeneracy_config = nlsq_config_dict.get("anti_degeneracy", {}) if anti_degeneracy_config: logger.info( f"Anti-Degeneracy config loaded: per_angle_mode=" f"{anti_degeneracy_config.get('per_angle_mode', 'auto')}" ) # Call stratified least_squares optimization try: popt, pcov, info = self._fit_with_stratified_least_squares( stratified_data=stratified_data, per_angle_scaling=per_angle_scaling, physical_param_names=physical_param_names, initial_params=validated_params, bounds=nlsq_bounds, logger=logger, target_chunk_size=target_chunk_size, anti_degeneracy_config=anti_degeneracy_config, nlsq_config_dict=nlsq_config_dict, ) # Compute final residuals for result creation # We need to recreate the residual function to compute final residuals chunked_data = self._create_stratified_chunks( stratified_data, target_chunk_size ) residual_fn = create_stratified_residual_function( stratified_data=chunked_data, per_angle_scaling=per_angle_scaling, physical_param_names=physical_param_names, logger=cast(logging.Logger | None, logger), validate=False, # Already validated ) final_residuals = residual_fn(popt) n_data = len(final_residuals) # Get execution time execution_time = time.time() - start_time # Compute effective DOF for reduced_chi_squared. # In auto_averaged mode, popt has compressed length (e.g. 9), # but the true model DOF is 2*n_phi + n_physical (e.g. 53). _sls_n_params_effective: int | None = None if per_angle_scaling and anti_degeneracy_config: _sls_ad_mode = anti_degeneracy_config.get("per_angle_mode", "auto") _sls_thresh = anti_degeneracy_config.get( "constant_scaling_threshold", 3 ) if _sls_ad_mode == "auto" and n_angles_check >= _sls_thresh: _sls_n_params_effective = 2 * n_angles_check + n_physical elif _sls_ad_mode == "constant": _sls_n_params_effective = n_physical # Create result result = self._create_fit_result( popt=popt, pcov=pcov, residuals=final_residuals, n_data=n_data, iterations=info.get("nit", 0), execution_time=execution_time, convergence_status=( "converged" if info.get("success", True) else "failed" ), recovery_actions=["stratified_least_squares_method"], streaming_diagnostics=None, stratification_diagnostics=stratification_diagnostics, diagnostics_payload=None, n_params_effective=_sls_n_params_effective, ) logger.info("=" * 80) logger.info("STRATIFIED LEAST-SQUARES COMPLETE") logger.info( f"Final chi2: {result.chi_squared:.4e}, Reduced chi2: {result.reduced_chi_squared:.4f}" ) logger.info("=" * 80) return result except (ValueError, RuntimeError, MemoryError, OSError) as e: logger.error( f"Stratified least_squares failed: {e}\n" f"Falling back to standard curve_fit_large path..." ) # Fall through to standard optimization path below # Step 2: Prepare data logger.info(f"Preparing data for {analysis_mode} optimization...") xdata, ydata = self._prepare_data(stratified_data) n_data = len(ydata) logger.info(f"Data prepared: {n_data} points") # Note: Memory estimation is deferred to NLSQ's estimate_memory_requirements() # which provides accurate Jacobian sizing based on actual parameter count. if n_data > 10_000_000: logger.warning( f"Very large dataset: {n_data:,} points. " f"NLSQ will use memory-efficient strategies automatically." ) elif n_data > 1_000_000: logger.info( f"Large dataset: {n_data:,} points. Memory managed automatically." ) # Step 3: Validate initial parameters if initial_params is None: raise ValueError( "initial_params must be provided (auto-loading not yet implemented)", ) validated_params = self._validate_initial_params(initial_params, bounds) # Step 4: Convert bounds nlsq_bounds = self._convert_bounds(bounds) # Step 5: Validate bounds consistency (FR-006) if nlsq_bounds is not None: lower, upper = nlsq_bounds if np.any(lower > upper): invalid_indices = np.where(lower > upper)[0] raise ValueError( f"Invalid bounds at indices {invalid_indices}: " f"lower > upper. Bounds must satisfy lower <= upper elementwise. " f"Lower: {lower[invalid_indices]}, Upper: {upper[invalid_indices]}", ) # Step 6: Create residual function with per-angle scaling logger.info( f"Creating residual function (per_angle_scaling={per_angle_scaling})..." ) residual_fn = self._create_residual_function( stratified_data, analysis_mode, per_angle_scaling ) base_residual_fn = residual_fn physical_param_names = self._get_physical_param_names(analysis_mode) phi_values = np.asarray(stratified_data.phi) n_phi_unique = len(np.unique(phi_values)) if phi_values.size else 0 per_angle_contrast_override: np.ndarray | None = None per_angle_offset_override: np.ndarray | None = None if per_angle_scaling_initial: contrast_override = per_angle_scaling_initial.get("contrast") if contrast_override is not None: try: arr = np.asarray(contrast_override, dtype=np.float64) if arr.size == n_phi_unique: per_angle_contrast_override = arr.copy() else: logger.warning( "per_angle_scaling contrast override has %d entries (expected %d); ignoring override", arr.size, n_phi_unique, ) except (TypeError, ValueError): logger.warning("Invalid per-angle contrast override; ignoring") offset_override = per_angle_scaling_initial.get("offset") if offset_override is not None: try: arr = np.asarray(offset_override, dtype=np.float64) if arr.size == n_phi_unique: per_angle_offset_override = arr.copy() else: logger.warning( "per_angle_scaling offset override has %d entries (expected %d); ignoring override", arr.size, n_phi_unique, ) except (TypeError, ValueError): logger.warning("Invalid per-angle offset override; ignoring") # Step 6.5: Expand parameters for per-angle scaling if needed # This is CRITICAL: the residual function expects per-angle parameters, # but validated_params is still in compact form [contrast, offset, *physical] if per_angle_scaling: n_phi = n_phi_unique # Expand parameters from compact to per-angle form # Input: [contrast, offset, *physical] (e.g., 5 params) # Output: [contrast_0, ..., contrast_{n-1}, offset_0, ..., offset_{n-1}, *physical] # (e.g., 2*n_phi + 3 params for static_isotropic with n_phi angles) contrast_single = validated_params[0] offset_single = validated_params[1] physical_params = validated_params[2:] # Replicate contrast and offset for each angle # CRITICAL FIX (v2.7.1): For laminar_flow mode, use consistent initialization # to prevent per-angle params from absorbing the shear signal is_laminar_flow = "gamma_dot_t0" in physical_param_names use_consistent_init = ( is_laminar_flow and per_angle_contrast_override is None and per_angle_offset_override is None and n_phi > 3 # Only for many angles where absorption is a problem ) if use_consistent_init: logger.info( "Computing consistent per-angle initialization for laminar_flow mode..." ) try: contrast_per_angle, offset_per_angle = ( _compute_consistent_per_angle_init( stratified_data=stratified_data, physical_params=physical_params, physical_param_names=physical_param_names, default_contrast=contrast_single, default_offset=offset_single, logger=logger, ) ) except ( ValueError, RuntimeError, TypeError, AttributeError, np.linalg.LinAlgError, ) as e: logger.warning( f"Failed to compute consistent per-angle init: {e}\n" "Falling back to uniform replication." ) contrast_per_angle = np.full(n_phi, contrast_single) offset_per_angle = np.full(n_phi, offset_single) else: if per_angle_contrast_override is not None: contrast_per_angle = per_angle_contrast_override else: contrast_per_angle = np.full(n_phi, contrast_single) if per_angle_offset_override is not None: offset_per_angle = per_angle_offset_override else: offset_per_angle = np.full(n_phi, offset_single) # Concatenate: [contrasts, offsets, physical] validated_params = np.concatenate( [contrast_per_angle, offset_per_angle, physical_params] ) logger.info( f"Expanded parameters for per-angle scaling:\n" f" {n_phi} phi angles detected\n" f" Parameters: compact {2 + len(physical_params)} -> per-angle {len(validated_params)}\n" f" Structure: [{n_phi} contrasts, {n_phi} offsets, {len(physical_params)} physical]" ) # Also expand bounds if they exist if nlsq_bounds is not None: lower, upper = nlsq_bounds # Extract compact bounds contrast_lower, offset_lower = lower[0], lower[1] contrast_upper, offset_upper = upper[0], upper[1] physical_lower = lower[2:] physical_upper = upper[2:] # Expand to per-angle bounds contrast_lower_per_angle = np.full(n_phi, contrast_lower) contrast_upper_per_angle = np.full(n_phi, contrast_upper) offset_lower_per_angle = np.full(n_phi, offset_lower) offset_upper_per_angle = np.full(n_phi, offset_upper) # Concatenate expanded bounds expanded_lower = np.concatenate( [contrast_lower_per_angle, offset_lower_per_angle, physical_lower] ) expanded_upper = np.concatenate( [contrast_upper_per_angle, offset_upper_per_angle, physical_upper] ) nlsq_bounds = (expanded_lower, expanded_upper) logger.info( f"Expanded bounds for per-angle scaling:\n" f" Bounds: compact {2 + len(physical_lower)} -> per-angle {len(expanded_lower)}" ) n_angles_for_map = n_phi_unique if per_angle_scaling else 1 physical_index_map = build_physical_index_map( per_angle_scaling, n_angles_for_map, physical_param_names, ) validated_params, transform_state = apply_forward_shear_transforms_to_vector( validated_params, physical_index_map, transform_cfg, ) if transform_state: nlsq_bounds = apply_forward_shear_transforms_to_bounds( nlsq_bounds, transform_state, ) solver_residual_fn = base_residual_fn if transform_state: if isinstance(base_residual_fn, StratifiedResidualFunction): solver_residual_fn = wrap_stratified_function_with_transforms( base_residual_fn, transform_state, ) else: solver_residual_fn = wrap_model_function_with_transforms( base_residual_fn, transform_state, ) param_labels = _build_parameter_labels( per_angle_scaling, n_phi_unique if per_angle_scaling else 0, physical_param_names, ) per_param_x_scale = build_per_parameter_x_scale( per_angle_scaling, n_phi_unique if per_angle_scaling else 0, physical_param_names, analysis_mode, x_scale_map_config, ) if per_param_x_scale is not None: x_scale_value = per_param_x_scale if diagnostics_enabled: diagnostics_payload = diagnostics_payload or { "solver_settings": {"loss": loss_name} } solver_settings = diagnostics_payload.setdefault( "solver_settings", {"loss": loss_name} ) solver_settings["x_scale"] = ( x_scale_value.tolist() if isinstance(x_scale_value, np.ndarray) else x_scale_value ) logger.info( "Diagnostics enabled: loss=%s, x_scale=%s, sample_size=%d", loss_name, format_x_scale_for_log(x_scale_value), diagnostics_sample_size, ) diagnostics_sample_x: np.ndarray | None = None sample_scaling = 1.0 if diagnostics_enabled: diagnostics_payload = diagnostics_payload or {} diagnostics_sample_x = _sample_xdata(xdata, diagnostics_sample_size) if diagnostics_sample_x.size == 0: diagnostics_sample_x = xdata sample_scaling = max(1.0, xdata.size / max(diagnostics_sample_x.size, 1)) initial_jtj, initial_norms = _compute_jacobian_stats( solver_residual_fn, diagnostics_sample_x, validated_params, sample_scaling, ) if initial_norms is not None: diagnostics_payload.setdefault("initial_jacobian_norms", {}) diagnostics_payload["initial_jacobian_norms"] = dict( zip(param_labels, initial_norms.tolist(), strict=False), ) logger.info( "Initial Jacobian column norms: %s", ", ".join( f"{label}={norm:.3e}" for label, norm in diagnostics_payload[ "initial_jacobian_norms" ].items() ), ) residual_counter: FunctionEvaluationCounter | None = None wrapped_residual_fn: StratifiedResidualFunction | FunctionEvaluationCounter if diagnostics_enabled: residual_counter = FunctionEvaluationCounter(solver_residual_fn) wrapped_residual_fn = residual_counter else: wrapped_residual_fn = solver_residual_fn # Step 7: Select optimization strategy using memory-based selection (v2.13.0) # Uses unified select_nlsq_strategy() instead of deprecated DatasetSizeStrategy n_parameters = len(validated_params) # Map NLSQStrategy to local OptimizationStrategy for fallback chain from homodyne.optimization.nlsq.strategies.chunking import ( estimate_nlsq_optimization_memory, ) memory_stats = estimate_nlsq_optimization_memory(n_data, n_parameters) logger.info( f"Memory estimate: {memory_stats['peak_gb']:.2f} GB peak, " f"{memory_stats.get('available_gb', 0):.2f} GB available" ) if not memory_stats.get("is_safe", True): logger.warning( f"Memory usage may be high ({memory_stats['peak_gb']:.2f} GB). " f"Using memory-efficient strategy." ) # Check for strategy override in config strategy_override = None if config is not None and hasattr(config, "config"): perf_config = config.config.get("performance", {}) strategy_override = perf_config.get("strategy_override") # Select strategy: use override if provided, else use memory-based selection if strategy_override: try: strategy = OptimizationStrategy(strategy_override) logger.info(f"Using overridden strategy: {strategy.value}") except ValueError: logger.warning( f"Invalid strategy override '{strategy_override}', using auto" ) strategy_override = None if not strategy_override: # Map memory-based decision to OptimizationStrategy for fallback chain decision = select_nlsq_strategy(n_data, n_parameters) if decision.strategy == NLSQStrategy.HYBRID_STREAMING: strategy = OptimizationStrategy.STREAMING elif decision.strategy == NLSQStrategy.OUT_OF_CORE: strategy = OptimizationStrategy.CHUNKED else: # STANDARD: use size-based selection for STANDARD/LARGE distinction if n_data < 1_000_000: strategy = OptimizationStrategy.STANDARD elif n_data < 10_000_000: strategy = OptimizationStrategy.LARGE else: strategy = OptimizationStrategy.CHUNKED logger.info( f"Selected {strategy.value} strategy for {n_data:,} points " f"(peak memory: {memory_stats['peak_gb']:.2f} GB)" ) # Step 8: Execute optimization with strategy fallback popt, pcov, info, recovery_actions, convergence_status = ( self._execute_optimization_with_fallback( strategy=strategy, wrapped_residual_fn=wrapped_residual_fn, xdata=xdata, ydata=ydata, validated_params=validated_params, nlsq_bounds=nlsq_bounds, loss_name=loss_name, x_scale_value=x_scale_value, config=config, start_time=start_time, logger=logger, ) ) # Compute effective DOF for the diagnostics covariance scaling (s²). # In auto_averaged mode the optimizer works on a compressed 9-param vector # (contrast_avg, offset_avg, physical×7), but the physics model consumes # 2*n_phi + n_physical effective degrees of freedom (one contrast+offset per # angle, constrained to an averaged value). Using the compressed count (9) # would underestimate s² and produce artificially tight diagnostic pcov. n_physical = len(physical_param_names) n_dof_effective: int | None = None if per_angle_scaling and config is not None and hasattr(config, "config"): _nlsq_cfg = config.config.get("optimization", {}).get("nlsq", {}) _ad_cfg = _nlsq_cfg.get("anti_degeneracy", {}) _ad_mode = _ad_cfg.get("per_angle_mode", "auto") _ad_threshold = _ad_cfg.get("constant_scaling_threshold", 3) if _ad_mode == "auto" and n_phi_unique >= _ad_threshold: # auto_averaged: expanded DOF = 2*n_phi + n_physical (e.g. 53) n_dof_effective = 2 * n_phi_unique + n_physical elif _ad_mode == "constant": # constant: only physical params are optimised; scaling is fixed n_dof_effective = n_physical return self._post_process_results( popt=popt, pcov=pcov, info=info, transform_state=transform_state, validated_params=validated_params, residual_counter=residual_counter, base_residual_fn=base_residual_fn, xdata=xdata, n_data=n_data, start_time=start_time, nlsq_bounds=nlsq_bounds, convergence_status=convergence_status, recovery_actions=recovery_actions, stratification_diagnostics=stratification_diagnostics, diagnostics_state={ "enabled": diagnostics_enabled, "payload": diagnostics_payload, "sample_x": diagnostics_sample_x, "solver_residual_fn": solver_residual_fn, "sample_scaling": sample_scaling, "param_labels": param_labels, }, logger=logger, n_dof_effective=n_dof_effective, )
def _execute_optimization_with_fallback( self, strategy: OptimizationStrategy, wrapped_residual_fn: Callable[..., np.ndarray], xdata: np.ndarray, ydata: np.ndarray, validated_params: np.ndarray, nlsq_bounds: tuple[np.ndarray, np.ndarray] | None, loss_name: str, x_scale_value: float | str, config: Any, start_time: float, logger: logging.Logger | logging.LoggerAdapter[logging.Logger], ) -> tuple[np.ndarray, np.ndarray | None, dict[str, Any], list[str], str]: """Execute optimization with strategy fallback. Delegates to fallback_chain.execute_optimization_with_fallback(). """ return execute_optimization_with_fallback( strategy=strategy, wrapped_residual_fn=wrapped_residual_fn, xdata=xdata, ydata=ydata, validated_params=validated_params, nlsq_bounds=nlsq_bounds, loss_name=loss_name, x_scale_value=x_scale_value, config=config, start_time=start_time, log=logger, enable_recovery=self.enable_recovery, execute_with_recovery_fn=self._execute_with_recovery, fit_with_hybrid_streaming_fn=self._fit_with_hybrid_streaming_optimizer, streaming_available=STREAMING_AVAILABLE, curve_fit_fn=curve_fit, curve_fit_large_fn=curve_fit_large, fast_mode=self.fast_mode, ) def _post_process_results( self, popt: np.ndarray, pcov: np.ndarray | None, info: dict[str, Any], transform_state: Any, validated_params: np.ndarray, residual_counter: Any, base_residual_fn: Callable[..., np.ndarray], xdata: np.ndarray, n_data: int, start_time: float, nlsq_bounds: tuple[np.ndarray, np.ndarray] | None, convergence_status: str, recovery_actions: list[str], stratification_diagnostics: Any, diagnostics_state: dict[str, Any], logger: logging.Logger | logging.LoggerAdapter[logging.Logger], n_dof_effective: int | None = None, ) -> OptimizationResult: """Post-process optimization outputs into final result. Applies inverse transforms, computes final residuals and costs, runs optional diagnostics, determines success, and creates result. """ import time # Unpack diagnostics state diagnostics_enabled = diagnostics_state.get("enabled", False) diagnostics_payload = diagnostics_state.get("payload") diagnostics_sample_x = diagnostics_state.get("sample_x") solver_residual_fn = diagnostics_state.get("solver_residual_fn") sample_scaling = diagnostics_state.get("sample_scaling") param_labels = diagnostics_state.get("param_labels") # Apply inverse shear transforms solver_params = np.asarray(popt, dtype=float) if transform_state: physical_params = apply_inverse_shear_transforms_to_vector( solver_params, transform_state, ) popt = physical_params if pcov is not None: pcov = adjust_covariance_for_transforms( np.asarray(pcov, dtype=float), solver_params, physical_params, transform_state, ) else: popt = np.asarray(popt, dtype=float) # Count function evaluations reported_nfev: int = cast(int, info.get("nfev", -1)) corrected_nfev = ( residual_counter.count if residual_counter is not None else reported_nfev ) if diagnostics_enabled: diagnostics_payload = diagnostics_payload or {} diagnostics_payload["nfev_reported"] = reported_nfev diagnostics_payload["nfev_actual"] = corrected_nfev logger.info( "Diagnostics: nfev reported=%s actual=%s", reported_nfev, corrected_nfev, ) # Compute final residuals using the base function (avoid counter side-effects). # StratifiedResidualFunction/JIT takes (params), not (xdata, *params). if isinstance( base_residual_fn, (StratifiedResidualFunction, StratifiedResidualFunctionJIT), ): final_residuals = base_residual_fn(popt) else: final_residuals = base_residual_fn(xdata, *popt) reported_iterations = -1 if isinstance(info, dict): reported_iterations = info.get("nit", info.get("nfev", -1)) iterations = max(0, corrected_nfev) if reported_iterations == -1: logger.debug( "Iteration count not available from NLSQ (curve_fit_large does not return this info)" ) execution_time = time.time() - start_time # Optional diagnostics: Jacobian stats, parameter status, covariance refinement if diagnostics_enabled and diagnostics_sample_x is not None: assert diagnostics_payload is not None final_jtj, final_norms = _compute_jacobian_stats( solver_residual_fn, diagnostics_sample_x, solver_params, sample_scaling, ) if final_norms is not None: diagnostics_payload["final_jacobian_norms"] = dict( zip(param_labels, final_norms.tolist(), strict=False), ) logger.info( "Final Jacobian column norms: %s", ", ".join( f"{label}={norm:.3e}" for label, norm in diagnostics_payload[ "final_jacobian_norms" ].items() ), ) if nlsq_bounds is not None: statuses = _classify_parameter_status( popt, nlsq_bounds[0], nlsq_bounds[1], ) diagnostics_payload["parameter_status"] = dict( zip(param_labels, statuses, strict=False), ) clips = [ label for label, st in diagnostics_payload["parameter_status"].items() if st != "active" ] if clips: logger.warning( "Diagnostics: parameters at bounds -> %s", ", ".join(clips), ) if final_jtj is not None: n_diag_data = len(final_residuals) # Use n_dof_effective when provided (e.g. auto_averaged mode where # the compressed optimizer vector has fewer entries than the true # model DOF: 2*n_phi + n_physical >> len(popt)). Falling back to # len(popt) would underestimate s² and produce artificially tight # diagnostic covariances. n_diag_params = ( n_dof_effective if n_dof_effective is not None else len(popt) ) s2_diag = float(np.sum(final_residuals**2)) / max( n_diag_data - n_diag_params, 1 ) pcov = s2_diag * np.linalg.pinv(final_jtj, rcond=1e-10) diagnostics_payload["jtj_condition"] = ( float(np.linalg.cond(final_jtj)) if final_jtj.size > 0 else None ) # Determine optimization success initial_cost = info.get("initial_cost", 0) if isinstance(info, dict) else 0 final_cost = np.sum(final_residuals**2) function_evals = iterations cost_reduction = ( (initial_cost - final_cost) / initial_cost if initial_cost > 0 else 0 ) params_changed = not np.allclose(popt, validated_params, rtol=1e-8) optimization_ran = function_evals > 10 or params_changed optimization_improved = cost_reduction > 0.05 if optimization_ran and optimization_improved: status_indicator = "SUCCESS" status_msg = "Optimization succeeded" elif optimization_ran and not optimization_improved: status_indicator = "MARGINAL" status_msg = "Optimization ran but minimal improvement" else: status_indicator = "FAILED" status_msg = "Optimization failed (0 iterations, no cost reduction)" logger.info( f"{status_indicator}: {status_msg} in {execution_time:.2f}s\n" f" Function evaluations: {function_evals}\n" f" Cost: {initial_cost:.4e} -> {final_cost:.4e} ({cost_reduction * 100:+.1f}%)\n" f" Iterations reported: {reported_iterations} (NLSQ may report 0)" ) if recovery_actions: logger.info(f"Recovery actions applied: {len(recovery_actions)}") # Extract streaming diagnostics streaming_diagnostics = None if "batch_statistics" in info: streaming_diagnostics = info["batch_statistics"] elif "streaming_diagnostics" in info: streaming_diagnostics = info["streaming_diagnostics"] # Create result result = self._create_fit_result( popt=popt, pcov=pcov, residuals=final_residuals, n_data=n_data, iterations=iterations, execution_time=execution_time, convergence_status=convergence_status, recovery_actions=recovery_actions, streaming_diagnostics=streaming_diagnostics, stratification_diagnostics=stratification_diagnostics, diagnostics_payload=diagnostics_payload if diagnostics_enabled else None, n_params_effective=n_dof_effective, ) logger.info( f"Final chi-squared: {result.chi_squared:.4e}, " f"reduced chi-squared: {result.reduced_chi_squared:.4f}", ) return result def _execute_with_recovery( self, residual_fn: Callable[[np.ndarray], np.ndarray], xdata: np.ndarray, ydata: np.ndarray, initial_params: np.ndarray, bounds: tuple[np.ndarray, np.ndarray] | None, strategy: OptimizationStrategy, logger: logging.Logger | logging.LoggerAdapter[logging.Logger], loss_name: str, x_scale_value: float | str, ) -> tuple[np.ndarray, np.ndarray, dict, list[str], str]: """Execute optimization with automatic error recovery (T022-T024).""" return execute_with_recovery( residual_fn=residual_fn, xdata=xdata, ydata=ydata, initial_params=initial_params, bounds=bounds, strategy=strategy, log=logger, loss_name=loss_name, x_scale_value=x_scale_value, handle_nlsq_result_fn=self._handle_nlsq_result, curve_fit_fn=curve_fit, curve_fit_large_fn=curve_fit_large, ) def _diagnose_error( self, error: Exception, params: np.ndarray, bounds: tuple[np.ndarray, np.ndarray] | None, attempt: int, ) -> dict[str, Any]: """Diagnose optimization error and provide actionable recovery strategy (T023).""" return diagnose_error( error=error, params=params, bounds=bounds, attempt=attempt, ) def _prepare_data(self, data: Any) -> tuple[np.ndarray, np.ndarray]: """Transform multi-dimensional XPCS data to flattened 1D arrays. Args: data: XPCSData with shape (n_phi, n_t1, n_t2) OR StratifiedData (already flattened) Returns: (xdata, ydata): Flattened independent variables and observations """ # Validate data has required attributes if ( not hasattr(data, "phi") or not hasattr(data, "t1") or not hasattr(data, "t2") or not hasattr(data, "g2") ): raise ValueError("Data must have 'phi', 't1', 't2', and 'g2' attributes") # Check if this is already stratified data (has phi_flat attribute) if hasattr(data, "phi_flat"): # Stratified data is already flattened - use directly g2_flat = np.asarray(data.g2_flat, dtype=np.float64) # Use int64 to avoid int32 overflow for large datasets # (n_phi * n_t1 * n_t2 can exceed 2.147B for 100+ angles × 5000+ time points). xdata = np.arange(len(g2_flat), dtype=np.int64) ydata = g2_flat return xdata, ydata # Original data path: needs meshgrid and flattening # Get dimensions phi = np.asarray(data.phi) t1 = np.asarray(data.t1) t2 = np.asarray(data.t2) g2 = np.asarray(data.g2) # CRITICAL FIX (Nov 14, 2025): Extract 1D arrays from 2D meshgrids if needed # Same issue as in _apply_stratification_if_needed - cache loader returns 2D # but meshgrid expects 1D inputs if t1.ndim == 2: t1 = t1[:, 0] if t1.size > 0 else np.array([]) if t2.ndim == 2: t2 = t2[0, :] if t2.size > 0 else np.array([]) # Validate non-empty arrays if phi.size == 0 or t1.size == 0 or t2.size == 0: raise ValueError("Data arrays cannot be empty") # Create meshgrid with indexing='ij' to preserve correct ordering # This ensures phi varies slowest, t2 varies fastest phi_grid, t1_grid, t2_grid = np.meshgrid(phi, t1, t2, indexing="ij") # Flatten all arrays to 1D # For NLSQ curve_fit interface, xdata is typically just indices # Use int64 to avoid int32 overflow for large datasets # (n_phi * n_t1 * n_t2 can exceed 2.147B for 100+ angles × 5000+ time points). xdata = np.arange(g2.size, dtype=np.int64) # Flatten observations ydata = g2.flatten().astype(np.float64, copy=False) return xdata, ydata def _apply_stratification_if_needed( self, data: Any, per_angle_scaling: bool, config: Any, logger: Any, ) -> Any: """Apply angle-stratified chunking if conditions require it. This method fixes the per-angle scaling + NLSQ chunking incompatibility identified in ultra-think-20251106-012247. When per-angle parameters are used (contrast[i], offset[i] for each phi angle), NLSQ's arbitrary chunking can create chunks without certain angles, resulting in zero gradients and silent optimization failures. Solution: Reorganize data so every chunk contains all phi angles, ensuring gradients are always well-defined. Args: data: XPCSData object with phi, t1, t2, g2 attributes per_angle_scaling: Whether per-angle parameters are enabled config: Configuration manager with stratification settings logger: Logger instance for diagnostics Returns: Data object (original or stratified copy) ready for optimization Notes: - No-op if conditions don't require stratification - Creates temporary 2x memory overhead during reorganization - <1% performance overhead (0.15s for 3M points) - Respects configuration overrides in optimization.stratification """ # Extract stratification configuration with defaults strat_config = {} if config is not None and hasattr(config, "config"): opt_config = config.config.get("optimization", {}) strat_config = opt_config.get("stratification", {}) # Configuration defaults (matching YAML template) enabled = strat_config.get("enabled", "auto") # "auto", true, false target_chunk_size = strat_config.get("target_chunk_size", 100_000) max_imbalance_ratio = strat_config.get("max_imbalance_ratio", 5.0) force_sequential = strat_config.get("force_sequential_fallback", False) check_memory = strat_config.get("check_memory_safety", True) use_index_based = strat_config.get("use_index_based", False) collect_diagnostics = strat_config.get("collect_diagnostics", False) log_diagnostics = strat_config.get("log_diagnostics", False) # Check if explicitly disabled if enabled is False or ( isinstance(enabled, str) and enabled.lower() == "false" ): logger.info("Stratification disabled via configuration") return data # Check if we should fallback to sequential if force_sequential: logger.info("Sequential per-angle fallback forced via configuration") return UseSequentialOptimization( data=data, reason="force_sequential_fallback=true in configuration" ) # Get data dimensions # Note: Per-angle scaling is always enabled (legacy mode removed Nov 2025) phi = np.asarray(data.phi) t1 = np.asarray(data.t1) t2 = np.asarray(data.t2) g2 = np.asarray(data.g2) # CRITICAL FIX (Nov 14, 2025): Extract 1D arrays from 2D meshgrids if needed # ROOT CAUSE: After commit e5ac926, cache loader returns 2D meshgrids (600, 600) # but np.meshgrid() at line 2428 expects 1D input arrays (600,) # Calling meshgrid on already-meshgridded data produces wrong structure! # This was breaking alpha parameter gradient computation. if t1.ndim == 2: # t1_2d[i, j] = time[i] (constant along j), extract first column if t1.size > 0: t1 = t1[:, 0] logger.debug( f"Extracted 1D t1 array from 2D meshgrid: shape {t1.shape}" ) else: t1 = np.array([]) logger.debug("Empty 2D t1 array converted to empty 1D array") if t2.ndim == 2: # t2_2d[i, j] = time[j] (constant along i), extract first row if t2.size > 0: t2 = t2[0, :] logger.debug( f"Extracted 1D t2 array from 2D meshgrid: shape {t2.shape}" ) else: t2 = np.array([]) logger.debug("Empty 2D t2 array converted to empty 1D array") # Calculate total points (meshgrid creates n_phi × n_t1 × n_t2 points) n_points = len(phi) * len(t1) * len(t2) # Analyze angle distribution stats = analyze_angle_distribution(phi) # Decision logic (use configured max_imbalance_ratio) # Override the imbalance check with configured value should_stratify_auto, reason = should_use_stratification( n_points=n_points, n_angles=stats.n_angles, per_angle_scaling=per_angle_scaling, imbalance_ratio=stats.imbalance_ratio, ) # Override with configuration if imbalance exceeds configured threshold if stats.imbalance_ratio > max_imbalance_ratio: # Extreme imbalance - use sequential optimization logger.info( f"Extreme angle imbalance detected ({stats.imbalance_ratio:.1f} > {max_imbalance_ratio:.1f})" ) return UseSequentialOptimization( data=data, reason=f"Angle imbalance ratio ({stats.imbalance_ratio:.1f}) exceeds threshold ({max_imbalance_ratio:.1f})", ) # Handle "auto" mode if enabled == "auto" or ( isinstance(enabled, str) and enabled.lower() == "auto" ): should_stratify = should_stratify_auto else: # enabled is True (force on) should_stratify = True reason = "Stratification forced via configuration (enabled=true)" if not should_stratify: logger.info(f"Stratification skipped: {reason}") return data # Apply stratification logger.info( f"Applying angle-stratified chunking: {reason}\n" f" Angles: {stats.n_angles}, Imbalance ratio: {stats.imbalance_ratio:.2f}\n" f" Total points: {n_points:,}\n" f" Target chunk size: {target_chunk_size:,}\n" f" Use index-based: {use_index_based}" ) # Check memory safety (if enabled in config) if check_memory: mem_stats = estimate_stratification_memory( n_points, use_index_based=use_index_based ) if not mem_stats["is_safe"]: logger.warning( f"Stratification may use significant memory: " f"{mem_stats['peak_memory_mb']:.1f} MB peak. " f"Consider: (1) setting use_index_based=true, or " f"(2) setting force_sequential_fallback=true" ) # Reorganize data arrays # Need to expand to full meshgrid first, then stratify phi_grid, t1_grid, t2_grid = np.meshgrid(phi, t1, t2, indexing="ij") phi_flat = phi_grid.flatten() t1_flat = t1_grid.flatten() t2_flat = t2_grid.flatten() g2_flat = g2.flatten() # Measure stratification execution time import time stratification_start = time.perf_counter() # Apply stratification based on mode if use_index_based: # Index-based stratification (zero-copy, ~1% memory overhead) logger.info("Using index-based stratification (zero-copy)") indices, chunk_sizes = create_angle_stratified_indices( phi_flat, target_chunk_size=target_chunk_size ) # Apply indices to get stratified data phi_stratified = phi_flat[indices] t1_stratified = t1_flat[indices] t2_stratified = t2_flat[indices] g2_stratified = g2_flat[indices] # CRITICAL FIX (Jan 2026): For index-based stratification, # chunk_sizes are now explicitly returned. else: # Full copy stratification (2x memory overhead) logger.info("Using full-copy stratification") # Convert to JAX arrays for stratification phi_jax = jnp.array(phi_flat) t1_jax = jnp.array(t1_flat) t2_jax = jnp.array(t2_flat) g2_jax = jnp.array(g2_flat) # Apply stratification (use configured target_chunk_size) # CRITICAL FIX (Nov 10, 2025): Now returns chunk_sizes as 5th value # to preserve stratification boundaries during re-chunking ( phi_stratified, t1_stratified, t2_stratified, g2_stratified, chunk_sizes, ) = create_angle_stratified_data( phi_jax, t1_jax, t2_jax, g2_jax, target_chunk_size=target_chunk_size ) # Convert back to numpy phi_stratified = np.array(phi_stratified) t1_stratified = np.array(t1_stratified) t2_stratified = np.array(t2_stratified) g2_stratified = np.array(g2_stratified) # Measure execution time stratification_time_ms = (time.perf_counter() - stratification_start) * 1000.0 # Compute diagnostics if requested diagnostics = None if collect_diagnostics: diagnostics = compute_stratification_diagnostics( phi_original=phi_flat, phi_stratified=phi_stratified, execution_time_ms=stratification_time_ms, use_index_based=use_index_based, target_chunk_size=target_chunk_size, chunk_sizes=chunk_sizes, # Pass actual chunk boundaries ) # Optionally log diagnostic report if log_diagnostics and diagnostics is not None: report = format_diagnostics_report(diagnostics) logger.info(f"\n{report}") # Create stratified data object (modify in-place or create copy) # We need to "unflatten" back to original shape for _prepare_data to work # Actually, we can't easily unflatten to 3D grid, so instead we'll create # a modified data object that stores the flattened stratified arrays # Create a simple namespace object to hold stratified data class StratifiedData: def __init__( self, phi: np.ndarray, t1: np.ndarray, t2: np.ndarray, g2: np.ndarray, original_data: Any, diagnostics: Any = None, chunk_sizes: Any = None, ) -> None: # Store flattened stratified arrays self.phi_flat = phi self.t1_flat = t1 self.t2_flat = t2 self.g2_flat = g2 # Also store unique values for backwards compatibility self.phi = np.unique(phi) self.t1 = np.unique(t1) self.t2 = np.unique(t2) # Store as 1D array (already flattened and stratified) self.g2 = g2 # Copy critical metadata attributes from original data # These are required for residual function computation self.sigma = original_data.sigma # Uncertainty/error bars (CRITICAL) self.q = original_data.q # Wavevector magnitude (CRITICAL) self.L = original_data.L # Sample-detector distance (CRITICAL) # Copy optional dt if present (time step) if hasattr(original_data, "dt"): self.dt = original_data.dt # Store diagnostics if available self.stratification_diagnostics = diagnostics # CRITICAL FIX (Nov 10, 2025): Store original chunk sizes # to preserve stratification boundaries during re-chunking self.chunk_sizes = chunk_sizes # CRITICAL FIX (Dec 2025): Pre-shuffle stratified data before returning # This prevents the hybrid streaming optimizer from seeing angle-sequential data # during L-BFGS warmup, which would cause local minimum traps (gamma_dot_t0 -> 0) # The shuffle must happen HERE, not in _fit_with_stratified_hybrid_streaming, # because other code paths may also use the stratified data. # Fixed seed for reproducible stratified shuffling. # Not user-configurable — this ensures deterministic data ordering # for consistent NLSQ convergence across runs. shuffle_seed = 42 rng = np.random.RandomState(shuffle_seed) # noqa: NPY002 — keep for reproducibility perm = rng.permutation(len(phi_stratified)) phi_stratified = phi_stratified[perm] t1_stratified = t1_stratified[perm] t2_stratified = t2_stratified[perm] g2_stratified = g2_stratified[perm] logger.info( f"Pre-shuffled stratified data (seed={shuffle_seed}) to prevent local minimum traps" ) stratified_data = StratifiedData( phi_stratified, t1_stratified, t2_stratified, g2_stratified, data, # Pass original data to copy metadata attributes diagnostics, chunk_sizes, # CRITICAL FIX: Pass chunk sizes for boundary-aware re-chunking ) logger.info( f"Stratification complete: {len(g2_stratified):,} points reorganized" ) return stratified_data def _run_sequential_optimization( self, data: Any, config: Any, initial_params: np.ndarray | None, bounds: tuple[np.ndarray, np.ndarray] | None, analysis_mode: str, per_angle_scaling: bool, logger: Any, start_time: float, x_scale_value: Any, transform_cfg: dict[str, Any], physical_param_names: list[str], per_angle_scaling_initial: dict[str, list[float]] | None = None, ) -> OptimizationResult: """Run sequential per-angle optimization as a fallback strategy. This method optimizes each phi angle independently and combines results using inverse variance weighting. It's used when: - Angle imbalance ratio exceeds threshold (>5.0 by default) - force_sequential_fallback=true in configuration - Stratification cannot be applied Args: data: Original XPCS data object config: Configuration manager initial_params: Initial parameter guess bounds: Parameter bounds (lower, upper) analysis_mode: 'static_isotropic' or 'laminar_flow' per_angle_scaling: Whether per-angle parameters enabled logger: Logger instance start_time: Start time for execution timing Returns: OptimizationResult with combined parameters from all angles Raises: RuntimeError: If too few angles converge (<50% by default) """ import time logger.info("=" * 80) logger.info("SEQUENTIAL PER-ANGLE OPTIMIZATION") logger.info("=" * 80) # Prepare data arrays phi = np.asarray(data.phi) t1 = np.asarray(data.t1) t2 = np.asarray(data.t2) g2 = np.asarray(data.g2) # Create full meshgrid phi_grid, t1_grid, t2_grid = np.meshgrid(phi, t1, t2, indexing="ij") phi_flat = phi_grid.flatten() t1_flat = t1_grid.flatten() t2_flat = t2_grid.flatten() g2_flat = g2.flatten() from homodyne.config.parameter_manager import ParameterManager param_manager = ParameterManager(config.config, analysis_mode) base_param_names = param_manager.get_all_parameter_names() config_lower_bounds, config_upper_bounds = param_manager.get_bounds_as_arrays( base_param_names ) config_lower_bounds = np.asarray(config_lower_bounds, dtype=float) config_upper_bounds = np.asarray(config_upper_bounds, dtype=float) # Load initial parameters if not provided if initial_params is None: initial_params = param_manager.get_initial_values() logger.info( f"Loaded initial parameters from config: {len(initial_params)} parameters" ) # Load bounds if not provided if bounds is None: bounds = param_manager.get_parameter_bounds(base_param_names) logger.info("Loaded parameter bounds from config") if initial_params is not None: initial_params = np.asarray(initial_params, dtype=np.float64) if bounds is not None: bounds = ( np.asarray(bounds[0], dtype=np.float64), np.asarray(bounds[1], dtype=np.float64), ) try: logger.debug( "Sequential bounds dtype: lower=%s upper=%s", getattr(bounds[0], "dtype", type(bounds[0])), getattr(bounds[1], "dtype", type(bounds[1])), ) logger.debug( "Sequential bounds values: lower=%s upper=%s", np.array2string(bounds[0], precision=3), np.array2string(bounds[1], precision=3), ) except ( ValueError, TypeError, AttributeError, ) as exc: # pragma: no cover - logging safeguard logger.debug(f"Sequential bounds dtype logging failed: {exc}") # Create residual function using physics kernels # (apply_diagonal_correction and compute_g2_scaled imported at module level) phi_unique_all = np.unique(np.round(phi_flat, decimals=6)) t1_unique_all = np.unique(np.asarray(t1)) t2_unique_all = np.unique(np.asarray(t2)) n_phi_total = len(phi_unique_all) per_angle_contrast_override: np.ndarray | None = None per_angle_offset_override: np.ndarray | None = None if per_angle_scaling_initial: contrast_override = per_angle_scaling_initial.get("contrast") if contrast_override is not None: try: arr = np.asarray(contrast_override, dtype=np.float64) if arr.size == n_phi_total: per_angle_contrast_override = arr.copy() else: logger.warning( "Sequential per-angle contrast override has %d entries (expected %d); ignoring override", arr.size, n_phi_total, ) except (TypeError, ValueError): logger.warning( "Invalid sequential per-angle contrast override; ignoring" ) offset_override = per_angle_scaling_initial.get("offset") if offset_override is not None: try: arr = np.asarray(offset_override, dtype=np.float64) if arr.size == n_phi_total: per_angle_offset_override = arr.copy() else: logger.warning( "Sequential per-angle offset override has %d entries (expected %d); ignoring override", arr.size, n_phi_total, ) except (TypeError, ValueError): logger.warning( "Invalid sequential per-angle offset override; ignoring" ) scalar_layout_len = len(physical_param_names) + 2 expected_per_angle_len = 2 * n_phi_total + len(physical_param_names) def _expand_compact_layout(vector: np.ndarray) -> np.ndarray: """Replicate scalar contrast/offset entries across all angles.""" arr = np.asarray(vector, dtype=np.float64) if ( expected_per_angle_len == scalar_layout_len or arr.size == expected_per_angle_len ): return arr if n_phi_total == 0 or arr.size != scalar_layout_len: return arr contrast_val = arr[0] offset_val = arr[1] physical_vals = arr[2:] contrast_block = np.full(n_phi_total, contrast_val, dtype=np.float64) offset_block = np.full(n_phi_total, offset_val, dtype=np.float64) return np.concatenate([contrast_block, offset_block, physical_vals]) solver_initial_params = initial_params.copy() solver_per_angle_scaling = False solver_per_angle_expanded = False if per_angle_scaling: if solver_initial_params.size == expected_per_angle_len: solver_per_angle_scaling = True elif ( expected_per_angle_len > scalar_layout_len and solver_initial_params.size == scalar_layout_len ): solver_initial_params = _expand_compact_layout(solver_initial_params) solver_per_angle_scaling = True solver_per_angle_expanded = True logger.info( "Expanded scalar contrast/offset to per-angle layout for sequential solver (%d angles)", n_phi_total, ) if bounds is not None and bounds[0].size == scalar_layout_len: bounds = ( _expand_compact_layout(bounds[0]), _expand_compact_layout(bounds[1]), ) else: logger.warning( "Per-angle scaling requested but parameter vector has %d entries (expected %d); " "sequential solver will operate with scalar scaling", solver_initial_params.size, expected_per_angle_len, ) if ( solver_per_angle_scaling and solver_initial_params.size == expected_per_angle_len ): if per_angle_contrast_override is not None: solver_initial_params[:n_phi_total] = per_angle_contrast_override if per_angle_offset_override is not None: solver_initial_params[n_phi_total : 2 * n_phi_total] = ( per_angle_offset_override ) param_lower_bounds = config_lower_bounds.copy() param_upper_bounds = config_upper_bounds.copy() if solver_per_angle_scaling and expected_per_angle_len > scalar_layout_len: param_lower_bounds = _expand_compact_layout(param_lower_bounds) param_upper_bounds = _expand_compact_layout(param_upper_bounds) if solver_per_angle_scaling: param_names = _build_parameter_labels( True, n_phi_total, physical_param_names, ) else: param_names = base_param_names def _maybe_expand_x_scale(value: Any) -> Any: if value is None or not solver_per_angle_scaling: return value if isinstance(value, (str, bytes, dict)): return value try: array = np.asarray(value, dtype=np.float64) except (TypeError, ValueError): return value if array.ndim == 0: if expected_per_angle_len > 0: return np.full( expected_per_angle_len, float(array), dtype=np.float64, ) return float(array) if array.size == expected_per_angle_len: return array if ( expected_per_angle_len > scalar_layout_len and array.size == scalar_layout_len ): return _expand_compact_layout(array) return value x_scale_value = _maybe_expand_x_scale(x_scale_value) sigma_source = getattr(data, "sigma", None) if sigma_source is None: sigma_array = np.ones( (n_phi_total, len(t1_unique_all), len(t2_unique_all)), dtype=np.float64, ) else: sigma_array = np.asarray(sigma_source, dtype=np.float64) if not np.all(np.isfinite(sigma_array)): raise ValueError( "sigma values must be finite; received NaN/inf entries" ) if np.any(sigma_array <= 0): non_positive = float(np.count_nonzero(sigma_array <= 0)) raise ValueError( "sigma values must be strictly positive for least-squares " "weighting; found " f"{non_positive:.0f} non-positive entries" ) q_value = float(getattr(data, "q", 1.0)) L_value = float(getattr(data, "L", 1.0)) dt_attr = getattr(data, "dt", None) dt_value = float(dt_attr) if dt_attr is not None else None t1_unique_jnp = jnp.asarray(t1_unique_all) t2_unique_jnp = jnp.asarray(t2_unique_all) physical_index_map = build_physical_index_map( solver_per_angle_scaling, n_phi_total if solver_per_angle_scaling else 0, physical_param_names, ) transform_state: dict[str, Any] = {} if transform_cfg: solver_initial_params, transform_state = ( apply_forward_shear_transforms_to_vector( solver_initial_params, physical_index_map, transform_cfg, ) ) if bounds is not None: bounds = apply_forward_shear_transforms_to_bounds( bounds, transform_state, ) def _compute_g2_grid_for_phi( phi_index: int, physical_params: np.ndarray, contrast_params: np.ndarray | float, offset_params: np.ndarray | float, ) -> np.ndarray: phi_val = float(phi_unique_all[phi_index]) if solver_per_angle_scaling: contrast_val = float(contrast_params[phi_index]) offset_val = float(offset_params[phi_index]) else: contrast_val = float(contrast_params) offset_val = float(offset_params) g2_grid = compute_g2_scaled( params=jnp.asarray(physical_params), t1=t1_unique_jnp, t2=t2_unique_jnp, phi=phi_val, q=q_value, L=L_value, contrast=contrast_val, offset=offset_val, dt=dt_value, ) g2_grid = jnp.squeeze(g2_grid, axis=0) g2_grid = apply_diagonal_correction(g2_grid) return np.asarray(g2_grid, dtype=np.float64) residual_debug_logged = False def residual_func( params: np.ndarray, phi_vals: np.ndarray, t1_vals: np.ndarray, t2_vals: np.ndarray, g2_vals: np.ndarray, ) -> np.ndarray: """Residual function compatible with sequential optimization.""" params_np = np.asarray(params, dtype=np.float64) if transform_state: params_np = apply_inverse_shear_transforms_to_vector( params_np, transform_state, ) phi_section = np.asarray(phi_vals, dtype=np.float64) t1_section = np.asarray(t1_vals, dtype=np.float64) t2_section = np.asarray(t2_vals, dtype=np.float64) g2_section = np.asarray(g2_vals, dtype=np.float64) if solver_per_angle_scaling: contrast_params = params_np[:n_phi_total] offset_params = params_np[n_phi_total : 2 * n_phi_total] physical_params = params_np[2 * n_phi_total :] else: contrast_params = float(params_np[0]) offset_params = float(params_np[1]) physical_params = params_np[2:] # Note: clip removed - sequential residual data comes from same source as # unique arrays (phi_flat, t1, t2), so all values are guaranteed to be in range. # The clip was causing optimization to converge to wrong local minima. # See: stratified LS fix in residual.py (D0=91342 vs 19253 issue). phi_indices = np.searchsorted( phi_unique_all, np.round(phi_section, decimals=6) ) t1_indices = np.searchsorted(t1_unique_all, t1_section) t2_indices = np.searchsorted(t2_unique_all, t2_section) g2_model = np.empty_like(g2_section, dtype=np.float64) sigma_vals = np.empty_like(g2_section, dtype=np.float64) nonlocal residual_debug_logged if not residual_debug_logged: logger.debug( "Sequential residual call: params_shape=%s, phi_unique=%d", params_np.shape, len(np.unique(phi_section)), ) residual_debug_logged = True for phi_idx in np.unique(phi_indices): mask = phi_indices == phi_idx g2_grid = _compute_g2_grid_for_phi( phi_idx, physical_params, contrast_params, offset_params ) g2_model[mask] = g2_grid[t1_indices[mask], t2_indices[mask]] sigma_slice = sigma_array[phi_idx] sigma_vals[mask] = sigma_slice[t1_indices[mask], t2_indices[mask]] residuals = (g2_section - g2_model) / (sigma_vals + 1e-10) return residuals # Get optimizer configuration opt_config = config.config.get("optimization", {}) nlsq_config = opt_config.get("nlsq", {}) # Sequential-specific config seq_config = opt_config.get("sequential", {}) min_success_rate = seq_config.get("min_success_rate", 0.5) weighting = seq_config.get("weighting", "inverse_variance") # Run sequential optimization logger.info( f"Starting per-angle optimization with {len(np.unique(phi_flat))} angles..." ) least_squares_kwargs: dict[str, Any] = { "max_nfev": nlsq_config.get("max_iterations", 1000), "ftol": nlsq_config.get("tolerance", 1e-8), } if "diff_step" in nlsq_config: least_squares_kwargs["diff_step"] = nlsq_config["diff_step"] if "f_scale" in nlsq_config: least_squares_kwargs["f_scale"] = nlsq_config["f_scale"] if x_scale_value is not None: least_squares_kwargs["x_scale"] = x_scale_value sequential_result = optimize_per_angle_sequential( phi=phi_flat, t1=t1_flat, t2=t2_flat, g2_exp=g2_flat, residual_func=residual_func, initial_params=solver_initial_params, bounds=bounds, weighting=weighting, min_success_rate=min_success_rate, parameter_names=param_names, **least_squares_kwargs, ) # Convert SequentialResult to OptimizationResult execution_time = time.time() - start_time # Get device info device_info = { "type": "CPU", # Sequential uses scipy.optimize.least_squares (CPU only) "backend": "scipy.optimize.least_squares", "strategy": "sequential_per_angle", } # Compute chi-squared combined_solver = sequential_result.combined_parameters.copy() combined_physical = combined_solver.copy() if transform_state: combined_physical = apply_inverse_shear_transforms_to_vector( combined_physical, transform_state, ) final_residuals = residual_func( combined_physical, phi_flat, t1_flat, t2_flat, g2_flat ) chi_squared = float(np.sum(final_residuals**2)) n_data = len(phi_flat) n_params = len(sequential_result.combined_parameters) reduced_chi_squared = chi_squared / (n_data - n_params) # Diagnostics payload param_status = {} for idx, name in enumerate(param_names): value = combined_physical[idx] if np.isclose(value, param_lower_bounds[idx]): status = "at_lower_bound" elif np.isclose(value, param_upper_bounds[idx]): status = "at_upper_bound" else: status = "active" param_status[name] = status def _norm_array_to_dict(array: np.ndarray | None) -> dict[str, float] | None: if array is None: return None return {name: float(array[idx]) for idx, name in enumerate(param_names)} per_angle_jac = {} for angle_result in sequential_result.per_angle_results: angle_label = f"phi_{angle_result['phi_angle']:.2f}" per_angle_jac[angle_label] = { "initial": None, "final": None, } if angle_result.get("jac_initial_norms") is not None: per_angle_jac[angle_label]["initial"] = { name: float(angle_result["jac_initial_norms"][idx]) for idx, name in enumerate(param_names) } if angle_result.get("jac_final_norms") is not None: per_angle_jac[angle_label]["final"] = { name: float(angle_result["jac_final_norms"][idx]) for idx, name in enumerate(param_names) } total_nfev = sum( r.get("n_iterations", 0) for r in sequential_result.per_angle_results ) diagnostics_payload = { "solver_settings": { "loss": nlsq_config.get("loss", "linear"), "x_scale": nlsq_config.get("x_scale", "jac"), "strategy": "sequential_per_angle", "jac_sample_size": JAC_SAMPLE_SIZE, }, "nfev_reported": total_nfev, "nfev_actual": total_nfev, "parameter_status": param_status, "initial_jacobian_norms": _norm_array_to_dict( sequential_result.initial_jacobian_norms ), "final_jacobian_norms": _norm_array_to_dict( sequential_result.final_jacobian_norms ), "per_angle_jacobian_norms": per_angle_jac, "solver_per_angle_expanded": solver_per_angle_expanded, "chi_squared": chi_squared, "reduced_chi_squared": reduced_chi_squared, } # Determine convergence status if sequential_result.success_rate >= min_success_rate: convergence_status = "converged" quality_flag = ( "good" if sequential_result.success_rate > 0.8 else "marginal" ) else: convergence_status = "failed" quality_flag = "poor" # Compute uncertainties from covariance uncertainties = np.sqrt(np.diag(sequential_result.combined_covariance)) # Summary logging logger.info("=" * 80) logger.info("SEQUENTIAL OPTIMIZATION COMPLETE") logger.info(f" Success rate: {sequential_result.success_rate:.1%}") logger.info( f" Angles optimized: {sequential_result.n_angles_optimized}/{sequential_result.n_angles_optimized + sequential_result.n_angles_failed}" ) logger.info(f" Combined cost: {sequential_result.total_cost:.4f}") logger.info(f" Reduced chi2: {reduced_chi_squared:.4f}") logger.info(f" Execution time: {execution_time:.2f}s") logger.info(f" Weighting: {weighting}") logger.info("=" * 80) return OptimizationResult( parameters=combined_physical, uncertainties=uncertainties, covariance=sequential_result.combined_covariance, chi_squared=chi_squared, reduced_chi_squared=reduced_chi_squared, convergence_status=convergence_status, iterations=sum( r["n_iterations"] for r in sequential_result.per_angle_results ), execution_time=execution_time, device_info=device_info, recovery_actions=[ f"Sequential per-angle optimization: {sequential_result.n_angles_optimized} angles converged" ], quality_flag=quality_flag, nlsq_diagnostics=diagnostics_payload, ) def _validate_initial_params( self, params: np.ndarray, bounds: tuple[np.ndarray, np.ndarray] | None, ) -> np.ndarray: """Validate initial parameters are within bounds, clip if necessary. Args: params: Initial parameter guess bounds: (lower, upper) bounds tuple or None Returns: Validated/clipped parameter array Raises: ValueError: If params shape doesn't match bounds """ params = np.asarray(params) # If no bounds, return params as-is if bounds is None: return params lower, upper = bounds lower = np.asarray(lower) upper = np.asarray(upper) # Validate parameter count matches bounds if params.shape != lower.shape or params.shape != upper.shape: raise ValueError( f"Parameter shape mismatch: params={params.shape}, " f"lower={lower.shape}, upper={upper.shape}", ) # Clip parameters to bounds clipped_params = np.clip(params, lower, upper) # Warn if any parameters were clipped if not np.allclose(params, clipped_params): clipped_indices = np.where(~np.isclose(params, clipped_params))[0] logger = get_logger(__name__) logger.warning( f"Initial parameters clipped to bounds at indices {clipped_indices}", ) return clipped_params def _convert_bounds( self, homodyne_bounds: tuple[np.ndarray, np.ndarray] | None, ) -> tuple[np.ndarray, np.ndarray] | None: """Convert homodyne bounds format to NLSQ format. Args: homodyne_bounds: (lower_array, upper_array) tuple or None Returns: NLSQ-compatible bounds tuple or None for unbounded optimization Raises: ValueError: If bounds are invalid (lower > upper) """ # Handle None bounds (unbounded optimization) if homodyne_bounds is None: return None # Extract lower and upper bounds lower, upper = homodyne_bounds # Convert to numpy arrays if not already lower = np.asarray(lower) upper = np.asarray(upper) # Validate bounds: lower <= upper elementwise (allow equal for fixed params) if np.any(lower > upper): invalid_indices = np.where(lower > upper)[0] raise ValueError( f"Invalid bounds: lower > upper at indices {invalid_indices}. " f"Bounds must satisfy lower <= upper elementwise.", ) # NLSQ uses the same (lower, upper) tuple format as homodyne # Just return validated bounds return (lower, upper) def _create_residual_function( self, data: Any, analysis_mode: str, per_angle_scaling: bool = True ) -> Any: """Create JAX-compatible model function for NLSQ with per-angle scaling support. IMPORTANT: NLSQ's curve_fit_large expects a MODEL FUNCTION f(x, *params) -> y, NOT a residual function. NLSQ internally computes residuals = data - model. Args: data: XPCS experimental data analysis_mode: Analysis mode determining model computation per_angle_scaling: If True (default), use per-angle contrast/offset parameters. This is the physically correct behavior. If False, use legacy single contrast/offset for all angles. Returns: Model function with signature f(xdata, *params) -> ydata_theory where xdata is a dummy variable for NLSQ compatibility Raises: AttributeError: If data is missing required attributes """ # Import NLSQ physics backend for g2 computation from homodyne.core.physics_nlsq import compute_g2_scaled # Validate data has required attributes required_attrs = ["phi", "t1", "t2", "g2", "sigma", "q", "L"] for attr in required_attrs: if not hasattr(data, attr): raise AttributeError( f"Data must have '{attr}' attribute for residual computation", ) # Extract data attributes and convert to JAX arrays # CRITICAL FIX (Nov 11, 2025): Handle stratified vs non-stratified data differently # # Stratified data: phi_flat, t1_flat, t2_flat are all per-point arrays (same length) # Non-stratified data: phi, t1, t2 are unique grid values (different lengths) is_stratified = hasattr(data, "phi_flat") if is_stratified: # Stratified data: use per-point flat arrays phi = jnp.asarray(data.phi_flat) # Shape: (n_data,) t1 = jnp.asarray(data.t1_flat) # Shape: (n_data,) t2 = jnp.asarray(data.t2_flat) # Shape: (n_data,) else: # Non-stratified data: use unique grid values phi = jnp.asarray(data.phi) # Shape: (n_phi,) t1 = jnp.asarray(data.t1) # Shape: (n_t1,) t2 = jnp.asarray(data.t2) # Shape: (n_t2,) q = float(data.q) L = float(data.L) # Get dt from data — required for compute_g2_scaled (float, not Optional). dt = getattr(data, "dt", None) if dt is not None: dt = float(dt) # Validate dt before JIT compilation (avoid JAX tracing issues) if dt <= 0: raise ValueError(f"dt must be positive, got {dt}") if not np.isfinite(dt): raise ValueError(f"dt must be finite, got {dt}") else: # Fallback: derive from t1 minimum spacing t1_arr = np.asarray(data.t1) t1_unique = np.unique(t1_arr) if len(t1_unique) > 1: dt = float(np.min(np.diff(t1_unique))) else: dt = 0.001 import warnings warnings.warn( f"data.dt missing; derived dt={dt:.6g} from t1 spacing", stacklevel=2, ) # Pre-compute phi_unique for per-angle parameter mapping phi_unique = jnp.asarray(np.unique(np.asarray(phi))) n_phi = len(phi_unique) # Determine parameter structure based on analysis mode and per_angle_scaling # Legacy (per_angle_scaling=False): [contrast, offset, *physical_params] # Static isotropic: 5 params total (2 scaling + 3 physical) # Laminar flow: 9 params total (2 scaling + 7 physical) # # Per-angle (per_angle_scaling=True): [contrast_0, ..., contrast_{n_phi-1}, # offset_0, ..., offset_{n_phi-1}, *physical_params] # Static isotropic: (2*n_phi + 3) params total # Laminar flow: (2*n_phi + 7) params total def model_function(xdata: jnp.ndarray, *params_tuple: float) -> jnp.ndarray: """Compute theoretical g2 model for NLSQ optimization with per-angle scaling. IMPORTANT: xdata contains indices into the flattened data array. This function MUST respect xdata size for curve_fit_large chunking. When curve_fit_large chunks the data, xdata will be a subset of indices. NLSQ will internally compute residuals as: (ydata - model) / sigma Args: xdata: Array of indices into flattened g2 array. Full dataset: [0, 1, ..., n-1] Chunked: [0, 1, ..., chunk_size-1] (subset) *params_tuple: Unpacked parameters (per-angle scaling only) - Format: [contrast_0, ..., contrast_{n_phi-1}, offset_0, ..., offset_{n_phi-1}, *physical] Returns: Theoretical g2 values at requested indices (size matches xdata) """ # Convert params tuple to array (stack avoids retracing vs asarray) params_array = jnp.stack(params_tuple) # Extract per-angle scaling parameters (legacy mode removed Nov 2025) # Per-angle mode: first n_phi are contrasts, next n_phi are offsets contrast = params_array[:n_phi] # Array of shape (n_phi,) offset = params_array[n_phi : 2 * n_phi] # Array of shape (n_phi,) physical_params = params_array[2 * n_phi :] # Get requested data point indices # Use int64 to prevent overflow when n_phi * n_t1 * n_t2 > 2.147B. indices = jnp.asarray(xdata, dtype=jnp.int64) # CRITICAL FIX (Nov 11, 2025): Handle stratified vs non-stratified data if is_stratified: # STRATIFIED DATA PATH (per-point arrays) # Extract per-point values for requested indices phi_requested = phi[indices] # Shape: (chunk_size,) t1_requested = t1[indices] # Shape: (chunk_size,) t2_requested = t2[indices] # Shape: (chunk_size,) # Map phi values to indices in phi_unique to get correct contrast/offset # Find which unique phi each requested phi corresponds to # Since phi values come from phi_unique, we can use searchsorted # CRITICAL: Keep all arrays in JAX (no np.asarray) for JIT compatibility # Note: clip removed - phi_requested is a subset of phi which was used to # build phi_unique, so all values are guaranteed to be in range. # The clip was causing optimization to converge to wrong local minima. phi_idx = jnp.searchsorted( phi_unique, phi_requested ) # Shape: (chunk_size,) # Select per-angle contrast and offset for each data point contrast_requested = contrast[phi_idx] # Shape: (chunk_size,) offset_requested = offset[phi_idx] # Shape: (chunk_size,) # Compute g2 per-point using vmap # Each point has its own (phi, t1, t2, contrast, offset) compute_g2_per_point = jax.vmap( lambda phi_val, t1_val, t2_val, c_val, o_val: compute_g2_scaled( params=physical_params, t1=jnp.array([t1_val]), # Single value as 1D array t2=jnp.array([t2_val]), phi=phi_val, q=q, L=L, contrast=c_val, offset=o_val, dt=dt, )[0, 0], # Extract scalar from (1, 1) output in_axes=(0, 0, 0, 0, 0), # Vmap over all arrays ) g2_theory = compute_g2_per_point( phi_requested, t1_requested, t2_requested, contrast_requested, offset_requested, ) # Shape: (chunk_size,) or possibly (chunk_size, 1) # Ensure 1D output by squeezing any trailing dimensions g2_theory = jnp.squeeze(g2_theory) return g2_theory else: # NON-STRATIFIED DATA PATH (grid-based computation) # Original grid-based logic for non-stratified data compute_g2_scaled_vmap = jax.vmap( lambda phi_val, contrast_val, offset_val: jnp.squeeze( compute_g2_scaled( params=physical_params, t1=t1, # 1D arrays t2=t2, phi=phi_val, # Single phi value q=q, L=L, contrast=contrast_val, # Per-angle contrast offset=offset_val, # Per-angle offset dt=dt, ), axis=0, # Squeeze the phi dimension ), in_axes=(0, 0, 0), # Vectorize over all three arrays ) # Compute on grid for all unique angles g2_theory = compute_g2_scaled_vmap(phi_unique, contrast, offset) # Shape: (n_phi, n_t1, n_t2) # Apply diagonal correction from homodyne.core.jax_backend import apply_diagonal_correction apply_diagonal_vmap = jax.vmap(apply_diagonal_correction, in_axes=0) g2_theory = apply_diagonal_vmap(g2_theory) # Grid-based indexing for non-stratified data n_t1 = len(t1) n_t2 = len(t2) grid_size_per_angle = n_t1 * n_t2 # Decompose flat indices into grid coordinates phi_idx = indices // grid_size_per_angle remaining = indices % grid_size_per_angle t1_idx = remaining // n_t2 t2_idx = remaining % n_t2 return g2_theory[phi_idx, t1_idx, t2_idx] return model_function def _update_best_parameters( self, params: np.ndarray, loss: float, batch_idx: int, logger: Any, ) -> None: """Update best parameters if current loss is better. Parameters ---------- params : np.ndarray Current parameter values loss : float Current loss value batch_idx : int Current batch index logger : logging.Logger Logger instance for reporting """ if params is None: return # Cannot update without parameters if loss < self.best_loss: prev_best = self.best_loss self.best_params = params.copy() self.best_loss = loss self.best_batch_idx = batch_idx logger.info( f"New best loss: {loss:.6e} at batch {batch_idx} " f"(improved from {prev_best:.6e})" ) def _fit_with_streaming_optimizer( self, residual_fn: Any, xdata: np.ndarray, ydata: np.ndarray, initial_params: np.ndarray, bounds: tuple[np.ndarray, np.ndarray] | None, logger: Any, checkpoint_config: dict | None = None, ) -> tuple[np.ndarray, np.ndarray, dict]: """Deprecated: raises RuntimeError.""" return fit_with_streaming_optimizer_deprecated( residual_fn=residual_fn, xdata=xdata, ydata=ydata, initial_params=initial_params, bounds=bounds, logger=logger, checkpoint_config=checkpoint_config, ) def _fit_with_hybrid_streaming_optimizer( self, residual_fn: Any, xdata: np.ndarray, ydata: np.ndarray, initial_params: np.ndarray, bounds: tuple[np.ndarray, np.ndarray] | None, logger: Any, nlsq_config: Any = None, ) -> tuple[np.ndarray, np.ndarray, dict]: """Fit using NLSQ AdaptiveHybridStreamingOptimizer.""" return fit_with_hybrid_streaming_optimizer( residual_fn=residual_fn, xdata=xdata, ydata=ydata, initial_params=initial_params, bounds=bounds, logger=logger, nlsq_config=nlsq_config, fast_mode=self.fast_mode, ) def _create_stratified_chunks( self, stratified_data: Any, target_chunk_size: int = 100_000, ) -> Any: """Convert stratified flat arrays into chunks for StratifiedResidualFunction.""" return create_stratified_chunks(stratified_data, target_chunk_size) def _fit_with_stratified_least_squares( self, stratified_data: Any, per_angle_scaling: bool, physical_param_names: list[str], initial_params: np.ndarray, bounds: tuple[np.ndarray, np.ndarray] | None, logger: Any, target_chunk_size: int = 100_000, anti_degeneracy_config: dict | None = None, nlsq_config_dict: dict | None = None, ) -> tuple[np.ndarray, np.ndarray, dict]: """Fit using NLSQ's least_squares() with stratified residual function.""" return fit_with_stratified_least_squares( stratified_data=stratified_data, per_angle_scaling=per_angle_scaling, physical_param_names=physical_param_names, initial_params=initial_params, bounds=bounds, log=logger, target_chunk_size=target_chunk_size, anti_degeneracy_config=anti_degeneracy_config, nlsq_config_dict=nlsq_config_dict, ) def _fit_with_streaming_optimizer( self, stratified_data: Any, per_angle_scaling: bool, physical_param_names: list[str], initial_params: np.ndarray, bounds: tuple[np.ndarray, np.ndarray] | None, logger: Any, streaming_config: dict | None = None, ) -> tuple[np.ndarray, np.ndarray, dict]: """Deprecated: delegates to fit_with_streaming_optimizer_stratified_deprecated.""" return fit_with_streaming_optimizer_stratified_deprecated( stratified_data=stratified_data, per_angle_scaling=per_angle_scaling, physical_param_names=physical_param_names, initial_params=initial_params, bounds=bounds, logger=logger, streaming_config=streaming_config, ) # NOTE: Dead streaming optimizer code removed (NLSQ 0.4.0+ removed StreamingOptimizer) def _fit_with_out_of_core_accumulation( self, stratified_data: Any, data: Any, per_angle_scaling: bool, physical_param_names: list[str], initial_params: np.ndarray, bounds: tuple[np.ndarray, np.ndarray] | None, logger: Any, config: Any, fast_chi2_mode: bool = False, anti_degeneracy_config: dict | None = None, ) -> tuple[np.ndarray, np.ndarray, dict]: """Fit using Out-of-Core Global Accumulation for massive datasets.""" return fit_with_out_of_core_accumulation( stratified_data=stratified_data, data=data, per_angle_scaling=per_angle_scaling, physical_param_names=physical_param_names, initial_params=initial_params, bounds=bounds, log=logger, config=config, fast_chi2_mode=fast_chi2_mode, anti_degeneracy_config=anti_degeneracy_config, ) def _fit_with_stratified_hybrid_streaming( self, stratified_data: Any, per_angle_scaling: bool, physical_param_names: list[str], initial_params: np.ndarray, bounds: tuple[np.ndarray, np.ndarray] | None, logger: Any, hybrid_config: dict | None = None, anti_degeneracy_config: dict | None = None, ) -> tuple[np.ndarray, np.ndarray, dict]: """Fit using NLSQ AdaptiveHybridStreamingOptimizer for large datasets.""" return fit_with_stratified_hybrid_streaming( stratified_data=stratified_data, per_angle_scaling=per_angle_scaling, physical_param_names=physical_param_names, initial_params=initial_params, bounds=bounds, logger=logger, hybrid_config=hybrid_config, anti_degeneracy_config=anti_degeneracy_config, ) def _estimate_memory_for_stratified_ls( self, n_points: int, n_params: int, n_chunks: int, ) -> float: """Estimate peak memory usage for stratified least-squares optimization.""" return estimate_memory_for_stratified_ls(n_points, n_params, n_chunks) def _should_use_streaming( self, n_points: int, n_params: int, n_chunks: int, memory_threshold_gb: float | None = None, memory_fraction: float | None = None, ) -> tuple[bool, float, str]: """Determine if streaming optimizer should be used based on memory estimate.""" return should_use_streaming( n_points=n_points, n_params=n_params, n_chunks=n_chunks, memory_threshold_gb=memory_threshold_gb, memory_fraction=memory_fraction, ) def _create_fit_result( self, popt: np.ndarray, pcov: np.ndarray, residuals: np.ndarray, n_data: int, iterations: int, execution_time: float, convergence_status: str = "converged", recovery_actions: list[str] | None = None, streaming_diagnostics: dict[str, Any] | None = None, stratification_diagnostics: StratificationDiagnostics | None = None, diagnostics_payload: dict[str, Any] | None = None, n_params_effective: int | None = None, ) -> OptimizationResult: """Convert NLSQ output to OptimizationResult. Args: popt: Optimized parameters pcov: Parameter covariance matrix residuals: Final residuals n_data: Number of data points iterations: Optimization iterations execution_time: Execution time in seconds convergence_status: Convergence status string recovery_actions: List of recovery actions taken streaming_diagnostics: Enhanced diagnostics for streaming optimization (Task 5.4) Returns: Complete OptimizationResult dataclass """ # Convert to numpy arrays popt = np.asarray(popt) pcov = np.asarray(pcov) residuals = np.asarray(residuals) # Compute uncertainties from covariance diagonal uncertainties = _safe_uncertainties_from_pcov(pcov, len(popt)) # Compute chi-squared chi_squared = float(np.sum(residuals**2)) # Compute reduced chi-squared. # Use n_params_effective when provided — in auto_averaged mode the compressed # optimizer vector (e.g. 9 params) has fewer entries than the true model DOF # (e.g. 2*n_phi + n_physical = 53). Using len(popt) would underestimate the # true DOF, producing an artificially low reduced chi-squared and a falsely # optimistic quality_flag ("good" when the fit is "marginal" or "poor"). n_params = n_params_effective if n_params_effective is not None else len(popt) degrees_of_freedom = n_data - n_params reduced_chi_squared = ( chi_squared / degrees_of_freedom if degrees_of_freedom > 0 else np.inf ) # Get device information devices = jax.devices() device_info = { "platform": devices[0].platform, "device": str(devices[0]), "device_kind": devices[0].device_kind, "n_devices": len(devices), } # Determine quality flag based on reduced chi-squared if reduced_chi_squared < 1.5: quality_flag = "good" elif reduced_chi_squared < 3.0: quality_flag = "marginal" else: quality_flag = "poor" # Task 5.4: Build enhanced streaming diagnostics if batch statistics available enhanced_streaming_diagnostics = None if streaming_diagnostics is not None: # Start with provided diagnostics enhanced_streaming_diagnostics = streaming_diagnostics.copy() # Add batch statistics if available if ( hasattr(self, "batch_statistics") and self.batch_statistics.total_batches > 0 ): batch_stats = self.batch_statistics.get_statistics() # Extract key metrics for enhanced diagnostics enhanced_streaming_diagnostics.update( { "batch_success_rate": batch_stats["success_rate"], "failed_batch_indices": [ b["batch_idx"] for b in batch_stats["recent_batches"] if not b["success"] ], "error_type_distribution": batch_stats["error_distribution"], "average_iterations_per_batch": batch_stats[ "average_iterations" ], "total_batches_processed": batch_stats["total_batches"], } ) # Create result result = OptimizationResult( parameters=popt, uncertainties=uncertainties, covariance=pcov, chi_squared=chi_squared, reduced_chi_squared=reduced_chi_squared, convergence_status=convergence_status, iterations=iterations, execution_time=execution_time, device_info=device_info, recovery_actions=recovery_actions or [], quality_flag=quality_flag, streaming_diagnostics=enhanced_streaming_diagnostics, # Task 5.4 stratification_diagnostics=stratification_diagnostics, # v2.2.1: Stratification diagnostics nlsq_diagnostics=diagnostics_payload, ) return result