Source code for homodyne.optimization.nlsq.strategies.sequential

"""
Sequential Per-Angle Optimization Module

Provides fallback optimization strategy when angle-stratified chunking cannot be used.
Optimizes each phi angle independently and combines results.

Use Cases:
- Extreme angle imbalance (ratio > 5.0)
- Stratification explicitly disabled
- Debugging and validation
- Memory-constrained environments

Author: Homodyne Development Team
Version: 2.3.0
Date: 2026-01-14
"""

from collections.abc import Callable, Mapping, Sequence
from dataclasses import dataclass
from typing import Any

import jax
import jax.numpy as jnp
import numpy as np
from nlsq import LeastSquares

from homodyne.utils.logging import get_logger

logger = get_logger(__name__)

# Global NLSQ LeastSquares instance for reuse
_nlsq_engine: LeastSquares | None = None


def _get_nlsq_engine() -> LeastSquares:
    """Get or create global NLSQ LeastSquares engine."""
    global _nlsq_engine
    if _nlsq_engine is None:
        _nlsq_engine = LeastSquares(enable_stability=True)
    return _nlsq_engine


def _jax_jacobian(
    func: Callable[[np.ndarray], np.ndarray],
    params: np.ndarray,
) -> np.ndarray:
    """Compute Jacobian using JAX autodiff.

    Uses forward-mode autodiff (jacfwd) which is O(n_params × cost_f),
    efficient when n_params << n_residuals (typical in least squares).

    Parameters
    ----------
    func : callable
        Function to differentiate: f(params) -> residuals
    params : np.ndarray
        Point at which to evaluate Jacobian

    Returns
    -------
    np.ndarray
        Jacobian matrix of shape (n_residuals, n_params)
    """
    # Convert to JAX array
    params_jax = jnp.asarray(params)

    # Wrap function for JAX compatibility
    def jax_func(p: jnp.ndarray) -> jnp.ndarray:
        return jnp.asarray(func(np.asarray(p)))

    # Compute Jacobian using forward-mode autodiff
    jac_fn = jax.jacfwd(jax_func)
    jac = jac_fn(params_jax)

    return np.asarray(jac)


def _coerce_mapping_to_array(
    mapping: Mapping[Any, Any],
    n_params: int,
    parameter_names: Sequence[str] | None,
    label: str,
) -> np.ndarray:
    """Convert a parameter mapping to a float64 array aligned with solver order."""

    if parameter_names and len(parameter_names) == n_params:
        index_map = {name: idx for idx, name in enumerate(parameter_names)}
        array = np.ones(n_params, dtype=np.float64)
        for key, value in mapping.items():
            if key not in index_map:
                logger.warning(
                    "%s mapping key '%s' not found in parameter_names; ignoring",
                    label,
                    key,
                )
                continue
            array[index_map[key]] = float(value)
        return array

    # Fallback: assume integer indices
    array = np.ones(n_params, dtype=np.float64)
    for key, value in mapping.items():
        try:
            idx = int(key)
        except (TypeError, ValueError) as exc:
            raise ValueError(
                f"Cannot align {label} mapping without parameter names; "
                f"invalid key: {key}"
            ) from exc
        if idx < 0 or idx >= n_params:
            raise ValueError(
                f"{label} mapping index {idx} out of range for {n_params} parameters"
            )
        array[idx] = float(value)
    return array


def _coerce_numeric_array(
    value: Any,
    n_params: int,
    label: str,
) -> np.ndarray:
    """Ensure a numeric array of length n_params."""

    arr = np.asarray(value, dtype=np.float64)
    if arr.ndim == 0:
        arr = np.full(n_params, float(arr), dtype=np.float64)
    elif arr.size != n_params:
        raise ValueError(
            f"{label} must have {n_params} entries (got shape {arr.shape})"
        )
    if not np.all(np.isfinite(arr)):
        raise ValueError(f"{label} must contain finite float64 values")
    return arr.reshape(n_params)


