"""Error recovery and diagnostics for NLSQ optimization.
Extracted from wrapper.py to reduce file size and improve maintainability.
This module provides:
- Safe uncertainty extraction from covariance matrices
- Automatic error recovery with retry strategies (T022-T024)
- Error diagnosis with actionable recovery guidance
"""
from __future__ import annotations
import logging
from collections.abc import Callable
from typing import Any
import numpy as np
from homodyne.optimization.nlsq.fallback_chain import OptimizationStrategy
from homodyne.optimization.nlsq.results import FunctionEvaluationCounter
from homodyne.utils.logging import get_logger, log_exception
logger = get_logger(__name__)
[docs]
def safe_uncertainties_from_pcov(pcov: np.ndarray, n_params: int) -> np.ndarray:
"""Extract uncertainties with diagonal regularization for singular pcov."""
if pcov.shape[0] != n_params:
return np.zeros(n_params)
diag = np.diag(pcov)
if np.any(diag < 1e-15):
logger.warning(
f"Singular covariance: {np.sum(diag < 1e-15)}/{n_params} near-zero entries. "
"Applying regularization."
)
diag = np.diag(pcov + np.eye(n_params) * 1e-10)
return np.asarray(np.sqrt(np.maximum(diag, 0.0)))
[docs]
def execute_with_recovery(
residual_fn: Callable[[np.ndarray], np.ndarray],
xdata: np.ndarray,
ydata: np.ndarray,
initial_params: np.ndarray,
bounds: tuple[np.ndarray, np.ndarray] | None,
strategy: OptimizationStrategy,
log: logging.Logger | logging.LoggerAdapter[logging.Logger],
loss_name: str,
x_scale_value: float | str | np.ndarray,
handle_nlsq_result_fn: Callable,
curve_fit_fn: Callable,
curve_fit_large_fn: Callable,
) -> tuple[np.ndarray, np.ndarray, dict, list[str], str]:
"""Execute optimization with automatic error recovery (T022-T024).
Implements intelligent retry strategies:
- Attempt 1: Original parameters with selected strategy
- Attempt 2: Perturbed parameters (+/-10%)
- Attempt 3: Relaxed convergence tolerance
- Final failure: Comprehensive diagnostics
Args:
residual_fn: Residual function
xdata, ydata: Data arrays
initial_params: Initial parameter guess
bounds: Parameter bounds tuple
strategy: Optimization strategy to use
log: Logger instance
loss_name: Loss function name
x_scale_value: Scaling value for parameters
handle_nlsq_result_fn: Function to normalize NLSQ results
curve_fit_fn: Standard curve_fit function
curve_fit_large_fn: Large dataset curve_fit function
Returns:
(popt, pcov, info, recovery_actions, convergence_status)
"""
recovery_actions = []
max_retries = 3
current_params = initial_params.copy()
# Compute initial cost for optimization success tracking
if hasattr(residual_fn, "n_total_points") or isinstance(
residual_fn, FunctionEvaluationCounter
):
initial_residuals = residual_fn(initial_params)
else:
initial_residuals = residual_fn(xdata, *initial_params)
initial_cost = np.sum(initial_residuals**2)
# Determine if we should use large dataset functions
use_large = strategy != OptimizationStrategy.STANDARD
show_progress = strategy in [
OptimizationStrategy.LARGE,
OptimizationStrategy.CHUNKED,
OptimizationStrategy.STREAMING,
]
for attempt in range(max_retries):
try:
log.info(
f"Optimization attempt {attempt + 1}/{max_retries} ({strategy.value} strategy)"
)
if use_large:
log.debug("Using curve_fit_large with NLSQ automatic memory management")
if isinstance(x_scale_value, (int, float)):
x_scale_large = np.abs(current_params) + 1e-3
log.info(
f"Replacing scalar x_scale={x_scale_value} with magnitude-based scaling"
)
elif isinstance(x_scale_value, np.ndarray):
x_scale_large = x_scale_value
else:
x_scale_large = np.abs(current_params) + 1e-3
result = curve_fit_large_fn(
residual_fn,
xdata,
ydata,
p0=current_params.tolist(),
bounds=bounds,
loss=loss_name,
x_scale=x_scale_large,
gtol=1e-6,
ftol=1e-6,
max_nfev=5000,
verbose=2,
show_progress=show_progress,
stability="auto",
)
popt, pcov, info = handle_nlsq_result_fn(
result, OptimizationStrategy.LARGE
)
info["initial_cost"] = initial_cost
else:
x_scale_array = np.abs(current_params) + 1e-3
n_show = min(8, len(current_params))
log.info(
f"DEBUG: Bounds and scaling (showing first {n_show} of {len(current_params)} params):"
)
if bounds is not None:
lower, upper = bounds
for i in range(n_show):
log.info(
f" param[{i}]: [{lower[i]:.6f}, {upper[i]:.6f}], "
f"initial={current_params[i]:.6f}, x_scale={x_scale_array[i]:.6e}"
)
else:
log.info(" bounds=None (unbounded)")
for i in range(n_show):
log.info(
f" param[{i}]: initial={current_params[i]:.6f}, "
f"x_scale={x_scale_array[i]:.6e}"
)
popt, pcov = curve_fit_fn(
residual_fn,
xdata,
ydata,
p0=current_params.tolist(),
bounds=bounds,
loss=loss_name,
x_scale=x_scale_array,
gtol=1e-6,
ftol=1e-6,
max_nfev=5000,
verbose=2,
stability="auto",
rescale_data=False,
)
info = {"initial_cost": initial_cost}
log.info("=" * 80)
log.info("NLSQ curve_fit RESULT DIAGNOSTICS")
log.info("=" * 80)
log.info(f" Initial params (p0): {current_params}")
log.info(f" Fitted params (popt): {popt}")
log.info(
f" Params changed: {not np.allclose(popt, current_params, rtol=1e-10)}"
)
log.info(f" pcov shape: {pcov.shape}")
log.info(f" pcov diagonal (uncertainties^2): {np.diag(pcov)}")
log.info(f" pcov condition number: {np.linalg.cond(pcov):.2e}")
zero_unc_mask = np.abs(np.diag(pcov)) < 1e-15
if np.any(zero_unc_mask):
zero_indices = np.where(zero_unc_mask)[0]
log.warning(
f"ZERO UNCERTAINTIES detected for parameters at indices: {zero_indices}"
)
log.warning(
" This indicates singular/ill-conditioned Jacobian matrix!"
)
log.warning(
" Affected parameters were likely NOT optimized by NLSQ."
)
log.info("=" * 80)
# Validate result
params_unchanged = np.allclose(popt, current_params, rtol=1e-10)
identity_covariance = np.allclose(pcov, np.eye(len(popt)), rtol=1e-10)
if params_unchanged or identity_covariance:
log.warning(
f"Potential optimization failure detected:\n"
f" Parameters unchanged: {params_unchanged}\n"
f" Identity covariance: {identity_covariance}\n"
f" This may indicate NLSQ streaming bug or failed optimization"
)
if attempt < max_retries - 1:
recovery_actions.append("detected_parameter_stagnation")
log.info("Retrying with perturbed parameters...")
_rng = np.random.default_rng(seed=42 + attempt)
perturbation = (
0.05
* current_params
* _rng.uniform(-1, 1, size=len(current_params))
)
current_params = current_params + perturbation
if bounds is not None:
current_params = np.clip(current_params, bounds[0], bounds[1])
continue
else:
log.error(
"Optimization returned unchanged parameters after all retries. "
"This may indicate a bug in NLSQ or an intractable problem."
)
# Success!
convergence_status = (
"converged" if attempt == 0 else "converged_with_recovery"
)
log.info(f"Optimization converged on attempt {attempt + 1}")
return popt, pcov, info, recovery_actions, convergence_status
except (
ValueError,
RuntimeError,
TypeError,
AttributeError,
OSError,
MemoryError,
) as e:
log_exception(
log,
e,
context={
"attempt": attempt + 1,
"max_retries": max_retries,
"strategy": strategy.value,
"n_params": len(current_params),
"params_summary": f"[{current_params[0]:.4g}, ..., {current_params[-1]:.4g}]",
},
level=logging.WARNING,
)
diagnostic = diagnose_error(
error=e,
params=current_params,
bounds=bounds,
attempt=attempt,
)
log.warning(
f"Attempt {attempt + 1} failed: {diagnostic['error_type']}",
)
log.info(f"Diagnostic: {diagnostic['message']}")
recovery_strategy = diagnostic["recovery_strategy"]
if recovery_strategy.get("action") == "no_recovery_available":
error_msg = (
f"Optimization failed: {diagnostic['error_type']} (unrecoverable)\n"
f"Diagnostic: {diagnostic['message']}\n"
f"Suggestions:\n"
)
for suggestion in diagnostic["suggestions"]:
error_msg += f" - {suggestion}\n"
log.error(error_msg)
raise RuntimeError(error_msg) from e
if attempt < max_retries - 1:
recovery_actions.append(recovery_strategy["action"])
params_before = current_params.copy()
log.info(f"Applying recovery: {recovery_strategy['action']}")
current_params = recovery_strategy["new_params"]
log.info(
f"Recovery parameter adjustment:\n"
f" Before: [{params_before[0]:.4g}, ..., {params_before[-1]:.4g}]\n"
f" After: [{current_params[0]:.4g}, ..., {current_params[-1]:.4g}]\n"
f" Max change: {np.max(np.abs(current_params - params_before)):.4g}"
)
continue
else:
error_msg = (
f"Optimization failed after {max_retries} attempts.\n"
f"Recovery actions attempted: {recovery_actions}\n"
f"Final diagnostic: {diagnostic['message']}\n"
f"Suggestions:\n"
)
for suggestion in diagnostic["suggestions"]:
error_msg += f" - {suggestion}\n"
log.error(error_msg)
raise RuntimeError(error_msg) from e
# Unreachable: loop always returns or raises, but mypy needs this
raise RuntimeError("Optimization failed: exhausted all retry attempts")
[docs]
def diagnose_error(
error: Exception,
params: np.ndarray,
bounds: tuple[np.ndarray, np.ndarray] | None,
attempt: int,
) -> dict[str, Any]:
"""Diagnose optimization error and provide actionable recovery strategy (T023).
Args:
error: Exception raised during optimization
params: Current parameter values
bounds: Parameter bounds
attempt: Current attempt number (0-indexed)
Returns:
Diagnostic dictionary with error analysis and recovery strategy
"""
error_str = str(error).lower()
error_type = type(error).__name__
diagnostic: dict[str, Any] = {
"error_type": error_type,
"message": str(error),
"suggestions": [],
"recovery_strategy": {},
}
if "resource_exhausted" in error_str or "out of memory" in error_str:
diagnostic["error_type"] = "out_of_memory"
diagnostic["suggestions"] = [
"Dataset too large for available CPU memory",
"IMMEDIATE FIX: Reduce dataset size:",
" - Enable phi angle filtering in config (reduce angles from 23 to 8-12)",
" - Reduce time points via subsampling (1001x1001 -> 200x200)",
" - Use smaller time window in config (frames: 1000-2000 -> 1000-1500)",
"ALTERNATIVE: Increase system memory or use machine with more RAM",
"NOTE: curve_fit_large() is disabled - residual function not chunk-aware",
]
diagnostic["recovery_strategy"] = {
"action": "no_recovery_available",
"reason": "Memory exhaustion requires data reduction",
"suggested_actions": [
"enable_angle_filtering",
"reduce_time_points",
"increase_system_memory",
],
}
elif "convergence" in error_str or "max" in error_str or "iteration" in error_str:
diagnostic["error_type"] = "convergence_failure"
diagnostic["suggestions"] = [
"Try different initial parameters",
"Relax convergence tolerance",
"Check if data quality is sufficient",
"Verify parameter bounds are reasonable",
]
if attempt == 0:
perturbation = (
np.random.default_rng(seed=42).standard_normal(params.shape) * 0.1
)
new_params = params * (1.0 + perturbation)
if bounds is not None:
new_params = np.clip(new_params, bounds[0], bounds[1])
diagnostic["recovery_strategy"] = {
"action": "perturb_initial_parameters_10pct",
"new_params": new_params,
}
else:
perturbation = (
np.random.default_rng(seed=123).standard_normal(params.shape) * 0.2
)
new_params = params * (1.0 + perturbation)
if bounds is not None:
new_params = np.clip(new_params, bounds[0], bounds[1])
diagnostic["recovery_strategy"] = {
"action": "perturb_initial_parameters_20pct",
"new_params": new_params,
}
elif "bound" in error_str or "constraint" in error_str:
diagnostic["error_type"] = "bounds_violation"
diagnostic["suggestions"] = [
"Check that lower bounds < upper bounds",
"Verify bounds are physically reasonable",
"Consider expanding bounds if parameters consistently hit limits",
]
if bounds is not None:
lower, upper = bounds
range_width = upper - lower
new_params = lower + 0.5 * range_width
diagnostic["recovery_strategy"] = {
"action": "reset_to_bounds_center",
"new_params": new_params,
}
else:
new_params = params * 0.9
diagnostic["recovery_strategy"] = {
"action": "scale_parameters_0.9x",
"new_params": new_params,
}
elif "singular" in error_str or "condition" in error_str or "rank" in error_str:
diagnostic["error_type"] = "ill_conditioned_jacobian"
diagnostic["suggestions"] = [
"Data may be insufficient to constrain all parameters",
"Consider fixing some parameters",
"Check for parameter correlation",
"Verify data quality and noise levels",
]
new_params = params * 0.1
if bounds is not None:
new_params = np.clip(new_params, bounds[0], bounds[1])
diagnostic["recovery_strategy"] = {
"action": "scale_parameters_0.1x_for_conditioning",
"new_params": new_params,
}
elif "nan" in error_str or "inf" in error_str:
diagnostic["error_type"] = "numerical_instability"
diagnostic["suggestions"] = [
"Check for extreme parameter values",
"Verify data contains no NaN/Inf values",
"Consider parameter rescaling",
"Check residual function implementation",
]
if bounds is not None:
lower, upper = bounds
new_params = np.sqrt(np.abs(lower * upper))
new_params = np.clip(new_params, lower, upper)
else:
new_params = np.ones_like(params) * 0.5
diagnostic["recovery_strategy"] = {
"action": "reset_to_geometric_mean_of_bounds",
"new_params": new_params,
}
else:
diagnostic["error_type"] = "unknown_error"
diagnostic["suggestions"] = [
f"Unexpected error: {error_type}",
"Check data format and residual function",
"Verify NLSQ package installation",
"Consult error message for details",
]
perturbation = np.random.randn(*params.shape) * 0.05
new_params = params * (1.0 + perturbation)
if bounds is not None:
new_params = np.clip(new_params, bounds[0], bounds[1])
diagnostic["recovery_strategy"] = {
"action": "generic_perturbation_5pct",
"new_params": new_params,
}
return diagnostic