Source code for homodyne.core.scaling_utils

"""Shared scaling utilities for per-angle contrast/offset estimation.

This module provides unified quantile-based estimation of contrast and offset
parameters that can be used by both NLSQ and CMC optimization backends.

The physics basis:
    C2 = contrast × g1² + offset

    - At large time lags, g1² → 0, so C2 → offset (the "floor")
    - At small time lags, g1² ≈ 1, so C2 ≈ contrast + offset (the "ceiling")

Version: 2.18.0
"""

from __future__ import annotations

from typing import TYPE_CHECKING

import numpy as np

from homodyne.utils.logging import get_logger

if TYPE_CHECKING:
    from logging import Logger, LoggerAdapter

logger = get_logger(__name__)


[docs] def estimate_contrast_offset_from_quantiles( c2_data: np.ndarray, delta_t: np.ndarray, contrast_bounds: tuple[float, float] = (0.0, 1.0), offset_bounds: tuple[float, float] = (0.5, 1.5), lag_floor_quantile: float = 0.80, lag_ceiling_quantile: float = 0.20, value_quantile_low: float = 0.10, value_quantile_high: float = 0.90, ) -> tuple[float, float]: """Estimate contrast and offset from C2 data using physics-informed quantile analysis. Uses the correlation decay structure: C2 = contrast × g1² + offset - At large time lags, g1² → 0, so C2 → offset (the "floor") - At small time lags, g1² ≈ 1, so C2 ≈ contrast + offset (the "ceiling") Parameters ---------- c2_data : np.ndarray C2 correlation values (1D array). delta_t : np.ndarray Time lag values ``abs(t1 - t2)`` (same shape as c2_data). contrast_bounds : tuple[float, float] Valid bounds for contrast parameter. offset_bounds : tuple[float, float] Valid bounds for offset parameter. lag_floor_quantile : float Quantile threshold for "large lag" region (default: 0.80 = top 20% of lags). lag_ceiling_quantile : float Quantile threshold for "small lag" region (default: 0.20 = bottom 20% of lags). value_quantile_low : float Quantile for robust floor estimation (default: 0.10). value_quantile_high : float Quantile for robust ceiling estimation (default: 0.90). Returns ------- tuple[float, float] (contrast_est, offset_est) - Estimated values clipped to bounds. Notes ----- The estimation is robust to outliers by using quantiles instead of min/max. The lag-based segmentation ensures we're sampling from the appropriate regions of the correlation decay curve. """ c2 = np.asarray(c2_data) dt = np.asarray(delta_t) # Filter non-finite values before estimation finite_mask = np.isfinite(c2) if not np.all(finite_mask): c2 = c2[finite_mask] dt = dt[finite_mask] # Sanity checks if len(c2) < 100: # Not enough data for robust estimation - return midpoints contrast_mid = (contrast_bounds[0] + contrast_bounds[1]) / 2.0 offset_mid = (offset_bounds[0] + offset_bounds[1]) / 2.0 return contrast_mid, offset_mid # Find lag thresholds lag_threshold_high = np.percentile(dt, lag_floor_quantile * 100) lag_threshold_low = np.percentile(dt, lag_ceiling_quantile * 100) # OFFSET estimation: From large-lag region where g1² ≈ 0 # C2 → offset at large lags large_lag_mask = dt >= lag_threshold_high if np.sum(large_lag_mask) >= 10: c2_floor_region = c2[large_lag_mask] # Use low quantile for robustness (in case of noise spikes) offset_est = np.percentile(c2_floor_region, value_quantile_low * 100) else: # Fallback: use overall low quantile offset_est = np.percentile(c2, value_quantile_low * 100) # Clip offset to bounds offset_est = float(np.clip(offset_est, offset_bounds[0], offset_bounds[1])) # CONTRAST estimation: From small-lag region where g1² ≈ 1 # C2 ≈ contrast + offset at small lags small_lag_mask = dt <= lag_threshold_low if np.sum(small_lag_mask) >= 10: c2_ceiling_region = c2[small_lag_mask] # Use high quantile for robustness c2_ceiling = np.percentile(c2_ceiling_region, value_quantile_high * 100) else: # Fallback: use overall high quantile c2_ceiling = np.percentile(c2, value_quantile_high * 100) # contrast ≈ c2_ceiling - offset contrast_est = c2_ceiling - offset_est # Clip contrast to bounds contrast_est = float(np.clip(contrast_est, contrast_bounds[0], contrast_bounds[1])) return contrast_est, offset_est
[docs] def estimate_per_angle_scaling( c2_data: np.ndarray, t1: np.ndarray, t2: np.ndarray, phi_indices: np.ndarray, n_phi: int, contrast_bounds: tuple[float, float], offset_bounds: tuple[float, float], log: Logger | LoggerAdapter[Logger] | None = None, ) -> dict[str, float]: """Estimate contrast and offset initial values for each phi angle. This is the unified implementation used by both NLSQ and CMC backends. Optimization (v2.9.1): Uses vectorized grouped operations instead of sequential loop over angles. Provides 3-5x speedup for typical datasets with 20+ phi angles. Parameters ---------- c2_data : np.ndarray Pooled C2 correlation values. t1 : np.ndarray Pooled first time coordinates. t2 : np.ndarray Pooled second time coordinates. phi_indices : np.ndarray Index mapping each data point to its phi angle (0 to n_phi-1). n_phi : int Number of unique phi angles. contrast_bounds : tuple[float, float] Valid bounds for contrast. offset_bounds : tuple[float, float] Valid bounds for offset. log : logging.Logger | None Logger for diagnostic messages. Returns ------- dict[str, float] Dictionary with keys 'contrast_0', 'offset_0', 'contrast_1', 'offset_1', etc. """ if log is None: log = logger c2 = np.asarray(c2_data) t1_arr = np.asarray(t1) t2_arr = np.asarray(t2) phi_idx = np.asarray(phi_indices) # Pre-compute time lags once (vectorized) delta_t = np.abs(t1_arr - t2_arr) # Pre-compute midpoint defaults contrast_mid = (contrast_bounds[0] + contrast_bounds[1]) / 2.0 offset_mid = (offset_bounds[0] + offset_bounds[1]) / 2.0 # Pre-allocate result arrays contrast_results = np.full(n_phi, contrast_mid) offset_results = np.full(n_phi, offset_mid) # Count points per angle using bincount (vectorized) points_per_angle = np.bincount(phi_idx, minlength=n_phi) # Identify angles with sufficient data sufficient_mask = points_per_angle >= 100 n_sufficient = np.sum(sufficient_mask) if n_sufficient == 0: # No angles have enough data - return defaults log.info(f"All {n_phi} angles have insufficient data, using midpoint defaults") return { **{f"contrast_{i}": contrast_mid for i in range(n_phi)}, **{f"offset_{i}": offset_mid for i in range(n_phi)}, } # Sort data by phi index for efficient grouped operations sort_idx = np.argsort(phi_idx) c2_sorted = c2[sort_idx] delta_t_sorted = delta_t[sort_idx] phi_sorted = phi_idx[sort_idx] # Find group boundaries group_starts = np.searchsorted(phi_sorted, np.arange(n_phi)) group_ends = np.searchsorted(phi_sorted, np.arange(n_phi), side="right") # Process each angle with sufficient data for i in range(n_phi): if not sufficient_mask[i]: log.debug( f"Angle {i}: insufficient data ({points_per_angle[i]} points), " f"using midpoint init contrast={contrast_mid:.4f}, offset={offset_mid:.4f}" ) continue # Extract data for this angle using pre-sorted arrays start, end = group_starts[i], group_ends[i] c2_angle = c2_sorted[start:end] delta_t_angle = delta_t_sorted[start:end] # Use shared estimation function contrast_est, offset_est = estimate_contrast_offset_from_quantiles( c2_angle, delta_t_angle, contrast_bounds=contrast_bounds, offset_bounds=offset_bounds, ) contrast_results[i] = contrast_est offset_results[i] = offset_est log.debug( f"Angle {i}: estimated contrast={contrast_est:.4f}, offset={offset_est:.4f} " f"from {end - start:,} data points" ) # Build result dictionary estimates: dict[str, float] = {} for i in range(n_phi): estimates[f"contrast_{i}"] = float(contrast_results[i]) estimates[f"offset_{i}"] = float(offset_results[i]) return estimates
[docs] def compute_averaged_scaling( c2_data: np.ndarray, t1: np.ndarray, t2: np.ndarray, phi_indices: np.ndarray, n_phi: int, contrast_bounds: tuple[float, float], offset_bounds: tuple[float, float], log: Logger | LoggerAdapter[Logger] | None = None, ) -> tuple[float, float, np.ndarray, np.ndarray]: """Compute averaged contrast and offset for constant mode. This function estimates per-angle contrast/offset using quantile analysis, then averages them to produce single values for constant mode optimization. Parameters ---------- c2_data : np.ndarray Pooled C2 correlation values. t1 : np.ndarray Pooled first time coordinates. t2 : np.ndarray Pooled second time coordinates. phi_indices : np.ndarray Index mapping each data point to its phi angle (0 to n_phi-1). n_phi : int Number of unique phi angles. contrast_bounds : tuple[float, float] Valid bounds for contrast. offset_bounds : tuple[float, float] Valid bounds for offset. log : logging.Logger | None Logger for diagnostic messages. Returns ------- tuple[float, float, np.ndarray, np.ndarray] (contrast_avg, offset_avg, contrast_per_angle, offset_per_angle) - contrast_avg: Averaged contrast for constant mode - offset_avg: Averaged offset for constant mode - contrast_per_angle: Per-angle estimates (for diagnostics) - offset_per_angle: Per-angle estimates (for diagnostics) """ if log is None: log = logger # Get per-angle estimates estimates = estimate_per_angle_scaling( c2_data=c2_data, t1=t1, t2=t2, phi_indices=phi_indices, n_phi=n_phi, contrast_bounds=contrast_bounds, offset_bounds=offset_bounds, log=log, ) # Extract arrays contrast_per_angle = np.array([estimates[f"contrast_{i}"] for i in range(n_phi)]) offset_per_angle = np.array([estimates[f"offset_{i}"] for i in range(n_phi)]) # Compute averaged values contrast_avg = float(np.nanmean(contrast_per_angle)) offset_avg = float(np.nanmean(offset_per_angle)) log.info( f"Computed averaged scaling for constant mode:\n" f" contrast_avg: {contrast_avg:.4f} (per-angle range: " f"[{float(np.nanmin(contrast_per_angle)):.4f}, {float(np.nanmax(contrast_per_angle)):.4f}])\n" f" offset_avg: {offset_avg:.4f} (per-angle range: " f"[{float(np.nanmin(offset_per_angle)):.4f}, {float(np.nanmax(offset_per_angle)):.4f}])" ) return contrast_avg, offset_avg, contrast_per_angle, offset_per_angle
__all__ = [ "estimate_contrast_offset_from_quantiles", "estimate_per_angle_scaling", "compute_averaged_scaling", ]