Source code for homodyne.optimization.nlsq.transforms
"""Parameter transformation utilities for NLSQ optimization.
This module extracts shear transform logic from nlsq_wrapper.py
to reduce file size and improve maintainability.
Extracted from nlsq_wrapper.py as part of technical debt remediation (Dec 2025).
"""
from __future__ import annotations
from typing import Any
import numpy as np
# Parameter name aliases for backwards compatibility
PARAMETER_NAME_ALIASES = {
# Legacy names -> canonical names
"gamma_dot_0": "gamma_dot_t0",
"gamma_dot_t_0": "gamma_dot_t0",
"gamma_dot_offset": "gamma_dot_t_offset",
"phi_0": "phi0",
}
# Default shear parameter scales for laminar flow mode
DEFAULT_SHEAR_X_SCALE = {
"gamma_dot_t0": 524.0,
"beta": 4.0,
"gamma_dot_t_offset": 771.0,
}
[docs]
def normalize_param_key(name: str | None) -> str:
"""Normalize parameter name using canonical aliases.
Parameters
----------
name : str | None
Parameter name to normalize.
Returns
-------
str
Canonical parameter name.
"""
if not name:
return ""
key = str(name).strip()
return PARAMETER_NAME_ALIASES.get(key, key)
[docs]
def normalize_x_scale_map(raw_map: Any) -> dict[str, float]:
"""Normalize parameter scaling map.
Parameters
----------
raw_map : Any
Raw scaling map (dict or other).
Returns
-------
dict[str, float]
Normalized scaling map with canonical keys.
"""
if not isinstance(raw_map, dict):
return {}
normalized: dict[str, float] = {}
for raw_key, raw_value in raw_map.items():
key = normalize_param_key(raw_key)
if not key:
continue
try:
normalized[key] = float(raw_value)
except (TypeError, ValueError):
continue
return normalized
[docs]
def build_per_parameter_x_scale(
per_angle_scaling: bool,
n_angles: int,
physical_param_names: list[str],
analysis_mode: str,
override_map: dict[str, float],
) -> np.ndarray | None:
"""Build per-parameter scale array for optimization.
Parameters
----------
per_angle_scaling : bool
Whether per-angle scaling is enabled.
n_angles : int
Number of phi angles.
physical_param_names : list[str]
List of physical parameter names.
analysis_mode : str
Analysis mode ("static" or "laminar_flow").
override_map : dict[str, float]
User overrides for parameter scales.
Returns
-------
np.ndarray | None
Scale array or None if all scales are 1.0.
"""
effective_physical: dict[str, float] = dict.fromkeys(physical_param_names, 1.0)
if analysis_mode == "laminar_flow":
for alias_key, scale in DEFAULT_SHEAR_X_SCALE.items():
canonical = normalize_param_key(alias_key)
if canonical in effective_physical:
effective_physical[canonical] = scale
for key, value in override_map.items():
canonical = normalize_param_key(key)
if canonical in effective_physical:
effective_physical[canonical] = value
contrast_scale = float(override_map.get("contrast", 1.0))
offset_scale = float(override_map.get("offset", 1.0))
has_nonunity = (
any(abs(scale - 1.0) > 1e-12 for scale in effective_physical.values())
or abs(contrast_scale - 1.0) > 1e-12
or abs(offset_scale - 1.0) > 1e-12
)
if not has_nonunity:
return None
scales: list[float] = []
if per_angle_scaling:
if n_angles <= 0:
return None
scales.extend([contrast_scale] * n_angles)
scales.extend([offset_scale] * n_angles)
else:
scales.extend([contrast_scale, offset_scale])
for name in physical_param_names:
scales.append(effective_physical.get(name, 1.0))
return np.asarray(scales, dtype=float)
[docs]
def format_x_scale_for_log(value: Any) -> str:
"""Format x_scale value for logging.
Parameters
----------
value : Any
Scale value to format.
Returns
-------
str
Formatted string.
"""
if isinstance(value, np.ndarray):
return f"array(len={value.size})"
return str(value)
[docs]
def parse_shear_transform_config(config: Any | None) -> dict[str, Any]:
"""Parse shear transform configuration.
Parameters
----------
config : Any | None
Configuration dict or None.
Returns
-------
dict[str, Any]
Parsed configuration with defaults.
"""
if not isinstance(config, dict):
return {
"enable_gamma_dot_log": False,
"enable_beta_centering": False,
"beta_reference": 0.0,
}
return {
"enable_gamma_dot_log": bool(config.get("enable_gamma_dot_log", False)),
"enable_beta_centering": bool(config.get("enable_beta_centering", False)),
"beta_reference": float(config.get("beta_reference", 0.0)),
}
[docs]
def build_physical_index_map(
per_angle_scaling: bool,
n_angles: int,
physical_param_names: list[str],
) -> dict[str, int]:
"""Build mapping from parameter names to indices.
Parameters
----------
per_angle_scaling : bool
Whether per-angle scaling is enabled.
n_angles : int
Number of phi angles.
physical_param_names : list[str]
List of physical parameter names.
Returns
-------
dict[str, int]
Mapping from parameter name to index in parameter vector.
"""
start = 2 * n_angles if per_angle_scaling else 2
return {name: start + idx for idx, name in enumerate(physical_param_names)}
[docs]
def apply_forward_shear_transforms_to_vector(
params: np.ndarray,
index_map: dict[str, int],
transform_cfg: dict[str, Any],
) -> tuple[np.ndarray, dict[str, Any]]:
"""Apply forward shear transforms to parameter vector.
Transforms parameters from physical space to solver space:
- gamma_dot_t0 -> log(gamma_dot_t0) if enable_gamma_dot_log
- beta -> beta - beta_reference if enable_beta_centering
Parameters
----------
params : np.ndarray
Parameter vector in physical space.
index_map : dict[str, int]
Mapping from parameter names to indices.
transform_cfg : dict[str, Any]
Transform configuration.
Returns
-------
tuple[np.ndarray, dict[str, Any]]
(transformed_params, transform_state)
"""
vector = np.asarray(params, dtype=float).copy()
state: dict[str, Any] = {
"gamma_log_idx": None,
"beta_center_idx": None,
"beta_reference": float(transform_cfg.get("beta_reference", 0.0)),
}
if transform_cfg.get("enable_gamma_dot_log", False):
# Try canonical name first, fallback to old name for backwards compatibility
idx = index_map.get("gamma_dot_t0") or index_map.get("gamma_dot_0")
if idx is not None:
value = vector[idx]
if value <= 0:
raise ValueError(
"gamma_dot_t0 must be > 0 when enable_gamma_dot_log is true"
)
vector[idx] = np.log(value)
state["gamma_log_idx"] = idx
if transform_cfg.get("enable_beta_centering", False):
idx = index_map.get("beta")
if idx is not None:
vector[idx] = vector[idx] - state["beta_reference"]
state["beta_center_idx"] = idx
if state["gamma_log_idx"] is None and state["beta_center_idx"] is None:
return np.asarray(params, dtype=float).copy(), {}
return vector, state
[docs]
def apply_forward_shear_transforms_to_bounds(
bounds: tuple[np.ndarray, np.ndarray] | None,
state: dict[str, Any],
) -> tuple[np.ndarray, np.ndarray] | None:
"""Apply forward shear transforms to parameter bounds.
Parameters
----------
bounds : tuple[np.ndarray, np.ndarray] | None
(lower, upper) bounds in physical space.
state : dict[str, Any]
Transform state from apply_forward_shear_transforms_to_vector.
Returns
-------
tuple[np.ndarray, np.ndarray] | None
Transformed bounds or None.
"""
if not bounds or not state:
return bounds
lower, upper = (
np.asarray(bounds[0], dtype=float).copy(),
np.asarray(bounds[1], dtype=float).copy(),
)
gamma_idx = state.get("gamma_log_idx")
if gamma_idx is not None:
if lower[gamma_idx] <= 0 or upper[gamma_idx] <= 0:
raise ValueError(
"gamma_dot_t0 bounds must be > 0 when enable_gamma_dot_log is true"
)
lower[gamma_idx] = np.log(lower[gamma_idx])
upper[gamma_idx] = np.log(upper[gamma_idx])
beta_idx = state.get("beta_center_idx")
if beta_idx is not None:
beta_ref = state.get("beta_reference", 0.0)
lower[beta_idx] = lower[beta_idx] - beta_ref
upper[beta_idx] = upper[beta_idx] - beta_ref
return (lower, upper)
[docs]
def apply_inverse_shear_transforms_to_vector(
params: np.ndarray,
state: dict[str, Any] | None,
) -> np.ndarray:
"""Apply inverse shear transforms to parameter vector.
Transforms parameters from solver space back to physical space.
Parameters
----------
params : np.ndarray
Parameter vector in solver space.
state : dict[str, Any] | None
Transform state from apply_forward_shear_transforms_to_vector.
Returns
-------
np.ndarray
Parameter vector in physical space.
"""
if not state:
return params
vector = np.asarray(params, dtype=float).copy()
gamma_idx = state.get("gamma_log_idx")
if gamma_idx is not None:
vector[gamma_idx] = np.exp(vector[gamma_idx])
beta_idx = state.get("beta_center_idx")
if beta_idx is not None:
vector[beta_idx] = vector[beta_idx] + state.get("beta_reference", 0.0)
return vector
[docs]
def adjust_covariance_for_transforms(
covariance: np.ndarray,
transformed_params: np.ndarray,
physical_params: np.ndarray,
state: dict[str, Any] | None,
) -> np.ndarray:
"""Adjust covariance matrix for parameter transforms.
Parameters
----------
covariance : np.ndarray
Covariance matrix in solver space.
transformed_params : np.ndarray
Parameters in solver space.
physical_params : np.ndarray
Parameters in physical space.
state : dict[str, Any] | None
Transform state.
Returns
-------
np.ndarray
Covariance matrix in physical space.
"""
if not state or covariance.size == 0:
return covariance
adjusted = np.asarray(covariance, dtype=float).copy()
gamma_idx = state.get("gamma_log_idx")
if gamma_idx is not None:
scale = physical_params[gamma_idx]
adjusted[gamma_idx, :] *= scale
adjusted[:, gamma_idx] *= scale
# beta centering derivative is 1, so covariance unchanged
return adjusted
[docs]
def wrap_model_function_with_transforms(
model_fn: Any,
state: dict[str, Any] | None,
) -> Any:
"""Wrap model function to apply inverse transforms to parameters.
Parameters
----------
model_fn : callable
Original model function.
state : dict[str, Any] | None
Transform state.
Returns
-------
callable
Wrapped model function (or original if no transforms).
"""
if not state:
return model_fn
if not callable(model_fn):
return model_fn
def wrapped_model(xdata: np.ndarray, *solver_params: float) -> np.ndarray:
physical = apply_inverse_shear_transforms_to_vector(
np.asarray(solver_params), state
)
result: np.ndarray = model_fn(xdata, *physical)
return result
# Preserve helpful attributes for downstream logging/diagnostics
for attr in ["n_phi", "n_angles", "per_angle_scaling"]:
if hasattr(model_fn, attr):
setattr(wrapped_model, attr, getattr(model_fn, attr))
return wrapped_model
[docs]
def wrap_stratified_function_with_transforms(
residual_fn: Any,
state: dict[str, Any] | None,
) -> Any:
"""Wrap stratified residual function with transforms.
Parameters
----------
residual_fn : Any
Original stratified residual function.
state : dict[str, Any] | None
Transform state.
Returns
-------
Any
Wrapped function (or original if no transforms).
"""
if not state:
return residual_fn
class _TransformedStratified:
def __init__(self, base_fn: Any, transform_state: dict[str, Any]):
self._base_fn = base_fn
self._state = transform_state
def __call__(self, params: np.ndarray) -> np.ndarray:
physical = apply_inverse_shear_transforms_to_vector(params, self._state)
result: np.ndarray = self._base_fn(physical)
return result
def __getattr__(self, item: str) -> Any:
return getattr(self._base_fn, item)
return _TransformedStratified(residual_fn, state)