Source code for homodyne.optimization.cmc.reparameterization
# homodyne/optimization/cmc/reparameterization.py
"""Internal reparameterization for CMC sampling.
Transforms correlated parameters to orthogonal sampling space,
then converts back to physics parameters for output.
Reparameterizations (v2.23.0, reference-time):
- D0, D_offset → log_D_ref, D_offset_ratio (decorrelates D0/alpha via observable)
- gamma_dot_t0 → log_gamma_ref (decorrelates gamma_dot_t0/beta via observable)
Theory:
For f(t) = A × t^α, the value at a reference time f_ref = A × t_ref^α is
well-constrained by data. Sampling (f_ref, α) instead of (A, α) decorrelates
them because f_ref relates to observables while α controls the shape.
t_ref = √(dt × t_max) is the geometric mean of the time range.
D_offset reparameterization (v2.23.0):
D_offset_ratio = D_offset / D_ref is a simple linear map that handles
negative D_offset (jammed/arrested systems) without singularity.
The old D_offset_frac = D_offset / (D_ref + D_offset) had a pole at
D_offset = -D_ref and was restricted to [0, 0.5], silently clipping
negative NLSQ warm-starts. D_offset_ratio uses a Normal prior (unbounded).
"""
from __future__ import annotations
import math
from dataclasses import dataclass
import numpy as np
from homodyne.utils.logging import get_logger
logger = get_logger(__name__)
[docs]
def compute_t_ref(
dt: float, t_max: float, *, fallback_value: float | None = None
) -> float:
"""Compute reference time as geometric mean of time range.
Parameters
----------
dt : float
Time step (minimum time scale).
t_max : float
Maximum time in the dataset.
fallback_value : float | None
If provided and inputs are invalid, return this value with a warning
instead of raising ValueError. If None (default), raises on invalid inputs.
Returns
-------
float
Reference time t_ref = sqrt(dt * t_max), or fallback_value if inputs invalid.
Raises
------
ValueError
If dt or t_max are non-positive or non-finite and fallback_value is None.
"""
if dt <= 0 or t_max <= 0 or not math.isfinite(dt) or not math.isfinite(t_max):
if fallback_value is not None:
logger.warning(
f"Invalid inputs for t_ref computation: dt={dt}, t_max={t_max}. "
f"Using fallback t_ref={fallback_value}."
)
return fallback_value
raise ValueError(
f"Invalid inputs for t_ref computation: dt={dt}, t_max={t_max}. "
"Both must be positive and finite."
)
return math.sqrt(dt * t_max)
[docs]
def transform_nlsq_to_reparam_space(
nlsq_values: dict[str, float],
nlsq_uncertainties: dict[str, float] | None,
t_ref: float,
) -> tuple[dict[str, float], dict[str, float]]:
"""Transform NLSQ estimates to reference-time reparameterized space.
Computes D_ref, log_D_ref, gamma_ref, log_gamma_ref from NLSQ physics
parameters, with delta-method uncertainty propagation.
Parameters
----------
nlsq_values : dict[str, float]
NLSQ parameter estimates (D0, alpha, D_offset, gamma_dot_t0, beta, ...).
nlsq_uncertainties : dict[str, float] | None
NLSQ standard errors for each parameter.
t_ref : float
Reference time for reparameterization.
Returns
-------
tuple[dict[str, float], dict[str, float]]
(reparam_values, reparam_uncertainties) in the reparameterized space.
Keys: "log_D_ref", "D_offset_ratio", "log_gamma_ref" (when available).
"""
reparam_values: dict[str, float] = {}
reparam_uncertainties: dict[str, float] = {}
# --- Diffusion reparameterization ---
D0 = nlsq_values.get("D0")
alpha = nlsq_values.get("alpha")
D_offset = nlsq_values.get("D_offset")
if D0 is not None and alpha is not None:
# D_ref = D0 * t_ref^alpha
D_ref = D0 * (t_ref**alpha)
D_ref = max(D_ref, 1e-10) # Prevent log(0)
log_D_ref = math.log(D_ref)
reparam_values["log_D_ref"] = log_D_ref
# D_offset_ratio = D_offset / D_ref (linear, handles negative D_offset)
# Normal prior in model.py; inverse is D_offset = D_ref * ratio (no singularity).
if D_offset is not None:
reparam_values["D_offset_ratio"] = float(D_offset / D_ref)
# Delta-method uncertainty propagation for log_D_ref
# log_D_ref = log(D0) + alpha * log(t_ref)
# Var(log_D_ref) ≈ (1/D0)^2 * Var(D0) + log(t_ref)^2 * Var(alpha)
# + 2 * (1/D0) * log(t_ref) * Cov(D0, alpha)
# We ignore covariance (conservative, slightly overestimates uncertainty)
if nlsq_uncertainties:
D0_std = nlsq_uncertainties.get("D0", 0.0)
alpha_std = nlsq_uncertainties.get("alpha", 0.0)
D_offset_std = nlsq_uncertainties.get("D_offset", 0.0)
# Var(log_D_ref) via delta method (ignoring covariance)
log_t_ref = math.log(max(t_ref, 1e-10))
var_log_D_ref = 0.0
if D0 > 0 and D0_std > 0:
var_log_D_ref += (D0_std / D0) ** 2
if alpha_std > 0:
var_log_D_ref += (log_t_ref * alpha_std) ** 2
reparam_uncertainties["log_D_ref"] = math.sqrt(max(var_log_D_ref, 1e-20))
# Uncertainty for D_offset_ratio via delta method (ignoring covariance):
# ratio = D_offset / D_ref, D_ref = D0 * t_ref^alpha
# Var(ratio) = Var(D_offset)/D_ref^2 + ratio^2 * Var(log_D_ref)
# The second term captures D0/alpha uncertainty propagated through D_ref.
if D_offset is not None and D_offset_std > 0:
ratio_val = D_offset / D_ref
var_ratio = (D_offset_std / D_ref) ** 2 + ratio_val**2 * var_log_D_ref
reparam_uncertainties["D_offset_ratio"] = math.sqrt(
max(var_ratio, 1e-20)
)
# --- Shear reparameterization ---
gamma_dot_t0 = nlsq_values.get("gamma_dot_t0")
beta = nlsq_values.get("beta")
if gamma_dot_t0 is not None and beta is not None:
# gamma_ref = gamma_dot_t0 * t_ref^beta
if gamma_dot_t0 <= 0:
logger.warning(
f"gamma_dot_t0={gamma_dot_t0:.4g} is non-positive. "
"NLSQ may have converged to a physically invalid value. "
"log_gamma_ref will be floored at log(1e-20)."
)
gamma_ref = gamma_dot_t0 * (t_ref**beta)
gamma_ref = max(gamma_ref, 1e-20) # Prevent log(0)
log_gamma_ref = math.log(gamma_ref)
reparam_values["log_gamma_ref"] = log_gamma_ref
# Delta-method uncertainty for log_gamma_ref
if nlsq_uncertainties:
gamma_std = nlsq_uncertainties.get("gamma_dot_t0", 0.0)
beta_std = nlsq_uncertainties.get("beta", 0.0)
log_t_ref = math.log(max(t_ref, 1e-10))
var_log_gamma_ref = 0.0
if gamma_dot_t0 > 0 and gamma_std > 0:
var_log_gamma_ref += (gamma_std / gamma_dot_t0) ** 2
if beta_std > 0:
var_log_gamma_ref += (log_t_ref * beta_std) ** 2
reparam_uncertainties["log_gamma_ref"] = math.sqrt(
max(var_log_gamma_ref, 1e-20)
)
return reparam_values, reparam_uncertainties
[docs]
@dataclass
class ReparamConfig:
"""Configuration for internal reparameterization.
Attributes
----------
enable_d_ref : bool
Enable D_ref reparameterization (D0, alpha → log_D_ref, alpha).
enable_gamma_ref : bool
Sample log(gamma_ref) where gamma_ref = gamma_dot_t0 * t_ref^beta.
t_ref : float
Reference time for reparameterization (geometric mean of time range).
"""
enable_d_ref: bool = True
enable_gamma_ref: bool = True
t_ref: float = 1.0
# Backward compatibility properties
@property
def enable_d_total(self) -> bool:
"""Backward-compatible alias for enable_d_ref."""
return self.enable_d_ref
@property
def enable_log_gamma(self) -> bool:
"""Backward-compatible alias for enable_gamma_ref."""
return self.enable_gamma_ref
[docs]
def transform_to_sampling_space(
params: dict[str, float],
config: ReparamConfig,
) -> dict[str, float]:
"""Transform physics params → sampling params.
Parameters
----------
params : dict[str, float]
Physics parameters (D0, D_offset, gamma_dot_t0, etc.).
config : ReparamConfig
Reparameterization configuration.
Returns
-------
dict[str, float]
Transformed parameters for sampling.
"""
result = dict(params)
t_ref = config.t_ref
if config.enable_d_ref and "D0" in params and "D_offset" in params:
D0 = params["D0"]
alpha = params.get("alpha", 0.0)
D_offset = params["D_offset"]
# D_ref = D0 * t_ref^alpha
D_ref = D0 * (t_ref**alpha)
D_ref = max(D_ref, 1e-10)
log_D_ref = np.log(D_ref)
# D_offset_ratio = D_offset / D_ref (linear map, no singularity)
# Handles negative D_offset (jammed/arrested systems) correctly.
# Clamped to (-1+eps, inf) to match TruncatedNormal prior floor in model.py:
# ratio <= -1 ↔ D_ref + D_offset <= 0 (non-physical total diffusion at t_ref).
# Inverse: D_offset = D_ref * ratio (used in transform_to_physics_space).
D_offset_ratio = float(max(D_offset / D_ref, -1.0 + 1e-4))
result["log_D_ref"] = float(log_D_ref)
result["D_offset_ratio"] = D_offset_ratio
del result["D0"]
del result["D_offset"]
if config.enable_gamma_ref and "gamma_dot_t0" in params:
gamma_dot_t0 = params["gamma_dot_t0"]
beta = params.get("beta", 0.0)
# gamma_ref = gamma_dot_t0 * t_ref^beta
gamma_ref = gamma_dot_t0 * (t_ref**beta)
gamma_ref = max(gamma_ref, 1e-20)
result["log_gamma_ref"] = float(np.log(gamma_ref))
del result["gamma_dot_t0"]
return result
[docs]
def transform_to_physics_space(
samples: dict[str, np.ndarray],
config: ReparamConfig,
) -> dict[str, np.ndarray]:
"""Transform sampling params → physics params (vectorized).
Parameters
----------
samples : dict[str, np.ndarray]
Sampled parameters from MCMC.
config : ReparamConfig
Reparameterization configuration.
Returns
-------
dict[str, np.ndarray]
Physics parameters.
"""
result = dict(samples)
t_ref = config.t_ref
if config.enable_d_ref and "log_D_ref" in samples:
log_D_ref = samples["log_D_ref"]
D_offset_ratio = samples["D_offset_ratio"]
alpha = samples.get("alpha", np.zeros_like(log_D_ref))
D_ref = np.exp(log_D_ref)
# D0 = D_ref * t_ref^(-alpha)
result["D0"] = D_ref * (t_ref ** (-alpha))
# D_offset = D_ref * ratio (exact inverse, handles negative ratio)
result["D_offset"] = D_ref * D_offset_ratio
del result["log_D_ref"]
del result["D_offset_ratio"]
elif config.enable_d_ref and "D_total" in samples:
# Legacy D_total path (backward compatibility)
D_total = samples["D_total"]
D_offset_frac = samples["D_offset_frac"]
result["D0"] = D_total * (1 - D_offset_frac)
result["D_offset"] = D_total * D_offset_frac
del result["D_total"]
del result["D_offset_frac"]
if config.enable_gamma_ref and "log_gamma_ref" in samples:
log_gamma_ref = samples["log_gamma_ref"]
beta = samples.get("beta", np.zeros_like(log_gamma_ref))
# gamma_dot_t0 = exp(log_gamma_ref) * t_ref^(-beta)
result["gamma_dot_t0"] = np.exp(log_gamma_ref) * (t_ref ** (-beta))
del result["log_gamma_ref"]
elif config.enable_gamma_ref and "log_gamma_dot_t0" in samples:
# Legacy log_gamma_dot_t0 path (backward compatibility)
result["gamma_dot_t0"] = np.exp(samples["log_gamma_dot_t0"])
del result["log_gamma_dot_t0"]
return result