Source code for homodyne.viz.nlsq_plots

"""NLSQ optimization plotting functions for homodyne XPCS analysis.

This module provides functions for visualizing NLSQ optimization results,
including simulated data, fitted data, and comparison heatmaps.
Extracted from cli/commands.py for better modularity.
"""

import json
import multiprocessing
from pathlib import Path
from typing import Any

import jax.numpy as jnp
import matplotlib
import numpy as np

# Set Agg backend only if no interactive backend is already active.
# Checking is_interactive() alone can be True for non-GUI backends in some
# environments; compare against the known interactive backend families instead.
_current_backend = matplotlib.get_backend().lower()
_interactive_backends = ("qt", "gtk", "wx", "tk", "macosx", "nbagg", "webagg")
if not any(_current_backend.startswith(b) for b in _interactive_backends):
    matplotlib.use("Agg")
import matplotlib.pyplot as plt  # noqa: E402

from homodyne.io.json_utils import json_serializer as _json_serializer  # noqa: E402
from homodyne.utils.logging import get_logger  # noqa: E402

logger = get_logger(__name__)

# Check if Datashader backend is available (actual import done lazily)
try:
    import homodyne.viz.datashader_backend  # noqa: F401

    DATASHADER_AVAILABLE = True
except ImportError:
    DATASHADER_AVAILABLE = False


