Source code for homodyne.optimization.nlsq.adapter_base

"""Abstract base class for NLSQ adapters (FR-012).

Provides shared methods for NLSQAdapter and NLSQWrapper to reduce code duplication.

Created as part of architecture refactoring (T059-T061).
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Any, cast

import numpy as np

from homodyne.utils.logging import get_logger

logger = get_logger(__name__)


[docs] class NLSQAdapterBase(ABC): """Abstract base class for NLSQ optimization adapters. Provides shared methods for data preparation, validation, result building, error handling, bounds setup, and covariance computation. Subclasses must implement the `fit()` method. """
[docs] @abstractmethod def fit(self, *args: Any, **kwargs: Any) -> Any: """Fit the model to data. Must be implemented by subclasses. """ ...
def _prepare_data( self, t1: np.ndarray, t2: np.ndarray, phi: np.ndarray, g2: np.ndarray, weights: np.ndarray | None = None, ) -> dict[str, Any]: """Prepare input data for optimization. Parameters ---------- t1 : np.ndarray First time coordinates t2 : np.ndarray Second time coordinates phi : np.ndarray Angle coordinates g2 : np.ndarray g2 correlation values weights : np.ndarray | None, optional Optional weights for weighted least squares Returns ------- dict[str, Any] Prepared data structure with keys: - 't1': validated t1 array - 't2': validated t2 array - 'phi': validated phi array - 'g2': validated g2 array - 'weights': weights or None - 'n_points': number of data points - 'phi_unique': unique phi values - 'n_phi': number of unique phi values """ # Convert to numpy arrays t1 = np.asarray(t1, dtype=np.float64) t2 = np.asarray(t2, dtype=np.float64) phi = np.asarray(phi, dtype=np.float64) g2 = np.asarray(g2, dtype=np.float64) if weights is not None: weights = np.asarray(weights, dtype=np.float64) # Get unique phi values phi_unique = np.unique(phi) n_phi = len(phi_unique) return { "t1": t1, "t2": t2, "phi": phi, "g2": g2, "weights": weights, "n_points": len(t1), "phi_unique": phi_unique, "n_phi": n_phi, } def _validate_input( self, t1: np.ndarray, t2: np.ndarray, phi: np.ndarray, g2: np.ndarray, weights: np.ndarray | None = None, ) -> None: """Validate input arrays for consistency. Parameters ---------- t1 : np.ndarray First time coordinates t2 : np.ndarray Second time coordinates phi : np.ndarray Angle coordinates g2 : np.ndarray g2 correlation values weights : np.ndarray | None, optional Optional weights for weighted least squares Raises ------ ValueError If arrays have inconsistent shapes or invalid values """ # Check array lengths match n = len(t1) if len(t2) != n: raise ValueError(f"t1 and t2 must have same length: {n} vs {len(t2)}") if len(phi) != n: raise ValueError(f"t1 and phi must have same length: {n} vs {len(phi)}") if len(g2) != n: raise ValueError(f"t1 and g2 must have same length: {n} vs {len(g2)}") if weights is not None and len(weights) != n: raise ValueError( f"weights must have same length as data: {n} vs {len(weights)}" ) # Check for NaN/Inf values if np.any(~np.isfinite(t1)): raise ValueError("t1 contains NaN or Inf values") if np.any(~np.isfinite(t2)): raise ValueError("t2 contains NaN or Inf values") if np.any(~np.isfinite(phi)): raise ValueError("phi contains NaN or Inf values") if np.any(~np.isfinite(g2)): raise ValueError("g2 contains NaN or Inf values") if weights is not None and np.any(~np.isfinite(weights)): raise ValueError("weights contains NaN or Inf values") # Check for empty arrays if n == 0: raise ValueError("Input arrays cannot be empty") logger.debug(f"Input validation passed: {n} points") def _build_result( self, params: np.ndarray, chi_squared: float, covariance: np.ndarray | None, param_names: list[str], n_iter: int, success: bool, message: str, diagnostics: dict[str, Any] | None = None, ) -> dict[str, Any]: """Build standardized result dictionary. Parameters ---------- params : np.ndarray Optimized parameter values chi_squared : float Final chi-squared value covariance : np.ndarray | None Parameter covariance matrix param_names : list[str] Parameter names n_iter : int Number of iterations success : bool Whether optimization succeeded message : str Status message diagnostics : dict[str, Any] | None, optional Additional diagnostics Returns ------- dict[str, Any] Standardized result dictionary """ result = { "params": params, "chi_squared": chi_squared, "covariance": covariance, "param_names": param_names, "n_iter": n_iter, "success": success, "message": message, } if diagnostics is not None: result["diagnostics"] = diagnostics # Compute parameter uncertainties from covariance if covariance is not None: try: uncertainties = np.sqrt(np.diag(covariance)) result["uncertainties"] = uncertainties except (ValueError, np.linalg.LinAlgError): result["uncertainties"] = None else: result["uncertainties"] = None return result def _handle_error( self, error: Exception, context: str, params: np.ndarray | None = None, raise_on_error: bool = False, ) -> dict[str, Any] | None: """Handle optimization errors gracefully. Parameters ---------- error : Exception The exception that occurred context : str Context description for logging params : np.ndarray | None, optional Current parameter values at time of error raise_on_error : bool, optional Whether to re-raise the error after logging Returns ------- dict[str, Any] | None Error result dictionary if not raising, None otherwise Raises ------ Exception Re-raises original error if raise_on_error is True """ logger.error(f"Error in {context}: {error}") if raise_on_error: raise error return { "params": params, "chi_squared": np.inf, "covariance": None, "param_names": [], "n_iter": 0, "success": False, "message": f"Error: {error}", "error": str(error), } def _setup_bounds( self, param_names: list[str], bounds_dict: dict[str, tuple[float, float]] | None = None, default_bounds: tuple[float, float] | None = None, ) -> tuple[np.ndarray, np.ndarray]: """Setup parameter bounds arrays. Parameters ---------- param_names : list[str] Parameter names bounds_dict : dict[str, tuple[float, float]] | None, optional Dictionary mapping parameter names to (lower, upper) bounds default_bounds : tuple[float, float] | None, optional Default bounds if not specified (defaults to (-inf, inf)) Returns ------- tuple[np.ndarray, np.ndarray] (lower_bounds, upper_bounds) arrays """ n_params = len(param_names) if default_bounds is None: default_bounds = (-np.inf, np.inf) lower = np.full(n_params, default_bounds[0]) upper = np.full(n_params, default_bounds[1]) if bounds_dict is not None: for i, name in enumerate(param_names): if name in bounds_dict: lb, ub = bounds_dict[name] lower[i] = lb upper[i] = ub logger.debug(f"Bounds setup: {n_params} parameters") return lower, upper def _compute_covariance( self, jacobian: np.ndarray, residuals: np.ndarray, n_params: int, ) -> np.ndarray | None: """Compute parameter covariance matrix from Jacobian. Uses the standard formula: cov = (J^T J)^{-1} * s^2 where s^2 = sum(residuals^2) / (n - p) Parameters ---------- jacobian : np.ndarray Jacobian matrix (n_points x n_params) residuals : np.ndarray Residual vector n_params : int Number of parameters Returns ------- np.ndarray | None Covariance matrix or None if computation fails """ try: n_points = len(residuals) dof = n_points - n_params if dof <= 0: logger.warning( f"Insufficient degrees of freedom: {n_points} points, {n_params} params" ) return None # Compute J^T J jtj = jacobian.T @ jacobian # Check condition number for numerical stability cond = np.linalg.cond(jtj) if cond > 1e12: logger.warning(f"J^T J ill-conditioned (cond={cond:.2e}), using SVD") # Use pseudo-inverse for ill-conditioned case jtj_inv = np.linalg.pinv(jtj) else: jtj_inv = np.linalg.inv(jtj) # Compute variance estimate s2 = np.sum(residuals**2) / dof # Covariance matrix covariance = jtj_inv * s2 return cast(np.ndarray, covariance) except (np.linalg.LinAlgError, ValueError) as e: logger.warning(f"Covariance computation failed: {e}") return None
__all__ = ["NLSQAdapterBase"]