"""Result saving for Homodyne CLI.
Handles saving NLSQ and MCMC/CMC optimization results including
JSON files, NPZ data, and plot generation.
"""
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from typing import Any, cast
import numpy as np
from homodyne.config.parameter_names import get_physical_param_names
from homodyne.config.types import (
LAMINAR_FLOW_PARAM_NAMES,
SCALING_PARAM_NAMES,
STATIC_PARAM_NAMES,
)
from homodyne.io.json_utils import json_safe as _io_json_safe
from homodyne.io.mcmc_writers import (
create_mcmc_analysis_dict as _io_create_mcmc_analysis_dict,
)
from homodyne.io.mcmc_writers import (
create_mcmc_diagnostics_dict as _io_create_mcmc_diagnostics_dict,
)
from homodyne.io.mcmc_writers import (
create_mcmc_parameters_dict as _io_create_mcmc_parameters_dict,
)
from homodyne.io.nlsq_writers import (
save_nlsq_json_files as _io_save_nlsq_json_files,
)
from homodyne.io.nlsq_writers import (
save_nlsq_npz_file as _io_save_nlsq_npz_file,
)
from homodyne.optimization.nlsq.fit_computation import (
compute_theoretical_fits,
)
from homodyne.optimization.nlsq.validation.fit_quality import (
FitQualityConfig,
validate_fit_quality,
)
from homodyne.utils.logging import get_logger
logger = get_logger(__name__)
def _json_safe(value: Any) -> Any:
"""Convert nested objects to JSON-serializable primitives.
This is a wrapper that delegates to homodyne.io.json_utils.
"""
return _io_json_safe(value)
def _json_serializer(obj: Any) -> Any:
"""JSON serializer for numpy arrays and other objects.
Delegates to homodyne.io.json_utils.json_serializer for NaN/Inf sanitization.
"""
from homodyne.io.json_utils import json_serializer
return json_serializer(obj)
def _save_nlsq_json_files(
param_dict: dict[str, Any],
analysis_dict: dict[str, Any],
convergence_dict: dict[str, Any],
output_dir: Path,
) -> None:
"""Save 3 JSON files: parameters, analysis results, convergence metrics.
This is a wrapper that delegates to homodyne.io.nlsq_writers.
"""
_io_save_nlsq_json_files(param_dict, analysis_dict, convergence_dict, output_dir)
def _save_nlsq_npz_file(
phi_angles: np.ndarray,
c2_exp: np.ndarray,
c2_raw: np.ndarray,
c2_scaled: np.ndarray,
c2_solver: np.ndarray | None,
per_angle_scaling: np.ndarray,
per_angle_scaling_solver: np.ndarray,
residuals: np.ndarray,
residuals_norm: np.ndarray,
t1: np.ndarray,
t2: np.ndarray,
q: float,
output_dir: Path,
) -> None:
"""Save NPZ file with experimental/theoretical data and metadata.
This is a wrapper that delegates to homodyne.io.nlsq_writers.
"""
_io_save_nlsq_npz_file(
phi_angles,
c2_exp,
c2_raw,
c2_scaled,
c2_solver,
per_angle_scaling,
per_angle_scaling_solver,
residuals,
residuals_norm,
t1,
t2,
q,
output_dir,
)
def _extract_nlsq_metadata(config: Any, data: dict[str, Any]) -> dict[str, Any]:
"""Extract required metadata for NLSQ theoretical fit computation.
Implements multi-level fallback hierarchy for robust metadata extraction:
- L (characteristic length): stator_rotor_gap -> sample_detector_distance -> default
- dt (time step): analyzer_parameters.dt -> experimental_data.dt -> None
- q (wavevector): from data['wavevector_q_list'][0]
Parameters
----------
config : ConfigManager
Configuration manager with analyzer_parameters and experimental_data
data : dict[str, Any]
Experimental data dictionary with wavevector_q_list
Returns
-------
dict[str, Any]
Dictionary with keys 'L', 'dt', 'q' (may be None if not found)
Notes
-----
Default L = 2000000.0 A (200 um, typical rheology-XPCS stator-rotor gap).
Missing dt or q will log warnings but not crash - downstream functions
must handle None values appropriately.
"""
metadata: dict[str, Any] = {}
# Normalize config access: support dict, ConfigManager, or object with .config
if isinstance(config, dict):
config_dict = config
elif hasattr(config, "config") and isinstance(config.config, dict):
config_dict = config.config
elif hasattr(config, "get_config"):
config_dict = config.get_config()
else:
config_dict = getattr(config, "config", {})
# L (characteristic length) extraction with fallback hierarchy
try:
analyzer_params = config_dict.get("analyzer_parameters", {})
geometry = analyzer_params.get("geometry", {})
if "stator_rotor_gap" in geometry:
metadata["L"] = float(geometry["stator_rotor_gap"])
logger.debug(f"Using stator_rotor_gap L = {metadata['L']:.1f} A")
else:
exp_config = config_dict.get("experimental_data", {})
exp_geometry = exp_config.get("geometry", {})
if "stator_rotor_gap" in exp_geometry:
metadata["L"] = float(exp_geometry["stator_rotor_gap"])
logger.debug("Using L from experimental_data.geometry")
elif "sample_detector_distance" in exp_config:
metadata["L"] = float(exp_config["sample_detector_distance"])
logger.debug(
f"Using sample_detector_distance L = {metadata['L']:.1f} A",
)
else:
metadata["L"] = 2000000.0 # Default: 200 um
logger.warning(
f"No L parameter found, using default L = {metadata['L']:.1f} A",
)
except (AttributeError, TypeError, ValueError) as e:
metadata["L"] = 2000000.0
logger.warning(f"Error reading L: {e}, using default L = 2000000.0 A")
# dt (time step) extraction (optional)
try:
analyzer_params = config_dict.get("analyzer_parameters", {})
dt_value = analyzer_params.get("dt")
if dt_value is None:
exp_config = config_dict.get("experimental_data", {})
dt_value = exp_config.get("dt")
if dt_value is not None:
metadata["dt"] = float(dt_value)
logger.debug(f"Using dt = {metadata['dt']:.6f} s")
else:
metadata["dt"] = None
logger.warning("dt not found in config - may need manual specification")
except (AttributeError, TypeError, ValueError) as e:
metadata["dt"] = None
logger.warning(f"Error reading dt: {e}")
# q (wavevector magnitude) extraction from data
try:
q_list = np.asarray(data["wavevector_q_list"])
if q_list.size > 0:
metadata["q"] = float(q_list[0])
logger.debug(f"Using q = {metadata['q']:.6f} A^-1")
else:
metadata["q"] = None
logger.warning("Empty wavevector_q_list")
except (KeyError, IndexError, TypeError) as e:
metadata["q"] = None
logger.error(f"Error extracting q: {e}")
return metadata
def _prepare_parameter_data(
result: Any,
analysis_mode: str,
n_angles: int | None = None,
) -> dict[str, Any]:
"""Prepare parameter data dictionary for JSON saving.
Extracts parameter values and uncertainties from OptimizationResult and
organizes them by name according to the analysis mode.
Handles both legacy scalar scaling (9 params) and per-angle scaling (13+ params).
Parameters
----------
result : OptimizationResult
NLSQ optimization result
analysis_mode : str
Analysis mode ("static_isotropic"/"static" or "laminar_flow")
n_angles : int, optional
Number of angles in the data (used to detect per-angle scaling).
If omitted, it is inferred from the parameter vector assuming the
canonical 2*n_angles + n_physical layout.
Returns
-------
dict[str, Any]
Dictionary mapping parameter names to {value, uncertainty} dicts
"""
# Get parameter names for analysis mode
normalized_mode = analysis_mode.lower()
if normalized_mode in {"static", "static_isotropic"}:
param_names = SCALING_PARAM_NAMES + STATIC_PARAM_NAMES
n_physical = len(STATIC_PARAM_NAMES)
mode_key = "static"
elif normalized_mode == "laminar_flow":
param_names = SCALING_PARAM_NAMES + LAMINAR_FLOW_PARAM_NAMES
n_physical = len(LAMINAR_FLOW_PARAM_NAMES)
mode_key = "laminar_flow"
else:
raise ValueError(f"Unknown analysis_mode: {analysis_mode}")
# Detect if per-angle scaling was used
n_params_expected_legacy = len(param_names) # 9 for laminar_flow, 5 for static
if n_angles is None:
remainder = max(0, len(result.parameters) - n_physical)
if remainder % 2 != 0 and remainder > 0:
logger.warning(
"Cannot cleanly infer n_angles: parameter count %d - n_physical %d = %d (odd). "
"Falling back to n_angles=1.",
len(result.parameters),
n_physical,
remainder,
)
inferred = remainder // 2 if remainder % 2 == 0 and remainder else 1
n_angles = max(1, inferred)
logger.debug(
"Inferred n_angles=%s for _prepare_parameter_data (mode=%s, params=%s)",
n_angles,
mode_key,
len(result.parameters),
)
n_params_expected_per_angle = 2 * n_angles + n_physical # Per-angle scaling format
n_params_actual = len(result.parameters)
# Check if per-angle scaling was used
if n_params_actual == n_params_expected_per_angle:
# Per-angle scaling detected
logger.info(
f"Detected per-angle scaling: {n_params_actual} parameters for {n_angles} angles"
)
logger.debug(
f"Parameter structure: [{n_angles} contrast] + [{n_angles} offset] + [{n_physical} physical]"
)
# Extract per-angle contrast and offset
contrast_per_angle = result.parameters[:n_angles]
offset_per_angle = result.parameters[n_angles : 2 * n_angles]
# Extract physical parameters (start after 2*n_angles)
physical_params = result.parameters[2 * n_angles :]
physical_uncertainties = (
result.uncertainties[2 * n_angles :]
if result.uncertainties is not None
else None
)
logger.debug(
f"Physical params array (indices {2 * n_angles}-{len(result.parameters) - 1}): {physical_params[:7]}"
)
# Use mean contrast/offset for JSON (representative value; NaN-safe for failed angles)
contrast_mean = float(np.nanmean(contrast_per_angle))
offset_mean = float(np.nanmean(offset_per_angle))
# Compute uncertainties for contrast/offset (RMS of per-angle uncertainties)
if result.uncertainties is not None:
contrast_unc_per_angle = result.uncertainties[:n_angles]
offset_unc_per_angle = result.uncertainties[n_angles : 2 * n_angles]
contrast_unc = float(np.sqrt(np.nanmean(contrast_unc_per_angle**2)))
offset_unc = float(np.sqrt(np.nanmean(offset_unc_per_angle**2)))
else:
contrast_unc = None
offset_unc = None
# Build parameter dictionary
param_dict = {
"contrast": {"value": contrast_mean, "uncertainty": contrast_unc},
"offset": {"value": offset_mean, "uncertainty": offset_unc},
}
# Add physical parameters
physical_param_names = (
STATIC_PARAM_NAMES if mode_key == "static" else LAMINAR_FLOW_PARAM_NAMES
)
for i, name in enumerate(physical_param_names):
param_dict[name] = {
"value": float(physical_params[i]),
"uncertainty": (
float(physical_uncertainties[i])
if physical_uncertainties is not None
else None
),
}
logger.debug(
f"Extracted parameters - contrast_mean={contrast_mean:.4f}, "
f"offset_mean={offset_mean:.4f}, "
f"D0={param_dict.get('D0', {}).get('value', 'N/A')}, "
f"alpha={param_dict.get('alpha', {}).get('value', 'N/A')}, "
f"D_offset={param_dict.get('D_offset', {}).get('value', 'N/A')}, "
f"gamma_dot_t0={param_dict.get('gamma_dot_t0', {}).get('value', 'N/A')}, "
f"beta={param_dict.get('beta', {}).get('value', 'N/A')}, "
f"gamma_dot_t_offset={param_dict.get('gamma_dot_t_offset', {}).get('value', 'N/A')}, "
f"phi0={param_dict.get('phi0', {}).get('value', 'N/A')}"
)
elif n_params_actual == n_params_expected_legacy:
# Legacy scalar scaling format
logger.warning(
f"Legacy format detected: {n_params_actual} parameters (expected {n_params_expected_per_angle} for per-angle). "
f"This typically indicates a failed optimization."
)
param_dict = {}
for i, name in enumerate(param_names):
param_dict[name] = {
"value": float(result.parameters[i]),
"uncertainty": (
float(result.uncertainties[i])
if result.uncertainties is not None
and i < len(result.uncertainties)
else None
),
}
logger.debug(f"Extracted legacy parameters: {list(param_dict.keys())}")
else:
# Unexpected parameter count
raise ValueError(
f"Unexpected parameter count: got {n_params_actual}, expected {n_params_expected_per_angle} "
f"for per-angle scaling with {n_angles} angles, or {n_params_expected_legacy} for legacy format."
)
return param_dict
def _save_results(
args: Any,
result: Any,
device_config: dict[str, Any],
data: dict[str, Any],
config: Any,
) -> None:
"""Save optimization results to output directory.
Parameters
----------
args : argparse.Namespace
Command-line arguments
result : Any
Optimization result (OptimizationResult or MCMC result)
device_config : dict
Device configuration
data : dict
Experimental data dictionary
config : ConfigManager
Configuration manager
"""
import yaml
from homodyne.utils.async_io import AsyncWriter
logger.info(f"Saving results to: {args.output_dir}")
writer = AsyncWriter(max_workers=2)
# Route to appropriate saving method based on optimization method.
if args.method == "nlsq":
logger.info("Using comprehensive NLSQ result saving (async)")
writer.submit_task(save_nlsq_results, result, data, config, args.output_dir)
if args.output_format != "json":
logger.info("Saving legacy results summary for backward compatibility")
elif args.method == "cmc":
logger.info("Using comprehensive CMC result saving (async)")
writer.submit_task(save_mcmc_results, result, data, config, args.output_dir)
if args.output_format != "json":
logger.info("Saving legacy results summary for backward compatibility")
# Create results summary
results_summary = {
"method": args.method,
"analysis_mode": getattr(result, "analysis_mode", "unknown"),
"success": getattr(result, "success", True),
"optimization_time": getattr(result, "optimization_time", 0.0),
"device_config": device_config,
"parameters": {},
"diagnostics": {},
}
# Extract parameters based on result type
if hasattr(result, "parameters"):
results_summary["parameters"] = (
result.parameters.tolist()
if hasattr(result.parameters, "tolist")
else result.parameters
)
elif hasattr(result, "mean_params"):
# MCMC result format
results_summary["parameters"] = {
"contrast": result.mean_contrast,
"offset": result.mean_offset,
"physical_params": (
result.mean_params.tolist()
if hasattr(result.mean_params, "tolist")
else result.mean_params
),
}
# Extract diagnostics
if hasattr(result, "chi_squared"):
results_summary["diagnostics"]["chi_squared"] = result.chi_squared
if hasattr(result, "converged"):
results_summary["diagnostics"]["converged"] = result.converged
# Save in requested format
output_file = args.output_dir / f"homodyne_results.{args.output_format}"
try:
if args.output_format == "yaml":
with open(output_file, "w", encoding="utf-8") as f:
yaml.dump(_json_safe(results_summary), f, default_flow_style=False)
elif args.output_format == "json":
with open(output_file, "w", encoding="utf-8") as f:
json.dump(results_summary, f, indent=2, default=_json_serializer)
elif args.output_format == "npz":
# Save numpy arrays
arrays_to_save: dict[str, Any] = {
"results_summary": np.array([results_summary], dtype=object),
}
if hasattr(result, "samples_params") and result.samples_params is not None:
arrays_to_save["samples_params"] = result.samples_params
np.savez_compressed(output_file, **arrays_to_save)
logger.info(f"OK: Results saved: {output_file}")
except (OSError, ValueError, TypeError) as e:
logger.warning(f"Failed to save results: {e}")
# Wait for background writes to complete
logger.debug("Waiting for background result writes to complete...")
errors = writer.wait_all(timeout=60.0)
if errors:
logger.warning(
"Background save errors (%d): %s",
len(errors),
[f"{type(e).__name__}: {e}" for e in errors],
)
writer.shutdown()
[docs]
def save_nlsq_results(
result: Any,
data: dict[str, Any],
config: Any,
output_dir: Path,
) -> None:
"""Save complete NLSQ optimization results to structured directory.
Main orchestrator function that coordinates all helper functions to save:
- parameters.json: Parameter values and uncertainties
- fitted_data.npz: Experimental + theoretical + residuals
- analysis_results_nlsq.json: Analysis summary
- convergence_metrics.json: Convergence diagnostics
Parameters
----------
result : OptimizationResult
NLSQ optimization result with parameters, uncertainties, chi-squared, etc.
data : dict[str, Any]
Experimental data with phi_angles_list, c2_exp, t1, t2, wavevector_q_list
config : ConfigManager
Configuration with analysis_mode and metadata
output_dir : Path
Output directory (nlsq/ subdirectory will be created)
"""
from homodyne.cli.data_pipeline import _apply_angle_filtering_for_optimization
from homodyne.cli.plot_dispatch import generate_nlsq_plots
# Create nlsq subdirectory
nlsq_dir = output_dir / "nlsq"
nlsq_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Saving NLSQ results to {nlsq_dir}")
# Get analysis mode
analysis_mode = config.config.get("analysis_mode", "static_isotropic")
# Apply phi filtering to data (if enabled in config)
filtered_data = _apply_angle_filtering_for_optimization(data, config)
logger.debug(
f"Applied phi filtering for NLSQ saving: "
f"{len(filtered_data['phi_angles_list'])} angles selected"
)
# Step 1: Extract metadata
logger.debug("Extracting metadata (L, dt, q)")
metadata = _extract_nlsq_metadata(config, filtered_data)
# Step 2: Prepare parameter data
n_angles = len(filtered_data["phi_angles_list"])
logger.debug(
f"Preparing parameter data for {analysis_mode} mode with {n_angles} angles"
)
param_dict = _prepare_parameter_data(result, analysis_mode, n_angles)
# Add timestamp and convergence info to parameters
param_dict_complete = {
"timestamp": datetime.now().isoformat(),
"analysis_mode": analysis_mode,
"chi_squared": float(result.chi_squared),
"reduced_chi_squared": float(result.reduced_chi_squared),
"convergence_status": result.convergence_status,
"parameters": param_dict,
}
# Step 3: Compute theoretical fits with per-angle scaling
logger.info("Computing theoretical fits with per-angle scaling")
try:
fits_dict = compute_theoretical_fits(
result,
filtered_data,
metadata,
analysis_mode=analysis_mode,
include_solver_surface=True,
)
except ValueError as exc:
logger.warning(
f"Could not compute theoretical fits (skipping NPZ and plots): {exc}"
)
fits_dict = None
if fits_dict is not None:
scalar_expanded = fits_dict.pop("scalar_per_angle_expansion", False)
if scalar_expanded:
logger.warning(
"Recorded scalar_per_angle_expansion=true in diagnostics (scalar contrast/offset replicated per angle)."
)
if getattr(result, "nlsq_diagnostics", None) is None:
result.nlsq_diagnostics = {"scalar_per_angle_expansion": True}
elif isinstance(result.nlsq_diagnostics, dict):
result.nlsq_diagnostics["scalar_per_angle_expansion"] = True
# Step 4: Prepare analysis results dictionary
phi_angles = np.asarray(filtered_data["phi_angles_list"])
c2_exp = np.asarray(filtered_data["c2_exp"])
n_angles = len(phi_angles)
n_data_points = c2_exp.size
n_params = len(result.parameters)
degrees_of_freedom = n_data_points - n_params
analysis_dict = {
"method": "nlsq",
"timestamp": datetime.now().isoformat(),
"analysis_mode": analysis_mode,
"fit_quality": {
"chi_squared": float(result.chi_squared),
"reduced_chi_squared": float(result.reduced_chi_squared),
"degrees_of_freedom": degrees_of_freedom,
"quality_flag": result.quality_flag,
},
"dataset_info": {
"n_angles": n_angles,
"n_time_points": c2_exp.shape[1] * c2_exp.shape[2],
"total_data_points": n_data_points,
"q_value": float(metadata["q"]) if metadata.get("q") is not None else None,
},
"optimization_summary": {
"convergence_status": result.convergence_status,
"iterations": result.iterations,
"execution_time": float(result.execution_time),
},
}
# Step 5: Prepare convergence metrics dictionary
convergence_dict = {
"convergence": {
"status": result.convergence_status,
"iterations": result.iterations,
"execution_time": float(result.execution_time),
"final_chi_squared": float(result.chi_squared),
"chi_squared_reduction": (
1.0 - result.reduced_chi_squared
if result.reduced_chi_squared < 1.0
else 0.0
),
},
"recovery_actions": result.recovery_actions,
"quality_flag": result.quality_flag,
"device_info": result.device_info,
}
# Step 5.5: Validate fit quality (T056)
param_labels = getattr(result, "param_labels", None)
if param_labels is None:
if analysis_mode == "laminar_flow":
physical_labels = list(LAMINAR_FLOW_PARAM_NAMES)
else:
physical_labels = list(STATIC_PARAM_NAMES)
scaling_labels = []
for i in range(n_angles):
scaling_labels.append(f"contrast[{i}]")
for i in range(n_angles):
scaling_labels.append(f"offset[{i}]")
param_labels = scaling_labels + physical_labels
# Get bounds from config if available
bounds = None
try:
if hasattr(config, "get_parameter_bounds"):
bounds_list = config.get_parameter_bounds(param_labels)
if bounds_list:
lower = np.array([b["min"] for b in bounds_list])
upper = np.array([b["max"] for b in bounds_list])
bounds = (lower, upper)
except (AttributeError, KeyError, TypeError, ValueError) as e:
logger.debug(f"Could not get bounds for quality validation: {e}")
# Create quality config from nlsq settings
nlsq_config = config.config.get("optimization", {}).get("nlsq", {})
quality_config = FitQualityConfig(
enable=nlsq_config.get("enable_quality_validation", True),
reduced_chi_squared_threshold=nlsq_config.get(
"quality_reduced_chi_squared_threshold", 10.0
),
warn_on_max_restarts=nlsq_config.get("quality_warn_on_max_restarts", True),
warn_on_bounds_hit=nlsq_config.get("quality_warn_on_bounds_hit", True),
warn_on_convergence_failure=nlsq_config.get(
"quality_warn_on_convergence_failure", True
),
bounds_tolerance=nlsq_config.get("quality_bounds_tolerance", 1e-9),
)
# Run validation
quality_report = validate_fit_quality(
result, bounds=bounds, config=quality_config, param_labels=param_labels
)
# Add quality report to convergence dict
convergence_dict.update(quality_report.to_dict())
# Step 6: Save JSON files
logger.info("Saving JSON files (parameters, analysis, convergence)")
_save_nlsq_json_files(
param_dict_complete,
analysis_dict,
convergence_dict,
nlsq_dir,
)
# Step 8b: Persist diagnostics payload if available
diagnostics_payload = getattr(result, "nlsq_diagnostics", None)
if diagnostics_payload:
diagnostics_file = nlsq_dir / "diagnostics.json"
try:
with open(diagnostics_file, "w", encoding="utf-8") as f:
json.dump(_json_safe(diagnostics_payload), f, indent=2)
logger.info(f"Saved diagnostics to {diagnostics_file}")
except (TypeError, OSError) as exc:
logger.warning(
"Failed to save diagnostics.json (%s). Payload keys: %s",
exc,
list(diagnostics_payload.keys()),
)
if fits_dict is None:
logger.info(f"OK: NLSQ results saved successfully to {nlsq_dir}")
logger.info(
" - 3 JSON files (parameters, analysis results, convergence metrics)"
)
logger.info(" - NPZ and plots skipped (theoretical fits unavailable)")
return
# Step 7: Compute normalized residuals
residuals_norm = np.divide(
fits_dict["residuals"],
0.05 * c2_exp,
out=np.zeros_like(fits_dict["residuals"]),
where=(c2_exp != 0),
)
# Convert time arrays to 1D
t1 = np.asarray(filtered_data["t1"])
t2 = np.asarray(filtered_data["t2"])
if t1.ndim == 2:
t1 = t1[:, 0]
if t2.ndim == 2:
t2 = t2[0, :]
logger.debug("Using time arrays in seconds (already converted by data loader)")
# Step 8: Save NPZ file
logger.info("Saving NPZ file with all arrays")
_save_nlsq_npz_file(
phi_angles=phi_angles,
c2_exp=c2_exp,
c2_raw=fits_dict["c2_theoretical_raw"],
c2_scaled=fits_dict["c2_theoretical_scaled"],
c2_solver=fits_dict["c2_solver_scaled"],
per_angle_scaling=fits_dict["per_angle_scaling"],
per_angle_scaling_solver=fits_dict["per_angle_scaling_solver"],
residuals=fits_dict["residuals"],
residuals_norm=residuals_norm,
t1=t1,
t2=t2,
q=metadata["q"],
output_dir=nlsq_dir,
)
logger.info(f"OK: NLSQ results saved successfully to {nlsq_dir}")
logger.info(" - 3 JSON files (parameters, analysis results, convergence metrics)")
n_arrays = 10 + (1 if fits_dict.get("c2_solver_scaled") is not None else 0)
logger.info(
f" - 1 NPZ file ({n_arrays} arrays: experimental + theoretical + residuals)"
)
# Step 9: Generate plots with graceful degradation
try:
logger.info("Generating heatmap plots")
generate_nlsq_plots(
phi_angles=phi_angles,
c2_exp=c2_exp,
c2_theoretical_scaled=fits_dict["c2_theoretical_scaled"],
residuals=fits_dict["residuals"],
t1=t1,
t2=t2,
output_dir=nlsq_dir,
config=config,
c2_solver_scaled=fits_dict["c2_solver_scaled"],
)
logger.info(f" - {len(phi_angles)} PNG plots")
except Exception as e:
logger.warning(f"Plot generation failed (data files still saved): {e}")
logger.debug("Plot error details:", exc_info=True)
[docs]
def save_mcmc_results(
result: Any,
data: dict[str, Any],
config: Any,
output_dir: Path,
) -> None:
"""Save CMC results with comprehensive diagnostics.
Creates cmc/ directory and saves:
1. parameters.json: Posterior mean +/- std for each parameter
2. analysis_results_cmc.json: Sampling summary and diagnostics
3. samples.npz: Full posterior samples, r_hat, ess
4. diagnostics.json: Convergence metrics
5. fitted_data.npz: Experimental + theoretical data (optional)
6. c2_heatmaps_phi_*.png: Comparison plots using posterior mean
7. ArviZ diagnostic plots: pair, forest, energy, autocorr, rank, ess
Parameters
----------
result : CMCResult
CMC optimization result with posterior samples and diagnostics
data : dict
Experimental data dictionary containing c2_exp, phi_angles_list, t1, t2, q
config : ConfigManager
Configuration manager with analysis settings
output_dir : Path
Base output directory (cmc/ subdirectory will be created)
"""
from homodyne.cli.data_pipeline import _apply_angle_filtering_for_optimization
from homodyne.cli.plot_dispatch import generate_nlsq_plots
# CMC-only - always use "cmc" directory
method_name = "cmc"
# Create method-specific directory
method_dir = output_dir / method_name
method_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Saving {method_name.upper()} results to: {method_dir}")
# Step 1: Save parameters.json with posterior statistics
try:
param_dict = _create_mcmc_parameters_dict(result)
param_file = method_dir / "parameters.json"
with open(param_file, "w", encoding="utf-8") as f:
json.dump(param_dict, f, indent=2, default=_json_serializer)
logger.debug(f"Saved parameters to {param_file}")
except (OSError, ValueError, TypeError) as e:
logger.warning(f"Failed to save parameters.json: {e}")
logger.debug("Parameter saving error:", exc_info=True)
# Step 2: Save samples.npz with full posterior
try:
samples_file = method_dir / "samples.npz"
save_dict: dict[str, Any] = {}
# Combine samples from separate attributes
if hasattr(result, "samples_params") and result.samples_params is not None:
samples_list = [result.samples_params]
contrast_inserted = False
if (
hasattr(result, "samples_contrast")
and result.samples_contrast is not None
):
contrast_samples = result.samples_contrast
if contrast_samples.ndim == 1:
contrast_samples = contrast_samples[:, np.newaxis]
samples_list.insert(0, contrast_samples)
contrast_inserted = True
if hasattr(result, "samples_offset") and result.samples_offset is not None:
offset_samples = result.samples_offset
if offset_samples.ndim == 1:
offset_samples = offset_samples[:, np.newaxis]
samples_list.insert(1 if contrast_inserted else 0, offset_samples)
save_dict["samples"] = np.concatenate(samples_list, axis=1)
elif (
hasattr(result, "samples")
and isinstance(result.samples, dict)
and result.samples
):
param_names = getattr(result, "param_names", list(result.samples.keys()))
arrays = []
for name in param_names:
if name in result.samples:
arr = result.samples[name].flatten()
arrays.append(arr[:, np.newaxis])
if arrays:
save_dict["samples"] = np.concatenate(arrays, axis=1)
save_dict["param_names"] = np.array(param_names)
# Add optional diagnostics if available
if hasattr(result, "log_prob") and result.log_prob is not None:
save_dict["log_prob"] = result.log_prob
if hasattr(result, "r_hat") and result.r_hat is not None:
if isinstance(result.r_hat, dict):
save_dict["r_hat"] = np.array(list(result.r_hat.values()))
else:
save_dict["r_hat"] = result.r_hat
# ESS: try ess_bulk (CMCResult) then effective_sample_size (legacy)
ess_source = None
if hasattr(result, "ess_bulk") and result.ess_bulk is not None:
ess_source = result.ess_bulk
elif (
hasattr(result, "effective_sample_size")
and result.effective_sample_size is not None
):
ess_source = result.effective_sample_size
if ess_source is not None:
if isinstance(ess_source, dict):
save_dict["ess"] = np.array(list(ess_source.values()))
else:
save_dict["ess"] = ess_source
if hasattr(result, "acceptance_rate") and result.acceptance_rate is not None:
save_dict["acceptance_rate"] = np.array([result.acceptance_rate])
if save_dict:
np.savez_compressed(str(samples_file), **save_dict)
try:
samples_size_mb = samples_file.stat().st_size / (1024 * 1024)
except OSError:
samples_size_mb = 0
logger.debug("Could not stat samples file")
logger.debug(
f"Saved posterior samples to {samples_file} ({samples_size_mb:.2f} MB)"
)
except (OSError, ValueError, TypeError) as e:
logger.warning(f"Failed to save samples.npz: {e}")
logger.debug("Samples saving error:", exc_info=True)
# Step 3: Save analysis_results_mcmc.json
try:
analysis_dict = _create_mcmc_analysis_dict(result, data, method_name)
analysis_file = method_dir / f"analysis_results_{method_name}.json"
with open(analysis_file, "w", encoding="utf-8") as f:
json.dump(analysis_dict, f, indent=2, default=_json_serializer)
logger.debug(f"Saved analysis results to {analysis_file}")
except (OSError, ValueError, TypeError) as e:
logger.warning(f"Failed to save analysis_results_{method_name}.json: {e}")
logger.debug("Analysis results saving error:", exc_info=True)
# Step 4: Save diagnostics.json
try:
diagnostics_dict = _create_mcmc_diagnostics_dict(result)
diagnostics_file = method_dir / "diagnostics.json"
with open(diagnostics_file, "w", encoding="utf-8") as f:
json.dump(diagnostics_dict, f, indent=2, default=_json_serializer)
logger.debug(f"Saved diagnostics to {diagnostics_file}")
except (OSError, ValueError, TypeError) as e:
logger.warning(f"Failed to save diagnostics.json: {e}")
logger.debug("Diagnostics saving error:", exc_info=True)
# Step 4b: Save shard_diagnostics.json for CMC results
if (
method_name == "cmc"
and hasattr(result, "per_shard_diagnostics")
and result.per_shard_diagnostics
):
try:
shard_diag_file = method_dir / "shard_diagnostics.json"
with open(shard_diag_file, "w", encoding="utf-8") as f:
json.dump(
result.per_shard_diagnostics, f, indent=2, default=_json_serializer
)
logger.debug(f"Saved per-shard diagnostics to {shard_diag_file}")
except (OSError, ValueError, TypeError) as e:
logger.warning(f"Failed to save shard_diagnostics.json: {e}")
logger.debug("Shard diagnostics saving error:", exc_info=True)
# Step 5: Generate heatmap plots (reuse NLSQ plotting)
try:
logger.info("Generating comparison heatmap plots")
# Apply phi filtering to data (if enabled in config)
filtered_data = _apply_angle_filtering_for_optimization(data, config)
logger.debug(
f"Applied phi filtering for MCMC plotting: "
f"{len(filtered_data['phi_angles_list'])} angles selected"
)
# Compute theoretical C2 using posterior mean parameters
c2_result = _compute_theoretical_c2_from_mcmc(result, filtered_data, config)
c2_theoretical_scaled = c2_result["c2_theoretical_scaled"]
c2_theoretical_raw = c2_result["c2_theoretical_raw"]
per_angle_scaling = c2_result["per_angle_scaling"]
# Calculate residuals
c2_exp = filtered_data["c2_exp"]
residuals = c2_exp - c2_theoretical_scaled
# Convert time arrays
t1 = np.asarray(filtered_data["t1"])
t2 = np.asarray(filtered_data["t2"])
if t1.ndim == 2:
t1 = t1[:, 0]
if t2.ndim == 2:
t2 = t2[0, :]
# Save fitted_data.npz (before plotting so data is saved even if plots fail)
phi_angles = np.asarray(filtered_data["phi_angles_list"])
q_val = filtered_data.get("wavevector_q_list", [1.0])[0]
residuals_normalized = residuals / (0.05 * np.where(c2_exp != 0, c2_exp, 1.0))
npz_file = method_dir / "fitted_data.npz"
np.savez_compressed(
npz_file,
phi_angles=phi_angles,
c2_exp=np.asarray(c2_exp),
c2_theoretical_raw=c2_theoretical_raw,
c2_theoretical_scaled=c2_theoretical_scaled,
per_angle_scaling=per_angle_scaling,
residuals=residuals,
residuals_normalized=residuals_normalized,
t1=t1,
t2=t2,
q=np.array([q_val]),
)
try:
fitted_size_mb = npz_file.stat().st_size / (1024 * 1024)
except OSError:
fitted_size_mb = 0
logger.debug("Could not stat fitted data file")
logger.info(f"Saved fitted_data.npz ({fitted_size_mb:.2f} MB)")
# Generate plots using NLSQ plotting function
generate_nlsq_plots(
phi_angles=filtered_data["phi_angles_list"],
c2_exp=c2_exp,
c2_theoretical_scaled=c2_theoretical_scaled,
residuals=residuals,
t1=t1,
t2=t2,
output_dir=method_dir,
config=config,
c2_solver_scaled=None,
)
logger.info(f" - {len(filtered_data['phi_angles_list'])} PNG heatmap plots")
except Exception as e:
logger.warning(f"Heatmap plot generation failed (data files still saved): {e}")
logger.debug("Plot error details:", exc_info=True)
# T057: Calculate and log total file sizes
json_files = list(method_dir.glob("*.json"))
npz_files = list(method_dir.glob("*.npz"))
try:
total_json_kb = sum(f.stat().st_size for f in json_files) / 1024
except OSError:
total_json_kb = 0
try:
total_npz_mb = sum(f.stat().st_size for f in npz_files) / (1024 * 1024)
except OSError:
total_npz_mb = 0
logger.info(f"OK: {method_name.upper()} results saved successfully to {method_dir}")
if (
method_name == "cmc"
and hasattr(result, "per_shard_diagnostics")
and result.per_shard_diagnostics
):
logger.info(
f" - 4 JSON files (parameters, analysis results, diagnostics, shard diagnostics) "
f"({total_json_kb:.1f} KB total)"
)
else:
logger.info(
f" - 3 JSON files (parameters, analysis results, diagnostics) "
f"({total_json_kb:.1f} KB total)"
)
logger.info(
f" - {len(npz_files)} NPZ file(s) (posterior samples) ({total_npz_mb:.2f} MB)"
)
def _create_mcmc_parameters_dict(result: Any) -> dict:
"""Create parameters dictionary with posterior statistics.
This is a wrapper that delegates to homodyne.io.mcmc_writers.
"""
return _io_create_mcmc_parameters_dict(result)
def _create_mcmc_analysis_dict(
result: Any,
data: dict[str, Any],
method_name: str,
) -> dict:
"""Create analysis results dictionary for MCMC/CMC.
This is a wrapper that delegates to homodyne.io.mcmc_writers.
"""
return _io_create_mcmc_analysis_dict(result, data, method_name)
def _create_mcmc_diagnostics_dict(result: Any) -> dict:
"""Create diagnostics dictionary for MCMC/CMC.
This is a wrapper that delegates to homodyne.io.mcmc_writers.
"""
return _io_create_mcmc_diagnostics_dict(result)
def _get_parameter_names(analysis_mode: str) -> list[str]:
"""Get physical parameter names for given analysis mode.
This is a thin wrapper around get_physical_param_names() that handles
unknown modes gracefully with a warning instead of raising an exception.
Parameters
----------
analysis_mode : str
Analysis mode ("static", "static_isotropic", or "laminar_flow")
Returns
-------
list[str]
List of physical parameter names (without scaling params)
"""
# Normalize "static" to "static_isotropic" for compatibility
normalized_mode = "static_isotropic" if analysis_mode == "static" else analysis_mode
try:
# Type cast to handle literal type requirement
if normalized_mode in ("static_isotropic", "laminar_flow"):
return get_physical_param_names(cast(Any, normalized_mode))
else:
logger.warning(
f"Unknown analysis mode: {analysis_mode}, assuming static_isotropic"
)
return get_physical_param_names("static_isotropic")
except ValueError:
logger.warning(
f"Unknown analysis mode: {analysis_mode}, assuming static_isotropic"
)
return get_physical_param_names("static_isotropic")
def _compute_theoretical_c2_from_mcmc(
result: Any,
data: dict[str, Any],
config: Any,
) -> dict[str, np.ndarray]:
"""Compute theoretical C2 using MCMC posterior mean parameters.
Parameters
----------
result : MCMCResult
MCMC result with posterior mean parameters
data : dict
Experimental data dictionary
config : ConfigManager
Configuration manager
Returns
-------
dict[str, np.ndarray]
Dictionary with keys:
- "c2_theoretical_scaled": shape (n_angles, n_t1, n_t2), contrast * g1^2 + offset
- "c2_theoretical_raw": shape (n_angles, n_t1, n_t2), unscaled g1^2
- "per_angle_scaling": shape (n_angles, 2), [contrast, offset] per angle
"""
# Extract parameters from MCMC result
contrast = getattr(result, "mean_contrast", 0.5)
offset = getattr(result, "mean_offset", 1.0)
mean_params_obj = getattr(result, "mean_params", None)
analysis_mode = getattr(result, "analysis_mode", "static")
ordered_names = _get_parameter_names(analysis_mode)
# mean_params may be a ParameterStats (hybrid), dict, or array-like
if mean_params_obj is not None and hasattr(mean_params_obj, "get"):
mean_params = np.array(
[mean_params_obj.get(name, np.nan) for name in ordered_names]
)
elif mean_params_obj is not None and hasattr(mean_params_obj, "as_array"):
mean_params_array = mean_params_obj.as_array
mean_params = (
np.asarray(mean_params_array)
if mean_params_array is not None
else np.array([])
)
elif mean_params_obj is not None:
mean_params = np.asarray(mean_params_obj)
else:
mean_params = np.array([])
# Log parameter values for debugging
logger.info("Computing theoretical C2 with posterior means:")
logger.info(f" Contrast: {contrast:.6f}")
logger.info(f" Offset: {offset:.6f}")
if mean_params_obj is not None and hasattr(mean_params_obj, "get"):
D0_val = mean_params_obj.get("D0", np.nan)
alpha_val = mean_params_obj.get("alpha", np.nan)
D_offset_val = mean_params_obj.get("D_offset", np.nan)
logger.info(
f" Physical params: D0={D0_val:.2f}, alpha={alpha_val:.4f}, D_offset={D_offset_val:.4f}"
)
else:
logger.info(
f" Physical params (positional): [{', '.join(f'{v:.4f}' for v in mean_params[:3])}]"
)
# Validate parameters for reasonable theoretical prediction
if contrast < 0.05:
logger.warning(
f"Very small contrast ({contrast:.4f} < 0.05) may produce nearly constant c2_theory. "
"This suggests poor MCMC convergence or inappropriate initial values."
)
D0_for_validation = (
mean_params_obj.get("D0", 0.0)
if mean_params_obj is not None and hasattr(mean_params_obj, "get")
else mean_params[0]
)
if D0_for_validation >= 99990:
logger.warning(
f"D0 ({D0_for_validation:.1f}) near upper bound (100000). "
"Consider increasing max D0 bound or improving initial values."
)
# Get data arrays
phi_angles = np.asarray(data["phi_angles_list"])
t1 = np.asarray(data["t1"])
t2 = np.asarray(data["t2"])
q_val = data.get("wavevector_q_list", [1.0])[0]
# Convert to 1D if needed
if t1.ndim == 2:
t1 = t1[:, 0]
if t2.ndim == 2:
t2 = t2[0, :]
# Get analysis mode
config_dict_mcmc: dict[str, Any] = (
config.get_config()
if hasattr(config, "get_config")
else cast(dict[str, Any], config)
)
_analysis_mode = config_dict_mcmc.get("analysis_mode", "static_isotropic") # noqa: F841
# Get L parameter (stator-rotor gap) from correct config path
analyzer_params = config_dict_mcmc.get("analyzer_parameters", {})
L = analyzer_params.get("geometry", {}).get("stator_rotor_gap", 2000000.0)
# Get dt parameter
dt = analyzer_params.get("dt", 0.001)
# Compute theoretical C2 for all angles with per-angle scaling estimation
c2_theoretical_list: list[np.ndarray] = []
c2_raw_list: list[np.ndarray] = []
scaling_list: list[list[float]] = []
# Get experimental data for per-angle lstsq fitting
c2_exp = data.get("c2_exp", None)
use_per_angle_lstsq = c2_exp is not None and len(c2_exp) == len(phi_angles)
if use_per_angle_lstsq:
logger.info(
"Using per-angle least squares estimation for contrast/offset "
"(fixes CMC sharding aggregation issue)"
)
# Pre-compute angle-independent factors outside the loop
import jax.numpy as jnp
from homodyne.core.jax_backend import _compute_g1_total_core
params_jax = jnp.array(mean_params)
wavevector_q_squared_half_dt = 0.5 * (q_val**2) * dt
sinc_prefactor = 0.5 / jnp.pi * q_val * L * dt
t1_jax = jnp.array(t1)
t2_jax = jnp.array(t2)
for i, phi in enumerate(phi_angles):
t1_grid, t2_grid = jnp.meshgrid(t1_jax, t2_jax, indexing="ij")
t1_flat = t1_grid.ravel()
t2_flat = t2_grid.ravel()
phi_flat = jnp.full_like(t1_flat, phi)
g1_flat = _compute_g1_total_core(
params=params_jax,
t1=t1_flat,
t2=t2_flat,
phi=phi_flat,
wavevector_q_squared_half_dt=wavevector_q_squared_half_dt,
sinc_prefactor=sinc_prefactor,
dt=dt,
)
g2_theory_np = np.array(g1_flat**2).reshape(len(t1), len(t2))
if use_per_angle_lstsq and c2_exp is not None:
c2_exp_angle = np.array(c2_exp[i])
g2_flat = g2_theory_np.flatten()
c2_flat = c2_exp_angle.flatten()
A = np.vstack([g2_flat, np.ones_like(g2_flat)]).T
try:
lstsq_result = np.linalg.lstsq(A, c2_flat, rcond=None)
contrast_raw, offset_raw = lstsq_result[0]
if contrast_raw > 1.0:
logger.warning(
f"Angle {i} (phi={phi:.2f} deg): lstsq contrast={contrast_raw:.4f} > 1.0 "
"(unphysical). This indicates the physics model underestimates "
"the C2 variation. Clipping to 1.0."
)
elif contrast_raw <= 0:
logger.warning(
f"Angle {i} (phi={phi:.2f} deg): lstsq contrast={contrast_raw:.4f} <= 0 "
"(unphysical). Clipping to 0.01."
)
contrast_i = float(np.clip(np.asarray(contrast_raw), 0.01, 1.0))
offset_i = float(np.clip(np.asarray(offset_raw), 0.5, 1.5))
if i == 0:
logger.debug(
f" Angle {i} (phi={phi:.2f} deg): fitted contrast={contrast_i:.4f}, "
f"offset={offset_i:.4f}"
)
except (np.linalg.LinAlgError, ValueError) as e:
logger.warning(f"lstsq failed for angle {i}, using global values: {e}")
contrast_i = contrast
offset_i = offset
else:
contrast_i = contrast
offset_i = offset
c2_raw_list.append(g2_theory_np)
scaling_list.append([contrast_i, offset_i])
c2_theoretical_np = contrast_i * g2_theory_np + offset_i
c2_theoretical_list.append(c2_theoretical_np)
# Stack all angles
c2_theoretical_scaled = np.array(c2_theoretical_list)
# Validate theoretical prediction quality (NaN-safe)
c2_min = float(np.nanmin(c2_theoretical_scaled))
c2_max = float(np.nanmax(c2_theoretical_scaled))
c2_range = c2_max - c2_min
logger.info(
f"Theoretical C2 range: [{c2_min:.6f}, {c2_max:.6f}], variation: {c2_range:.6f}"
)
# Check if per-angle scaling produced reasonable variation
if use_per_angle_lstsq and c2_exp is not None:
c2_exp_arr = np.asarray(c2_exp)
exp_min = float(np.nanmin(c2_exp_arr))
exp_max = float(np.nanmax(c2_exp_arr))
exp_range = exp_max - exp_min
coverage = c2_range / exp_range if exp_range > 0.01 else 0
logger.info(
f"Per-angle lstsq scaling: fitted range covers {coverage:.1%} of "
f"experimental range [{exp_min:.4f}, {exp_max:.4f}]"
)
if c2_range < 0.01:
logger.warning(
f"Theoretical C2 has very low variation ({c2_range:.6f} < 0.01). "
f"The model prediction is nearly constant (c2 ~ {c2_min:.4f}). "
"This indicates:\n"
" 1. Poor MCMC convergence to local minimum\n"
" 2. Inappropriate initial parameter values\n"
" 3. Physical parameters may have hit bounds\n"
"Recommendations:\n"
" - Run NLSQ first to get better initial values\n"
" - Check parameter bounds (especially D0 upper limit)\n"
" - Verify initial_parameters.values in config are reasonable"
)
return {
"c2_theoretical_scaled": c2_theoretical_scaled,
"c2_theoretical_raw": np.array(c2_raw_list),
"per_angle_scaling": np.array(scaling_list),
}