"""Parameter Utilities for NLSQ Optimization.
Provides utility functions for parameter handling, labeling, status classification,
and per-angle initialization in NLSQ optimization.
Key Functions:
- build_parameter_labels: Create parameter labels with per-angle support
- classify_parameter_status: Identify parameters at bounds
- sample_xdata: Subsample x-data for diagnostic computations
- compute_consistent_per_angle_init: Initialize per-angle params consistently
- compute_jacobian_stats: Compute Jacobian-based statistics
"""
from collections.abc import Callable
from typing import Any
import jax
import jax.numpy as jnp
import numpy as np
[docs]
def build_parameter_labels(
per_angle_scaling: bool,
n_phi: int,
physical_param_names: list[str],
) -> list[str]:
"""Build parameter labels including per-angle scaling parameters.
Parameters
----------
per_angle_scaling : bool
Whether per-angle contrast/offset are used
n_phi : int
Number of phi angles
physical_param_names : list[str]
Names of physical parameters
Returns
-------
list[str]
Full list of parameter labels
"""
labels: list[str] = []
if per_angle_scaling:
labels.extend([f"contrast[{i}]" for i in range(n_phi)])
labels.extend([f"offset[{i}]" for i in range(n_phi)])
labels.extend(physical_param_names)
return labels
[docs]
def classify_parameter_status(
values: np.ndarray,
lower: np.ndarray | None,
upper: np.ndarray | None,
atol: float = 1e-9,
) -> list[str]:
"""Classify parameters as active or at bounds.
Parameters
----------
values : np.ndarray
Current parameter values
lower : np.ndarray | None
Lower bounds
upper : np.ndarray | None
Upper bounds
atol : float, optional
Absolute tolerance for bound comparison
Returns
-------
list[str]
Status for each parameter: 'active', 'at_lower_bound', or 'at_upper_bound'
"""
if lower is None or upper is None:
return ["active"] * len(values)
statuses: list[str] = []
for value, lo, hi in zip(values, lower, upper, strict=False):
if np.isclose(value, lo, atol=atol * (1.0 + abs(lo))):
statuses.append("at_lower_bound")
elif np.isclose(value, hi, atol=atol * (1.0 + abs(hi))):
statuses.append("at_upper_bound")
else:
statuses.append("active")
return statuses
[docs]
def sample_xdata(xdata: np.ndarray, max_points: int) -> np.ndarray:
"""Subsample x-data for diagnostic computations.
Parameters
----------
xdata : np.ndarray
Input data
max_points : int
Maximum number of points to return
Returns
-------
np.ndarray
Subsampled data
"""
if max_points <= 0 or xdata.size <= max_points:
return xdata
indices = np.linspace(0, xdata.size - 1, max_points, dtype=np.int64)
return xdata[indices]
[docs]
def compute_jacobian_stats(
residual_fn: Callable[..., Any],
x_subset: np.ndarray,
params: np.ndarray,
scaling_factor: float,
) -> tuple[np.ndarray | None, np.ndarray | None]:
"""Compute Jacobian statistics for diagnostics.
Parameters
----------
residual_fn : Callable
Residual function
x_subset : np.ndarray
Subset of x data for computation
params : np.ndarray
Current parameters
scaling_factor : float
Scaling factor for statistics
Returns
-------
tuple
(J^T J matrix, column norms) or (None, None) on failure
"""
try:
params_jnp = jnp.asarray(params)
if hasattr(residual_fn, "jax_residual"):
def residual_vector(p):
return jnp.asarray(residual_fn.jax_residual(jnp.asarray(p))).reshape(-1)
else:
def residual_vector(p):
return jnp.asarray(residual_fn(x_subset, *tuple(p))).reshape(-1)
jac = jax.jacfwd(residual_vector)(params_jnp)
jac_np = np.asarray(jac)
jtj = jac_np.T @ jac_np * scaling_factor
col_norms = np.linalg.norm(jac_np, axis=0) * np.sqrt(scaling_factor)
return jtj, col_norms
except (ValueError, RuntimeError, np.linalg.LinAlgError):
return None, None
[docs]
def compute_consistent_per_angle_init(
stratified_data: Any,
physical_params: np.ndarray,
physical_param_names: list[str],
default_contrast: float = 0.5,
default_offset: float = 1.0,
logger: Any = None,
) -> tuple[np.ndarray, np.ndarray]:
"""Compute per-angle contrast/offset consistent with initial physical parameters.
This function solves a critical initialization problem in laminar_flow mode:
when physical shear parameters (gamma_dot_t0) are nonzero, the model predicts
DIFFERENT g2 values at different angles. If per-angle contrast/offset are
initialized uniformly, large initial residuals can cause the optimizer to
incorrectly reduce gamma_dot_t0 to zero.
Instead, we compute per-angle contrast/offset by fitting:
g2_data[angle] ≈ offset[angle] + contrast[angle] × g1_model²[angle]
where g1_model is computed using the initial physical parameters.
Parameters
----------
stratified_data : StratifiedData
Data containing per-angle g2, phi, t1, t2 arrays
physical_params : np.ndarray
Initial physical parameters [D0, alpha, D_offset, (gamma_dot_t0, beta, gamma_dot_t_offset, phi0)]
physical_param_names : list[str]
Names of physical parameters to determine analysis mode
default_contrast : float
Default contrast value if fitting fails
default_offset : float
Default offset value if fitting fails
logger : logging.Logger, optional
Logger for diagnostic messages
Returns
-------
contrast_per_angle : np.ndarray
Per-angle contrast values consistent with physical params
offset_per_angle : np.ndarray
Per-angle offset values consistent with physical params
"""
from homodyne.core.physics_utils import (
calculate_diffusion_coefficient,
calculate_shear_rate,
safe_sinc,
trapezoid_cumsum,
)
# Extract data by angle
if hasattr(stratified_data, "chunks"):
phi_unique = np.array(
sorted(
{phi for chunk in stratified_data.chunks for phi in chunk.phi.tolist()}
)
)
n_phi = len(phi_unique)
# Get metadata from first chunk
first_chunk = stratified_data.chunks[0]
q = first_chunk.q
L = first_chunk.L
dt = first_chunk.dt
t1_unique = np.array(
sorted({t for chunk in stratified_data.chunks for t in chunk.t1.tolist()})
)
else:
phi_unique = np.unique(stratified_data.phi_flat)
n_phi = len(phi_unique)
q = stratified_data.q
L = stratified_data.L
dt = stratified_data.dt
t1_unique = np.unique(stratified_data.t1_flat)
# Extract physical parameters
is_laminar_flow = "gamma_dot_t0" in physical_param_names
D0 = physical_params[0]
alpha = physical_params[1]
D_offset = physical_params[2]
if is_laminar_flow:
gamma_dot_t0 = physical_params[3]
beta = physical_params[4]
gamma_dot_t_offset = physical_params[5]
phi0 = physical_params[6]
else:
gamma_dot_t0 = 0.0
beta = 0.0
gamma_dot_t_offset = 0.0
phi0 = 0.0
# Precompute time-dependent quantities (same for all angles)
t_values = np.asarray(t1_unique)
D_t = calculate_diffusion_coefficient(t_values, D0, alpha, D_offset)
D_cumsum = np.asarray(trapezoid_cumsum(D_t))
wavevector_q_squared_half_dt = 0.5 * (q**2) * dt
if is_laminar_flow:
gamma_t = calculate_shear_rate(t_values, gamma_dot_t0, beta, gamma_dot_t_offset)
gamma_cumsum = np.asarray(trapezoid_cumsum(gamma_t))
sinc_prefactor = 0.5 / np.pi * q * L * dt
# Initialize output arrays
contrast_per_angle = np.full(n_phi, default_contrast)
offset_per_angle = np.full(n_phi, default_offset)
# Process each angle
for i, phi in enumerate(phi_unique):
try:
# Get data for this angle
if hasattr(stratified_data, "chunks"):
# Find data in chunks
g2_list = []
t1_list = []
t2_list = []
for chunk in stratified_data.chunks:
mask = np.isclose(chunk.phi, phi, atol=0.1)
if np.any(mask):
g2_list.extend(chunk.g2[mask].tolist())
t1_list.extend(chunk.t1[mask].tolist())
t2_list.extend(chunk.t2[mask].tolist())
if not g2_list:
continue
g2_data = np.array(g2_list)
t1_data = np.array(t1_list)
t2_data = np.array(t2_list)
else:
mask = np.isclose(stratified_data.phi_flat, phi, atol=0.1)
if not np.any(mask):
continue
g2_data = stratified_data.g2_flat[mask]
t1_data = stratified_data.t1_flat[mask]
t2_data = stratified_data.t2_flat[mask]
# Compute g1_model for each data point at this angle.
# NOTE: Both t1 and t2 index into t1_unique because XPCS correlation
# matrices C2(t1, t2) use a shared time grid (t1_unique == t2_unique).
# The physics model computes D(t) on this single grid, and differences
# D_cumsum[t1_idx] - D_cumsum[t2_idx] give the integral over |t1-t2|.
t1_idx = np.clip(np.searchsorted(t1_unique, t1_data), 0, len(t1_unique) - 1)
t2_idx = np.clip(np.searchsorted(t1_unique, t2_data), 0, len(t1_unique) - 1)
# Diffusion term
D_diff = D_cumsum[t1_idx] - D_cumsum[t2_idx]
D_integral_batch = np.abs(D_diff)
log_g1_diff = -wavevector_q_squared_half_dt * D_integral_batch
g1_diffusion = np.exp(np.clip(log_g1_diff, -700.0, 0.0))
if is_laminar_flow:
# Shear term
angle_diff = np.deg2rad(phi0 - phi)
cos_phi = np.cos(angle_diff)
gamma_diff = gamma_cumsum[t1_idx] - gamma_cumsum[t2_idx]
gamma_integral_batch = np.abs(gamma_diff)
sinc_arg = sinc_prefactor * cos_phi * gamma_integral_batch
sinc_val = safe_sinc(sinc_arg)
g1_shear = sinc_val**2
g1_model = g1_diffusion * g1_shear
else:
g1_model = g1_diffusion
# Clip for numerical stability (g1 ∈ [0, 1] by physics)
g1_model = np.clip(g1_model, 1e-10, 1.0)
g1_sq = g1_model**2
# Linear regression: g2 = offset + contrast × g1²
if len(g2_data) > 2:
A = np.column_stack([np.ones_like(g1_sq), g1_sq])
result = np.linalg.lstsq(A, g2_data, rcond=None)
fit_offset, fit_contrast = result[0]
# Sanity checks
if 0.0 < fit_contrast < 2.0 and 0.5 < fit_offset < 1.5:
contrast_per_angle[i] = fit_contrast
offset_per_angle[i] = fit_offset
except (ValueError, RuntimeError, np.linalg.LinAlgError) as e:
if logger:
logger.debug(f"Failed to compute consistent init for angle {phi}: {e}")
# Keep default values
if logger:
logger.info(
f"Computed consistent per-angle initialization:\n"
f" Contrast range: [{contrast_per_angle.min():.4f}, {contrast_per_angle.max():.4f}]\n"
f" Offset range: [{offset_per_angle.min():.4f}, {offset_per_angle.max():.4f}]"
)
return contrast_per_angle, offset_per_angle
[docs]
def compute_quantile_per_angle_scaling(
stratified_data: Any,
contrast_bounds: tuple[float, float] = (0.0, 1.0),
offset_bounds: tuple[float, float] = (0.5, 1.5),
lag_floor_quantile: float = 0.80,
lag_ceiling_quantile: float = 0.20,
value_quantile_low: float = 0.10,
value_quantile_high: float = 0.90,
logger: Any = None,
) -> tuple[np.ndarray, np.ndarray]:
"""Estimate per-angle contrast/offset from quantiles of c2_experimental values.
This function uses physics-informed quantile analysis to estimate contrast and
offset for each phi angle independently. Unlike least-squares fitting, this
approach does not require a model and directly extracts scaling from the data.
Physics basis:
C2 = contrast × g1² + offset
- At large time lags, g1² → 0, so C2 → offset (the "floor")
- At small time lags, g1² ≈ 1, so C2 ≈ contrast + offset (the "ceiling")
Parameters
----------
stratified_data : StratifiedData
Data containing per-angle g2_flat, phi_flat, t1_flat, t2_flat arrays.
contrast_bounds : tuple[float, float]
Valid bounds for contrast parameter.
offset_bounds : tuple[float, float]
Valid bounds for offset parameter.
lag_floor_quantile : float
Quantile threshold for "large lag" region (default: 0.80 = top 20% of lags).
lag_ceiling_quantile : float
Quantile threshold for "small lag" region (default: 0.20 = bottom 20% of lags).
value_quantile_low : float
Quantile for robust floor estimation (default: 0.10).
value_quantile_high : float
Quantile for robust ceiling estimation (default: 0.90).
logger : logging.Logger, optional
Logger for diagnostic messages.
Returns
-------
contrast_per_angle : np.ndarray
Per-angle contrast values from quantile estimation.
offset_per_angle : np.ndarray
Per-angle offset values from quantile estimation.
Notes
-----
The estimation is robust to outliers by using quantiles instead of min/max.
The lag-based segmentation ensures we sample from appropriate regions of
the correlation decay curve.
This function is designed for the "constant" mode in anti-degeneracy defense,
where per-angle contrast/offset are estimated once and treated as fixed
parameters during optimization.
"""
from homodyne.utils.logging import get_logger as _get_logger
if logger is None:
logger = _get_logger(__name__)
# Extract data from stratified_data
if hasattr(stratified_data, "chunks"):
# ChunkedData format
phi_unique = np.array(
sorted(
{phi for chunk in stratified_data.chunks for phi in chunk.phi.tolist()}
)
)
n_phi = len(phi_unique)
# Collect all data into flat arrays
g2_list = []
t1_list = []
t2_list = []
phi_list = []
for chunk in stratified_data.chunks:
g2_list.extend(chunk.g2.tolist())
t1_list.extend(chunk.t1.tolist())
t2_list.extend(chunk.t2.tolist())
phi_list.extend(chunk.phi.tolist())
g2_flat = np.array(g2_list)
t1_flat = np.array(t1_list)
t2_flat = np.array(t2_list)
phi_flat = np.array(phi_list)
else:
# StratifiedData format with flat arrays
phi_unique = np.unique(stratified_data.phi_flat)
n_phi = len(phi_unique)
g2_flat = stratified_data.g2_flat
t1_flat = stratified_data.t1_flat
t2_flat = stratified_data.t2_flat
phi_flat = stratified_data.phi_flat
# Pre-compute time lags (vectorized)
delta_t = np.abs(t1_flat - t2_flat)
# Pre-compute midpoint defaults
contrast_mid = (contrast_bounds[0] + contrast_bounds[1]) / 2.0
offset_mid = (offset_bounds[0] + offset_bounds[1]) / 2.0
# Initialize output arrays with defaults
contrast_per_angle = np.full(n_phi, contrast_mid)
offset_per_angle = np.full(n_phi, offset_mid)
# Pre-compute per-point angle indices for exact matching
phi_indices = np.searchsorted(phi_unique, phi_flat)
phi_indices = np.clip(phi_indices, 0, n_phi - 1)
# Process each angle
for i, phi in enumerate(phi_unique):
# Get mask for this angle — use exact index matching (not tolerance-based)
# to avoid bleeding adjacent phi bins on dense angular grids.
mask = phi_indices == i
n_points = np.sum(mask)
if n_points < 100:
logger.debug(
f"Angle {i} (phi={phi:.1f} deg): insufficient data ({n_points} points), "
f"using midpoint defaults"
)
continue
# Extract data for this angle
c2_angle = g2_flat[mask]
delta_t_angle = delta_t[mask]
# Find lag thresholds for this angle
lag_threshold_high = np.nanpercentile(delta_t_angle, lag_floor_quantile * 100)
lag_threshold_low = np.nanpercentile(delta_t_angle, lag_ceiling_quantile * 100)
# OFFSET estimation: From large-lag region where g1² ≈ 0
large_lag_mask = delta_t_angle >= lag_threshold_high
if np.sum(large_lag_mask) >= 10:
c2_floor_region = c2_angle[large_lag_mask]
offset_est = np.nanpercentile(c2_floor_region, value_quantile_low * 100)
if not np.isfinite(offset_est):
offset_est = offset_mid
else:
# Fallback: use overall low quantile
offset_est = np.nanpercentile(c2_angle, value_quantile_low * 100)
if not np.isfinite(offset_est):
offset_est = offset_mid
# Clip offset to bounds
offset_est = float(np.clip(offset_est, offset_bounds[0], offset_bounds[1]))
# CONTRAST estimation: From small-lag region where g1² ≈ 1
small_lag_mask = delta_t_angle <= lag_threshold_low
if np.sum(small_lag_mask) >= 10:
c2_ceiling_region = c2_angle[small_lag_mask]
c2_ceiling = np.nanpercentile(c2_ceiling_region, value_quantile_high * 100)
if not np.isfinite(c2_ceiling):
c2_ceiling = contrast_mid + offset_mid
else:
# Fallback: use overall high quantile
c2_ceiling = np.nanpercentile(c2_angle, value_quantile_high * 100)
if not np.isfinite(c2_ceiling):
c2_ceiling = contrast_mid + offset_mid
# contrast ≈ c2_ceiling - offset
contrast_est = c2_ceiling - offset_est
# Clip contrast to bounds
contrast_est = float(
np.clip(contrast_est, contrast_bounds[0], contrast_bounds[1])
)
contrast_per_angle[i] = contrast_est
offset_per_angle[i] = offset_est
logger.debug(
f"Angle {i} (phi={phi:.1f} deg): quantile estimation "
f"contrast={contrast_est:.4f}, offset={offset_est:.4f} "
f"from {n_points:,} points"
)
logger.info(
f"Quantile-based per-angle estimation complete:\n"
f" Contrast range: [{contrast_per_angle.min():.4f}, {contrast_per_angle.max():.4f}]\n"
f" Offset range: [{offset_per_angle.min():.4f}, {offset_per_angle.max():.4f}]"
)
return contrast_per_angle, offset_per_angle
__all__ = [
"build_parameter_labels",
"classify_parameter_status",
"sample_xdata",
"compute_jacobian_stats",
"compute_consistent_per_angle_init",
"compute_quantile_per_angle_scaling",
]