Source code for homodyne.optimization.nlsq.cmaes_wrapper

"""CMA-ES global optimization wrapper for homodyne.

Provides CMA-ES integration using NLSQ's CMAESOptimizer with:
- Automatic memory configuration for large datasets
- BIPOP restart strategy for robust convergence
- Scale-ratio based method selection
- Integration with homodyne's model caching

CMA-ES (Covariance Matrix Adaptation Evolution Strategy) is particularly
beneficial for XPCS laminar_flow mode where parameters have vastly different
scales (e.g., D₀ ~ 1e4 vs γ̇₀ ~ 1e-3, scale ratio > 1e7).

NLSQ v0.6.4+ Features:
- evosax backend for JAX-accelerated evolution
- BIPOP restart strategy (alternating large/small populations)
- Memory batching: population_batch_size, data_chunk_size
- MethodSelector for auto-selection based on scale ratio

Usage
-----
>>> from homodyne.optimization.nlsq.cmaes_wrapper import CMAESWrapper
>>> wrapper = CMAESWrapper()
>>> if wrapper.should_use_cmaes(bounds):
...     result = wrapper.fit(model_func, xdata, ydata, p0, bounds)
"""

from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any

import numpy as np

from homodyne.utils.logging import get_logger, log_exception, log_phase

if TYPE_CHECKING:
    from homodyne.optimization.nlsq.config import NLSQConfig

logger = get_logger(__name__)


def _format_bounds_summary(bounds: tuple[np.ndarray, np.ndarray]) -> str:
    """Format bounds summary for logging.

    Parameters
    ----------
    bounds : tuple[np.ndarray, np.ndarray]
        Lower and upper bounds as (lower, upper) arrays.

    Returns
    -------
    str
        Human-readable bounds summary.
    """
    lower, upper = bounds
    ranges = upper - lower
    n_params = len(lower)

    # Find min/max ranges for summary
    valid_ranges = ranges[ranges > 0]
    if len(valid_ranges) == 0:
        return f"{n_params} params (no valid ranges)"

    min_range = np.min(valid_ranges)
    max_range = np.max(valid_ranges)

    return f"{n_params} params, range=[{min_range:.2e}, {max_range:.2e}]"


# =============================================================================
# Parameter Normalization Utilities (v2.16.0)
# =============================================================================
# These functions implement bounds-based normalization to improve CMA-ES
# convergence for multi-scale problems (e.g., D₀ ~ 1e4 vs γ̇₀ ~ 1e-3).