def _normalize_least_squares_kwargs(
    optimizer_kwargs: dict[str, Any],
    n_params: int,
    parameter_names: Sequence[str] | None,
) -> dict[str, Any]:
    """Normalize NLSQ least_squares kwargs to numeric-friendly forms."""

    if not optimizer_kwargs:
        return {}

    normalized: dict[str, Any] = {}

    for key, value in optimizer_kwargs.items():
        normalized[key] = value

    if "x_scale" in normalized:
        x_scale_value = normalized["x_scale"]
        try:
            if isinstance(x_scale_value, Mapping):
                normalized["x_scale"] = _coerce_mapping_to_array(
                    x_scale_value, n_params, parameter_names, "x_scale"
                )
            else:
                normalized["x_scale"] = _coerce_numeric_array(
                    x_scale_value, n_params, "x_scale"
                )
        except (TypeError, ValueError) as exc:
            logger.warning(
                "Dropping non-numeric x_scale due to %s; reverting to default",
                exc,
            )
            normalized.pop("x_scale", None)

    for scalar_key in ("diff_step", "f_scale"):
        if scalar_key not in normalized:
            continue
        raw_value = normalized[scalar_key]
        try:
            if isinstance(raw_value, Mapping):
                normalized[scalar_key] = _coerce_mapping_to_array(
                    raw_value, n_params, parameter_names, scalar_key
                )
            else:
                arr = np.asarray(raw_value, dtype=np.float64)
                if arr.ndim == 0:
                    normalized[scalar_key] = float(arr)
                elif arr.size == n_params:
                    normalized[scalar_key] = _coerce_numeric_array(
                        arr, n_params, scalar_key
                    )
                else:
                    raise ValueError(
                        f"{scalar_key} must be scalar or length {n_params}, got {arr.shape}"
                    )
        except (TypeError, ValueError) as exc:
            logger.warning(
                "Dropping non-numeric %s due to %s; reverting to default",
                scalar_key,
                exc,
            )
            normalized.pop(scalar_key, None)

    for tol_key in ("ftol", "xtol", "gtol"):
        if tol_key not in normalized:
            continue
        try:
            normalized[tol_key] = float(normalized[tol_key])
        except (TypeError, ValueError) as exc:
            logger.warning(
                "Dropping non-numeric %s due to %s; reverting to default",
                tol_key,
                exc,
            )
            normalized.pop(tol_key, None)

    if "max_nfev" in normalized:
        try:
            normalized["max_nfev"] = int(normalized["max_nfev"])
        except (TypeError, ValueError) as exc:
            logger.warning(
                "Dropping non-numeric max_nfev due to %s; reverting to default",
                exc,
            )
            normalized.pop("max_nfev", None)

    if normalized:
        summary = []
        for key, value in normalized.items():
            if isinstance(value, np.ndarray):
                summary.append(f"{key}=array(shape={value.shape}, dtype={value.dtype})")
            else:
                summary.append(f"{key}={type(value).__name__}")
        logger.debug(
            "Sequential least_squares kwargs sanitized: %s",
            "; ".join(summary),
        )

    return normalized


