"""Hybrid streaming optimization strategy for NLSQ optimization.
Extracted from wrapper.py to reduce file size and improve maintainability.
This module provides:
- Hybrid streaming optimizer (L-BFGS warmup + Gauss-Newton refinement)
- Stratified hybrid streaming with anti-degeneracy defense
- Memory estimation and streaming decision logic
- Deprecated streaming optimizer stubs
"""
from __future__ import annotations
import time
from typing import Any, cast
import jax
import jax.numpy as jnp
import numpy as np
from homodyne.optimization.exceptions import NLSQOptimizationError
from homodyne.optimization.nlsq.adaptive_regularization import (
AdaptiveRegularizationConfig,
AdaptiveRegularizer,
)
from homodyne.optimization.nlsq.fourier_reparam import (
FourierReparamConfig,
FourierReparameterizer,
)
from homodyne.optimization.nlsq.gradient_monitor import (
GradientCollapseMonitor,
GradientMonitorConfig,
)
from homodyne.optimization.nlsq.hierarchical import (
HierarchicalConfig,
HierarchicalOptimizer,
)
from homodyne.optimization.nlsq.memory import get_adaptive_memory_threshold
from homodyne.optimization.nlsq.parameter_utils import (
classify_parameter_status as _classify_parameter_status,
)
from homodyne.optimization.nlsq.parameter_utils import (
compute_quantile_per_angle_scaling as _compute_quantile_per_angle_scaling,
)
from homodyne.optimization.nlsq.recovery import safe_uncertainties_from_pcov
from homodyne.optimization.nlsq.shear_weighting import (
ShearSensitivityWeighting,
ShearWeightingConfig,
)
from homodyne.utils.logging import get_logger
logger = get_logger(__name__)
# Lazy imports to avoid circular dependencies
_memory_logger = get_logger("homodyne.optimization.nlsq.memory")
# Try importing AdaptiveHybridStreamingOptimizer (available in NLSQ >= 0.3.2)
try:
from nlsq import AdaptiveHybridStreamingOptimizer, HybridStreamingConfig
HYBRID_STREAMING_AVAILABLE = True
except ImportError:
HYBRID_STREAMING_AVAILABLE = False
AdaptiveHybridStreamingOptimizer = None
HybridStreamingConfig = None
[docs]
def fit_with_streaming_optimizer_deprecated(
residual_fn: Any,
xdata: np.ndarray,
ydata: np.ndarray,
initial_params: np.ndarray,
bounds: tuple[np.ndarray, np.ndarray] | None,
logger: Any,
checkpoint_config: dict | None = None,
) -> tuple[np.ndarray, np.ndarray, dict]:
"""Fit using streaming optimizer for large datasets.
.. deprecated:: 2.9.1
The old non-stratified StreamingOptimizer was removed in NLSQ 0.4.0.
Use the stratified optimization path with per-angle scaling instead,
which automatically uses AdaptiveHybridStreamingOptimizer for large datasets.
Raises
------
RuntimeError
Always - this code path is no longer supported
"""
raise RuntimeError(
"Non-stratified StreamingOptimizer was removed in NLSQ 0.4.0. "
"Use the stratified optimization path with per_angle_scaling=True, "
"which automatically switches to AdaptiveHybridStreamingOptimizer "
"for memory-constrained datasets. "
"See CLAUDE.md section 'NLSQ Adaptive Hybrid Streaming Mode' for details."
)
[docs]
def fit_with_hybrid_streaming_optimizer(
residual_fn: Any,
xdata: np.ndarray,
ydata: np.ndarray,
initial_params: np.ndarray,
bounds: tuple[np.ndarray, np.ndarray] | None,
logger: Any,
nlsq_config: Any = None,
fast_mode: bool = False,
) -> tuple[np.ndarray, np.ndarray, dict]:
"""Fit using NLSQ AdaptiveHybridStreamingOptimizer for large datasets.
This method uses NLSQ's four-phase hybrid optimizer to fix three key issues:
1. Shear-term weak gradients (scale imbalance) - via parameter normalization
2. Slow convergence - via L-BFGS warmup + Gauss-Newton refinement
3. Crude covariance - via exact J^T J accumulation + covariance transform
Four Phases:
- Phase 0: Parameter normalization setup (bounds-based)
- Phase 1: L-BFGS warmup with adaptive switching
- Phase 2: Streaming Gauss-Newton with exact J^T J accumulation
- Phase 3: Denormalization and covariance transform
Parameters
----------
residual_fn : callable
Residual function (StratifiedResidualFunction or similar)
xdata : np.ndarray
Independent variable data (flattened)
ydata : np.ndarray
Dependent variable data (flattened)
initial_params : np.ndarray
Initial parameter guess
bounds : tuple of np.ndarray or None
Parameter bounds (lower, upper)
logger : logging.Logger
Logger instance
nlsq_config : NLSQConfig, optional
NLSQ configuration with hybrid streaming settings
Returns
-------
popt : np.ndarray
Optimized parameters
pcov : np.ndarray
Covariance matrix (properly transformed to original space)
info : dict
Optimization information including phase diagnostics
Raises
------
RuntimeError
If AdaptiveHybridStreamingOptimizer is not available
NLSQOptimizationError
If optimization fails
"""
if not HYBRID_STREAMING_AVAILABLE:
raise RuntimeError(
"AdaptiveHybridStreamingOptimizer not available. "
"Please upgrade NLSQ to version >= 0.3.2: pip install --upgrade nlsq"
)
logger.info("Initializing NLSQ AdaptiveHybridStreamingOptimizer...")
logger.info("Fixes: 1) Shear-term gradients, 2) Convergence, 3) Covariance")
# Create HybridStreamingConfig from NLSQConfig with 4-layer defense
if nlsq_config is not None:
config = HybridStreamingConfig(
normalize=nlsq_config.hybrid_normalize,
normalization_strategy=nlsq_config.hybrid_normalization_strategy,
warmup_iterations=nlsq_config.hybrid_warmup_iterations,
max_warmup_iterations=nlsq_config.hybrid_max_warmup_iterations,
warmup_learning_rate=nlsq_config.hybrid_warmup_learning_rate,
gauss_newton_max_iterations=nlsq_config.hybrid_gauss_newton_max_iterations,
gauss_newton_tol=nlsq_config.hybrid_gauss_newton_tol,
chunk_size=nlsq_config.hybrid_chunk_size,
trust_region_initial=nlsq_config.hybrid_trust_region_initial,
regularization_factor=nlsq_config.hybrid_regularization_factor,
enable_checkpoints=nlsq_config.hybrid_enable_checkpoints,
checkpoint_frequency=nlsq_config.hybrid_checkpoint_frequency,
validate_numerics=nlsq_config.hybrid_validate_numerics,
# 4-Layer Defense Strategy (NLSQ 0.3.6)
enable_warm_start_detection=nlsq_config.hybrid_enable_warm_start_detection,
warm_start_threshold=nlsq_config.hybrid_warm_start_threshold,
enable_adaptive_warmup_lr=nlsq_config.hybrid_enable_adaptive_warmup_lr,
warmup_lr_refinement=nlsq_config.hybrid_warmup_lr_refinement,
warmup_lr_careful=nlsq_config.hybrid_warmup_lr_careful,
enable_cost_guard=nlsq_config.hybrid_enable_cost_guard,
cost_increase_tolerance=nlsq_config.hybrid_cost_increase_tolerance,
enable_step_clipping=nlsq_config.hybrid_enable_step_clipping,
max_warmup_step_size=nlsq_config.hybrid_max_warmup_step_size,
)
else:
# Use NLSQ 0.3.6 defaults with 4-layer defense enabled
config = HybridStreamingConfig(
normalize=True,
normalization_strategy="auto",
warmup_iterations=200,
max_warmup_iterations=500,
gauss_newton_max_iterations=100,
gauss_newton_tol=1e-8,
chunk_size=10000,
# 4-Layer Defense enabled by default
enable_warm_start_detection=True,
warm_start_threshold=0.01,
enable_adaptive_warmup_lr=True,
warmup_lr_refinement=1e-6,
warmup_lr_careful=1e-5,
enable_cost_guard=True,
cost_increase_tolerance=0.05,
enable_step_clipping=True,
max_warmup_step_size=0.1,
)
logger.info(f" Normalization: {config.normalization_strategy}")
logger.info(f" Warmup iterations: {config.warmup_iterations}")
logger.info(f" Gauss-Newton max: {config.gauss_newton_max_iterations}")
logger.info(f" Chunk size: {config.chunk_size}")
# Initialize optimizer
optimizer = AdaptiveHybridStreamingOptimizer(config)
# Create model function from residual function
# The hybrid optimizer expects: func(x, *params) -> predictions
# Our residual function computes: residuals = y - predictions
# We need: predictions = y - residuals
if hasattr(residual_fn, "jax_residual"):
# Stratified residual function
def model_fn(x: Any, *params: float) -> Any:
params_array = jnp.asarray(params)
residuals = residual_fn.jax_residual(params_array)
return ydata - residuals
else:
# Standard residual function
def model_fn(x: Any, *params: float) -> Any:
residuals = residual_fn(x, *params)
return ydata - residuals
try:
# Run optimization
result = optimizer.fit(
data_source=(xdata, ydata),
func=model_fn,
p0=initial_params,
bounds=bounds,
sigma=None, # TODO: Add sigma support if needed
verbose=1 if not fast_mode else 0,
)
# Extract results
popt = np.asarray(result["x"])
pcov = np.asarray(result.get("pcov", np.eye(len(popt))))
perr = np.asarray(
result.get("perr", safe_uncertainties_from_pcov(pcov, len(popt)))
)
# Build info dict with phase diagnostics
info = {
"success": result.get("success", True),
"message": result.get("message", "Hybrid optimization completed"),
"hybrid_streaming_diagnostics": result.get("streaming_diagnostics", {}),
"perr": perr,
"sigma_sq": result.get("streaming_diagnostics", {})
.get("gauss_newton_diagnostics", {})
.get("final_cost"),
"phase_timings": result.get("streaming_diagnostics", {}).get(
"phase_timings", {}
),
}
logger.info("Hybrid streaming optimization completed successfully")
phase_timings = info.get("phase_timings", {})
if phase_timings:
logger.info(
f" Phase 0 (normalization): {phase_timings.get('phase0_normalization', 0):.3f}s"
)
logger.info(
f" Phase 1 (L-BFGS warmup): {phase_timings.get('phase1_warmup', 0):.3f}s"
)
logger.info(
f" Phase 2 (Gauss-Newton): {phase_timings.get('phase2_gauss_newton', 0):.3f}s"
)
logger.info(
f" Phase 3 (covariance): {phase_timings.get('phase3_finalize', 0):.3f}s"
)
return popt, pcov, info
except (
ValueError,
RuntimeError,
TypeError,
AttributeError,
OSError,
MemoryError,
) as e:
# T031: Log detailed warning explaining failure and lost capabilities
logger.error(f"AdaptiveHybridStreamingOptimizer failed: {e}")
logger.warning(
"=" * 60 + "\n"
"HYBRID OPTIMIZER FAILURE - Falling back to basic streaming\n"
"=" * 60 + "\n"
"The AdaptiveHybridStreamingOptimizer encountered an error.\n"
"\n"
"Capabilities lost with fallback:\n"
" - Parameter normalization (gradient equalization)\n"
" - L-BFGS warmup + Gauss-Newton hybrid convergence\n"
" - Exact J^T J covariance accumulation\n"
"\n"
"Fallback uses basic streaming optimizer which may:\n"
" - Converge slower (1000+ vs ~110 iterations)\n"
" - Miss shear parameters (imbalanced gradients)\n"
" - Produce less accurate uncertainties\n"
"\n"
f"Error details: {type(e).__name__}: {str(e)}\n"
"=" * 60
)
# T030: TODO - Implement 3-attempt retry with HybridRecoveryConfig
# For now, immediately raise to trigger fallback to streaming
if isinstance(e, NLSQOptimizationError):
raise
else:
raise NLSQOptimizationError(
f"AdaptiveHybridStreamingOptimizer failed: {str(e)}",
error_context={"original_error": type(e).__name__},
) from e
[docs]
def fit_with_streaming_optimizer_stratified_deprecated(
stratified_data: Any,
per_angle_scaling: bool,
physical_param_names: list[str],
initial_params: np.ndarray,
bounds: tuple[np.ndarray, np.ndarray] | None,
logger: Any,
streaming_config: dict | None = None,
) -> tuple[np.ndarray, np.ndarray, dict]:
"""Fit using NLSQ streaming optimizer for memory-constrained large datasets.
.. deprecated:: 2.9.1
The old StreamingOptimizer was removed in NLSQ 0.4.0.
This method now delegates to _fit_with_stratified_hybrid_streaming
which uses AdaptiveHybridStreamingOptimizer.
Args:
stratified_data: StratifiedData object with flat stratified arrays
per_angle_scaling: Whether per-angle parameters are enabled
physical_param_names: List of physical parameter names
initial_params: Initial parameter guess
bounds: Parameter bounds (lower, upper) tuple
logger: Logger instance
streaming_config: Optional config dict (converted to hybrid_config)
Returns:
(popt, pcov, info) tuple
Raises:
RuntimeError: If AdaptiveHybridStreamingOptimizer is not available
"""
# Delegate to hybrid streaming optimizer (old StreamingOptimizer removed in NLSQ 0.4.0)
logger.info(
"Note: StreamingOptimizer was removed in NLSQ 0.4.0. "
"Using AdaptiveHybridStreamingOptimizer instead."
)
# Convert streaming_config to hybrid_config format
hybrid_config = None
if streaming_config:
hybrid_config = {
"chunk_size": streaming_config.get("batch_size", 50000),
"warmup_iterations": streaming_config.get("max_epochs", 100),
"gauss_newton_tol": streaming_config.get("convergence_tol", 1e-8),
"warmup_learning_rate": streaming_config.get("learning_rate", 0.001),
}
return fit_with_stratified_hybrid_streaming(
stratified_data=stratified_data,
per_angle_scaling=per_angle_scaling,
physical_param_names=physical_param_names,
initial_params=initial_params,
bounds=bounds,
logger=logger,
hybrid_config=hybrid_config,
)
[docs]
def fit_with_stratified_hybrid_streaming(
stratified_data: Any,
per_angle_scaling: bool,
physical_param_names: list[str],
initial_params: np.ndarray,
bounds: tuple[np.ndarray, np.ndarray] | None,
logger: Any,
hybrid_config: dict | None = None,
anti_degeneracy_config: dict | None = None,
) -> tuple[np.ndarray, np.ndarray, dict]:
"""Fit using NLSQ AdaptiveHybridStreamingOptimizer for large datasets.
This method implements the 4-phase hybrid optimization from NLSQ >=0.3.2:
- Phase 0: Parameter normalization setup (bounds-based)
- Phase 1: L-BFGS warmup with adaptive switching
- Phase 2: Streaming Gauss-Newton with exact J^T J accumulation
- Phase 3: Denormalization and covariance transform
With Anti-Degeneracy Defense System v2.9.0 integration:
- Layer 1: Fourier Reparameterization (reduces per-angle DoF)
- Layer 2: Hierarchical Optimization (alternating stage fitting)
- Layer 3: Adaptive CV-based Regularization (scales properly)
- Layer 4: Gradient Collapse Detection (runtime monitoring)
Key improvements over basic StreamingOptimizer:
1. Shear-term weak gradients: Fixed via parameter normalization
2. Slow convergence: Fixed via L-BFGS warmup + Gauss-Newton refinement
3. Crude covariance: Fixed via exact J^T J accumulation
4. Structural degeneracy: Fixed via anti-degeneracy defense layers
Args:
stratified_data: StratifiedData object with flat stratified arrays
per_angle_scaling: Whether per-angle parameters are enabled
physical_param_names: List of physical parameter names
initial_params: Initial parameter guess
bounds: Parameter bounds (lower, upper) tuple
logger: Logger instance
hybrid_config: Optional config dict with keys:
- normalize: Enable parameter normalization (default: True)
- normalization_strategy: "bounds" or "scale" (default: "bounds")
- warmup_iterations: L-BFGS warmup iterations (default: 100)
- max_warmup_iterations: Max L-BFGS iterations (default: 500)
- warmup_learning_rate: L-BFGS line search scale (default: 0.001)
- gauss_newton_max_iterations: GN iterations (default: 50)
- gauss_newton_tol: Convergence tolerance (default: 1e-8)
- chunk_size: Points per chunk for streaming (default: 50000)
anti_degeneracy_config: Optional config dict for Anti-Degeneracy Defense:
- per_angle_mode: "independent", "fourier", or "auto" (default: "auto")
- fourier_order: Fourier harmonic order (default: 2)
- fourier_auto_threshold: n_phi threshold for auto mode (default: 6)
- hierarchical.enable: Enable hierarchical optimization (default: True)
- regularization.mode: "absolute", "relative", or "auto" (default: "relative")
- regularization.lambda: Base regularization strength (default: 1.0)
- gradient_monitoring.enable: Enable gradient collapse detection (default: True)
Returns:
(popt, pcov, info) tuple
Raises:
RuntimeError: If AdaptiveHybridStreamingOptimizer is not available
"""
if not HYBRID_STREAMING_AVAILABLE:
raise RuntimeError(
"AdaptiveHybridStreamingOptimizer not available. "
"Please upgrade NLSQ to version >= 0.3.2: pip install --upgrade nlsq"
)
logger.info("Initializing NLSQ AdaptiveHybridStreamingOptimizer...")
logger.info("Fixes: 1) Shear-term gradients, 2) Convergence, 3) Covariance")
start_time = time.perf_counter()
# Parse hybrid streaming configuration
# Uses NLSQ 0.3.6 defaults which include 4-layer defense strategy
config_dict = hybrid_config or {}
normalize = config_dict.get("normalize", True)
normalization_strategy = config_dict.get("normalization_strategy", "auto")
# Standard warmup iterations - NLSQ 0.3.6 has 4-layer defense to prevent
# divergence when starting from good parameters
warmup_iterations = config_dict.get("warmup_iterations", 200)
max_warmup_iterations = config_dict.get("max_warmup_iterations", 500)
warmup_learning_rate = config_dict.get("warmup_learning_rate", 0.001)
gauss_newton_max_iterations = config_dict.get("gauss_newton_max_iterations", 100)
gauss_newton_tol = config_dict.get("gauss_newton_tol", 1e-8)
chunk_size = config_dict.get("chunk_size", 10_000)
trust_region_initial = config_dict.get("trust_region_initial", 1.0)
regularization_factor = config_dict.get("regularization_factor", 1e-10)
enable_checkpoints = config_dict.get("enable_checkpoints", True)
checkpoint_frequency = config_dict.get("checkpoint_frequency", 100)
validate_numerics = config_dict.get("validate_numerics", True)
# Learning rate scheduling
use_learning_rate_schedule = config_dict.get("use_learning_rate_schedule", False)
lr_schedule_warmup_steps = config_dict.get(
"lr_schedule_warmup_steps", warmup_iterations
)
lr_schedule_decay_steps = config_dict.get(
"lr_schedule_decay_steps", max_warmup_iterations - warmup_iterations
)
lr_schedule_end_value = config_dict.get("lr_schedule_end_value", 0.0001)
# 4-Layer Defense Strategy (NLSQ 0.3.6)
# Prevents L-BFGS warmup from diverging when starting from good parameters
# Layer 1: Warm Start Detection - skip warmup if already at good solution
enable_warm_start_detection = config_dict.get("enable_warm_start_detection", True)
warm_start_threshold = float(config_dict.get("warm_start_threshold", 0.01))
# Layer 2: Adaptive Learning Rate - scale LR based on initial loss quality
enable_adaptive_warmup_lr = config_dict.get("enable_adaptive_warmup_lr", True)
warmup_lr_refinement = float(config_dict.get("warmup_lr_refinement", 1e-6))
warmup_lr_careful = float(config_dict.get("warmup_lr_careful", 1e-5))
# Layer 3: Cost-Increase Guard - abort if loss increases during warmup
enable_cost_guard = config_dict.get("enable_cost_guard", True)
cost_increase_tolerance = float(config_dict.get("cost_increase_tolerance", 0.05))
# Layer 4: Step Clipping - limit max parameter change per L-BFGS iteration
enable_step_clipping = config_dict.get("enable_step_clipping", True)
max_warmup_step_size = float(config_dict.get("max_warmup_step_size", 0.1))
# Group Variance Regularization (NLSQ 0.3.8)
# Prevents per-angle parameters from absorbing angle-dependent physical signals
enable_group_variance_regularization = config_dict.get(
"enable_group_variance_regularization", False
)
group_variance_lambda = float(config_dict.get("group_variance_lambda", 0.01))
# group_variance_indices will be auto-computed if not provided
group_variance_indices_raw = config_dict.get("group_variance_indices", None)
# Compute n_phi early for auto-computing group_variance_indices
# Extract unique phi angles from stratified data
all_phi_early = []
if hasattr(stratified_data, "chunks") and len(stratified_data.chunks) > 0:
for chunk in stratified_data.chunks:
all_phi_early.extend(chunk.phi.tolist())
else:
all_phi_early = stratified_data.phi_flat.tolist()
n_phi = len(set(all_phi_early))
phi_unique = np.array(sorted(set(all_phi_early))) # For shear weighting
# Auto-compute group_variance_indices for laminar_flow with per-angle scaling
is_laminar_flow = "gamma_dot_t0" in physical_param_names
# =====================================================================
# Anti-Degeneracy Defense System v2.9.0 Initialization
# =====================================================================
# CRITICAL FIX (Jan 2026): Define n_physical unconditionally FIRST
# This variable is used by multiple conditional blocks (hierarchical,
# gradient_monitor, shear_weighter). Previously it was only defined
# inside conditional blocks, causing UnboundLocalError when those
# conditions were false but shear_weighter tried to use it.
n_physical = len(physical_param_names)
# Parse anti-degeneracy configuration
ad_config = anti_degeneracy_config or {}
hierarchical_config = ad_config.get("hierarchical", {})
regularization_config = ad_config.get("regularization", {})
gradient_monitoring_config = ad_config.get("gradient_monitoring", {})
# Layer 1: Fourier Reparameterization / Constant Scaling Configuration
# v2.18.0: Distinct semantics for auto vs explicit constant mode
per_angle_mode = ad_config.get("per_angle_mode", "auto")
fourier_order = ad_config.get("fourier_order", 2)
fourier_auto_threshold = ad_config.get("fourier_auto_threshold", 6)
constant_scaling_threshold = ad_config.get("constant_scaling_threshold", 3)
# Determine actual per-angle mode
# v2.18.0: Distinct semantics:
# - auto (n_phi >= threshold): "auto_averaged" → 9 params, OPTIMIZED averaged scaling
# - constant (explicit): "fixed_constant" → 7 params, FIXED per-angle scaling
# - individual: per-angle scaling OPTIMIZED
if per_angle_mode == "auto":
if n_phi >= constant_scaling_threshold:
# AUTO mode with large n_phi: optimize averaged scaling (9 params)
# Computes N quantile estimates, averages to 1 contrast + 1 offset
# These 2 averaged values ARE OPTIMIZED along with 7 physical params
per_angle_mode_actual = "auto_averaged"
logger.info("=" * 60)
logger.info("ANTI-DEGENERACY DEFENSE: Auto-selected 'auto_averaged' mode")
logger.info(
f" Reason: n_phi ({n_phi}) >= "
f"constant_scaling_threshold ({constant_scaling_threshold})"
)
logger.info(" Behavior: Quantile estimates -> AVERAGED -> OPTIMIZED")
logger.info(
" Parameters: 7 physical + 2 averaged (contrast, offset) = 9 total"
)
logger.info("=" * 60)
else:
# Use individual per-angle parameters for few angles (N < 3)
per_angle_mode_actual = "individual"
logger.info("=" * 60)
logger.info("ANTI-DEGENERACY DEFENSE: Auto-selected 'individual' mode")
logger.info(
f" Reason: n_phi ({n_phi}) < "
f"constant_scaling_threshold ({constant_scaling_threshold})"
)
logger.info(
f" Parameters: 7 physical + {2 * n_phi} per-angle = {7 + 2 * n_phi} total"
)
logger.info("=" * 60)
elif per_angle_mode == "constant":
# EXPLICIT constant mode: FIXED per-angle scaling (7 params)
# Computes N quantile estimates, uses per-angle values DIRECTLY (NOT averaged)
# Only 7 physical params are optimized; scaling is FIXED
per_angle_mode_actual = "fixed_constant"
logger.info("=" * 60)
logger.info(
"ANTI-DEGENERACY DEFENSE: Explicit 'constant' mode -> fixed_constant"
)
logger.info(f" n_phi: {n_phi}")
logger.info(
" Behavior: Quantile estimates -> per-angle values FIXED (NOT optimized)"
)
logger.info(" Parameters: 7 physical only (scaling FIXED from quantiles)")
logger.info("=" * 60)
else:
# Other explicit modes (fourier or individual)
per_angle_mode_actual = per_angle_mode
logger.debug(
f"ANTI-DEGENERACY: Using explicit per_angle_mode: {per_angle_mode_actual}"
)
# T031: Determine mode flags
# use_constant: True for both auto_averaged and fixed_constant (constant-style mapping)
# use_fixed_scaling: True only for fixed_constant (scaling NOT optimized)
# use_averaged_scaling: True only for auto_averaged (scaling optimized)
use_constant = per_angle_mode_actual in ("auto_averaged", "fixed_constant")
use_averaged_scaling = per_angle_mode_actual == "auto_averaged"
# use_fixed_scaling will be set True after quantile estimation for fixed_constant mode
# Initialize Fourier reparameterizer if using fourier mode
fourier_reparameterizer = None
if per_angle_mode_actual == "fourier" and per_angle_scaling and is_laminar_flow:
# Get unique phi angles in radians
phi_unique_rad = np.deg2rad(np.array(sorted(set(all_phi_early))))
# Extract user-configured bounds for contrast and offset from bounds tuple
# Bounds layout: [contrast(n_phi), offset(n_phi), physical(7)]
# Use first contrast/offset bound as the c0/o0 (mean) bounds
c0_bounds = (0.1, 0.8) # Default
o0_bounds = (0.5, 1.5) # Default
if bounds is not None:
lower_bounds, upper_bounds = bounds
if len(lower_bounds) >= n_phi and len(upper_bounds) >= n_phi:
# Extract contrast bounds from first contrast element
c0_bounds = (float(lower_bounds[0]), float(upper_bounds[0]))
# Extract offset bounds from first offset element
o0_bounds = (float(lower_bounds[n_phi]), float(upper_bounds[n_phi]))
logger.debug(
f" Using user-configured Fourier bounds: "
f"c0={c0_bounds}, o0={o0_bounds}"
)
fourier_config = FourierReparamConfig(
mode="fourier",
fourier_order=fourier_order,
auto_threshold=fourier_auto_threshold,
c0_bounds=c0_bounds,
o0_bounds=o0_bounds,
)
fourier_reparameterizer = FourierReparameterizer(phi_unique_rad, fourier_config)
logger.info("=" * 60)
logger.info("ANTI-DEGENERACY DEFENSE: Layer 1 - Fourier Reparameterization")
logger.info(f" Mode: {per_angle_mode_actual}")
logger.info(f" n_phi: {n_phi}, Fourier order: {fourier_order}")
logger.info(f" Contrast bounds (c0): {c0_bounds}")
logger.info(f" Offset bounds (o0): {o0_bounds}")
logger.info(
f" Parameter reduction: {2 * n_phi} -> {fourier_reparameterizer.n_coeffs}"
)
logger.info("=" * 60)
elif (
per_angle_mode_actual == "fixed_constant"
and per_angle_scaling
and is_laminar_flow
):
# fixed_constant mode: per-angle scaling is FIXED, not optimized
logger.info("=" * 60)
logger.info("ANTI-DEGENERACY DEFENSE: Layer 1 - Fixed Constant Mode (v2.18.0)")
logger.info(f" Mode: {per_angle_mode_actual}")
logger.info(f" n_phi: {n_phi}")
logger.info(" Method: Quantile-based per-angle scaling (FIXED, not optimized)")
logger.info(
" Per-angle contrast/offset will be estimated from c2 data quantiles"
)
logger.info(" These values are FIXED (not optimized) during fitting")
logger.info(f" Parameter reduction: {2 * n_phi} -> 0 (physical only)")
logger.info("=" * 60)
elif (
per_angle_mode_actual == "auto_averaged"
and per_angle_scaling
and is_laminar_flow
):
# auto_averaged mode: averaged scaling is OPTIMIZED (9 params)
logger.info("=" * 60)
logger.info("ANTI-DEGENERACY DEFENSE: Layer 1 - Auto Averaged Mode (v2.18.0)")
logger.info(f" Mode: {per_angle_mode_actual}")
logger.info(f" n_phi: {n_phi}")
logger.info(" Method: Quantile estimates -> averaged -> OPTIMIZED")
logger.info(" Initial values: averaged from per-angle quantile estimates")
logger.info(
f" Parameter reduction: {2 * n_phi} -> 2 (averaged contrast + offset)"
)
logger.info("=" * 60)
# =====================================================================
# CONSTANT/AUTO_AVERAGED MODES (v2.18.0): Quantile-Based Scaling
# =====================================================================
# - fixed_constant: per-angle values are FIXED (not optimized), 7 params
# - auto_averaged: averaged values are OPTIMIZED as initial values, 9 params
# =====================================================================
use_fixed_scaling = False
fixed_contrast_per_angle: np.ndarray | None = None
fixed_offset_per_angle: np.ndarray | None = None
fixed_contrast_jax: jnp.ndarray | None = None
fixed_offset_jax: jnp.ndarray | None = None
# For auto_averaged mode: averaged values to use as initial optimization values
averaged_contrast_init: float | None = None
averaged_offset_init: float | None = None
if use_constant and per_angle_scaling and is_laminar_flow:
logger.info("Computing quantile-based per-angle scaling estimates...")
try:
# Extract bounds for clipping
contrast_bounds = (0.0, 1.0) # Default
offset_bounds = (0.5, 1.5) # Default
if bounds is not None:
lower_bounds, upper_bounds = bounds
if len(lower_bounds) >= n_phi and len(upper_bounds) >= n_phi:
contrast_bounds = (
float(lower_bounds[0]),
float(upper_bounds[0]),
)
offset_bounds = (
float(lower_bounds[n_phi]),
float(upper_bounds[n_phi]),
)
# Compute quantile-based per-angle scaling
fixed_contrast_per_angle, fixed_offset_per_angle = (
_compute_quantile_per_angle_scaling(
stratified_data=stratified_data,
contrast_bounds=contrast_bounds,
offset_bounds=offset_bounds,
logger=logger,
)
)
if (
fixed_contrast_per_angle is not None
and fixed_offset_per_angle is not None
):
if per_angle_mode_actual == "fixed_constant":
# fixed_constant: Use per-angle values DIRECTLY as FIXED
use_fixed_scaling = True
fixed_contrast_jax = jnp.asarray(fixed_contrast_per_angle)
fixed_offset_jax = jnp.asarray(fixed_offset_per_angle)
logger.info(
"Fixed per-angle scaling computed (FIXED, not optimized):"
)
logger.info(
f" Contrast: mean={np.nanmean(fixed_contrast_per_angle):.4f}, "
f"range=[{np.nanmin(fixed_contrast_per_angle):.4f}, "
f"{np.nanmax(fixed_contrast_per_angle):.4f}]"
)
logger.info(
f" Offset: mean={np.nanmean(fixed_offset_per_angle):.4f}, "
f"range=[{np.nanmin(fixed_offset_per_angle):.4f}, "
f"{np.nanmax(fixed_offset_per_angle):.4f}]"
)
elif per_angle_mode_actual == "auto_averaged":
# auto_averaged: AVERAGE per-angle values → use as INITIAL for optimization
averaged_contrast_init = float(np.nanmean(fixed_contrast_per_angle))
averaged_offset_init = float(np.nanmean(fixed_offset_per_angle))
logger.info(
"Averaged scaling computed (initial values for optimization):"
)
logger.info(f" Averaged contrast: {averaged_contrast_init:.4f}")
logger.info(f" Averaged offset: {averaged_offset_init:.4f}")
logger.info(
" These will be OPTIMIZED along with 7 physical params (9 total)"
)
# Do NOT set use_fixed_scaling = True for auto_averaged
# The averaged values are just initial guesses for optimization
else: # pragma: no cover – defensive; function always returns arrays
logger.warning( # type: ignore[unreachable]
"Failed to compute quantile-based scaling, "
"falling back to standard constant mode (optimizing 2 params)"
)
except (ValueError, RuntimeError, np.linalg.LinAlgError) as e:
logger.warning(
f"Error computing quantile-based scaling: {e}, "
f"falling back to standard constant mode"
)
use_fixed_scaling = False
# Layer 2: Hierarchical Optimization Configuration
# =====================================================================
# CRITICAL FIX (Jan 2026): Auto-enable hierarchical when shear_weighting
# is enabled. Shear weighting is ONLY applied inside hierarchical
# optimizer's loss function. Without hierarchical, the gradient
# cancellation for gamma_dot_t0 is NOT prevented!
#
# Root cause: The shear gradient ∂L/∂γ̇₀ ∝ Σ cos(φ₀-φ) cancels when
# summing over angles spanning 360° (e.g., 23 angles → 94.6% cancellation).
# Shear weighting emphasizes shear-sensitive angles to prevent this.
# =====================================================================
shear_weighting_config_early = ad_config.get("shear_weighting", {})
shear_weighting_will_be_enabled = (
shear_weighting_config_early.get("enable", True)
and is_laminar_flow
and n_phi > 3
)
enable_hierarchical = hierarchical_config.get("enable", True)
# Override: shear weighting requires hierarchical optimization to function
if shear_weighting_will_be_enabled and not enable_hierarchical:
logger.warning("=" * 60)
logger.warning(
"ANTI-DEGENERACY: Shear weighting enabled but hierarchical disabled!"
)
logger.warning(
" Auto-enabling hierarchical optimization to apply shear weights."
)
logger.warning(
" Without this, gradient cancellation will collapse gamma_dot_t0."
)
logger.warning("=" * 60)
enable_hierarchical = True
hierarchical_optimizer = None
# Skip hierarchical optimization in constant scaling mode:
# - Constant mode already prevents per-angle absorption (2 DoF vs 46)
# - HierarchicalOptimizer expects n_per_angle = 2*n_phi or n_coeffs (Fourier)
# - Using hierarchical with constant mode causes index mismatch error
if (
enable_hierarchical
and per_angle_scaling
and is_laminar_flow
and not use_constant
):
# n_physical defined unconditionally above
hier_config = HierarchicalConfig(
enable=True,
max_outer_iterations=hierarchical_config.get("max_outer_iterations", 5),
outer_tolerance=float(hierarchical_config.get("outer_tolerance", 1e-6)),
physical_max_iterations=hierarchical_config.get(
"physical_max_iterations", 100
),
per_angle_max_iterations=hierarchical_config.get(
"per_angle_max_iterations", 50
),
)
hierarchical_optimizer = HierarchicalOptimizer(
config=hier_config,
n_phi=n_phi,
n_physical=n_physical,
fourier_reparameterizer=fourier_reparameterizer,
)
logger.info("=" * 60)
logger.info("ANTI-DEGENERACY DEFENSE: Layer 2 - Hierarchical Optimization")
logger.info(f" Enabled: {enable_hierarchical}")
logger.info(f" Max outer iterations: {hier_config.max_outer_iterations}")
logger.info(f" Outer tolerance: {hier_config.outer_tolerance}")
if shear_weighting_will_be_enabled:
logger.info(
" Shear weighting: WILL BE APPLIED via hierarchical loss function"
)
logger.info("=" * 60)
elif use_constant and enable_hierarchical and per_angle_scaling and is_laminar_flow:
# Log that hierarchical is skipped due to constant scaling mode
logger.info("=" * 60)
logger.info("ANTI-DEGENERACY DEFENSE: Layer 2 - Hierarchical Optimization")
logger.info(
" Skipped: constant scaling mode already prevents per-angle absorption"
)
logger.info(
" Reason: Only 2 per-angle DoF (vs 46), no need for hierarchical alternation"
)
logger.info("=" * 60)
# Layer 3: Adaptive Relative Regularization Configuration
# Replaces/enhances the basic group variance regularization with CV-based approach
regularization_mode = regularization_config.get("mode", "relative")
regularization_lambda = float(regularization_config.get("lambda", 1.0))
target_cv = float(regularization_config.get("target_cv", 0.10))
target_contribution = float(regularization_config.get("target_contribution", 0.10))
max_cv = float(regularization_config.get("max_cv", 0.20))
adaptive_regularizer = None
if per_angle_scaling and is_laminar_flow:
# Compute mode-aware group indices
# Group indices depend on per-angle mode: fixed_constant, auto_averaged, fourier, or individual
if use_fixed_scaling:
# fixed_constant: No scaling params to regularize (7 physical only)
mode_group_indices = []
logger.debug(
"Fixed-constant mode: No per-angle regularization (scaling is fixed)"
)
elif use_averaged_scaling:
# auto_averaged: 2 per-angle params (1 contrast + 1 offset) to regularize
mode_group_indices = [(0, 1), (1, 2)]
logger.debug(
f"Auto-averaged regularization groups: {mode_group_indices} "
f"(1 contrast + 1 offset)"
)
elif (
fourier_reparameterizer is not None and fourier_reparameterizer.use_fourier
):
n_coeffs_per_param = fourier_reparameterizer.n_coeffs_per_param
mode_group_indices = [
(0, n_coeffs_per_param), # contrast Fourier coefficients
(
n_coeffs_per_param,
2 * n_coeffs_per_param,
), # offset Fourier coefficients
]
logger.debug(
f"Fourier-aware regularization groups: {mode_group_indices} "
f"(n_coeffs_per_param={n_coeffs_per_param})"
)
else:
mode_group_indices = None # Use default: [(0, n_phi), (n_phi, 2*n_phi)]
logger.debug(
f"Using default regularization groups (Fourier mode not active): "
f"fourier_reparameterizer={fourier_reparameterizer is not None}, "
f"use_fourier={fourier_reparameterizer.use_fourier if fourier_reparameterizer else 'N/A'}"
)
reg_config = AdaptiveRegularizationConfig(
enable=True,
mode=regularization_mode,
lambda_base=regularization_lambda,
target_cv=target_cv,
target_contribution=target_contribution,
max_cv=max_cv,
group_indices=mode_group_indices,
)
adaptive_regularizer = AdaptiveRegularizer(reg_config, n_phi)
logger.info("=" * 60)
logger.info("ANTI-DEGENERACY DEFENSE: Layer 3 - Adaptive Regularization")
logger.info(f" Mode: {regularization_mode}")
logger.info(f" Auto-tuned lambda: {adaptive_regularizer.lambda_value:.2f}")
logger.info(f" Target CV: {target_cv} ({target_cv * 100:.0f}% variation)")
logger.info(f" Max CV: {max_cv}")
logger.info(f" Group indices: {adaptive_regularizer.group_indices}")
logger.info("=" * 60)
# Update group variance settings to use adaptive regularizer's lambda
# This ensures NLSQ's built-in regularization is consistent
enable_group_variance_regularization = True
group_variance_lambda = adaptive_regularizer.lambda_value
# Layer 4: Gradient Collapse Monitor Configuration
gradient_monitor_enabled = gradient_monitoring_config.get("enable", True)
gradient_monitor = None
if gradient_monitor_enabled and per_angle_scaling and is_laminar_flow:
# Compute mode-aware parameter count
# n_per_angle depends on per-angle mode: fixed_constant, auto_averaged, fourier, or individual
if use_fixed_scaling:
# fixed_constant: 0 per-angle params (scaling is fixed)
n_per_angle = 0
elif use_averaged_scaling:
# auto_averaged: 2 per-angle params (1 contrast + 1 offset)
n_per_angle = 2
elif fourier_reparameterizer is not None:
# Fourier mode: n_coeffs Fourier coefficients
n_per_angle = fourier_reparameterizer.n_coeffs
else:
# Independent mode: 2 * n_phi per-angle params
n_per_angle = 2 * n_phi
# n_physical defined unconditionally above
# Use numpy arrays for indices (JAX compatibility)
per_angle_indices = np.arange(n_per_angle, dtype=np.intp)
physical_indices = np.arange(
n_per_angle, n_per_angle + n_physical, dtype=np.intp
)
# Compute gamma_dot_t0 index for watch_parameters
# In laminar_flow, physical params are [D0, alpha, D_offset, gamma_dot_t0, beta, gamma_dot_t_offset, phi0]
# gamma_dot_t0 is at physical_indices[3] = n_per_angle + 3
gamma_dot_t0_idx = n_per_angle + 3 # Index of gamma_dot_t0 in full param vector
monitor_config = GradientMonitorConfig(
enable=True,
ratio_threshold=float(
gradient_monitoring_config.get("ratio_threshold", 0.01)
),
consecutive_triggers=gradient_monitoring_config.get(
"consecutive_triggers", 5
),
response_mode=gradient_monitoring_config.get("response", "hierarchical"),
# NEW (Dec 2025): Watch gamma_dot_t0 specifically for gradient collapse
# This detects when shear parameter gradient vanishes during L-BFGS warmup
watch_parameters=[gamma_dot_t0_idx],
watch_threshold=float(
gradient_monitoring_config.get("watch_threshold", 1e-8)
),
)
gradient_monitor = GradientCollapseMonitor(
config=monitor_config,
physical_indices=physical_indices,
per_angle_indices=per_angle_indices,
)
logger.info("=" * 60)
logger.info("ANTI-DEGENERACY DEFENSE: Layer 4 - Gradient Collapse Monitor")
logger.info(f" Enabled: {gradient_monitor_enabled}")
logger.info(f" Ratio threshold: {monitor_config.ratio_threshold}")
logger.info(f" Consecutive triggers: {monitor_config.consecutive_triggers}")
logger.info(f" Response mode: {monitor_config.response_mode}")
logger.info("=" * 60)
# Layer 5: Shear-Sensitivity Weighting (v2.9.1)
# Prevents gradient cancellation for shear parameters by emphasizing
# shear-sensitive angles (parallel/antiparallel to flow direction)
shear_weighting_config = ad_config.get("shear_weighting", {})
shear_weighting_enabled = shear_weighting_config.get("enable", True)
shear_weighter: ShearSensitivityWeighting | None = None
if is_laminar_flow and shear_weighting_enabled and n_phi > 3:
# Get initial phi0 from config or use default
initial_phi0 = shear_weighting_config.get("initial_phi0", None)
if initial_phi0 is None:
# Try to get from initial parameters
initial_phi0 = float(initial_params[-1]) if len(initial_params) > 0 else 0.0
sw_config = ShearWeightingConfig(
enable=True,
min_weight=float(shear_weighting_config.get("min_weight", 0.3)),
alpha=float(shear_weighting_config.get("alpha", 1.0)),
update_frequency=int(shear_weighting_config.get("update_frequency", 1)),
initial_phi0=initial_phi0,
normalize=shear_weighting_config.get("normalize", True),
)
shear_weighter = ShearSensitivityWeighting(
phi_angles=phi_unique,
n_physical=n_physical,
phi0_index=6, # phi0 is last of 7 physical params
config=sw_config,
)
logger.info("=" * 60)
logger.info("ANTI-DEGENERACY DEFENSE: Layer 5 - Shear-Sensitivity Weighting")
logger.info(f" Enabled: {shear_weighting_enabled}")
logger.info(f" n_phi: {n_phi}")
logger.info(f" min_weight: {sw_config.min_weight:.2f}")
logger.info(f" alpha: {sw_config.alpha:.1f}")
logger.info(f" initial_phi0: {initial_phi0:.1f} deg")
logger.info("=" * 60)
# Store anti-degeneracy components for diagnostics
anti_degeneracy_components = {
"per_angle_mode": per_angle_mode_actual,
"use_constant": use_constant, # T031: Track constant mode status
"use_fixed_scaling": use_fixed_scaling, # v2.17.0: Track fixed scaling status
"fourier_reparameterizer": fourier_reparameterizer,
"hierarchical_optimizer": hierarchical_optimizer,
"adaptive_regularizer": adaptive_regularizer,
"gradient_monitor": gradient_monitor,
"shear_weighter": shear_weighter,
}
# ===================================================================== #
if enable_group_variance_regularization and group_variance_indices_raw is None:
if is_laminar_flow and per_angle_scaling and n_phi > 3:
# T031: Handle fixed scaling, constant, Fourier, and individual modes
# Fixed scaling mode (v2.17.0): 0 per-angle params (all fixed)
# Constant mode: 1 value per group (contrast/offset)
# Fourier mode: n_coeffs_per_param values per group
# Individual mode: n_phi values per group
if use_fixed_scaling:
# No per-angle params to regularize - skip group variance
n_per_group = 0
group_variance_indices = []
logger.info(
" Fixed scaling mode: skipping group variance regularization "
"(no per-angle params)"
)
elif use_constant:
n_per_group = 1
elif fourier_reparameterizer is not None:
n_per_group = fourier_reparameterizer.n_coeffs_per_param
else:
n_per_group = n_phi
# Only compute group indices if not using fixed scaling
if not use_fixed_scaling:
# Per-angle contrast: params[0:n_per_group]
# Per-angle offset: params[n_per_group:2*n_per_group]
group_variance_indices = [
(0, n_per_group),
(n_per_group, 2 * n_per_group),
]
logger.info(
f" Auto-computed group_variance_indices for {n_phi} angles: "
f"{group_variance_indices} (mode: {per_angle_mode_actual})"
)
else:
group_variance_indices = None
if enable_group_variance_regularization:
logger.warning(
"Group variance regularization enabled but no indices provided. "
"Auto-computation requires laminar_flow mode with per_angle_scaling "
f"and n_phi > 3. (is_laminar_flow={is_laminar_flow}, "
f"per_angle_scaling={per_angle_scaling}, n_phi={n_phi})"
)
else:
# Convert raw indices to list of tuples if provided
if group_variance_indices_raw is not None:
group_variance_indices = [tuple(idx) for idx in group_variance_indices_raw]
else:
group_variance_indices = None
logger.info("Hybrid streaming config:")
logger.info(f" Normalization: {normalization_strategy}")
logger.info(f" Warmup iterations: {warmup_iterations}")
logger.info(f" Max warmup iterations: {max_warmup_iterations}")
logger.info(f" Learning rate: {warmup_learning_rate}")
if use_learning_rate_schedule:
logger.info(
f" LR schedule: warmup={lr_schedule_warmup_steps}, "
f"decay={lr_schedule_decay_steps}, end={lr_schedule_end_value}"
)
logger.info(f" Gauss-Newton iterations: {gauss_newton_max_iterations}")
logger.info(f" Gauss-Newton tolerance: {gauss_newton_tol}")
logger.info(f" Chunk size: {chunk_size:,}")
logger.info(" 4-Layer Defense Strategy (NLSQ 0.3.6):")
logger.info(f" L1 Warm Start Detection: {enable_warm_start_detection}")
logger.info(f" L2 Adaptive LR: {enable_adaptive_warmup_lr}")
logger.info(f" L3 Cost Guard: {enable_cost_guard}")
logger.info(f" L4 Step Clipping: {enable_step_clipping}")
if enable_group_variance_regularization:
logger.info(" Group Variance Regularization (NLSQ 0.3.8):")
logger.info(f" Enabled: {enable_group_variance_regularization}")
logger.info(f" Lambda: {group_variance_lambda}")
logger.info(f" Indices: {group_variance_indices}")
# Prepare residual weighting for NLSQ optimizer (Layer 5 of Anti-Degeneracy)
# Homodyne computes shear-sensitivity weights and passes them to NLSQ
# as generic residual weights - NLSQ doesn't need to know about XPCS physics
enable_residual_weighting = shear_weighter is not None
residual_weights_list = None
if enable_residual_weighting:
# Compute shear-sensitivity weights in homodyne, pass to NLSQ as generic weights
assert shear_weighter is not None # guarded by enable_residual_weighting
residual_weights_list = shear_weighter.get_weights().tolist()
logger.info(" Residual Weighting (Shear-Sensitivity):")
logger.info(f" Enabled: {enable_residual_weighting}")
logger.info(f" n_weights: {len(residual_weights_list)}")
logger.info(
f" Weight range: [{min(residual_weights_list):.3f}, "
f"{max(residual_weights_list):.3f}]"
)
# Create HybridStreamingConfig with 4-layer defense
optimizer_config = HybridStreamingConfig(
normalize=normalize,
normalization_strategy=normalization_strategy,
warmup_iterations=warmup_iterations,
max_warmup_iterations=max_warmup_iterations,
warmup_learning_rate=warmup_learning_rate,
gauss_newton_max_iterations=gauss_newton_max_iterations,
gauss_newton_tol=gauss_newton_tol,
chunk_size=chunk_size,
trust_region_initial=trust_region_initial,
regularization_factor=regularization_factor,
enable_checkpoints=enable_checkpoints,
checkpoint_frequency=checkpoint_frequency,
validate_numerics=validate_numerics,
use_learning_rate_schedule=use_learning_rate_schedule,
lr_schedule_warmup_steps=lr_schedule_warmup_steps,
lr_schedule_decay_steps=lr_schedule_decay_steps,
lr_schedule_end_value=lr_schedule_end_value,
# 4-Layer Defense Strategy
enable_warm_start_detection=enable_warm_start_detection,
warm_start_threshold=warm_start_threshold,
enable_adaptive_warmup_lr=enable_adaptive_warmup_lr,
warmup_lr_refinement=warmup_lr_refinement,
warmup_lr_careful=warmup_lr_careful,
enable_cost_guard=enable_cost_guard,
cost_increase_tolerance=cost_increase_tolerance,
enable_step_clipping=enable_step_clipping,
max_warmup_step_size=max_warmup_step_size,
# Group Variance Regularization (NLSQ 0.3.8)
enable_group_variance_regularization=enable_group_variance_regularization,
group_variance_lambda=group_variance_lambda,
group_variance_indices=group_variance_indices,
# Residual Weighting (NLSQ 0.4.x)
# Homodyne computes shear-sensitivity weights and passes them as generic
# residual weights - NLSQ just does weighted least squares
enable_residual_weighting=enable_residual_weighting,
residual_weights=residual_weights_list,
verbose=config_dict.get("verbose", 1),
log_frequency=config_dict.get("log_frequency", 1),
)
# Initialize optimizer
optimizer = AdaptiveHybridStreamingOptimizer(optimizer_config)
# Extract global metadata from stratified data
if hasattr(stratified_data, "chunks") and len(stratified_data.chunks) > 0:
first_chunk = stratified_data.chunks[0]
q = first_chunk.q
L = first_chunk.L
dt = first_chunk.dt
else:
q = stratified_data.q
L = stratified_data.L
dt = stratified_data.dt
logger.debug(f"Global metadata: q={q}, L={L}, dt={dt}")
# Extract unique values for theory computation
all_phi = []
all_t1 = []
all_t2 = []
if hasattr(stratified_data, "chunks"):
for chunk in stratified_data.chunks:
all_phi.extend(chunk.phi.tolist())
all_t1.extend(chunk.t1.tolist())
all_t2.extend(chunk.t2.tolist())
else:
all_phi = stratified_data.phi_flat.tolist()
all_t1 = stratified_data.t1_flat.tolist()
all_t2 = stratified_data.t2_flat.tolist()
phi_unique = np.array(sorted(set(all_phi)))
t1_unique = np.array(sorted(set(all_t1)))
n_phi = len(phi_unique)
logger.info(f"Unique values: {n_phi} phi, {len(t1_unique)} t1")
# Import physics utilities
from homodyne.core.physics_utils import (
PI,
calculate_diffusion_coefficient,
calculate_shear_rate,
safe_sinc,
trapezoid_cumsum,
)
# Pre-compute physics factors
wavevector_q_squared_half_dt = 0.5 * (q**2) * dt
sinc_prefactor = 0.5 / PI * q * L * dt
# Convert to JAX arrays
phi_unique_jax = jnp.asarray(phi_unique)
t1_unique_jax = jnp.asarray(t1_unique)
# Create model function
is_laminar_flow = "gamma_dot_t0" in physical_param_names
# T042: Compute n_per_angle for model function based on mode
# In fixed scaling mode: 0 (all params are physical)
# In constant mode (fallback): 1 contrast + 1 offset = 2
# In individual mode: n_phi contrast + n_phi offset = 2*n_phi
# In Fourier mode: n_coeffs contrast + n_coeffs offset = 2*n_coeffs
if use_fixed_scaling:
# Fixed scaling: all params are physical, no per-angle params in vector
n_per_angle = 0
elif use_constant:
n_per_angle = 2
elif fourier_reparameterizer is not None:
n_per_angle = fourier_reparameterizer.n_coeffs
else:
n_per_angle = 2 * n_phi
@jax.jit
def model_fn_pointwise(
x_batch: jnp.ndarray, *params_tuple: jnp.ndarray
) -> jnp.ndarray:
"""Point-wise model function for hybrid streaming optimizer."""
# Handle both single points (1D) and batches (2D)
# The optimizer may call with single points during Jacobian computation
x_batch_2d = jnp.atleast_2d(x_batch)
params_all = jnp.stack(params_tuple)
# Extract indices from x_batch (now guaranteed 2D)
phi_idx = x_batch_2d[:, 0].astype(jnp.int32)
t1_idx = x_batch_2d[:, 1].astype(jnp.int32)
t2_idx = x_batch_2d[:, 2].astype(jnp.int32)
# T042: Extract scaling and physical parameters based on mode
# Fixed scaling mode (v2.17.0): use pre-computed fixed arrays, all params are physical
# Constant mode (fallback): params[0]=contrast, params[1]=offset, params[2:]=physical
# Individual mode: params[:n_phi]=contrast, params[n_phi:2*n_phi]=offset, params[2*n_phi:]=physical
if use_fixed_scaling:
# Use pre-computed fixed per-angle scaling from quantiles
# All params in params_all are physical
contrast_all = fixed_contrast_jax
offset_all = fixed_offset_jax
physical_params = params_all
elif use_constant:
# Single contrast and offset shared across all angles
contrast_all = jnp.full(n_phi, params_all[0])
offset_all = jnp.full(n_phi, params_all[1])
physical_params = params_all[2:]
else:
contrast_all = params_all[:n_phi]
offset_all = params_all[n_phi : 2 * n_phi]
physical_params = params_all[2 * n_phi :]
# Extract physical parameters
D0 = physical_params[0]
alpha = physical_params[1]
D_offset = physical_params[2]
# Compute diffusion
D_t = calculate_diffusion_coefficient(t1_unique_jax, D0, alpha, D_offset)
D_cumsum = trapezoid_cumsum(D_t)
D_diff = D_cumsum[t1_idx] - D_cumsum[t2_idx]
# P0-2: epsilon_abs=1e-12 (was 1e-20, below float32 precision)
D_integral_batch = jnp.sqrt(D_diff**2 + 1e-12)
log_g1_diff = -wavevector_q_squared_half_dt * D_integral_batch
log_g1_diff_bounded = jnp.clip(log_g1_diff, -700.0, 0.0)
g1_diffusion = jnp.exp(log_g1_diff_bounded)
if is_laminar_flow:
# Shear parameters
gamma_dot_0 = physical_params[3]
beta = physical_params[4]
gamma_dot_offset = physical_params[5]
phi0 = physical_params[6]
# Compute shear
gamma_t = calculate_shear_rate(
t1_unique_jax, gamma_dot_0, beta, gamma_dot_offset
)
gamma_cumsum = trapezoid_cumsum(gamma_t)
gamma_diff = gamma_cumsum[t1_idx] - gamma_cumsum[t2_idx]
# P0-2: epsilon_abs=1e-12 (was 1e-20, below float32 precision)
gamma_integral_batch = jnp.sqrt(gamma_diff**2 + 1e-12)
# Shear contribution with angle dependence
# Formula: g₁_shear = [sinc(Φ)]² where Φ = sinc_prefactor * cos(φ₀-φ) * ∫γ̇
phi_values = phi_unique_jax[phi_idx]
angle_diff = jnp.deg2rad(phi0 - phi_values) # Match physics: cos(φ₀-φ)
cos_phi = jnp.cos(angle_diff)
sinc_arg = sinc_prefactor * gamma_integral_batch * cos_phi
sinc_val = safe_sinc(sinc_arg)
g1_shear = sinc_val**2 # CRITICAL: g1_shear = sinc²(Φ)
g1_total = g1_diffusion * g1_shear
# P0-3: Use jnp.where (gradient-safe) instead of jnp.clip.
# log-space clip above guarantees g1 ≤ 1.0; lower floor prevents log(0).
epsilon = 1e-10
g1 = jnp.where(g1_total > epsilon, g1_total, epsilon)
else:
epsilon = 1e-10
g1 = jnp.where(g1_diffusion > epsilon, g1_diffusion, epsilon)
# Compute g2 with per-angle scaling
assert contrast_all is not None # set in all branches above
assert offset_all is not None # set in all branches above
contrast = contrast_all[phi_idx]
offset = offset_all[phi_idx]
g2_theory = offset + contrast * g1**2
# P0-3: Removed jnp.clip(g2, 0.5, 2.5) — kills gradients at boundaries.
# Bounds enforced via parameter bounds in optimizer, not g2 clipping.
g2 = g2_theory
# Squeeze output to match input dimensionality
# Returns 0D scalar for single point, 1D array for batch
return jnp.asarray(g2.squeeze())
# Prepare data
logger.info("Preparing hybrid streaming data...")
prep_start = time.perf_counter()
if hasattr(stratified_data, "chunks"):
all_phi_data = np.concatenate([c.phi for c in stratified_data.chunks])
all_t1_data = np.concatenate([c.t1 for c in stratified_data.chunks])
all_t2_data = np.concatenate([c.t2 for c in stratified_data.chunks])
y_data = np.concatenate([c.g2 for c in stratified_data.chunks])
else:
all_phi_data = stratified_data.phi_flat
all_t1_data = stratified_data.t1_flat
all_t2_data = stratified_data.t2_flat
y_data = stratified_data.g2_flat
# Convert to indices (vectorized).
# NOTE: Both t1 and t2 index into t1_unique because XPCS correlation
# matrices use a shared time grid (t1_unique == t2_unique).
phi_idx_arr = np.clip(
np.searchsorted(phi_unique, all_phi_data), 0, len(phi_unique) - 1
)
t1_idx_arr = np.clip(np.searchsorted(t1_unique, all_t1_data), 0, len(t1_unique) - 1)
t2_idx_arr = np.clip(np.searchsorted(t1_unique, all_t2_data), 0, len(t1_unique) - 1)
x_data = np.column_stack([phi_idx_arr, t1_idx_arr, t2_idx_arr]).astype(np.float64)
y_data = np.asarray(y_data, dtype=np.float64)
# =====================================================================
# Diagonal Handling (v2.14.2+)
# =====================================================================
# Hybrid streaming uses point-wise theory computation (no 2D grid), so
# apply_diagonal_correction() cannot be applied to theory.
#
# Instead, diagonal points are FILTERED OUT from the data entirely:
# - Data: Already has diagonal correction applied at load time
# - Theory: Never computes diagonal values (filtered points excluded)
# - Residual: Diagonal points excluded from loss (equivalent to mask=0)
#
# This is architecturally equivalent to correction + masking used in
# Stratified LS and Out-of-Core methods. The result is the same:
# diagonal points contribute ZERO to the optimization objective.
# =====================================================================
n_points_before = len(y_data)
non_diagonal_mask = t1_idx_arr != t2_idx_arr
x_data = x_data[non_diagonal_mask]
y_data = y_data[non_diagonal_mask]
n_diagonal_removed = n_points_before - len(y_data)
prep_time = time.perf_counter() - prep_start
logger.info(f"Data preparation completed in {prep_time:.2f}s")
logger.info(f" Dataset size: {len(y_data):,} points")
logger.info(
f" Diagonal points removed: {n_diagonal_removed:,} "
f"({100 * n_diagonal_removed / n_points_before:.1f}%)"
)
# NOTE (Dec 2025): Data is already pre-shuffled at stratification stage
# in _apply_stratification_if_needed(). No additional shuffle needed here.
# The pre-shuffle prevents L-BFGS warmup from seeing angle-sequential data,
# which would cause local minimum traps (gamma_dot_t0 -> 0).
# =====================================================================
# Anti-Degeneracy Defense System v2.9.0 - EXECUTION INTEGRATION
# =====================================================================
# Transform parameters and execute appropriate optimization path
use_hierarchical = (
hierarchical_optimizer is not None
and anti_degeneracy_components.get("hierarchical_optimizer") is not None
and ad_config.get("enable", True)
)
use_fourier = (
fourier_reparameterizer is not None
and anti_degeneracy_components.get("fourier_reparameterizer") is not None
and ad_config.get("enable", True)
)
# Track params for fitting
fit_initial_params = initial_params.copy()
fit_bounds = bounds
# T034-T038: Constant mode parameter transformation
# v2.17.0: When use_fixed_scaling=True, use physical params only (fixed contrast/offset from quantiles)
# Fallback: Transform per-angle params (2*n_phi) to constant (2) by taking means
if use_fixed_scaling:
# FIXED SCALING MODE (v2.17.0): Use quantile-derived fixed per-angle scaling
# Parameters are physical-only, contrast/offset are NOT in the param vector
logger.info("=" * 60)
logger.info("ANTI-DEGENERACY EXECUTION: Fixed Per-Angle Scaling (v2.17.0)")
physical_params = initial_params[2 * n_phi :]
# New parameter layout: [physical_params] only
fit_initial_params = physical_params
logger.info(f" Original params: {len(initial_params)}")
logger.info(
f" Fixed scaling params: {len(fit_initial_params)} (physical only)"
)
logger.info(f" Per-angle reduction: {2 * n_phi} -> 0 (using fixed arrays)")
# Transform bounds to physical only
if bounds is not None:
lower_bounds, upper_bounds = bounds
fit_bounds = (lower_bounds[2 * n_phi :], upper_bounds[2 * n_phi :])
logger.info(
f" Bounds reduced to physical only: {len(fit_bounds[0])} params"
)
logger.info("=" * 60)
elif use_averaged_scaling:
logger.info("=" * 60)
logger.info("ANTI-DEGENERACY EXECUTION: Auto Averaged Scaling Mode")
# Transform per-angle params to single values (means) for optimization
per_angle_params = initial_params[: 2 * n_phi]
physical_params = initial_params[2 * n_phi :]
# Split per-angle into contrast and offset groups
contrast_per_angle = per_angle_params[:n_phi]
offset_per_angle = per_angle_params[n_phi : 2 * n_phi]
# Use quantile-based averaged values if computed, else take means
if averaged_contrast_init is not None and averaged_offset_init is not None:
contrast_mean = averaged_contrast_init
offset_mean = averaged_offset_init
logger.info(" Using quantile-based averaged initial values (OPTIMIZED)")
else:
contrast_mean = np.nanmean(contrast_per_angle)
offset_mean = np.nanmean(offset_per_angle)
logger.info(" Using parameter-based averaged initial values (OPTIMIZED)")
# New parameter layout: [contrast_const, offset_const, physical_params]
fit_initial_params = np.concatenate(
[[contrast_mean], [offset_mean], physical_params]
)
logger.info(f" Original params: {len(initial_params)}")
logger.info(f" Constant params: {len(fit_initial_params)}")
logger.info(f" Per-angle reduction: {2 * n_phi} -> 2")
logger.info(f" Contrast mean: {contrast_mean:.6f}")
logger.info(f" Offset mean: {offset_mean:.6f}")
# T039: Transform bounds for constant mode
if bounds is not None:
lower_bounds, upper_bounds = bounds
# For constant mode, use the bounds of the first per-angle param
# (all per-angle bounds are typically the same)
fit_lower = np.concatenate(
[
[lower_bounds[0]],
[lower_bounds[n_phi]],
lower_bounds[2 * n_phi :],
]
)
fit_upper = np.concatenate(
[
[upper_bounds[0]],
[upper_bounds[n_phi]],
upper_bounds[2 * n_phi :],
]
)
fit_bounds = (fit_lower, fit_upper)
logger.info("=" * 60)
# Layer 1: Fourier reparameterization of initial parameters
elif use_fourier:
assert fourier_reparameterizer is not None # guarded by use_fourier
logger.info("=" * 60)
logger.info("ANTI-DEGENERACY EXECUTION: Fourier Reparameterization")
# Transform per-angle params to Fourier coefficients
per_angle_params = initial_params[: 2 * n_phi]
physical_params = initial_params[2 * n_phi :]
# Split per-angle into contrast and offset groups
contrast_per_angle = per_angle_params[:n_phi]
offset_per_angle = per_angle_params[n_phi : 2 * n_phi]
# Transform to Fourier coefficients
contrast_coeffs = fourier_reparameterizer.to_fourier(contrast_per_angle)
offset_coeffs = fourier_reparameterizer.to_fourier(offset_per_angle)
# New parameter layout: [contrast_coeffs, offset_coeffs, physical_params]
fit_initial_params = np.concatenate(
[contrast_coeffs, offset_coeffs, physical_params]
)
logger.info(f" Original params: {len(initial_params)}")
logger.info(f" Fourier params: {len(fit_initial_params)}")
logger.info(
f" Per-angle reduction: {2 * n_phi} -> {len(contrast_coeffs) + len(offset_coeffs)}"
)
# Transform bounds for Fourier space
if bounds is not None:
lower_bounds, upper_bounds = bounds
# Per-angle bounds are typically (0,1) for contrast, (0.5, 1.5) for offset
# Fourier coefficients can have wider bounds since they combine linearly
# Use n_coeffs_per_param (e.g., 5 for order=2), NOT n_coeffs (total=10)
n_half = fourier_reparameterizer.n_coeffs_per_param
# Fourier coefficient bounds: a0 keeps the mean, others can be ±range
contrast_lower = np.concatenate(
[
[lower_bounds[0]], # a0 (mean) lower bound
np.full(n_half - 1, -1.0), # Other coeffs can be negative
]
)
contrast_upper = np.concatenate(
[
[upper_bounds[0]], # a0 (mean) upper bound
np.full(n_half - 1, 1.0), # Other coeffs bounded
]
)
offset_lower = np.concatenate(
[
[lower_bounds[n_phi]], # a0 (mean) lower bound
np.full(n_half - 1, -0.5), # Other coeffs
]
)
offset_upper = np.concatenate(
[
[upper_bounds[n_phi]], # a0 (mean) upper bound
np.full(n_half - 1, 0.5), # Other coeffs
]
)
fit_lower = np.concatenate(
[contrast_lower, offset_lower, lower_bounds[2 * n_phi :]]
)
fit_upper = np.concatenate(
[contrast_upper, offset_upper, upper_bounds[2 * n_phi :]]
)
fit_bounds = (fit_lower, fit_upper)
logger.info("=" * 60)
# =====================================================================
# Anti-Degeneracy Defense: Create Fourier-wrapped model function
# =====================================================================
# When using Fourier mode, wrap model_fn to convert Fourier coeffs -> per-angle
if use_fourier:
assert fourier_reparameterizer is not None # guarded by use_fourier
n_coeffs_per_param = fourier_reparameterizer.n_coeffs_per_param
_fourier_basis_matrix = fourier_reparameterizer._basis_matrix
@jax.jit
def model_fn_fourier(
x_batch: jnp.ndarray, *params_tuple: jnp.ndarray
) -> jnp.ndarray:
"""Model function with Fourier coefficient inputs."""
# Handle both single points (1D) and batches (2D)
x_batch_2d = jnp.atleast_2d(x_batch)
params_all = jnp.stack(params_tuple)
# Extract Fourier coefficients and physical params
# Layout: [contrast_coeffs, offset_coeffs, physical_params]
n_coeffs = n_coeffs_per_param # captured from outer scope
contrast_coeffs = params_all[:n_coeffs]
offset_coeffs = params_all[n_coeffs : 2 * n_coeffs]
physical_params = params_all[2 * n_coeffs :]
# Convert Fourier coefficients to per-angle values
# Uses precomputed basis matrix: values = B @ coeffs
basis_matrix = jnp.asarray(_fourier_basis_matrix)
contrast_all = basis_matrix @ contrast_coeffs
offset_all = basis_matrix @ offset_coeffs
# Extract indices from x_batch (now guaranteed 2D)
phi_idx = x_batch_2d[:, 0].astype(jnp.int32)
t1_idx = x_batch_2d[:, 1].astype(jnp.int32)
t2_idx = x_batch_2d[:, 2].astype(jnp.int32)
# Extract physical parameters
D0 = physical_params[0]
alpha = physical_params[1]
D_offset = physical_params[2]
# Compute diffusion
D_t = calculate_diffusion_coefficient(t1_unique_jax, D0, alpha, D_offset)
D_cumsum = trapezoid_cumsum(D_t)
D_diff = D_cumsum[t1_idx] - D_cumsum[t2_idx]
# P0-2: epsilon_abs=1e-12 (was 1e-20, below float32 precision)
D_integral_batch = jnp.sqrt(D_diff**2 + 1e-12)
log_g1_diff = -wavevector_q_squared_half_dt * D_integral_batch
log_g1_diff_bounded = jnp.clip(log_g1_diff, -700.0, 0.0)
g1_diffusion = jnp.exp(log_g1_diff_bounded)
if is_laminar_flow:
# Shear parameters
gamma_dot_0 = physical_params[3]
beta = physical_params[4]
gamma_dot_offset = physical_params[5]
phi0 = physical_params[6]
# Compute shear
gamma_t = calculate_shear_rate(
t1_unique_jax, gamma_dot_0, beta, gamma_dot_offset
)
gamma_cumsum = trapezoid_cumsum(gamma_t)
gamma_diff = gamma_cumsum[t1_idx] - gamma_cumsum[t2_idx]
# P0-2: epsilon_abs=1e-12 (was 1e-20, below float32 precision)
gamma_integral_batch = jnp.sqrt(gamma_diff**2 + 1e-12)
# Shear contribution with angle dependence
phi_values = phi_unique_jax[phi_idx]
angle_diff = jnp.deg2rad(phi0 - phi_values)
cos_phi = jnp.cos(angle_diff)
sinc_arg = sinc_prefactor * gamma_integral_batch * cos_phi
sinc_val = safe_sinc(sinc_arg)
g1_shear = sinc_val**2
g1_total = g1_diffusion * g1_shear
# P0-3: Use jnp.where (gradient-safe) instead of jnp.clip.
# log-space clip above guarantees g1 ≤ 1.0; lower floor prevents log(0).
epsilon = 1e-10
g1 = jnp.where(g1_total > epsilon, g1_total, epsilon)
else:
epsilon = 1e-10
g1 = jnp.where(g1_diffusion > epsilon, g1_diffusion, epsilon)
# Compute g2 with per-angle scaling (from Fourier-derived values)
contrast = contrast_all[phi_idx]
offset = offset_all[phi_idx]
g2_theory = offset + contrast * g1**2
# P0-3: Removed jnp.clip(g2, 0.5, 2.5) — kills gradients at boundaries.
# Bounds enforced via parameter bounds in optimizer, not g2 clipping.
g2 = g2_theory
return jnp.asarray(g2.squeeze())
# Use Fourier model function for optimization
active_model_fn = model_fn_fourier
logger.info(" Using Fourier-wrapped model function")
else:
# Use standard per-angle model function
active_model_fn = model_fn_pointwise
# Run hybrid optimization
logger.info("Starting hybrid optimization (L-BFGS + Gauss-Newton)...")
opt_start = time.perf_counter()
# Layer 2: Hierarchical optimization path
# Can be combined with Fourier mode (hierarchical operates on Fourier params)
result: dict[str, Any]
if use_hierarchical:
# Use hierarchical two-stage optimization
logger.info("=" * 60)
logger.info("ANTI-DEGENERACY EXECUTION: Hierarchical Two-Stage Optimization")
# Pre-extract phi indices for shear weighting (x_data[:, 0] contains phi indices)
phi_indices_jax = jnp.asarray(x_data[:, 0], dtype=jnp.int32)
shear_weighter_local = cast(
ShearSensitivityWeighting | None,
anti_degeneracy_components.get("shear_weighter"),
)
def loss_fn(params: Any) -> Any:
"""Loss function for hierarchical optimizer.
CRITICAL: Must use jnp (JAX) operations, NOT np (NumPy).
Using np.mean breaks the JAX autodiff computation graph,
resulting in zero gradients for all parameters.
Layer 5: Shear-sensitivity weighting is applied here to prevent
gradient cancellation for shear parameters (gamma_dot_t0, phi0).
"""
# Convert params to JAX array if needed for tracing
params_jax = jnp.asarray(params)
pred = active_model_fn(x_data, *params_jax)
# Convert y_data to JAX for proper gradient flow
y_data_jax = jnp.asarray(y_data)
residuals = y_data_jax - pred
# Layer 5: Apply shear-sensitivity weighting if enabled
# This emphasizes angles parallel/antiparallel to flow direction,
# preventing gradient cancellation for shear parameters
if shear_weighter_local is not None:
# Use shear-weighted loss instead of uniform MSE
weighted_loss = shear_weighter_local.apply_weights_to_loss(
residuals, phi_indices_jax
)
else:
# CRITICAL: Use jnp.mean, NOT np.mean!
# np.mean breaks JAX autodiff and causes zero gradients
weighted_loss = jnp.mean(residuals**2) * len(y_data)
# Add adaptive regularization if enabled
if adaptive_regularizer is not None:
# Use JAX-compatible method for autodiff compatibility
# Note: weighted_loss already includes the normalization
mse_for_reg = weighted_loss / len(y_data)
reg_term = adaptive_regularizer.compute_regularization_jax(
params_jax, mse_for_reg, len(y_data)
)
return weighted_loss + reg_term
return weighted_loss
def grad_fn(params: Any) -> Any:
"""Gradient function with optional monitoring."""
# Use JAX autodiff for gradient computation
grad = jax.grad(lambda p: loss_fn(p))(params)
# Layer 4: Gradient monitoring
if gradient_monitor is not None:
gradient_monitor.check(
grad, iteration_counter[0], params, loss_fn(params)
)
iteration_counter[0] += 1
return grad
iteration_counter = [0] # Mutable counter for gradient monitor
# Layer 5: Create callback for shear weight updates
# Updates weights based on current phi0 estimate at start of each outer iteration
def shear_weight_update_callback(params: np.ndarray, outer_iter: int) -> None:
"""Update shear-sensitivity weights based on current phi0."""
if shear_weighter_local is not None:
shear_weighter_local.update_phi0(params, outer_iter)
assert hierarchical_optimizer is not None # guarded by use_hierarchical
assert fit_bounds is not None # hierarchical requires bounds
hier_result = hierarchical_optimizer.fit(
loss_fn=loss_fn,
grad_fn=grad_fn,
p0=fit_initial_params,
bounds=fit_bounds,
outer_iteration_callback=shear_weight_update_callback,
)
# Compute covariance from Hessian of loss function (BUG-15 fix)
# Gauss-Newton approximation: H ≈ 2 * J^T J for least-squares loss
# So pcov = s² * inv(J^T J) ≈ 2 * s² * inv(H)
n_hier_data = len(y_data)
n_hier_params = len(hier_result.x)
s2_hier = hier_result.fun / max(n_hier_data - n_hier_params, 1)
try:
popt_jax = jnp.asarray(hier_result.x)
H = np.asarray(jax.hessian(loss_fn)(popt_jax))
except Exception as e:
logger.warning(
f"Could not compute Hessian: {e}. Using identity placeholder."
)
H = None
if H is not None:
try:
pcov_hier = 2.0 * s2_hier * np.linalg.inv(H)
logger.info(
f"Hierarchical covariance from Hessian: s^2={s2_hier:.6e} "
f"(n_data={n_hier_data}, n_params={n_hier_params})"
)
except np.linalg.LinAlgError:
logger.warning(
"Singular Hessian in hierarchical path, using pseudo-inverse"
)
pcov_hier = 2.0 * s2_hier * np.linalg.pinv(H)
else:
pcov_hier = np.eye(n_hier_params)
# Convert HierarchicalResult to standard format
result = {
"x": hier_result.x,
"pcov": pcov_hier,
"success": hier_result.success,
"message": hier_result.message,
"function_evaluations": hier_result.n_outer_iterations * 150, # Estimate
"streaming_diagnostics": {
"phase_iterations": {
"phase1": 0,
"phase2": hier_result.n_outer_iterations,
},
"warmup_diagnostics": {},
"gauss_newton_diagnostics": {
"final_cost": hier_result.fun,
},
"hierarchical_history": hier_result.history,
},
}
logger.info(f" Hierarchical result: success={hier_result.success}")
logger.info(f" Outer iterations: {hier_result.n_outer_iterations}")
logger.info(f" Final loss: {hier_result.fun:.6e}")
logger.info("=" * 60)
else:
# Standard hybrid streaming optimization path
result = optimizer.fit(
data_source=(x_data, y_data),
func=active_model_fn,
p0=fit_initial_params,
bounds=fit_bounds,
verbose=1,
)
opt_time = time.perf_counter() - opt_start
total_time = time.perf_counter() - start_time
# Extract diagnostics from NLSQ result structure
# NLSQ uses nested dicts: streaming_diagnostics -> phase_iterations/warmup_diagnostics
diagnostics = result.get("streaming_diagnostics", {})
phase_iterations = diagnostics.get("phase_iterations", {})
warmup_diag = diagnostics.get("warmup_diagnostics", {})
gn_diag = diagnostics.get("gauss_newton_diagnostics", {})
lbfgs_epochs = phase_iterations.get("phase1", 0)
gn_iterations = phase_iterations.get("phase2", 0)
final_lbfgs_loss = warmup_diag.get("final_loss", float("inf"))
final_gn_cost = gn_diag.get("final_cost", float("inf"))
logger.info("=" * 80)
logger.info("HYBRID STREAMING OPTIMIZATION COMPLETE")
logger.info(f" Success: {result.get('success', False)}")
logger.info(f" L-BFGS final loss: {final_lbfgs_loss:.6e}")
logger.info(f" GN final cost: {final_gn_cost:.6e}")
logger.info(f" L-BFGS epochs: {lbfgs_epochs}")
logger.info(f" GN iterations: {gn_iterations}")
logger.info(f" Optimization time: {opt_time:.2f}s")
logger.info(f" Total time: {total_time:.2f}s")
logger.info("=" * 80)
# Extract results
popt = np.asarray(result["x"])
# =====================================================================
# Anti-Degeneracy Defense System v2.9.0 - INVERSE TRANSFORMATION
# =====================================================================
# Transform Fourier coefficients back to per-angle parameters
if use_fourier:
assert fourier_reparameterizer is not None # guarded by use_fourier
logger.info("=" * 60)
logger.info("ANTI-DEGENERACY EXECUTION: Inverse Fourier Transform")
# Use n_coeffs_per_param (e.g., 5 for order=2), NOT n_coeffs (total=10)
# Layout: [contrast_coeffs (5), offset_coeffs (5), physical (7)]
n_half = fourier_reparameterizer.n_coeffs_per_param
# Extract Fourier coefficients and physical params from optimized result
fourier_contrast_coeffs = popt[:n_half]
fourier_offset_coeffs = popt[n_half : 2 * n_half]
physical_params_opt = popt[2 * n_half :]
# Transform back to per-angle parameters
contrast_per_angle_opt = fourier_reparameterizer.from_fourier(
fourier_contrast_coeffs
)
offset_per_angle_opt = fourier_reparameterizer.from_fourier(
fourier_offset_coeffs
)
# Reconstruct full parameter vector in original layout
popt = np.concatenate(
[contrast_per_angle_opt, offset_per_angle_opt, physical_params_opt]
)
logger.info(f" Fourier params: {2 * n_half + len(physical_params_opt)}")
logger.info(f" Restored per-angle params: {len(popt)}")
# Transform covariance from Fourier space to per-angle space
# J_fourier = d(per_angle)/d(fourier_coeffs)
# pcov_per_angle = J_full @ pcov_fourier @ J_full.T
pcov_fourier = result.get("pcov", None)
n_fourier_total = 2 * n_half + len(physical_params_opt)
if (
pcov_fourier is not None
and pcov_fourier.shape[0] == n_fourier_total
and pcov_fourier.shape[1] == n_fourier_total
):
# Get Jacobian for per-angle transformation
# This is the Fourier basis matrix that maps coefficients to per-angle values
jacobian_per_angle = fourier_reparameterizer.get_jacobian_transform()
# jacobian_per_angle shape: (2 * n_phi, n_coeffs_fourier)
# where n_coeffs_fourier = 2 * n_half
# Build full Jacobian for complete parameter space transformation
# Layout: [n_phi contrast, n_phi offset, n_physical]
# Fourier layout: [n_half contrast_coeffs, n_half offset_coeffs, n_physical]
n_per_angle_total = 2 * n_phi # contrast + offset per-angle
n_physical = len(physical_params_opt)
n_total_restored = n_per_angle_total + n_physical
J_full = np.zeros((n_total_restored, n_fourier_total))
# Block for per-angle params: use Fourier Jacobian
J_full[:n_per_angle_total, : 2 * n_half] = jacobian_per_angle
# Block for physical params: identity (pass-through)
J_full[n_per_angle_total:, 2 * n_half :] = np.eye(n_physical)
# Transform covariance: pcov_full = J @ pcov_fourier @ J.T
try:
pcov_transformed = J_full @ pcov_fourier @ J_full.T
# Store for later use (override the result dict lookup)
result["pcov_transformed"] = pcov_transformed
logger.info(" Covariance transformed from Fourier to per-angle space")
except (ValueError, RuntimeError, np.linalg.LinAlgError) as e:
logger.warning(
f" Covariance transformation failed: {e}. Using identity fallback."
)
result["pcov_transformed"] = None
else:
pcov_shape = pcov_fourier.shape if pcov_fourier is not None else None
logger.warning(
f" Fourier covariance unavailable or wrong shape (got {pcov_shape}, "
f"expected ({n_fourier_total}, {n_fourier_total})). "
"Using identity fallback."
)
result["pcov_transformed"] = None
logger.info("=" * 60)
# v2.17.0: Fixed scaling mode inverse transformation
# Expand physical-only params back to per-angle format using fixed scaling arrays
elif use_fixed_scaling:
assert (
fixed_contrast_per_angle is not None
) # set when use_fixed_scaling is True
assert fixed_offset_per_angle is not None # set when use_fixed_scaling is True
logger.info("=" * 60)
logger.info(
"ANTI-DEGENERACY EXECUTION: Inverse Fixed Scaling Transform (v2.17.0)"
)
# Layout: [physical_params] - popt contains ONLY physical parameters
physical_params_opt = popt
# Use the pre-computed fixed per-angle scaling from quantiles
contrast_per_angle_opt = fixed_contrast_per_angle
offset_per_angle_opt = fixed_offset_per_angle
# Reconstruct full parameter vector in original layout
popt = np.concatenate(
[contrast_per_angle_opt, offset_per_angle_opt, physical_params_opt]
)
logger.info(f" Physical params: {len(physical_params_opt)}")
logger.info(f" Fixed per-angle scaling restored: {len(popt)} total params")
logger.info(
f" Contrast (fixed): mean={np.nanmean(contrast_per_angle_opt):.4f}, "
f"range=[{np.nanmin(contrast_per_angle_opt):.4f}, {np.nanmax(contrast_per_angle_opt):.4f}]"
)
logger.info(
f" Offset (fixed): mean={np.nanmean(offset_per_angle_opt):.4f}, "
f"range=[{np.nanmin(offset_per_angle_opt):.4f}, {np.nanmax(offset_per_angle_opt):.4f}]"
)
# Transform covariance from physical-only space to full space
# For fixed scaling mode, the Jacobian is simpler:
# Per-angle params are fixed (variance = 0), physical params have identity
# J[i, j] = 0 for per-angle params (i < 2*n_phi)
# J[2*n_phi+i, i] = 1 for physical params (identity)
pcov_physical = result.get("pcov", None)
n_physical = len(physical_params_opt)
if (
pcov_physical is not None
and pcov_physical.shape[0] == n_physical
and pcov_physical.shape[1] == n_physical
):
n_per_angle_total = 2 * n_phi # contrast + offset per-angle
n_total_restored = n_per_angle_total + n_physical
# Build full covariance matrix
# Per-angle params have zero covariance (they're fixed)
# Physical params have the original covariance
try:
pcov_full = np.zeros((n_total_restored, n_total_restored))
# Physical params covariance block
pcov_full[2 * n_phi :, 2 * n_phi :] = pcov_physical
result["pcov_transformed"] = pcov_full
logger.info(
" Covariance expanded: per-angle=0 (fixed), physical=preserved"
)
except (
ValueError,
RuntimeError,
MemoryError,
np.linalg.LinAlgError,
) as e:
logger.warning(
f" Covariance expansion failed: {e}. Using identity fallback."
)
result["pcov_transformed"] = None
else:
pcov_shape = pcov_physical.shape if pcov_physical is not None else None
logger.warning(
f" Physical covariance unavailable or wrong shape (got {pcov_shape}, "
f"expected ({n_physical}, {n_physical})). "
"Using identity fallback."
)
result["pcov_transformed"] = None
logger.info("=" * 60)
# T046-T049: Auto averaged mode inverse transformation
# Expand averaged parameters back to per-angle format for backward compatibility
elif use_averaged_scaling:
logger.info("=" * 60)
logger.info("ANTI-DEGENERACY EXECUTION: Inverse Auto Averaged Transform")
# Layout: [contrast_const, offset_const, physical_params]
from homodyne.optimization.nlsq.data_prep import (
expand_per_angle_parameters,
)
contrast_const = popt[0]
offset_const = popt[1]
n_physical_opt = len(popt) - 2
expanded = expand_per_angle_parameters(
popt,
None,
n_phi,
n_physical_opt,
)
popt = expanded.params
logger.info(f" Constant params: 2 + {n_physical_opt} physical")
logger.info(f" Restored per-angle params: {len(popt)}")
logger.info(f" Contrast (uniform): {contrast_const:.6f}")
logger.info(f" Offset (uniform): {offset_const:.6f}")
# Transform covariance from constant space to per-angle space
# For constant mode, the Jacobian is simpler: broadcasting matrix
# J[i, 0] = 1 for i in 0..n_phi-1 (contrast params)
# J[n_phi+i, 1] = 1 for i in 0..n_phi-1 (offset params)
# J[2*n_phi+i, 2+i] = 1 for physical params (identity)
pcov_constant = result.get("pcov", None)
n_constant_total = 2 + n_physical_opt
if (
pcov_constant is not None
and pcov_constant.shape[0] == n_constant_total
and pcov_constant.shape[1] == n_constant_total
):
n_per_angle_total = 2 * n_phi # contrast + offset per-angle
n_physical = n_physical_opt
n_total_restored = n_per_angle_total + n_physical
# Build Jacobian for constant → per-angle transformation
J_full = np.zeros((n_total_restored, n_constant_total))
# Contrast broadcast: d(contrast_per_angle[i])/d(contrast_const) = 1
J_full[:n_phi, 0] = 1.0
# Offset broadcast: d(offset_per_angle[i])/d(offset_const) = 1
J_full[n_phi : 2 * n_phi, 1] = 1.0
# Physical params: identity (pass-through)
J_full[2 * n_phi :, 2:] = np.eye(n_physical)
# Transform covariance: pcov_full = J @ pcov_constant @ J.T
try:
pcov_transformed = J_full @ pcov_constant @ J_full.T
result["pcov_transformed"] = pcov_transformed
logger.info(" Covariance transformed from constant to per-angle space")
except (ValueError, RuntimeError, np.linalg.LinAlgError) as e:
logger.warning(
f" Covariance transformation failed: {e}. Using identity fallback."
)
result["pcov_transformed"] = None
else:
pcov_shape = pcov_constant.shape if pcov_constant is not None else None
logger.warning(
f" Constant covariance unavailable or wrong shape (got {pcov_shape}, "
f"expected ({n_constant_total}, {n_constant_total})). "
"Using identity fallback."
)
result["pcov_transformed"] = None
logger.info("=" * 60)
# Log gradient monitor summary if available
if gradient_monitor is not None:
gradient_monitor.log_summary()
if gradient_monitor.collapse_detected:
logger.warning("=" * 60)
logger.warning("GRADIENT COLLAPSE WAS DETECTED DURING OPTIMIZATION")
logger.warning(
f" Collapse events: {len(gradient_monitor.collapse_events)}"
)
for event in gradient_monitor.collapse_events:
logger.warning(
f" Iteration {event.iteration}: ratio={event.ratio:.6f}"
)
logger.warning("=" * 60)
# Get covariance (properly transformed from normalized space)
# Priority: 1) pcov_transformed (from Fourier space), 2) pcov, 3) identity fallback
pcov = result.get("pcov_transformed", None)
if pcov is None:
pcov = result.get("pcov", None)
if pcov is None or pcov.shape[0] != len(popt):
logger.debug(
f"Covariance size mismatch or unavailable: expected ({len(popt)}, {len(popt)}), "
f"got {pcov.shape if pcov is not None else None}. Using identity fallback."
)
pcov = np.eye(len(popt))
# Enforce bounds on final parameters
if bounds is not None:
lower_bounds, upper_bounds = bounds
popt = np.clip(popt, lower_bounds, upper_bounds)
# Check for parameters stuck at bounds with zero/near-zero uncertainty
# This indicates the optimizer could not move these parameters away from bounds
bound_stuck_warning = None
if bounds is not None and is_laminar_flow:
perr = safe_uncertainties_from_pcov(pcov, len(popt))
param_statuses = _classify_parameter_status(
popt, lower_bounds, upper_bounds, atol=1e-6
)
# Map indices to physical parameter names for laminar_flow mode
# Layout: [n_phi contrasts] + [n_phi offsets] + [7 physical params]
physical_indices_list = list(range(2 * n_phi, len(popt)))
physical_param_names_local = [
"D0",
"alpha",
"D_offset",
"gamma_dot_t0",
"beta",
"gamma_dot_t_offset",
"phi0",
]
bound_stuck_params = []
for i, idx in enumerate(physical_indices_list):
if idx < len(param_statuses) and idx < len(popt):
status = param_statuses[idx]
uncertainty = perr[idx] if idx < len(perr) else 0.0
if status != "active" and (uncertainty == 0.0 or uncertainty < 1e-15):
param_name = (
physical_param_names_local[i]
if i < len(physical_param_names_local)
else f"param[{idx}]"
)
bound_stuck_params.append(
(param_name, status, popt[idx], uncertainty)
)
if bound_stuck_params:
logger.warning("=" * 80)
logger.warning("PARAMETER BOUNDS WARNING")
logger.warning(
"The following parameters are stuck at bounds with zero uncertainty:"
)
for param_name, status, value, unc in bound_stuck_params:
logger.warning(
f" {param_name}: {value:.6e} ({status}, uncertainty={unc:.2e})"
)
logger.warning("")
logger.warning("This may indicate:")
logger.warning(
" 1. The optimizer cannot find gradient information for these parameters"
)
logger.warning(" 2. The initial guess was already at or near the bounds")
logger.warning(
" 3. The model is insensitive to these parameters with this data coverage"
)
logger.warning("")
logger.warning("RECOMMENDED ACTIONS:")
logger.warning(
" - Enable phi_filtering to use only angles near 0 and 90 deg for laminar flow"
)
logger.warning(
" - Use multi-start optimization to explore multiple parameter basins"
)
logger.warning(
" - Check if gamma_dot_t0 ~ 0 means shear contribution is missing"
)
logger.warning("=" * 80)
# Store for info dict
bound_stuck_warning = {
"parameters_at_bounds": [
{
"name": name,
"status": status,
"value": float(val),
"uncertainty": float(unc),
}
for name, status, val, unc in bound_stuck_params
]
}
# Build info dict
info: dict[str, Any] = {
"success": result.get("success", False),
"message": result.get("message", "Hybrid streaming optimization completed"),
"nfev": result.get("function_evaluations", 0),
"nit": lbfgs_epochs + gn_iterations,
"final_loss": final_gn_cost
if final_gn_cost != float("inf")
else final_lbfgs_loss,
"lbfgs_epochs": lbfgs_epochs,
"gauss_newton_iterations": gn_iterations,
"optimization_time": opt_time,
"total_time": total_time,
"method": "adaptive_hybrid_streaming",
"hybrid_streaming_diagnostics": diagnostics,
}
# Add anti-degeneracy defense diagnostics
info["anti_degeneracy"] = {
"version": "2.18.0",
"per_angle_mode": anti_degeneracy_components["per_angle_mode"],
"use_constant": anti_degeneracy_components.get("use_constant", False),
"use_fixed_scaling": use_fixed_scaling,
"fourier_enabled": fourier_reparameterizer is not None,
"hierarchical_enabled": hierarchical_optimizer is not None,
"adaptive_regularization_enabled": adaptive_regularizer is not None,
"gradient_monitor_enabled": gradient_monitor is not None,
"shear_weighting_enabled": shear_weighter is not None,
}
if fourier_reparameterizer is not None:
info["anti_degeneracy"]["fourier"] = {
"order": fourier_order,
"n_coeffs": fourier_reparameterizer.n_coeffs,
"param_reduction": f"{2 * n_phi} -> {fourier_reparameterizer.n_coeffs}",
}
# T048: Add constant mode diagnostics
if use_fixed_scaling:
# v2.18.0: Fixed scaling mode - per-angle values are fixed, not optimized
assert (
fixed_contrast_per_angle is not None
) # set when use_fixed_scaling is True
assert fixed_offset_per_angle is not None # set when use_fixed_scaling is True
info["anti_degeneracy"]["fixed_scaling"] = {
"param_reduction": f"{2 * n_phi} -> 0 (physical only)",
"method": "quantile_estimation",
"contrast_mean": float(np.nanmean(fixed_contrast_per_angle)),
"contrast_range": [
float(np.nanmin(fixed_contrast_per_angle)),
float(np.nanmax(fixed_contrast_per_angle)),
],
"offset_mean": float(np.nanmean(fixed_offset_per_angle)),
"offset_range": [
float(np.nanmin(fixed_offset_per_angle)),
float(np.nanmax(fixed_offset_per_angle)),
],
}
elif use_averaged_scaling:
# v2.18.0: Auto averaged mode - averaged values are OPTIMIZED
info["anti_degeneracy"]["auto_averaged"] = {
"param_reduction": f"{2 * n_phi} -> 2 (averaged scaling)",
"method": "quantile_estimation_averaged",
# After inverse transform, popt[0] is first contrast (uniform)
"contrast_optimized": float(popt[0]) if len(popt) > 0 else None,
"offset_optimized": float(popt[n_phi]) if len(popt) > n_phi else None,
}
if hierarchical_optimizer is not None:
info["anti_degeneracy"]["hierarchical"] = (
hierarchical_optimizer.get_diagnostics()
)
if adaptive_regularizer is not None:
info["anti_degeneracy"]["regularization"] = (
adaptive_regularizer.get_diagnostics()
)
if gradient_monitor is not None:
info["anti_degeneracy"]["gradient_monitor"] = gradient_monitor.get_diagnostics()
if shear_weighter is not None:
info["anti_degeneracy"]["shear_weighting"] = shear_weighter.get_diagnostics()
# Add bounds warning info if detected
if bound_stuck_warning is not None:
info["bound_stuck_warning"] = bound_stuck_warning
# Check for shear collapse: gamma_dot_t0 essentially zero
if is_laminar_flow and len(popt) > 2 * n_phi + 3:
gamma_dot_t0_idx = 2 * n_phi + 3
gamma_dot_t0_value = popt[gamma_dot_t0_idx]
# Check if shear rate is effectively zero (< 1e-5 s^-1)
if abs(gamma_dot_t0_value) < 1e-5:
logger.warning("=" * 80)
logger.warning("SHEAR COLLAPSE WARNING")
logger.warning(
f"gamma_dot_t0 = {gamma_dot_t0_value:.2e} s^-1 is effectively zero"
)
logger.warning("")
logger.warning("This means the shear contribution to g1 is negligible.")
logger.warning(
"The model has effectively collapsed to static_isotropic mode."
)
logger.warning("")
logger.warning("POSSIBLE CAUSES:")
logger.warning(" 1. Per-angle contrast/offset absorbed the shear signal")
logger.warning(
" 2. Inconsistent initialization of per-angle vs physical params"
)
logger.warning(
" 3. Physical parameters at bounds with weak gradient signal"
)
logger.warning(" 4. The data may genuinely have no measurable shear")
logger.warning("")
logger.warning("RECOMMENDED ACTIONS:")
logger.warning(
" - Enable multi-start optimization to explore parameter basins"
)
logger.warning(
" - Check reduced chi-squared: if worse than expected, re-run optimization"
)
logger.warning(
" - Verify per-angle contrast/offset are not varying excessively"
)
logger.warning(
" - Consider static_isotropic mode if shear is truly absent"
)
logger.warning("=" * 80)
info["shear_collapse_warning"] = {
"gamma_dot_t0": float(gamma_dot_t0_value),
"threshold": 1e-5,
"message": "Shear contribution effectively zero",
}
return popt, pcov, info
[docs]
def estimate_memory_for_stratified_ls(
n_points: int,
n_params: int,
n_chunks: int,
) -> float:
"""Estimate peak memory usage for stratified least-squares optimization.
The main memory consumers are:
1. Padded arrays: n_chunks × max_chunk_size × 5 arrays × 8 bytes
2. Dense Jacobian: n_points × n_params × 8 bytes
3. JAX autodiff intermediates: ~3× Jacobian size for backprop
4. JAX compilation cache: ~5-10 GB
Args:
n_points: Total number of data points
n_params: Number of parameters
n_chunks: Number of stratified chunks
Returns:
Estimated peak memory in bytes
"""
bytes_per_float = 8
# Padded arrays (5 arrays: phi, t1, t2, g2, mask)
max_chunk_size = (n_points + n_chunks - 1) // n_chunks
padded_arrays = n_chunks * max_chunk_size * 5 * bytes_per_float
# Dense Jacobian
jacobian = n_points * n_params * bytes_per_float
# JAX autodiff intermediates (keep all grids for backprop)
# This is the main memory killer - originally estimated at 3× Jacobian
# but empirical testing shows 5× is more accurate for large datasets
# (C020 dataset: estimated 44.9 GB at 3×, actual ~60 GB at 96% pressure)
autodiff_intermediates = jacobian * 5
# JAX compilation cache
jax_cache = 5 * 1e9 # ~5 GB
total = padded_arrays + jacobian + autodiff_intermediates + jax_cache
return total
[docs]
def should_use_streaming(
n_points: int,
n_params: int,
n_chunks: int,
memory_threshold_gb: float | None = None,
memory_fraction: float | None = None,
) -> tuple[bool, float, str]:
"""Determine if streaming optimizer should be used based on memory estimate.
Uses adaptive memory thresholding (v2.7.0+) to automatically compute
an appropriate threshold based on total system memory.
Args:
n_points: Total number of data points
n_params: Number of parameters
n_chunks: Number of stratified chunks
memory_threshold_gb: Memory threshold in GB above which to use streaming.
If None (default), computes adaptive threshold as 75% of total memory.
memory_fraction: Fraction of total memory for adaptive threshold (0.1-0.9).
Only used if memory_threshold_gb is None.
Returns:
(use_streaming, estimated_gb, reason) tuple
"""
try:
import psutil
except ImportError:
from homodyne.optimization.nlsq.memory import detect_total_system_memory
total_bytes = detect_total_system_memory()
if total_bytes is not None:
total_gb = total_bytes / (1024**3)
return (
False,
0.0,
f"psutil not available; system has {total_gb:.1f} GB",
)
return (False, 0.0, "psutil not available; system memory unknown")
# Compute adaptive threshold if not explicitly provided
if memory_threshold_gb is None:
memory_threshold_gb, threshold_info = get_adaptive_memory_threshold(
memory_fraction=memory_fraction
)
_memory_logger.debug(
f"_should_use_streaming using adaptive threshold: "
f"{memory_threshold_gb:.1f} GB ({threshold_info})"
)
# Get available system memory
mem = psutil.virtual_memory()
available_gb = mem.available / (1024**3)
# Estimate memory for stratified LS
estimated_bytes = estimate_memory_for_stratified_ls(n_points, n_params, n_chunks)
estimated_gb = estimated_bytes / (1024**3)
# Decision logic
# Use streaming if:
# 1. Estimated memory exceeds threshold, OR
# 2. Estimated memory exceeds 85% of available memory
#
# Note: Increased from 70% to 85% because non-streaming Levenberg-Marquardt
# is more accurate than streaming optimization. The 85% threshold allows
# more datasets to use the preferred non-streaming path.
use_streaming = False
reason = ""
if estimated_gb > memory_threshold_gb:
use_streaming = True
reason = (
f"Estimated memory ({estimated_gb:.1f} GB) exceeds "
f"threshold ({memory_threshold_gb:.1f} GB)"
)
elif estimated_gb > available_gb * 0.85:
use_streaming = True
reason = (
f"Estimated memory ({estimated_gb:.1f} GB) exceeds "
f"85% of available memory ({available_gb:.1f} GB available)"
)
else:
reason = (
f"Estimated memory ({estimated_gb:.1f} GB) within limits "
f"(threshold={memory_threshold_gb:.1f} GB, "
f"available={available_gb:.1f} GB)"
)
return use_streaming, estimated_gb, reason