[docs] def plot_simulated_data( config: dict[str, Any], contrast: float, offset: float, phi_angles_str: str | None, plots_dir: Path, data: dict[str, Any] | None = None, ) -> None: """Generate plots of simulated/theoretical data. Parameters ---------- config : dict[str, Any] Configuration dictionary with analysis_mode and parameters contrast : float Contrast parameter value offset : float Offset parameter value phi_angles_str : str | None Comma-separated phi angles string from CLI plots_dir : Path Output directory for plot files data : dict[str, Any] | None Optional experimental data dictionary for extracting phi angles """ from homodyne.core.models import CombinedModel if contrast < 0.1: logger.warning(f"Very low contrast={contrast:.3f}; plot may appear flat") logger.info( f"Generating simulated data plots (contrast={contrast:.3f}, offset={offset:.3f})", ) # Determine analysis mode analysis_mode = config.get("analysis_mode", "static") logger.info(f"Analysis mode: {analysis_mode}") # Create model model = CombinedModel(analysis_mode) # Get parameters from configuration initial_params_config = config.get("initial_parameters", {}) param_names = initial_params_config.get("parameter_names", []) param_values = initial_params_config.get("values", []) params_dict = ( dict(zip(param_names, param_values, strict=False)) if param_names and param_values else {} ) if analysis_mode.startswith("static"): params = jnp.array( [ params_dict.get("D0", 100.0), params_dict.get("alpha", -0.5), params_dict.get("D_offset", 0.0), ], ) else: params = jnp.array( [ params_dict.get("D0", 100.0), params_dict.get("alpha", -0.5), params_dict.get("D_offset", 0.0), params_dict.get("gamma_dot_t0", 0.01), params_dict.get("beta", 0.5), params_dict.get("gamma_dot_t_offset", 0.0), params_dict.get("phi0", 0.0), ], ) logger.debug( f"Using parameters: {dict(zip(model.parameter_names, params, strict=False))}", ) # Determine phi angles for theoretical simulation plots if phi_angles_str: phi_degrees = np.array([float(x.strip()) for x in phi_angles_str.split(",")]) phi = phi_degrees logger.info( f"Using CLI-provided phi angles for theoretical plots: {phi_degrees}" ) elif data is not None and "phi_angles_list" in data: phi_degrees = np.array(data["phi_angles_list"]) phi = phi_degrees logger.info( f"Using experimental data phi angles for theoretical plots: {phi_degrees}" ) logger.warning( "Theoretical plots using potentially filtered phi angles from experimental data. " "To use all angles, disable phi_filtering in config or provide --phi-angles explicitly.", ) else: phi_degrees = np.linspace(0, 180, 8) phi = phi_degrees logger.info(f"Using default phi angles for theoretical plots: {phi_degrees}") logger.debug(f"Generating simulated data for {len(phi)} phi angles") # Generate time arrays matching configuration specification analyzer_params = config.get("analyzer_parameters", {}) dt = analyzer_params.get("dt", 0.1) start_frame = analyzer_params.get("start_frame", 1) end_frame = analyzer_params.get("end_frame", 8000) n_time_points = end_frame - start_frame + 1 t_vals = dt * np.arange(1, n_time_points + 1) t1_grid, t2_grid = np.meshgrid(t_vals, t_vals, indexing="ij") logger.debug( f"Simulated data time grid: dt={dt}, start_frame={start_frame}, end_frame={end_frame}", ) logger.debug( f"Time range: [{float(t_vals[0]):.4f}, {float(t_vals[-1]):.2f}] seconds with {n_time_points} points", ) # Get wavevector_q and stator_rotor_gap from correct config sections scattering_config = analyzer_params.get("scattering", {}) geometry_config = analyzer_params.get("geometry", {}) q = scattering_config.get("wavevector_q", 0.0054) L_angstroms = geometry_config.get("stator_rotor_gap", 2000000) L_microns = L_angstroms / 10000.0 logger.info( f"Generating theoretical C2 with q={q:.6f} A^-1, L={L_microns:.1f} um ({L_angstroms:.0f} A)", ) # Generate simulated C2 for each phi angle c2_simulated_list = [] for _i, phi_val in enumerate(phi): phi_array = jnp.array([phi_val]) logger.debug(f"Computing C2 for phi={phi_val} deg (phi_array={phi_array})") c2_phi = model.compute_g2( params, t1_grid, t2_grid, phi_array, q, L_angstroms, contrast, offset, dt, ) c2_result = np.array(c2_phi[0]) logger.debug( f" C2 shape: {c2_result.shape}, range: [{float(np.nanmin(c2_result)):.4f}, {float(np.nanmax(c2_result)):.4f}]", ) c2_simulated_list.append(c2_result) c2_simulated = np.array(c2_simulated_list) logger.info(f"Generated simulated C2 with shape: {c2_simulated.shape}") # Compute global color scale c2_min = float(np.nanmin(c2_simulated)) c2_max = float(np.nanmax(c2_simulated)) if not np.isfinite(c2_min) or not np.isfinite(c2_max): c2_min, c2_max = 1.0, 1.5 vmin = max(1.0, c2_min) vmax = min(1.6, c2_max) logger.debug( f"Simulated C2 range [{c2_min:.4f}, {c2_max:.4f}] -> " f"color scale [{vmin:.4f}, {vmax:.4f}] (clamped to [1.0, 1.6])" ) # Save individual C2 heatmap for EACH phi angle n_phi = len(phi) logger.info( f"Generating individual simulated C2 heatmaps for {n_phi} phi angles..." ) for idx in range(n_phi): fig, ax = plt.subplots(figsize=(8, 7)) im = ax.imshow( c2_simulated[idx].T, extent=(t_vals[0], t_vals[-1], t_vals[0], t_vals[-1]), aspect="equal", cmap="jet", origin="lower", interpolation="bilinear", vmin=vmin, vmax=vmax, ) ax.set_xlabel("t₁ (s)", fontsize=11) ax.set_ylabel("t₂ (s)", fontsize=11) ax.set_title( f"Simulated C₂(t₁, t₂) at φ={phi_degrees[idx]:.1f}°", fontsize=13, fontweight="bold", ) cbar = plt.colorbar(im, ax=ax, label="C₂", shrink=0.9) cbar.ax.tick_params(labelsize=9) mean_val = np.nanmean(c2_simulated[idx]) max_val = np.nanmax(c2_simulated[idx]) min_val = np.nanmin(c2_simulated[idx]) stats_text = f"Mean: {mean_val:.4f}\nRange: [{min_val:.4f}, {max_val:.4f}]" ax.text( 0.02, 0.98, stats_text, transform=ax.transAxes, fontsize=9, verticalalignment="top", bbox={"boxstyle": "round", "facecolor": "white", "alpha": 0.8}, ) mode_text = ( f"Mode: {analysis_mode}\nContrast: {contrast:.3f}\nOffset: {offset:.3f}" ) ax.text( 0.02, 0.02, mode_text, transform=ax.transAxes, fontsize=8, verticalalignment="bottom", bbox={"boxstyle": "round", "facecolor": "lightblue", "alpha": 0.7}, ) plt.tight_layout() filename = f"simulated_data_phi_{phi_degrees[idx]:.1f}.png" plt.savefig(plots_dir / filename, dpi=150, bbox_inches="tight") plt.close() logger.debug(f" Saved: {filename}") logger.info(f"Generated {n_phi} individual simulated C2 heatmaps") # Plot diagonal (t1=t2) for all phi angles fig, ax = plt.subplots(figsize=(10, 6)) for idx in range(min(10, len(phi))): diagonal = np.diag(c2_simulated[idx]) ax.plot( t_vals, diagonal, label=f"φ={phi_degrees[idx]:.1f}°", alpha=0.7, linewidth=2, ) ax.set_xlabel("Time t (s)", fontsize=12) ax.set_ylabel("C₂(t, t)", fontsize=12) ax.set_title( f"Simulated C₂ Along Diagonal (t₁=t₂)\n(contrast={contrast:.3f}, offset={offset:.3f}, mode={analysis_mode})", fontsize=13, fontweight="bold", ) ax.legend(loc="best", fontsize=9, ncol=2) ax.grid(True, alpha=0.3) plt.tight_layout() plt.savefig(plots_dir / "simulated_data_diagonal.png", dpi=150, bbox_inches="tight") plt.close() logger.info("Generated diagonal plot: simulated_data_diagonal.png") logger.info("Simulated data plots generated successfully")
[docs] def generate_and_plot_fitted_simulations( result: Any, data: dict[str, Any], config: dict[str, Any], output_dir: Path, angle_filter_func: Any | None = None, ) -> None: """Generate and plot C2 simulations using fitted parameters from optimization. Parameters ---------- result : Any Optimization result containing fitted parameters data : dict[str, Any] Experimental data dictionary containing phi_angles_list, t1, t2, c2_exp config : dict[str, Any] Configuration dictionary with analysis_mode and physics parameters output_dir : Path Output directory path (simulated_data/ subdirectory will be created) angle_filter_func : callable, optional Function to apply angle filtering for optimization """ from homodyne.config.manager import ConfigManager from homodyne.core.models import CombinedModel logger.info("Generating fitted C2 simulations...") # Apply phi filtering to data (if enabled in config) if angle_filter_func is not None: config_for_filtering = ConfigManager(config_override=config) filtered_data = angle_filter_func(data, config_for_filtering) logger.debug( f"Applied phi filtering for fitted simulation plots: " f"{len(filtered_data['phi_angles_list'])} angles selected" ) else: filtered_data = data # Create simulated_data subdirectory simulated_data_dir = output_dir / "simulated_data" simulated_data_dir.mkdir(parents=True, exist_ok=True) # Extract fitted parameters from result if hasattr(result, "parameters") and isinstance(result.parameters, np.ndarray): params_array = result.parameters if len(params_array) >= 2: contrast = float(params_array[0]) offset = float(params_array[1]) physical_params = params_array[2:] if len(params_array) > 2 else [] else: logger.warning( f"Insufficient parameters in result.parameters: {len(params_array)}" ) contrast = 0.5 offset = 1.0 physical_params = [] elif hasattr(result, "mean_params"): contrast = result.mean_contrast offset = result.mean_offset physical_params = result.mean_params else: logger.warning("Cannot extract fitted parameters from result") return # Convert to JAX array if isinstance(physical_params, list): params = jnp.array(physical_params) elif hasattr(physical_params, "tolist"): params = jnp.array(physical_params.tolist()) else: params = jnp.array(physical_params) logger.info( f"Using fitted parameters: contrast={contrast:.4f}, offset={offset:.4f}", ) logger.debug(f"Physical parameters: {params}") # Get analysis mode analysis_mode = config.get("analysis_mode", "static_isotropic") logger.info(f"Analysis mode: {analysis_mode}") # Create model model = CombinedModel(analysis_mode) # Get experimental data structure (using filtered data) phi_angles_list = filtered_data.get("phi_angles_list", None) t1 = filtered_data.get("t1", None) t2 = filtered_data.get("t2", None) if phi_angles_list is None or t1 is None or t2 is None: logger.warning("Missing experimental data structure (phi_angles_list, t1, t2)") return t1_grid = jnp.array(t1) t2_grid = jnp.array(t2) # Get physics parameters from config analyzer_params = config.get("analyzer_parameters", {}) scattering_config = analyzer_params.get("scattering", {}) geometry_config = analyzer_params.get("geometry", {}) dt = analyzer_params.get("dt", 0.1) q = scattering_config.get("wavevector_q", 0.0054) L_angstroms = geometry_config.get("stator_rotor_gap", 2000000) logger.debug(f"Physics: q={q:.6f} A^-1, L={L_angstroms:.0f} A, dt={dt}") # Generate fitted C2 for each phi angle c2_fitted_list = [] for _i, phi_deg in enumerate(phi_angles_list): phi_array = jnp.array([phi_deg]) logger.debug(f"Generating fitted C2 for phi={phi_deg:.1f} deg") c2_phi = model.compute_g2( params, t1_grid, t2_grid, phi_array, q, L_angstroms, contrast, offset, dt, ) c2_result = np.array(c2_phi[0]) c2_fitted_list.append(c2_result) logger.debug( f" C2 range: [{float(np.nanmin(c2_result)):.4f}, {float(np.nanmax(c2_result)):.4f}]" ) c2_fitted = np.array(c2_fitted_list) logger.info(f"Generated fitted C2 with shape: {c2_fitted.shape}") # Save fitted C2 data as NPZ npz_file = simulated_data_dir / "c2_fitted_data.npz" np.savez_compressed( npz_file, c2_data=c2_fitted, phi_angles=phi_angles_list, t1=t1, t2=t2, initial_params=params, contrast=contrast, offset=offset, ) logger.info(f"Saved fitted C2 data: {npz_file}") # Save configuration for fitted simulation config_file = simulated_data_dir / "simulation_config_fitted.json" sim_config = { "command_line_args": { "contrast": float(contrast), "offset": float(offset), "phi_angles": ",".join(f"{x:.1f}" for x in phi_angles_list), }, "parameters": { "values": params.tolist() if hasattr(params, "tolist") else list(params), "names": model.parameter_names, }, "data_type": "fitted", "analysis_mode": analysis_mode, } with open(config_file, "w", encoding="utf-8") as f: json.dump(sim_config, f, indent=2, default=_json_serializer) logger.info(f"Saved simulation config: {config_file}") # Generate individual plots for each phi angle logger.info( f"Generating individual fitted C2 plots for {len(phi_angles_list)} angles...", ) # Get time extent for plotting if t1 is not None and t2 is not None: t1_min, t1_max = float(np.nanmin(t1)), float(np.nanmax(t1)) t2_min, t2_max = float(np.nanmin(t2)), float(np.nanmax(t2)) # imshow uses .T so after transpose: x=t1, y=t2 extent = [t1_min, t1_max, t2_min, t2_max] xlabel = "t₁ (s)" ylabel = "t₂ (s)" else: extent = None xlabel = "t₁ Index" ylabel = "t₂ Index" for i, phi_deg in enumerate(phi_angles_list): fig, ax = plt.subplots(figsize=(8, 7)) im = ax.imshow( c2_fitted[i].T, aspect="equal", cmap="jet", origin="lower", extent=extent, ) ax.set_xlabel(xlabel, fontsize=11) ax.set_ylabel(ylabel, fontsize=11) ax.set_title( f"Fitted C₂(t₁, t₂) at φ={phi_deg:.1f}°", fontsize=13, fontweight="bold", ) cbar = plt.colorbar(im, ax=ax, label="C₂", shrink=0.9) cbar.ax.tick_params(labelsize=9) mean_val = float(np.nanmean(c2_fitted[i])) max_val = float(np.nanmax(c2_fitted[i])) min_val = float(np.nanmin(c2_fitted[i])) stats_text = f"Mean: {mean_val:.4f}\nRange: [{min_val:.4f}, {max_val:.4f}]" ax.text( 0.02, 0.98, stats_text, transform=ax.transAxes, fontsize=9, verticalalignment="top", bbox={"boxstyle": "round", "facecolor": "white", "alpha": 0.8}, ) fit_text = f"Fitted Parameters\nContrast: {contrast:.3f}\nOffset: {offset:.3f}" ax.text( 0.02, 0.02, fit_text, transform=ax.transAxes, fontsize=8, verticalalignment="bottom", bbox={"boxstyle": "round", "facecolor": "lightgreen", "alpha": 0.7}, ) plt.tight_layout() filename = f"simulated_c2_fitted_phi_{phi_deg:.1f}deg.png" plt.savefig(simulated_data_dir / filename, dpi=150, bbox_inches="tight") plt.close() logger.debug(f" Saved: {filename}") logger.info(f"Generated {len(phi_angles_list)} individual fitted C2 plots") logger.info(f"Fitted simulation data saved to: {simulated_data_dir}")
[docs] def generate_nlsq_plots( phi_angles: np.ndarray, c2_exp: np.ndarray, c2_theoretical_scaled: np.ndarray, residuals: np.ndarray, t1: np.ndarray, t2: np.ndarray, output_dir: Path, config: Any = None, use_datashader: bool = True, parallel: bool = True, *, c2_solver_scaled: np.ndarray | None = None, ) -> None: """Generate 3-panel heatmap plots for NLSQ fit visualization. Parameters ---------- phi_angles : np.ndarray Scattering angles in degrees (n_angles,) c2_exp : np.ndarray Experimental correlation data (n_angles, n_t1, n_t2) c2_theoretical_scaled : np.ndarray Scaled theoretical fits (n_angles, n_t1, n_t2) residuals : np.ndarray Residuals: exp - scaled (n_angles, n_t1, n_t2) t1 : np.ndarray Time array 1 in seconds (n_t1,) t2 : np.ndarray Time array 2 in seconds (n_t2,) output_dir : Path Output directory for PNG files config : ConfigManager or dict, optional Configuration object/dict containing output.plots settings use_datashader : bool, default=True Legacy parameter for backward compatibility parallel : bool, default=True Generate plots in parallel using multiprocessing c2_solver_scaled : np.ndarray | None Optional solver-computed scaled C2 for display """ logger.info(f"Generating heatmap plots for {len(phi_angles)} angles") # Determine rendering mode from config preview_mode = use_datashader width = 1200 height = 1200 fit_surface_mode = "solver" color_scale_cfg: dict[str, Any] = {} if config is not None: config_dict = config.config if hasattr(config, "config") else config output_config = config_dict.get("output", {}) plots_config = output_config.get("plots", {}) preview_mode = plots_config.get("preview_mode", preview_mode) fit_surface_mode = plots_config.get("fit_surface", fit_surface_mode) color_scale_cfg = plots_config.get("color_scale", {}) datashader_config = plots_config.get("datashader", {}) width = datashader_config.get("canvas_width", width) height = datashader_config.get("canvas_height", height) logger.debug( f"Plot config: preview_mode={preview_mode}, " f"canvas={width}x{height}, parallel={parallel}", ) # Determine which fit surface to display use_solver_surface = fit_surface_mode == "solver" and c2_solver_scaled is not None c2_fit_display = c2_solver_scaled if use_solver_surface else c2_theoretical_scaled residuals_display = c2_exp - c2_fit_display percentile_min = color_scale_cfg.get("percentile_min", 1.0) percentile_max = color_scale_cfg.get("percentile_max", 99.0) # Color scaling configuration # Don't set explicit vmin/vmax here - let _resolve_color_limits compute # per-panel percentile-based limits to show structure in both exp and fit data _c2_min = min(float(np.nanmin(c2_exp)), float(np.nanmin(c2_fit_display))) # noqa: F841 _c2_max = max(float(np.nanmax(c2_exp)), float(np.nanmax(c2_fit_display))) # noqa: F841 logger.debug( f"C2 data range: exp=[{float(np.nanmin(c2_exp)):.4f}, {float(np.nanmax(c2_exp)):.4f}], " f"fit=[{float(np.nanmin(c2_fit_display)):.4f}, {float(np.nanmax(c2_fit_display)):.4f}]" ) # Pass percentile settings but NOT fixed vmin/vmax - let each panel auto-scale color_options = { "percentile_min": percentile_min, "percentile_max": percentile_max, } surface_label = "solver" if use_solver_surface else "posthoc" logger.info(f"Plotting fit surface: {surface_label}") # Select backend based on mode if preview_mode and DATASHADER_AVAILABLE: logger.info("Using Datashader backend (preview mode, fast rendering)") _generate_plots_datashader( phi_angles, c2_exp, c2_fit_display, residuals_display, t1, t2, output_dir, parallel=parallel, width=1200, height=1200, color_options=color_options, ) else: if preview_mode and not DATASHADER_AVAILABLE: logger.warning( "Preview mode (Datashader) requested but Datashader not available. " "Install with: pip install datashader xarray colorcet" ) logger.info("Falling back to matplotlib backend (publication quality)") else: logger.info("Using matplotlib backend (publication quality)") _generate_plots_matplotlib( phi_angles, c2_exp, c2_fit_display, residuals_display, t1, t2, output_dir, color_options=color_options, )
def _worker_init_cpu_only(): """Initialize worker process with CPU-only mode.""" import os os.environ["JAX_PLATFORMS"] = "cpu" os.environ["CUDA_VISIBLE_DEVICES"] = "-1" os.environ["XLA_PYTHON_CLIENT_PREALLOCATE"] = "false" os.environ["XLA_PYTHON_CLIENT_ALLOCATOR"] = "platform" os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" def _plot_single_angle_datashader(args): """Plot single angle for parallel processing (picklable module-level function).""" import os os.environ["JAX_PLATFORMS"] = "cpu" os.environ["CUDA_VISIBLE_DEVICES"] = "-1" os.environ["XLA_PYTHON_CLIENT_PREALLOCATE"] = "false" os.environ["XLA_PYTHON_CLIENT_ALLOCATOR"] = "platform" os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" from homodyne.viz.datashader_backend import plot_c2_comparison_fast ( i, phi_angles, c2_exp, c2_fit, residuals, t1, t2, output_dir, width, height, color_options, ) = args phi = phi_angles[i] output_file = output_dir / f"c2_heatmaps_phi_{phi:.1f}deg.png" c2_exp_cpu = np.asarray(c2_exp) c2_fit_cpu = np.asarray(c2_fit) residuals_cpu = np.asarray(residuals) t1_cpu = np.asarray(t1) t2_cpu = np.asarray(t2) plot_c2_comparison_fast( c2_exp_cpu, c2_fit_cpu, residuals_cpu, t1_cpu, t2_cpu, output_file, phi_angle=phi, width=width, height=height, **color_options, ) return output_file def _generate_plots_datashader( phi_angles: np.ndarray, c2_exp: np.ndarray, c2_fit_display: np.ndarray, residuals: np.ndarray, t1: np.ndarray, t2: np.ndarray, output_dir: Path, parallel: bool = True, width: int = 1200, height: int = 1200, color_options: dict[str, Any] | None = None, ) -> None: """Generate plots using Datashader backend with optional parallelization.""" if parallel and len(phi_angles) > 1: ctx = multiprocessing.get_context("spawn") n_workers = min(multiprocessing.cpu_count(), len(phi_angles)) logger.info(f"Using {n_workers} parallel workers for plotting (spawn method)") args_list = [ ( i, phi_angles, c2_exp[i], c2_fit_display[i], residuals[i], t1, t2, output_dir, width, height, color_options or {}, ) for i in range(len(phi_angles)) ] try: with ctx.Pool( processes=n_workers, initializer=_worker_init_cpu_only ) as pool: timeout_seconds = (60 * len(phi_angles) / n_workers) + 120 logger.debug(f"Parallel plotting timeout: {timeout_seconds:.0f}s") result = pool.map_async(_plot_single_angle_datashader, args_list) result.get(timeout=timeout_seconds) logger.info(f"Generated {len(phi_angles)} heatmap plots (parallel)") except (OSError, RuntimeError, TimeoutError) as e: logger.warning(f"Parallel plotting failed: {e.__class__.__name__}: {e}") logger.info("Falling back to sequential plotting...") for i in range(len(phi_angles)): args = ( i, phi_angles, c2_exp[i], c2_fit_display[i], residuals[i], t1, t2, output_dir, width, height, color_options or {}, ) _plot_single_angle_datashader(args) logger.info( f"Generated {len(phi_angles)} heatmap plots (sequential fallback)" ) else: for i in range(len(phi_angles)): args = ( i, phi_angles, c2_exp[i], c2_fit_display[i], residuals[i], t1, t2, output_dir, width, height, color_options or {}, ) _plot_single_angle_datashader(args) logger.info(f"Generated {len(phi_angles)} heatmap plots (sequential)") def _generate_plots_matplotlib( phi_angles: np.ndarray, c2_exp: np.ndarray, c2_fit_display: np.ndarray, residuals: np.ndarray, t1: np.ndarray, t2: np.ndarray, output_dir: Path, color_options: dict[str, Any] | None = None, ) -> None: """Generate plots using matplotlib backend (publication quality).""" logger.info(f"Generating heatmap plots for {len(phi_angles)} angles") for i, phi in enumerate(phi_angles): fig, axes = plt.subplots(1, 3, figsize=(15, 5)) # Use separate color limits for experimental and fitted panels # This allows seeing structure in both even if they have different amplitudes vmin_exp, vmax_exp = _resolve_color_limits(c2_exp[i], color_options) vmin_fit, vmax_fit = _resolve_color_limits(c2_fit_display[i], color_options) # Panel 1: Experimental data with its own scale im0 = axes[0].imshow( c2_exp[i].T, origin="lower", aspect="equal", cmap="jet", extent=[t1[0], t1[-1], t2[0], t2[-1]], vmin=vmin_exp, vmax=vmax_exp, ) axes[0].set_title(f"Experimental C₂ (φ={phi:.1f}°)", fontsize=12) axes[0].set_xlabel("t₁ (s)", fontsize=10) axes[0].set_ylabel("t₂ (s)", fontsize=10) cbar0 = plt.colorbar(im0, ax=axes[0], label="C₂(t₁,t₂)") cbar0.ax.tick_params(labelsize=8) # Panel 2: Theoretical fit with its own scale (shows structure even if amplitude differs) im1 = axes[1].imshow( c2_fit_display[i].T, origin="lower", aspect="equal", cmap="jet", extent=[t1[0], t1[-1], t2[0], t2[-1]], vmin=vmin_fit, vmax=vmax_fit, ) axes[1].set_title(f"Classical Fit (φ={phi:.1f}°)", fontsize=12) axes[1].set_xlabel("t₁ (s)", fontsize=10) axes[1].set_ylabel("t₂ (s)", fontsize=10) cbar1 = plt.colorbar(im1, ax=axes[1], label="C₂(t₁,t₂)") cbar1.ax.tick_params(labelsize=8) # Panel 3: Residuals residual_min = float(np.nanmin(residuals[i])) residual_max = float(np.nanmax(residuals[i])) if not np.isfinite(residual_min) or not np.isfinite(residual_max): residual_min, residual_max = -0.1, 0.1 im2 = axes[2].imshow( residuals[i].T, origin="lower", aspect="equal", cmap="jet", vmin=residual_min, vmax=residual_max, extent=[t1[0], t1[-1], t2[0], t2[-1]], ) axes[2].set_title(f"Residuals (φ={phi:.1f}°)", fontsize=12) axes[2].set_xlabel("t₁ (s)", fontsize=10) axes[2].set_ylabel("t₂ (s)", fontsize=10) cbar2 = plt.colorbar(im2, ax=axes[2], label="ΔC₂") cbar2.ax.tick_params(labelsize=8) plt.tight_layout() plot_file = output_dir / f"c2_heatmaps_phi_{phi:.1f}deg.png" plt.savefig(plot_file, dpi=300, bbox_inches="tight") plt.close(fig) logger.debug(f"Saved plot: {plot_file}") logger.info(f"Generated {len(phi_angles)} heatmap plots (matplotlib)") def _resolve_color_limits( matrix: np.ndarray, color_options: dict[str, Any] | None, ) -> tuple[float, float]: """Resolve color limits for heatmap plotting. Uses percentile-based limits by default to show data structure, with fallback to [1.0, 1.5] only for empty data. """ opts = color_options or {} vmin = opts.get("vmin") vmax = opts.get("vmax") percentile_min = opts.get("percentile_min", 1.0) percentile_max = opts.get("percentile_max", 99.0) # Always use percentile-based limits if not explicitly set # Use nan-safe variants so masked/dead-pixel data does not yield NaN limits. if matrix.size > 0: if vmin is None: vmin = float(np.nanpercentile(matrix, percentile_min)) if vmax is None: vmax = float(np.nanpercentile(matrix, percentile_max)) # Fallback only for empty data or all-NaN data if vmin is None or not np.isfinite(vmin): vmin = 1.0 if vmax is None or not np.isfinite(vmax): vmax = 1.5 # Guard against degenerate range (constant/flat data) which causes imshow # to emit a warning and render a blank image with an invalid colorbar. if vmin >= vmax: vmax = vmin + 1.0 return vmin, vmax