Source code for homodyne.optimization.cmc.io

"""I/O utilities for CMC results.

This module provides functions for saving CMC results to files:

- samples.npz: ArviZ-compatible posterior samples
- fitted_data.npz: Fitted data matching NLSQ format
- parameters.json: Posterior statistics
- diagnostics.json: Convergence diagnostics
"""

from __future__ import annotations

import json
import math
from pathlib import Path
from typing import TYPE_CHECKING, Any

import numpy as np

from homodyne.optimization.cmc.diagnostics import create_diagnostics_dict
from homodyne.utils.logging import get_logger

if TYPE_CHECKING:
    from homodyne.optimization.cmc.results import CMCResult

logger = get_logger(__name__)

# Schema version for samples.npz
SAMPLES_SCHEMA_VERSION = (1, 0)


[docs] def save_samples_npz( result: CMCResult, output_path: Path, ) -> None: """Save posterior samples in ArviZ-compatible format. The saved file can be loaded directly with numpy and converted to ArviZ InferenceData without modification. Parameters ---------- result : CMCResult CMC result with samples. output_path : Path Output file path. File Format ----------- - schema_version: (major, minor) version tuple - posterior_samples: (n_chains, n_samples, n_params) array - param_names: Parameter names in sampling order - r_hat: R-hat per parameter - ess_bulk: Bulk ESS per parameter - ess_tail: Tail ESS per parameter - divergences: Divergences per chain - analysis_mode: Analysis mode string - n_phi: Number of phi angles - n_chains: Number of chains - n_samples: Samples per chain """ # Build samples array (n_chains, n_samples, n_params) samples_3d = result.get_samples_array() # Build arrays for diagnostics r_hat_arr = np.array( [result.r_hat.get(name, np.nan) for name in result.param_names] ) ess_bulk_arr = np.array( [result.ess_bulk.get(name, np.nan) for name in result.param_names] ) ess_tail_arr = np.array( [result.ess_tail.get(name, np.nan) for name in result.param_names] ) # Count per-angle parameters to get n_phi. # n_phi here counts sampled contrast sites (1 for auto, n for individual), not physical angles # Individual mode: "contrast_0", "contrast_1", ... → n_phi = count of indexed sites. # Auto mode: "contrast" (single unindexed site) → n_phi = 1 (at least one angle). # Constant/constant_averaged mode: no sampled contrast sites → n_phi = 0. # Pick the first scaling name from the registry to count indexed sites from homodyne.config.parameter_registry import ParameterRegistry _first_scaling = ParameterRegistry().scaling_names[0] _contrast_indexed = sum( 1 for name in result.param_names if name.startswith(f"{_first_scaling}_") ) _has_contrast_auto = _first_scaling in result.param_names n_phi = ( _contrast_indexed if _contrast_indexed > 0 else (1 if _has_contrast_auto else 0) ) # Save to compressed npz (P2-R5-01: same fix as nlsq_writers.py P2-17) # Float64 posterior samples compress ~5-10x via zlib deflate. np.savez_compressed( output_path, # Schema version schema_version=np.array(SAMPLES_SCHEMA_VERSION), # Samples posterior_samples=samples_3d, param_names=np.array(result.param_names), # Diagnostics r_hat=r_hat_arr, ess_bulk=ess_bulk_arr, ess_tail=ess_tail_arr, divergences=np.array([result.divergences]), # Metadata analysis_mode=np.array([result.analysis_mode]), n_phi=np.array([n_phi]), n_chains=np.array([result.n_chains]), n_samples=np.array([result.n_samples]), ) logger.info(f"Saved samples.npz: {output_path} ({samples_3d.shape})")
[docs] def load_samples_npz( input_path: Path, ) -> dict[str, Any]: """Load samples from npz file. Parameters ---------- input_path : Path Path to samples.npz file. Returns ------- dict[str, Any] Loaded data dictionary. Raises ------ ValueError If path validation fails (path traversal, non-existent file). FileNotFoundError If the file does not exist. """ # Validate path for security input_path = Path(input_path).resolve() # Check file exists if not input_path.exists(): raise FileNotFoundError(f"Samples file not found: {input_path}") # Check file extension if input_path.suffix != ".npz": raise ValueError(f"Expected .npz file, got: {input_path.suffix}") # Check file is readable (not a directory, symlink to non-file, etc.) if not input_path.is_file(): raise ValueError(f"Path is not a regular file: {input_path}") # All arrays use native numpy dtypes (string, float, int) — no pickle needed data = np.load(input_path, allow_pickle=False) return { "schema_version": tuple(data["schema_version"]), "posterior_samples": data["posterior_samples"], "param_names": data["param_names"].tolist(), "r_hat": data["r_hat"], "ess_bulk": data["ess_bulk"], "ess_tail": data["ess_tail"], "divergences": int(data["divergences"][0]), "analysis_mode": str(data["analysis_mode"][0]), "n_phi": int(data["n_phi"][0]), "n_chains": int(data["n_chains"][0]), "n_samples": int(data["n_samples"][0]), }
[docs] def samples_to_arviz( samples_data: dict[str, Any], ): """Convert loaded samples to ArviZ InferenceData. Parameters ---------- samples_data : dict[str, Any] Data from load_samples_npz(). Returns ------- az.InferenceData ArviZ-compatible data structure. """ import arviz as az samples = samples_data["posterior_samples"] param_names = samples_data["param_names"] posterior_dict = {name: samples[:, :, i] for i, name in enumerate(param_names)} return az.from_dict({"posterior": posterior_dict})
[docs] def save_fitted_data_npz( result: CMCResult, c2_exp: np.ndarray, c2_fitted: np.ndarray, c2_fitted_std: np.ndarray, t1: np.ndarray, t2: np.ndarray, phi_angles: np.ndarray, q: float, output_path: Path, ) -> None: """Save fitted data in NLSQ-compatible format. Parameters ---------- result : CMCResult CMC result. c2_exp : np.ndarray Experimental C2 data. c2_fitted : np.ndarray Fitted C2 (posterior mean). c2_fitted_std : np.ndarray Fitted C2 uncertainty. t1 : np.ndarray Time coordinates t1. t2 : np.ndarray Time coordinates t2. phi_angles : np.ndarray Phi angles. q : float Wavevector. output_path : Path Output file path. File Format ----------- - c2_exp: Experimental data - c2_fitted: Posterior mean prediction - residuals: c2_exp - c2_fitted - c2_fitted_std: Posterior uncertainty - c2_fitted_5pct: 5th percentile - c2_fitted_95pct: 95th percentile - q: Wavevector - phi_angles: Phi angles - t1, t2: Time coordinates """ residuals = c2_exp - c2_fitted # Compute percentiles if we have samples c2_fitted_5pct = c2_fitted - 1.645 * c2_fitted_std # ~90% CI c2_fitted_95pct = c2_fitted + 1.645 * c2_fitted_std np.savez_compressed( output_path, # Core data (NLSQ parity) c2_exp=c2_exp, c2_fitted=c2_fitted, residuals=residuals, # Coordinates q=np.array([q]), phi_angles=phi_angles, t1=t1, t2=t2, # CMC-specific uncertainty c2_fitted_std=c2_fitted_std, c2_fitted_5pct=c2_fitted_5pct, c2_fitted_95pct=c2_fitted_95pct, ) logger.info(f"Saved fitted_data.npz: {output_path} (shape={c2_exp.shape})")
[docs] def save_parameters_json( result: CMCResult, output_path: Path, ) -> None: """Save posterior parameter statistics to JSON. Parameters ---------- result : CMCResult CMC result. output_path : Path Output file path. """ stats = result.get_posterior_stats() # Convert numpy types to Python types for JSON stats_json: dict[str, dict[str, float | str | None]] = {} for name, param_stats in stats.items(): converted: dict[str, float | str | None] = {} for k, v in param_stats.items(): fv = float(v) if math.isnan(fv): converted[k] = None elif math.isinf(fv): converted[k] = "Infinity" if fv > 0 else "-Infinity" else: converted[k] = fv stats_json[name] = converted with open(output_path, "w", encoding="utf-8") as f: json.dump(stats_json, f, indent=2) logger.info(f"Saved parameters.json: {output_path} ({len(stats)} params)")
[docs] def save_diagnostics_json( result: CMCResult, output_path: Path, warnings: list[str] | None = None, ) -> None: """Save convergence diagnostics to JSON. Parameters ---------- result : CMCResult CMC result. output_path : Path Output file path. warnings : list[str] | None Warning messages from convergence check. """ diagnostics = create_diagnostics_dict( r_hat=result.r_hat, ess_bulk=result.ess_bulk, ess_tail=result.ess_tail, divergences=result.divergences, convergence_status=result.convergence_status, warnings=warnings or [], n_chains=result.n_chains, n_warmup=result.n_warmup, n_samples=result.n_samples, warmup_time=result.warmup_time, sampling_time=max(0.0, result.execution_time - result.warmup_time), num_shards=result.num_shards, ) # Convert numpy types with NaN/Inf sanitization def convert_numpy(obj): if isinstance(obj, np.ndarray): return convert_numpy(obj.tolist()) if isinstance(obj, (np.integer, np.floating)): v = float(obj) if math.isnan(v): return None if math.isinf(v): return "Infinity" if v > 0 else "-Infinity" return v if isinstance(obj, dict): return {k: convert_numpy(v) for k, v in obj.items()} if isinstance(obj, list): return [convert_numpy(item) for item in obj] if isinstance(obj, float): if math.isnan(obj): return None if math.isinf(obj): return "Infinity" if obj > 0 else "-Infinity" return obj diagnostics_json = convert_numpy(diagnostics) with open(output_path, "w", encoding="utf-8") as f: json.dump(diagnostics_json, f, indent=2) logger.info(f"Saved diagnostics.json: {output_path}")
[docs] def save_all_results( result: CMCResult, output_dir: Path, c2_exp: np.ndarray | None = None, c2_fitted: np.ndarray | None = None, c2_fitted_std: np.ndarray | None = None, t1: np.ndarray | None = None, t2: np.ndarray | None = None, phi_angles: np.ndarray | None = None, q: float | None = None, ) -> dict[str, Path]: """Save all CMC result files. Parameters ---------- result : CMCResult CMC result. output_dir : Path Output directory. c2_exp, c2_fitted, c2_fitted_std : np.ndarray | None Data for fitted_data.npz. t1, t2, phi_angles : np.ndarray | None Coordinates. q : float | None Wavevector. Returns ------- dict[str, Path] Paths to saved files. """ output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) saved_files: dict[str, Path] = {} # Save samples.npz samples_path = output_dir / "samples.npz" save_samples_npz(result, samples_path) saved_files["samples"] = samples_path # Save parameters.json params_path = output_dir / "parameters.json" save_parameters_json(result, params_path) saved_files["parameters"] = params_path # Save diagnostics.json diag_path = output_dir / "diagnostics.json" save_diagnostics_json(result, diag_path) saved_files["diagnostics"] = diag_path # Save fitted_data.npz if data provided if all( x is not None for x in [c2_exp, c2_fitted, c2_fitted_std, t1, t2, phi_angles, q] ): fitted_path = output_dir / "fitted_data.npz" save_fitted_data_npz( result=result, c2_exp=c2_exp, c2_fitted=c2_fitted, c2_fitted_std=c2_fitted_std, t1=t1, t2=t2, phi_angles=phi_angles, q=q, output_path=fitted_path, ) saved_files["fitted_data"] = fitted_path return saved_files