def _compute_normalization_factors(
    bounds: tuple[np.ndarray, np.ndarray],
    epsilon: float = 1e-12,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Compute normalization factors for bounds-based normalization.

    Normalization: x_norm = (x - lower) / (upper - lower)

    Parameters
    ----------
    bounds : tuple[np.ndarray, np.ndarray]
        Lower and upper bounds as (lower, upper) arrays.
    epsilon : float
        Small value to prevent division by zero for fixed parameters.

    Returns
    -------
    tuple[np.ndarray, np.ndarray, np.ndarray]
        (scale, offset, range) where:
        - scale = 1 / (upper - lower + epsilon) for safe division
        - offset = lower (subtracted before scaling)
        - range = upper - lower (for covariance adjustment)
    """
    lower, upper = bounds
    param_range = upper - lower

    # Prevent division by zero for fixed parameters (where upper == lower)
    safe_range = np.where(param_range > epsilon, param_range, 1.0)
    scale = 1.0 / safe_range

    return scale, lower, param_range


def _normalize_params(
    params: np.ndarray,
    scale: np.ndarray,
    offset: np.ndarray,
) -> np.ndarray:
    """Normalize parameters from physical space to [0, 1] space.

    Parameters
    ----------
    params : np.ndarray
        Parameters in physical space.
    scale : np.ndarray
        Scale factors (1 / range).
    offset : np.ndarray
        Offset values (lower bounds).

    Returns
    -------
    np.ndarray
        Parameters normalized to [0, 1] space.
    """
    return (params - offset) * scale


def _denormalize_params(
    params_norm: np.ndarray,
    scale: np.ndarray,
    offset: np.ndarray,
) -> np.ndarray:
    """Denormalize parameters from [0, 1] space back to physical space.

    Parameters
    ----------
    params_norm : np.ndarray
        Parameters in normalized [0, 1] space.
    scale : np.ndarray
        Scale factors (1 / range).
    offset : np.ndarray
        Offset values (lower bounds).

    Returns
    -------
    np.ndarray
        Parameters in physical space.
    """
    return params_norm / scale + offset


def _normalize_bounds(
    bounds: tuple[np.ndarray, np.ndarray],
) -> tuple[np.ndarray, np.ndarray]:
    """Normalize bounds to [0, 1] space.

    Parameters
    ----------
    bounds : tuple[np.ndarray, np.ndarray]
        Physical bounds as (lower, upper) arrays.

    Returns
    -------
    tuple[np.ndarray, np.ndarray]
        Normalized bounds (zeros, ones).
    """
    n_params = len(bounds[0])
    return (np.zeros(n_params), np.ones(n_params))


def _adjust_covariance_for_normalization(
    covariance: np.ndarray | None,
    param_range: np.ndarray,
) -> np.ndarray | None:
    """Transform covariance from normalized to physical space.

    Uses Jacobian scaling: Cov_phys = J @ Cov_norm @ J.T
    where J is diagonal with elements = param_range.

    Parameters
    ----------
    covariance : np.ndarray | None
        Covariance matrix in normalized space.
    param_range : np.ndarray
        Parameter ranges (upper - lower bounds).

    Returns
    -------
    np.ndarray | None
        Covariance matrix in physical space, or None if input is None.
    """
    if covariance is None:
        return None

    # Jacobian is diagonal with elements = param_range
    # Cov_phys[i,j] = range_i * Cov_norm[i,j] * range_j
    jacobian_outer = np.outer(param_range, param_range)
    return covariance * jacobian_outer


def _is_cmaes_available() -> bool:
    """Check if CMA-ES is available via NLSQ."""
    try:
        from nlsq.global_optimization import is_evosax_available

        return is_evosax_available()
    except ImportError:
        return False


CMAES_AVAILABLE = _is_cmaes_available()

# Skip L-M refinement when CMA-ES chi2 exceeds this multiple of the
# warm-start chi2 — the comparison in core.py will discard it anyway.
_REFINEMENT_SKIP_CHI2_RATIO = 10.0


[docs] @dataclass class CMAESWrapperConfig: """Configuration for CMA-ES wrapper. Attributes ---------- preset : str CMA-ES preset: "cmaes-fast" (50 gen), "cmaes" (100 gen), "cmaes-global" (200 gen). max_generations : int | None Maximum CMA-ES generations. None = use preset default + adaptive scaling. sigma : float Initial step size as fraction of search range (0, 1]. tol_fun : float Function value tolerance for convergence. tol_x : float Parameter tolerance for convergence. restart_strategy : str Restart strategy: "none" or "bipop". max_restarts : int Maximum number of BIPOP restarts. population_batch_size : int | None Batch size for population evaluation (None = auto). data_chunk_size : int | None Chunk size for data streaming (None = auto). refine_with_nlsq : bool Whether to refine CMA-ES solution with NLSQ TRF. auto_memory : bool Whether to auto-configure memory parameters. memory_limit_gb : float Memory limit for auto-configuration in GB. refinement_workflow : str NLSQ workflow for refinement: "auto" (recommended), "standard", "streaming". refinement_ftol : float Function tolerance for NLSQ refinement. refinement_xtol : float Parameter tolerance for NLSQ refinement. refinement_gtol : float Gradient tolerance for NLSQ refinement. refinement_max_nfev : int Maximum function evaluations for NLSQ refinement. refinement_loss : str Loss function for NLSQ refinement: "linear", "soft_l1", "huber", etc. """ # CMA-ES global search settings preset: str = "cmaes" max_generations: int | None = None # None = use preset + adaptive scaling popsize: int | None = None # None = auto from 4+3*ln(n) sigma: float = 0.5 sigma_warmstart: float = 0.05 # Reduced sigma for warm-start (local refinement) tol_fun: float = 1e-8 tol_x: float = 1e-8 restart_strategy: str = "bipop" max_restarts: int = 9 population_batch_size: int | None = None data_chunk_size: int | None = None auto_memory: bool = True memory_limit_gb: float = 8.0 # NLSQ TRF refinement settings (post-CMA-ES) refine_with_nlsq: bool = True refinement_workflow: str = "auto" # "auto" auto-selects memory strategy refinement_ftol: float = 1e-10 # Tighter than CMA-ES for local refinement refinement_xtol: float = 1e-10 refinement_gtol: float = 1e-10 refinement_max_nfev: int = 500 # Refinement shouldn't need many iterations refinement_loss: str = "linear" # Linear loss for final refinement # Parameter normalization (v2.16.0) # Normalizes parameters to [0,1] based on bounds for better scale handling normalize: bool = True # Enable bounds-based normalization normalization_epsilon: float = 1e-12 # Prevent division by zero
[docs] @classmethod def from_nlsq_config(cls, config: NLSQConfig) -> CMAESWrapperConfig: """Create CMAESWrapperConfig from NLSQConfig. Parameters ---------- config : NLSQConfig NLSQ configuration object. Returns ------- CMAESWrapperConfig CMA-ES wrapper configuration. """ # NLSQConfig might not have all fields if it's an older version or partial # Use getattr with defaults where appropriate or access directly if we are sure _pop_batch = getattr(config, "cmaes_population_batch_size", None) _data_chunk = getattr(config, "cmaes_data_chunk_size", None) return cls( # CMA-ES global search settings preset=getattr(config, "cmaes_preset", "cmaes"), max_generations=getattr(config, "cmaes_max_generations", None), popsize=getattr(config, "cmaes_popsize", None), sigma=getattr(config, "cmaes_sigma", 0.5), sigma_warmstart=getattr(config, "cmaes_sigma_warmstart", 0.05), tol_fun=getattr(config, "cmaes_tol_fun", 1e-8), tol_x=getattr(config, "cmaes_tol_x", 1e-8), restart_strategy=getattr(config, "cmaes_restart_strategy", "bipop"), max_restarts=getattr(config, "cmaes_max_restarts", 9), population_batch_size=_pop_batch, data_chunk_size=_data_chunk, auto_memory=_pop_batch is None and _data_chunk is None, memory_limit_gb=getattr(config, "cmaes_memory_limit_gb", 8.0), # NLSQ TRF refinement settings refine_with_nlsq=getattr(config, "cmaes_refine_with_nlsq", True), refinement_workflow=getattr(config, "cmaes_refinement_workflow", "auto"), refinement_ftol=getattr(config, "cmaes_refinement_ftol", 1e-10), refinement_xtol=getattr(config, "cmaes_refinement_xtol", 1e-10), refinement_gtol=getattr(config, "cmaes_refinement_gtol", 1e-10), refinement_max_nfev=getattr(config, "cmaes_refinement_max_nfev", 500), refinement_loss=getattr(config, "cmaes_refinement_loss", "linear"), # Parameter normalization (v2.16.0) normalize=getattr(config, "cmaes_normalize", True), normalization_epsilon=getattr(config, "cmaes_normalization_epsilon", 1e-12), )
[docs] def to_cmaes_config( self, n_params: int, *, sigma_override: float | None = None ) -> Any: """Convert to NLSQ CMAESConfig. Parameters ---------- n_params : int Number of parameters for popsize calculation. sigma_override : float or None If provided, override the default sigma value. Used to apply warm-start sigma when NLSQ warm-start is active. Returns ------- CMAESConfig NLSQ CMAESConfig object. Raises ------ ImportError If NLSQ CMA-ES is not available. """ if not CMAES_AVAILABLE: raise ImportError( "CMA-ES requires NLSQ 0.6.4+ with evosax backend. " "Install with: pip install nlsq[evosax]" ) from nlsq.global_optimization import CMAESConfig, compute_default_popsize # Use configured popsize if specified, otherwise compute default if self.popsize is not None: popsize = self.popsize else: popsize = compute_default_popsize(n_params) # Map preset to max_generations if using preset preset_generations = { "cmaes-fast": 50, "cmaes": 100, "cmaes-global": 200, } max_gen = preset_generations.get(self.preset, self.max_generations or 100) effective_sigma = sigma_override if sigma_override is not None else self.sigma return CMAESConfig( popsize=popsize, max_generations=max_gen, sigma=effective_sigma, tol_fun=self.tol_fun, tol_x=self.tol_x, restart_strategy=self.restart_strategy, max_restarts=self.max_restarts, population_batch_size=self.population_batch_size, data_chunk_size=self.data_chunk_size, # Disable NLSQ's internal refinement - we do it explicitly in homodyne refine_with_nlsq=False, )
[docs] @dataclass class CMAESResult: """Result from CMA-ES optimization. Attributes ---------- parameters : np.ndarray Optimized parameter values. covariance : np.ndarray | None Parameter covariance matrix (if computed). chi_squared : float Final chi-squared value. success : bool Whether optimization converged successfully. diagnostics : dict CMA-ES diagnostics (generations, evaluations, etc.). method_used : str Method used: "cmaes" or "multi-start". nlsq_refined : bool Whether result was refined with NLSQ L-M. message : str Convergence message. """ parameters: np.ndarray covariance: np.ndarray | None chi_squared: float success: bool diagnostics: dict = field(default_factory=dict) method_used: str = "cmaes" nlsq_refined: bool = False message: str = ""
[docs] class CMAESWrapper: """Wrapper around NLSQ's CMAESOptimizer for homodyne integration. This wrapper provides: - Scale-ratio based method selection (CMA-ES vs multi-start) - Automatic memory configuration for large datasets - BIPOP restart strategy for robust global optimization - Optional L-M refinement of CMA-ES solutions Parameters ---------- config : CMAESWrapperConfig | None Configuration for CMA-ES wrapper. If None, uses defaults. Examples -------- >>> wrapper = CMAESWrapper() >>> if wrapper.should_use_cmaes(bounds, scale_threshold=1000): ... result = wrapper.fit(model_func, xdata, ydata, p0, bounds) """
[docs] def __init__(self, config: CMAESWrapperConfig | None = None) -> None: """Initialize CMA-ES wrapper. Parameters ---------- config : CMAESWrapperConfig | None Configuration for wrapper. Uses defaults if None. """ self.config = config or CMAESWrapperConfig() self._optimizer = None self._restarter = None
@property def is_available(self) -> bool: """Check if CMA-ES is available.""" return CMAES_AVAILABLE
[docs] def compute_scale_ratio( self, bounds: tuple[np.ndarray, np.ndarray], ) -> float: """Compute scale ratio from parameter bounds. The scale ratio is the ratio of the largest to smallest parameter range. High scale ratios (> 1000) indicate multi-scale problems where CMA-ES excels. Parameters ---------- bounds : tuple[np.ndarray, np.ndarray] Lower and upper bounds as (lower, upper) arrays. Returns ------- float Scale ratio (max_range / min_range). Examples -------- >>> lower = np.array([0, 0.001, 100]) >>> upper = np.array([1, 0.01, 10000]) >>> wrapper.compute_scale_ratio((lower, upper)) 11000.0 # (10000-100) / (0.01-0.001) """ lower, upper = bounds ranges = upper - lower # Avoid division by zero valid_ranges = ranges[ranges > 0] if len(valid_ranges) < 2: return 1.0 min_range = np.min(valid_ranges) max_range = np.max(valid_ranges) return max_range / min_range if min_range > 0 else 1.0
[docs] def should_use_cmaes( self, bounds: tuple[np.ndarray, np.ndarray], scale_threshold: float = 1000.0, ) -> bool: """Determine if CMA-ES should be used based on scale ratio. CMA-ES adapts its covariance matrix to different parameter scales, making it ideal for multi-scale optimization problems. This method checks if the scale ratio exceeds the threshold. Parameters ---------- bounds : tuple[np.ndarray, np.ndarray] Parameter bounds as (lower, upper) arrays. scale_threshold : float Scale ratio threshold for CMA-ES selection. Default: 1000. Returns ------- bool True if CMA-ES should be used. Notes ----- XPCS laminar_flow mode typically has scale ratios > 1e7: - D₀ ~ 1e4 (diffusion coefficient) - γ̇₀ ~ 1e-3 (shear rate) """ if not self.is_available: logger.info( "[CMA-ES] Method unavailable: evosax not installed. " "Install with: pip install nlsq[evosax]" ) return False scale_ratio = self.compute_scale_ratio(bounds) should_use = scale_ratio >= scale_threshold bounds_summary = _format_bounds_summary(bounds) if should_use: # Log at INFO when CMA-ES is selected - this is an important decision logger.info( f"[CMA-ES] Auto-selected: scale_ratio={scale_ratio:.2e} " f"(threshold={scale_threshold:.2e}), {bounds_summary}" ) else: # Log at DEBUG when not selected - less important logger.debug( f"[CMA-ES] Not selected: scale_ratio={scale_ratio:.2e} " f"< threshold={scale_threshold:.2e}" ) return should_use
def _run_nlsq_refinement( self, model_func: Callable, xdata: np.ndarray, ydata: np.ndarray, p0: np.ndarray, bounds: tuple[np.ndarray, np.ndarray], sigma: np.ndarray | None = None, ) -> dict[str, Any]: """Run NLSQ TRF refinement on CMA-ES solution. Uses NLSQ's curve_fit with workflow="auto" for memory-aware strategy selection, similar to NLSQ's "auto_global" workflow. Parameters ---------- model_func : Callable Model function: ``y = f(x, *params)``. xdata : np.ndarray Independent variable data. ydata : np.ndarray Dependent variable data. p0 : np.ndarray Initial parameters (CMA-ES solution). bounds : tuple[np.ndarray, np.ndarray] Parameter bounds as (lower, upper). sigma : np.ndarray | None Data uncertainties (optional). Returns ------- dict[str, Any] Refinement result with keys: popt, pcov, infodict, mesg, ier. """ from nlsq import curve_fit n_data = len(ydata) logger.info( f"[CMA-ES] Refinement starting: workflow={self.config.refinement_workflow}, " f"n_data={n_data:,}, ftol={self.config.refinement_ftol:.1e}, " f"max_nfev={self.config.refinement_max_nfev}" ) try: with log_phase("CMA-ES refinement", logger, track_memory=True) as phase: # Build refinement kwargs for NLSQ curve_fit # Note: NLSQ curve_fit uses 'workflow' instead of full scipy interface refinement_kwargs: dict[str, Any] = { "ftol": self.config.refinement_ftol, "xtol": self.config.refinement_xtol, "gtol": self.config.refinement_gtol, "max_nfev": self.config.refinement_max_nfev, "loss": self.config.refinement_loss, } # Run NLSQ curve_fit with workflow="auto" for memory-aware selection # NLSQ returns (popt, pcov), not scipy's 5-tuple with full_output popt, pcov = curve_fit( f=model_func, xdata=xdata, ydata=ydata, p0=p0, sigma=sigma, bounds=bounds, workflow=self.config.refinement_workflow, tr_solver="exact", # Model uses closure data, not xdata **refinement_kwargs, ) # Compute refined chi-squared pred = model_func(xdata, *popt) residuals = ydata - pred if sigma is not None: residuals = residuals / sigma chi_squared = float(np.sum(residuals**2)) logger.info( f"[CMA-ES] Refinement completed: chi2={chi_squared:.4e}, " f"time={phase.duration:.1f}s" ) return { "popt": np.asarray(popt), "pcov": np.asarray(pcov) if pcov is not None else None, "chi_squared": chi_squared, "infodict": {}, # NLSQ doesn't return infodict "mesg": "NLSQ TRF refinement converged", "ier": 1, # Success "success": True, "duration_s": phase.duration, } except (ValueError, RuntimeError, TypeError, OSError, MemoryError) as e: log_exception( logger, e, context={ "phase": "NLSQ refinement", "workflow": self.config.refinement_workflow, "n_data": n_data, }, level=30, # WARNING include_traceback=False, # Keep it concise ) # Return original parameters on refinement failure return { "popt": p0, "pcov": None, "chi_squared": None, "infodict": {}, "mesg": f"Refinement failed: {e}", "ier": -1, "success": False, } def _configure_memory( self, n_data: int, n_params: int, ) -> tuple[int | None, int | None]: """Auto-configure memory parameters for large datasets. Parameters ---------- n_data : int Number of data points. n_params : int Number of parameters. Returns ------- tuple[int | None, int | None] (population_batch_size, data_chunk_size) or (None, None) if auto-configuration is disabled. """ if not self.config.auto_memory: return self.config.population_batch_size, self.config.data_chunk_size if not CMAES_AVAILABLE: return None, None try: from nlsq.global_optimization import ( auto_configure_cmaes_memory, compute_default_popsize, ) # Use configured popsize if specified, otherwise compute default if self.config.popsize is not None: popsize = self.config.popsize else: popsize = compute_default_popsize(n_params) pop_batch, data_chunk = auto_configure_cmaes_memory( n_data=n_data, popsize=popsize, available_memory_gb=self.config.memory_limit_gb, ) # Calculate estimated memory usage for logging # Each individual evaluation: n_data * 8 bytes (float64) est_memory_mb = (pop_batch * n_data * 8) / (1024 * 1024) if pop_batch else 0 # Format data_chunk safely (may be None if auto-configured) data_chunk_str = f"{data_chunk:,}" if data_chunk is not None else "auto" logger.info( f"[CMA-ES] Memory configured: population_batch={pop_batch}, " f"data_chunk={data_chunk_str}, popsize={popsize}, " f"limit={self.config.memory_limit_gb:.1f}GB, " f"est_batch_memory={est_memory_mb:.0f}MB" ) return pop_batch, data_chunk except (ValueError, RuntimeError, TypeError, OSError, MemoryError) as e: log_exception( logger, e, context={ "phase": "memory auto-configuration", "n_data": n_data, "n_params": n_params, "memory_limit_gb": self.config.memory_limit_gb, }, level=30, # WARNING include_traceback=False, ) return None, None
[docs] def fit( self, model_func: Callable, xdata: np.ndarray, ydata: np.ndarray, p0: np.ndarray, bounds: tuple[np.ndarray, np.ndarray], sigma: np.ndarray | None = None, warmstart_chi2: float | None = None, ) -> CMAESResult: """Run CMA-ES global optimization. Parameters ---------- model_func : Callable Model function: ``y = f(x, *params)``. xdata : np.ndarray Independent variable data. ydata : np.ndarray Dependent variable data to fit. p0 : np.ndarray Initial parameter guess. bounds : tuple[np.ndarray, np.ndarray] Parameter bounds as (lower, upper). sigma : np.ndarray | None Data uncertainties (optional). warmstart_chi2 : float | None Chi-squared from NLSQ warm-start. If provided and CMA-ES chi2 exceeds 10x this value, refinement is skipped (the comparison in core.py will discard the CMA-ES result anyway). Also triggers use of ``sigma_warmstart`` instead of ``sigma`` for the CMA-ES search. Returns ------- CMAESResult Optimization result with parameters, covariance, diagnostics. Raises ------ ImportError If CMA-ES is not available. RuntimeError If optimization fails. """ if not CMAES_AVAILABLE: raise ImportError( "CMA-ES requires NLSQ 0.6.4+ with evosax backend. " "Install with: pip install nlsq[evosax]" ) from nlsq.global_optimization import CMAESOptimizer n_params = len(p0) n_data = len(ydata) scale_ratio = self.compute_scale_ratio(bounds) bounds_summary = _format_bounds_summary(bounds) # Log comprehensive configuration summary logger.info( f"[CMA-ES] Optimization starting: n_params={n_params}, " f"n_data={n_data:,}, preset={self.config.preset}" ) logger.info( f"[CMA-ES] Problem characteristics: scale_ratio={scale_ratio:.2e}, " f"{bounds_summary}" ) # Log bounds for debugging (v2.20.0) # Bounds order follows _bounds_to_arrays canonical order: # [contrast, offset, D0, alpha, D_offset, gamma_dot_t0, beta, # gamma_dot_t_offset, phi0] (may differ from config parameter order) lower, upper = bounds logger.debug( "[CMA-ES] Parameter bounds (canonical order): lower=%s, upper=%s", np.array2string(lower, precision=4, separator=", "), np.array2string(upper, precision=4, separator=", "), ) # Select sigma based on warm-start state (v2.20.0) # When warm-start provides a near-optimal starting point, use a smaller # sigma for local refinement instead of global exploration. warmstart_active = warmstart_chi2 is not None and warmstart_chi2 < float("inf") if warmstart_active: effective_sigma = self.config.sigma_warmstart logger.info( f"[CMA-ES] Warm-start active: using sigma_warmstart=" f"{effective_sigma:.3f} (default sigma={self.config.sigma:.3f})" ) else: effective_sigma = None # Use config default # Configure memory batching pop_batch, data_chunk = self._configure_memory(n_data, n_params) # Build CMAESConfig with memory settings cmaes_config = self.config.to_cmaes_config( n_params, sigma_override=effective_sigma ) # When warm-start is active, override BIPOP -> none (v2.21.0) # BIPOP large-population restarts are designed for global exploration, # but with sigma_warmstart (small sigma), the large populations sample # densely in a small neighborhood without actually exploring broadly. # A single focused run with the full generation budget is more coherent # for local refinement around the warm-start solution. if warmstart_active and cmaes_config.restart_strategy == "bipop": cmaes_config.restart_strategy = "none" cmaes_config.max_restarts = 0 logger.info( "[CMA-ES] Warm-start: overriding restart_strategy='bipop' -> 'none' " "(BIPOP large-population restarts are incoherent with small sigma_warmstart)" ) # Adaptive population sizing for high scale-ratio problems (v2.19.0) # Default popsize (4+3*ln(9) ~ 11) is too small for multi-scale problems. # Scale up popsize and generations when scale ratio is large, unless # the user explicitly configured a popsize. if scale_ratio > 1e3: from nlsq.global_optimization import compute_default_popsize default_pop = compute_default_popsize(n_params) if self.config.popsize is not None: # Warn when explicit popsize may be too small for the scale ratio if scale_ratio > 1e6: recommended = max(200, default_pop * 10) elif scale_ratio > 1e4: recommended = max(100, default_pop * 5) else: recommended = max(50, default_pop * 3) if self.config.popsize < recommended: logger.warning( f"[CMA-ES] Explicit popsize={self.config.popsize} may be too small " f"for scale_ratio={scale_ratio:.2e}. Adaptive scaling recommends " f"popsize={recommended}. Set popsize: null in config to enable " f"adaptive sizing." ) else: # Auto-scale popsize and generations if scale_ratio > 1e6: adaptive_pop = max(200, default_pop * 10) adaptive_gen = max(500, cmaes_config.max_generations * 3) elif scale_ratio > 1e4: adaptive_pop = max(100, default_pop * 5) adaptive_gen = max(300, cmaes_config.max_generations * 2) else: adaptive_pop = max(50, default_pop * 3) adaptive_gen = max(200, cmaes_config.max_generations) logger.info( f"[CMA-ES] Adaptive scaling: scale_ratio={scale_ratio:.2e} -> " f"popsize {cmaes_config.popsize}->{adaptive_pop}, " f"max_gen {cmaes_config.max_generations}->{adaptive_gen}" ) cmaes_config.popsize = adaptive_pop cmaes_config.max_generations = adaptive_gen # Override with auto-configured memory settings if pop_batch is not None: cmaes_config.population_batch_size = pop_batch if data_chunk is not None: cmaes_config.data_chunk_size = data_chunk # Log algorithm configuration logger.info( f"[CMA-ES] Algorithm settings: max_generations={cmaes_config.max_generations}, " f"popsize={cmaes_config.popsize}, sigma={cmaes_config.sigma:.3f}" f"{' (warm-start)' if warmstart_active else ''}, " f"restart={self.config.restart_strategy}" ) if self.config.refine_with_nlsq: logger.info( f"[CMA-ES] Post-refinement enabled: workflow={self.config.refinement_workflow}, " f"ftol={self.config.refinement_ftol:.1e}" ) # Create optimizer with config optimizer = CMAESOptimizer(config=cmaes_config) # Set up parameter normalization if enabled (v2.16.0) # Normalizes parameters to [0, 1] space for better CMA-ES covariance adaptation norm_state: dict[str, Any] | None = None if self.config.normalize: norm_scale, norm_offset, param_range = _compute_normalization_factors( bounds, self.config.normalization_epsilon ) norm_state = { "scale": norm_scale, "offset": norm_offset, "range": param_range, } # Normalize initial parameters and bounds fit_p0 = _normalize_params(p0, norm_scale, norm_offset) fit_bounds = _normalize_bounds(bounds) # Wrap model function to denormalize params before evaluation # IMPORTANT: Use JAX operations to preserve tracers during JIT compilation original_model_func = model_func import jax.numpy as jnp # noqa: E402 - lazy import, CMA-ES is optional # Pre-convert normalization factors to JAX arrays (captured in closure) norm_scale_jax = jnp.array(norm_scale) norm_offset_jax = jnp.array(norm_offset) def normalized_model_func( xdata: np.ndarray, *params_norm: float ) -> np.ndarray: # Use jnp.stack to preserve JAX tracers during JIT tracing params_norm_jax = jnp.stack(params_norm) # Denormalize: x = x_norm / scale + offset = x_norm * range + lower params_phys = params_norm_jax / norm_scale_jax + norm_offset_jax return original_model_func(xdata, *params_phys) fit_model_func = normalized_model_func logger.info( f"[CMA-ES] Parameter normalization enabled: " f"range=[{np.min(param_range):.2e}, {np.max(param_range):.2e}]" ) else: fit_p0 = p0 fit_bounds = bounds fit_model_func = model_func # Run CMA-ES global search (NLSQ internal refinement is disabled) # - CMA-ES global search with covariance adaptation # - Optional BIPOP restarts (configured via cmaes_config.restart_strategy) logger.info("[CMA-ES] Global search phase starting...") with log_phase( "CMA-ES global search", logger, track_memory=True ) as search_phase: result = optimizer.fit( f=fit_model_func, xdata=xdata, ydata=ydata, p0=fit_p0, bounds=fit_bounds, sigma=sigma, ) # Extract CMA-ES results cmaes_params = np.asarray(result.get("popt", fit_p0)) cmaes_covariance = result.get("pcov", None) if cmaes_covariance is not None: cmaes_covariance = np.asarray(cmaes_covariance) # Denormalize results if normalization was applied (v2.16.0) if norm_state is not None: cmaes_params = _denormalize_params( cmaes_params, norm_state["scale"], norm_state["offset"] ) cmaes_covariance = _adjust_covariance_for_normalization( cmaes_covariance, norm_state["range"] ) logger.debug( "[CMA-ES] Parameters denormalized from [0,1] to physical space" ) # Build CMA-ES diagnostics # NLSQ 0.6.4+ stores diagnostics under 'cmaes_diagnostics' dict cmaes_diag = result.get("cmaes_diagnostics", {}) generations = cmaes_diag.get("total_generations", 0) restarts = cmaes_diag.get("total_restarts", 0) convergence_reason = cmaes_diag.get("convergence_reason", "unknown") # Calculate evaluations from restart history (each generation evaluates popsize candidates) restart_history = cmaes_diag.get("restart_history", []) evaluations = ( sum(r.get("generations", 0) * r.get("popsize", 0) for r in restart_history) if restart_history else generations * cmaes_config.popsize ) diagnostics = { "generations": generations, "evaluations": evaluations, "restarts": restarts, "convergence_reason": convergence_reason, "global_search_time_s": search_phase.duration, "normalization_enabled": norm_state is not None, } if search_phase.memory_peak_gb is not None: diagnostics["global_search_memory_gb"] = search_phase.memory_peak_gb # Compute CMA-ES chi-squared pred = model_func(xdata, *cmaes_params) residuals = ydata - pred if sigma is not None: residuals = residuals / sigma cmaes_chi_squared = float(np.sum(residuals**2)) # Calculate evaluations per second for performance insight evals_per_sec = ( evaluations / search_phase.duration if search_phase.duration > 0 else 0 ) logger.info( f"[CMA-ES] Global search completed: chi2={cmaes_chi_squared:.4e}, " f"generations={generations}, restarts={restarts}" ) logger.info( f"[CMA-ES] Performance: {evaluations:,} evals in {search_phase.duration:.1f}s " f"({evals_per_sec:.0f} evals/s), reason={convergence_reason}" ) # Early-exit: skip refinement if CMA-ES chi2 is much worse than warm-start # The comparison in core.py will discard this result anyway, so refinement # from a bad starting point wastes compute (can take 400+ seconds). skip_refinement = False if ( warmstart_chi2 is not None and cmaes_chi_squared > _REFINEMENT_SKIP_CHI2_RATIO * warmstart_chi2 ): skip_refinement = True logger.warning( f"[CMA-ES] Skipping refinement: CMA-ES chi2={cmaes_chi_squared:.4e} " f"is >{_REFINEMENT_SKIP_CHI2_RATIO:.0f}x worse than warm-start chi2={warmstart_chi2:.4e}. " f"Warm-start result will be selected in comparison phase." ) # Run explicit NLSQ TRF refinement if enabled if self.config.refine_with_nlsq and not skip_refinement: refinement_result = self._run_nlsq_refinement( model_func=model_func, xdata=xdata, ydata=ydata, p0=cmaes_params, # Start from CMA-ES solution bounds=bounds, sigma=sigma, ) if refinement_result["success"]: # Use refined parameters best_params = refinement_result["popt"] best_covariance = refinement_result["pcov"] best_chi_squared = refinement_result["chi_squared"] nlsq_refined = True # Update diagnostics with refinement info diagnostics["refinement_nfev"] = refinement_result["infodict"].get( "nfev", 0 ) diagnostics["refinement_message"] = refinement_result["mesg"] diagnostics["cmaes_chi_squared"] = cmaes_chi_squared diagnostics["refined_chi_squared"] = best_chi_squared # Calculate improvement percentage if cmaes_chi_squared > 0.0: improvement = ( cmaes_chi_squared - best_chi_squared ) / cmaes_chi_squared else: improvement = 0.0 diagnostics["chi_squared_improvement"] = improvement # Add refinement timing if available if "duration_s" in refinement_result: diagnostics["refinement_time_s"] = refinement_result["duration_s"] logger.info( f"[CMA-ES] Refinement improved chi2: " f"{cmaes_chi_squared:.4e} -> {best_chi_squared:.4e} " f"({improvement:.2%} improvement)" ) else: # Refinement failed, use CMA-ES result logger.warning( f"[CMA-ES] Refinement failed, using CMA-ES result. " f"Reason: {refinement_result['mesg']}" ) best_params = cmaes_params best_covariance = cmaes_covariance best_chi_squared = cmaes_chi_squared nlsq_refined = False diagnostics["refinement_failed"] = True diagnostics["refinement_message"] = refinement_result["mesg"] else: # No refinement requested best_params = cmaes_params best_covariance = cmaes_covariance best_chi_squared = cmaes_chi_squared nlsq_refined = False logger.debug( "[CMA-ES] Post-refinement disabled, using global search result" ) # Calculate total time total_time = search_phase.duration if nlsq_refined and "refinement_time_s" in diagnostics: total_time += diagnostics["refinement_time_s"] diagnostics["total_time_s"] = total_time # Log final summary refined_str = " (refined)" if nlsq_refined else "" logger.info( f"[CMA-ES] Optimization completed{refined_str}: " f"chi2={best_chi_squared:.4e}, total_time={total_time:.1f}s" ) converged_reasons = {"tol_fun", "tol_x", "tol_fun_hist", "ftarget"} success = convergence_reason in converged_reasons or nlsq_refined return CMAESResult( parameters=best_params, covariance=best_covariance, chi_squared=best_chi_squared, success=success, diagnostics=diagnostics, method_used="cmaes", nlsq_refined=nlsq_refined, message=f"CMA-ES converged: {convergence_reason}", )
[docs] def fit_with_cmaes( model_func: Callable, xdata: np.ndarray, ydata: np.ndarray, p0: np.ndarray, bounds: tuple[np.ndarray, np.ndarray], sigma: np.ndarray | None = None, config: CMAESWrapperConfig | None = None, ) -> CMAESResult: """Convenience function for CMA-ES optimization. Parameters ---------- model_func : Callable Model function: ``y = f(x, *params)``. xdata : np.ndarray Independent variable data. ydata : np.ndarray Dependent variable data to fit. p0 : np.ndarray Initial parameter guess. bounds : tuple[np.ndarray, np.ndarray] Parameter bounds as (lower, upper). sigma : np.ndarray | None Data uncertainties (optional). config : CMAESWrapperConfig | None Configuration. Uses defaults if None. Returns ------- CMAESResult Optimization result. Examples -------- >>> result = fit_with_cmaes(model, x, y, p0, bounds) >>> print(f"Best params: {result.parameters}") """ wrapper = CMAESWrapper(config) return wrapper.fit(model_func, xdata, ydata, p0, bounds, sigma)