[docs] @dataclass class AngleSubset: """Data subset for a single phi angle. Attributes ---------- phi_angle : float The phi angle value for this subset phi_indices : np.ndarray Indices where phi == phi_angle n_points : int Number of data points for this angle phi : np.ndarray Phi values (all equal to phi_angle) t1 : np.ndarray Time 1 values t2 : np.ndarray Time 2 values g2_exp : np.ndarray Experimental g2 values """ phi_angle: float phi_indices: np.ndarray n_points: int phi: np.ndarray t1: np.ndarray t2: np.ndarray g2_exp: np.ndarray
[docs] @dataclass class SequentialResult: """Result from sequential per-angle optimization. Attributes ---------- combined_parameters : np.ndarray Combined optimized parameters (weighted average) combined_covariance : np.ndarray Combined covariance matrix per_angle_results : list[dict] Individual results for each angle n_angles_optimized : int Number of angles successfully optimized n_angles_failed : int Number of angles that failed optimization total_cost : float Combined optimization cost success_rate : float Fraction of angles that converged (0.0-1.0) """ combined_parameters: np.ndarray combined_covariance: np.ndarray per_angle_results: list[dict[str, Any]] n_angles_optimized: int n_angles_failed: int total_cost: float success_rate: float initial_jacobian_norms: np.ndarray | None = None final_jacobian_norms: np.ndarray | None = None
JAC_SAMPLE_SIZE = 4096 def _select_jacobian_sample(subset: AngleSubset, sample_size: int) -> dict[str, Any]: """Select a representative subset of rows for Jacobian diagnostics.""" size = min(sample_size, subset.n_points) if size <= 0: return { "phi": subset.phi, "t1": subset.t1, "t2": subset.t2, "g2": subset.g2_exp, "scale": 1.0, "indices": slice(None), } if size == subset.n_points: idx = slice(None) scale = 1.0 else: idx = np.linspace(0, subset.n_points - 1, size).astype(int) scale = np.sqrt(subset.n_points / float(size)) return { "phi": subset.phi[idx], "t1": subset.t1[idx], "t2": subset.t2[idx], "g2": subset.g2_exp[idx], "scale": scale, "indices": idx, "size": size, } def _estimate_initial_jacobian_norms( residual_func: Callable, params: np.ndarray, sample: dict[str, Any], ) -> np.ndarray | None: """Estimate column norms at the starting point via JAX autodiff.""" if sample["phi"].size == 0: return None def sample_residual_vector(p: np.ndarray) -> np.ndarray: return residual_func(p, sample["phi"], sample["t1"], sample["t2"], sample["g2"]) try: jac = _jax_jacobian(sample_residual_vector, params) norms = np.linalg.norm(jac, axis=0) * sample.get("scale", 1.0) return norms except ( ValueError, RuntimeError, TypeError, np.linalg.LinAlgError, ) as exc: # pragma: no cover - diagnostic logger.debug(f"Initial Jacobian estimation failed: {exc}") return None def _compute_final_jacobian_norms( jacobian: np.ndarray | None, sample: dict[str, Any], total_rows: int, ) -> np.ndarray | None: """Compute column norms from NLSQ's final Jacobian, using subsampling.""" if jacobian is None: return None try: if isinstance(sample["indices"], slice): jac_subset = jacobian scale = 1.0 else: idx = sample["indices"] jac_subset = jacobian[idx] scale = np.sqrt(total_rows / float(len(idx))) norms = np.linalg.norm(jac_subset, axis=0) * scale return norms except ( ValueError, RuntimeError, TypeError, IndexError, np.linalg.LinAlgError, ) as exc: # pragma: no cover logger.debug(f"Final Jacobian norm computation failed: {exc}") return None
[docs] def split_data_by_angle( phi: np.ndarray, t1: np.ndarray, t2: np.ndarray, g2_exp: np.ndarray, min_points_per_angle: int = 10, ) -> list[AngleSubset]: """Split dataset into per-angle subsets. Parameters ---------- phi : np.ndarray Phi angle values (flattened) t1 : np.ndarray Time 1 values (flattened) t2 : np.ndarray Time 2 values (flattened) g2_exp : np.ndarray Experimental g2 values (flattened) min_points_per_angle : int, optional Minimum points required per angle, default: 10 Returns ------- list[AngleSubset] List of angle subsets, one per unique phi value Raises ------ ValueError If any angle has fewer than min_points_per_angle points Examples -------- >>> phi = np.array([0, 0, 90, 90, 180, 180]) >>> t1 = np.linspace(0, 1, 6) >>> t2 = np.linspace(0, 1, 6) >>> g2 = np.ones(6) >>> subsets = split_data_by_angle(phi, t1, t2, g2) >>> len(subsets) 3 >>> subsets[0].phi_angle 0.0 >>> subsets[0].n_points 2 """ # Convert to numpy for indexing phi_np = np.asarray(phi) t1_np = np.asarray(t1) t2_np = np.asarray(t2) g2_np = np.asarray(g2_exp) # Get unique angles unique_angles = np.unique(phi_np) logger.info(f"Splitting data into {len(unique_angles)} angle subsets") subsets = [] for angle in unique_angles: # Find indices for this angle indices = np.where(np.isclose(phi_np, angle, atol=1e-6))[0] n_points = len(indices) if n_points < min_points_per_angle: raise ValueError( f"Angle {angle:.2f} deg has only {n_points} points, " f"minimum required: {min_points_per_angle}" ) # Extract subset subset = AngleSubset( phi_angle=float(angle), phi_indices=indices, n_points=n_points, phi=phi_np[indices], t1=t1_np[indices], t2=t2_np[indices], g2_exp=g2_np[indices], ) subsets.append(subset) logger.debug( f" Angle {angle:6.2f} deg: {n_points:,} points " f"({n_points / len(phi_np) * 100:.1f}% of total)" ) return subsets
[docs] def optimize_single_angle( subset: AngleSubset, residual_func: Callable, initial_params: np.ndarray, bounds: tuple[np.ndarray, np.ndarray], **optimizer_kwargs, ) -> dict[str, Any]: """Optimize parameters for a single phi angle. Parameters ---------- subset : AngleSubset Data for this angle residual_func : callable Residual function: residual_func(params, phi, t1, t2) -> residuals initial_params : np.ndarray Initial parameter guess bounds : tuple of np.ndarray (lower_bounds, upper_bounds) for parameters **optimizer_kwargs Additional arguments passed to NLSQ optimizer Returns ------- dict Result dictionary with keys: - 'parameters': Optimized parameters - 'covariance': Covariance matrix - 'cost': Final cost - 'success': Whether optimization converged - 'n_iterations': Number of iterations - 'message': Status message - 'n_points': Number of points used - 'phi_angle': Angle value Notes ----- Uses NLSQ LeastSquares for JAX-accelerated optimization. """ logger.debug( f"Optimizing angle {subset.phi_angle:.2f} deg ({subset.n_points:,} points)" ) try: # Sanitize dtypes: enforce float64 arrays initial_params = np.asarray(initial_params, dtype=np.float64) lower_bounds, upper_bounds = bounds lower_bounds = np.asarray(lower_bounds, dtype=np.float64) upper_bounds = np.asarray(upper_bounds, dtype=np.float64) if ( initial_params.shape[0] != lower_bounds.shape[0] or initial_params.shape[0] != upper_bounds.shape[0] ): raise ValueError( "Initial parameters and bounds must have matching shapes: " f"init={initial_params.shape}, lower={lower_bounds.shape}, upper={upper_bounds.shape}" ) if not ( np.all(np.isfinite(initial_params)) and np.all(np.isfinite(lower_bounds)) and np.all(np.isfinite(upper_bounds)) ): raise ValueError( "Initial parameters and bounds must be finite float64 values" ) # TRF method requires non-degenerate intervals (lower < upper) for all parameters. # Fixed parameters (lower == upper) must be handled by the caller before passing to sequential. if not np.all(lower_bounds < upper_bounds): raise ValueError( "Sequential optimizer requires strict lower < upper bounds for all parameters. " "Fixed parameters (lower == upper) should be removed before sequential optimization." ) logger.debug( "Angle %.2f deg dtype check: init=%s%s lower=%s%s upper=%s%s", subset.phi_angle, initial_params.dtype, initial_params.shape, lower_bounds.dtype, lower_bounds.shape, upper_bounds.dtype, upper_bounds.shape, ) # Prepare Jacobian sampling subset for diagnostics jac_sample = _select_jacobian_sample(subset, JAC_SAMPLE_SIZE) initial_jacobian_norms = _estimate_initial_jacobian_norms( residual_func, initial_params, jac_sample ) # Define residual function for this angle (JAX-compatible) def residuals(params: np.ndarray) -> np.ndarray: return jnp.asarray( residual_func(params, subset.phi, subset.t1, subset.t2, subset.g2_exp) ) # Get NLSQ engine and run optimization engine = _get_nlsq_engine() result = engine.least_squares( fun=residuals, x0=initial_params, bounds=(lower_bounds, upper_bounds), method="trf", **optimizer_kwargs, ) # Extract Jacobian for diagnostics (NLSQ returns 'jac' key) jac = result.get("jac") final_jacobian_norms = _compute_final_jacobian_norms( jac, jac_sample, subset.n_points ) # Compute covariance if possible try: # Covariance from Jacobian: (J^T J)^{-1} * s^2 # where s^2 = sum(r^2) / (n - p) is the residual variance estimate if jac is not None: # result["fun"] is scalar cost, not residual vector — always recompute residuals_final = residuals(result["x"]) n_residuals = len(residuals_final) n_params = len(initial_params) s2 = (residuals_final @ residuals_final) / max( n_residuals - n_params, 1 ) try: cov = np.linalg.inv(jac.T @ jac) * s2 except np.linalg.LinAlgError: try: cov = np.linalg.pinv(jac.T @ jac) * s2 logger.warning( "Singular J^T J - used pinv fallback for covariance" ) except np.linalg.LinAlgError: cov = np.eye(len(initial_params)) else: raise ValueError("No Jacobian available") except (np.linalg.LinAlgError, ValueError): # Fallback to identity if singular logger.warning( f"Could not compute covariance for angle {subset.phi_angle:.2f} deg" ) cov = np.eye(len(initial_params)) # NLSQ result has different key names success = result.get("success", False) cost = result.get("cost", np.inf) n_iterations = result.get("nfev", 0) message = result.get("message", "") return { "parameters": np.asarray(result["x"]), "covariance": cov, "cost": float(cost), "success": bool(success), "n_iterations": int(n_iterations), "message": str(message), "n_points": subset.n_points, "phi_angle": subset.phi_angle, "jac_initial_norms": initial_jacobian_norms, "jac_final_norms": final_jacobian_norms, } except (ValueError, RuntimeError, OSError) as e: logger.error(f"Optimization failed for angle {subset.phi_angle:.2f} deg: {e}") return { "parameters": initial_params, "covariance": np.eye(len(initial_params)), "cost": np.inf, "success": False, "n_iterations": 0, "message": f"Failed: {str(e)}", "n_points": subset.n_points, "phi_angle": subset.phi_angle, "jac_initial_norms": None, "jac_final_norms": None, }
[docs] def combine_angle_results( per_angle_results: list[dict[str, Any]], weighting: str = "inverse_variance", ) -> tuple[np.ndarray, np.ndarray, float]: """Combine per-angle optimization results. Parameters ---------- per_angle_results : list of dict Results from optimize_single_angle for each angle weighting : str, optional Weighting scheme: 'inverse_variance' | 'uniform' | 'n_points' Default: 'inverse_variance' (optimal statistical weighting) Returns ------- combined_params : np.ndarray Weighted average of parameters combined_cov : np.ndarray Combined covariance matrix total_cost : float Sum of individual costs Notes ----- Inverse variance weighting: w_i = 1 / σ²_i μ = Σ(w_i × x_i) / Σ(w_i) σ² = 1 / Σ(w_i) This provides optimal statistical combination when errors are independent. """ # Filter to successful optimizations successful = [r for r in per_angle_results if r["success"]] if not successful: raise ValueError("No angles converged - cannot combine results") logger.info( f"Combining results from {len(successful)}/{len(per_angle_results)} " f"successful angle optimizations" ) # Extract parameters and covariances params_list = np.array([r["parameters"] for r in successful]) cov_list = np.array([r["covariance"] for r in successful]) _n_params = params_list.shape[1] # noqa: F841 - Reserved for future validation # Compute weights if weighting == "inverse_variance": # Weight by 1/σ² (diagonal of inverse covariance) # Add small epsilon to prevent division by zero weights = np.array([1.0 / (np.diag(cov).mean() + 1e-10) for cov in cov_list]) elif weighting == "n_points": # Weight by number of data points weights = np.array([r["n_points"] for r in successful], dtype=float) elif weighting == "uniform": # Equal weights weights = np.ones(len(successful)) else: raise ValueError(f"Unknown weighting: {weighting}") # Normalize weights # Add small epsilon to prevent division by zero weights = weights / (weights.sum() + 1e-10) # Weighted average of parameters combined_params = np.sum(params_list * weights[:, np.newaxis], axis=0) # Combined covariance (inverse variance weighting formula) if weighting == "inverse_variance": # σ² = 1 / Σ(1/σ²_i) # Add small epsilon to prevent division by zero inv_vars = np.array([1.0 / (np.diag(cov) + 1e-10) for cov in cov_list]) combined_var = 1.0 / inv_vars.sum(axis=0) combined_cov = np.diag(combined_var) else: # Weighted average of covariances combined_cov = np.sum(cov_list * weights[:, np.newaxis, np.newaxis], axis=0) # Total cost total_cost = sum(r["cost"] for r in successful) logger.info(f"Combined parameters using {weighting} weighting") logger.debug(f" Weights: {weights}") logger.debug(f" Total cost: {total_cost:.4f}") return combined_params, combined_cov, total_cost
[docs] def strip_fixed_parameters( initial_params: np.ndarray, lower_bounds: np.ndarray, upper_bounds: np.ndarray, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Remove fixed parameters (lower == upper) from the optimizer inputs. The TRF solver used by sequential optimization requires strict lower < upper for every parameter. Fixed parameters (equality constraints encoded as lower == upper) must be stripped before the call and their known values re-inserted into the result. Parameters ---------- initial_params : np.ndarray Full parameter vector including fixed parameters. lower_bounds : np.ndarray Lower bounds array (same length as initial_params). upper_bounds : np.ndarray Upper bounds array (same length as initial_params). Returns ------- free_params : np.ndarray Subset of initial_params where lower < upper. free_lower : np.ndarray Lower bounds for free parameters. free_upper : np.ndarray Upper bounds for free parameters. free_mask : np.ndarray Boolean mask (length == len(initial_params)), True where free. Examples -------- >>> p = np.array([1.0, 2.0, 3.0]) >>> lo = np.array([0.0, 2.0, 0.0]) >>> hi = np.array([5.0, 2.0, 5.0]) >>> free, fl, fu, mask = strip_fixed_parameters(p, lo, hi) >>> free # array([1.0, 3.0]) >>> mask # array([True, False, True]) """ free_mask = lower_bounds < upper_bounds return ( initial_params[free_mask], lower_bounds[free_mask], upper_bounds[free_mask], free_mask, )
[docs] def restore_fixed_parameters( free_result: np.ndarray, fixed_values: np.ndarray, free_mask: np.ndarray, ) -> np.ndarray: """Re-insert fixed parameter values into the optimized result. Inverse of :func:`strip_fixed_parameters`. Parameters ---------- free_result : np.ndarray Optimized values for the free parameters. fixed_values : np.ndarray Full reference parameter vector (fixed positions taken from here). free_mask : np.ndarray Boolean mask returned by :func:`strip_fixed_parameters`. Returns ------- np.ndarray Full parameter vector with fixed values restored. """ result = np.array(fixed_values, dtype=np.float64) result[free_mask] = free_result return result
[docs] def optimize_per_angle_sequential( phi: np.ndarray, t1: np.ndarray, t2: np.ndarray, g2_exp: np.ndarray, residual_func: callable, initial_params: np.ndarray, bounds: tuple[np.ndarray, np.ndarray], weighting: str = "inverse_variance", min_success_rate: float = 0.5, parameter_names: Sequence[str] | None = None, **optimizer_kwargs, ) -> SequentialResult: """Optimize parameters sequentially for each phi angle. Main entry point for sequential per-angle optimization. Parameters ---------- phi : np.ndarray Phi angle values (flattened) t1 : np.ndarray Time 1 values (flattened) t2 : np.ndarray Time 2 values (flattened) g2_exp : np.ndarray Experimental g2 values (flattened) residual_func : callable Residual function: residual_func(params, phi, t1, t2, g2) -> residuals initial_params : np.ndarray Initial parameter guess bounds : tuple of np.ndarray (lower_bounds, upper_bounds) weighting : str, optional Result combination weighting: 'inverse_variance' | 'uniform' | 'n_points' min_success_rate : float, optional Minimum fraction of angles that must converge (0.0-1.0), default: 0.5 parameter_names : Sequence[str], optional Parameter ordering used to align per-parameter kwargs (e.g., x_scale) **optimizer_kwargs Additional arguments passed to NLSQ LeastSquares.least_squares Returns ------- SequentialResult Combined optimization results Raises ------ RuntimeError If success rate < min_success_rate Examples -------- >>> # Simple example with 3 angles >>> phi = np.array([0]*100 + [90]*100 + [180]*100) >>> t1 = np.tile(np.linspace(0, 1, 100), 3) >>> t2 = np.tile(np.linspace(0, 1, 100), 3) >>> g2 = np.ones(300) >>> >>> def residuals(params, phi, t1, t2, g2): ... # Simple model ... return g2 - (1.0 + params[0] * np.exp(-params[1] * t1)) >>> >>> result = optimize_per_angle_sequential( ... phi, t1, t2, g2, ... residuals, ... initial_params=np.array([0.5, 1.0]), ... bounds=(np.array([0.0, 0.0]), np.array([1.0, 10.0])) ... ) >>> result.success_rate 1.0 >>> len(result.per_angle_results) 3 """ logger.info( f"Starting sequential per-angle optimization\n" f" Total points: {len(phi):,}\n" f" Unique angles: {len(np.unique(phi))}\n" f" Parameters: {len(initial_params)}\n" f" Weighting: {weighting}" ) optimizer_kwargs = _normalize_least_squares_kwargs( optimizer_kwargs, n_params=len(initial_params), parameter_names=parameter_names, ) # Split data by angle subsets = split_data_by_angle(phi, t1, t2, g2_exp) # Strip fixed parameters (lower == upper) before passing to TRF solver lower_bounds, upper_bounds = bounds free_params, free_lower, free_upper, free_mask = strip_fixed_parameters( initial_params, lower_bounds, upper_bounds ) free_bounds = (free_lower, free_upper) # Re-normalize optimizer kwargs for the reduced (free) parameter count has_fixed = np.any(~free_mask) if has_fixed: optimizer_kwargs = _normalize_least_squares_kwargs( optimizer_kwargs, n_params=len(free_params), parameter_names=( [n for n, m in zip(parameter_names, free_mask, strict=False) if m] if parameter_names is not None else None ), ) # Wrap residual_func to expand free params back to full params # before evaluation, since residual_func uses hard-coded slicing # that expects the full parameter vector _original_residual = residual_func def residual_func(params, *args, **kwargs): full_params = restore_fixed_parameters(params, initial_params, free_mask) return _original_residual(full_params, *args, **kwargs) # Optimize each angle per_angle_results = [] for subset in subsets: result = optimize_single_angle( subset, residual_func, free_params if has_fixed else initial_params, free_bounds if has_fixed else bounds, **optimizer_kwargs, ) # Restore fixed parameters into the result if has_fixed: result["parameters"] = restore_fixed_parameters( result["parameters"], initial_params, free_mask ) per_angle_results.append(result) status = "OK" if result["success"] else "FAIL" logger.info( f" {status} Angle {result['phi_angle']:6.2f} deg: " f"cost={result['cost']:.4f}, " f"iterations={result['n_iterations']}" ) # Aggregate Jacobian diagnostics def _aggregate_norms(key: str) -> np.ndarray | None: values = [r[key] for r in per_angle_results if r.get(key) is not None] if not values: return None stacked = np.vstack(values) return stacked.mean(axis=0) aggregated_initial = _aggregate_norms("jac_initial_norms") aggregated_final = _aggregate_norms("jac_final_norms") # Check success rate n_success = sum(1 for r in per_angle_results if r["success"]) n_total = len(per_angle_results) success_rate = n_success / n_total if success_rate < min_success_rate: raise RuntimeError( f"Insufficient convergence: {n_success}/{n_total} angles converged " f"({success_rate:.1%}), minimum required: {min_success_rate:.1%}" ) # Combine results combined_params, combined_cov, total_cost = combine_angle_results( per_angle_results, weighting=weighting ) logger.info( f"Sequential optimization complete:\n" f" Success rate: {success_rate:.1%} ({n_success}/{n_total})\n" f" Combined cost: {total_cost:.4f}\n" f" Combined parameters: {combined_params}" ) return SequentialResult( combined_parameters=combined_params, combined_covariance=combined_cov, per_angle_results=per_angle_results, n_angles_optimized=n_success, n_angles_failed=n_total - n_success, total_cost=total_cost, success_rate=success_rate, initial_jacobian_norms=aggregated_initial, final_jacobian_norms=aggregated_final, )