Source code for homodyne.config.manager

"""Minimal Configuration Management for Homodyne
===================================================

Simplified configuration system with preserved API compatibility.
Provides essential YAML/JSON loading with the same interface as the original
ConfigManager while removing complex features not needed for core functionality.

Note: GPU support removed in v2.3.0 - CPU-only execution.
"""

import json
from pathlib import Path
from typing import Any

# Handle YAML dependency
try:
    from types import ModuleType

    import yaml

    HAS_YAML = True
    yaml_module: ModuleType | None = yaml
    _YAMLError: type[BaseException] = yaml.YAMLError
except ImportError:
    HAS_YAML = False
    yaml_module = None
    _YAMLError = Exception

# Import minimal logging
try:
    from homodyne.utils.logging import get_logger

    HAS_LOGGING = True
except ImportError:
    import logging
    from typing import Any as _Any

    HAS_LOGGING = False

    def get_logger(name: str, **kwargs: _Any) -> logging.Logger:  # type: ignore[misc]
        return logging.getLogger(name)


logger = get_logger(__name__)


[docs] class ConfigManager: """Minimal configuration manager for homodyne v2 scattering analysis. Provides simplified configuration loading with preserved API compatibility. Key Features: - YAML/JSON configuration file loading - Compatible .config attribute access - Preserved constructor signature - Graceful fallback to defaults - CPU-only execution (GPU support removed in v2.3.0) Usage: config_manager = ConfigManager('my_config.yaml') data = config_manager.config """
[docs] def __init__( self, config_file: str = "homodyne_config.yaml", config_override: dict[str, Any] | None = None, ): """Initialize configuration manager. Parameters ---------- config_file : str Path to YAML/JSON configuration file config_override : dict, optional Override configuration data instead of loading from file """ self.config_file = config_file self.config: dict[str, Any] | None = None # Cache for ParameterManager to avoid repeated instantiation self._cached_param_manager: Any | None = None if config_override is not None: self.config = config_override.copy() logger.info("Configuration loaded from override data") else: self.load_config() # Normalize schema for backward compatibility self._normalize_schema()
[docs] def load_config(self) -> None: """Load and parse YAML/JSON configuration file. Supports both YAML and JSON formats with graceful fallback to default configuration if loading fails. """ try: if self.config_file is None: raise ValueError("Configuration file path cannot be None") config_path = Path(self.config_file) if not config_path.exists(): raise FileNotFoundError( f"Configuration file not found: {self.config_file}", ) # Determine file format and load accordingly file_extension = config_path.suffix.lower() # Use 8KB buffering for improved I/O performance on large config files with open(config_path, buffering=8192, encoding="utf-8") as f: if file_extension in [".yaml", ".yml"] and HAS_YAML and yaml_module: self.config = yaml_module.safe_load(f) elif file_extension == ".json": self.config = json.load(f) elif HAS_YAML and yaml_module: # Try YAML first for unknown extensions content = f.read() try: self.config = yaml_module.safe_load(content) except yaml_module.YAMLError: # Fallback to JSON self.config = json.loads(content) else: # Only JSON available self.config = json.load(f) logger.info(f"Configuration loaded from: {self.config_file}") # Display version information if available if self.config is None: logger.warning( "Configuration file '%s' is empty or null; using defaults", self.config_file, ) self.config = self._get_default_config() return if isinstance(self.config, dict) and "metadata" in self.config: version = self.config["metadata"].get("config_version", "Unknown") logger.info(f"Configuration version: {version}") # Optional validation (can be disabled via environment variable) import os if os.environ.get("HOMODYNE_VALIDATE_CONFIG", "true").lower() == "true": self._validate_config() except json.JSONDecodeError as e: logger.error(f"JSON parsing error: {e}") logger.info("Using default configuration...") self.config = self._get_default_config() except FileNotFoundError: # Re-raise immediately: wrong config path must be reported, not silenced. # Proceeding with stub defaults would produce confusing downstream errors. raise except ( OSError, ValueError, UnicodeDecodeError, TypeError, KeyError, _YAMLError, ) as e: logger.error(f"Configuration parsing error: {e}") logger.info("Using default configuration...") self.config = self._get_default_config()
def _get_default_config(self) -> dict[str, Any]: """Get default configuration structure. T052: Logs default value application at DEBUG level. Returns minimal configuration that supports basic analysis modes. CPU-only execution (GPU support removed in v2.3.0). """ # T052: Log default value application logger.debug("Applying default configuration values (fallback)") return { "metadata": { "config_version": "2.18.0", "description": "Default minimal configuration (CPU-only)", }, "analysis_mode": "static", "analyzer_parameters": { "dt": 0.1, "start_frame": 1, "end_frame": -1, }, "experimental_data": { "file_path": None, "cache_directory": "./cache", "use_caching": True, }, "optimization": { "method": "nlsq", "lsq": { "max_iterations": 10000, "tolerance": 1e-8, "method": "trf", }, "mcmc": { "n_samples": 1000, "n_warmup": 1000, "n_chains": 4, "target_accept_prob": 0.8, }, }, "output": { "formats": ["yaml", "npz"], "include_diagnostics": True, }, "logging": { "enabled": True, "level": "INFO", "console": {"enabled": True}, "file": {"enabled": False}, }, }
[docs] def get_config(self) -> dict[str, Any]: """Get the current configuration dictionary. Returns ------- Dict[str, Any] Current configuration dictionary """ if self.config is None: return {} return self.config
[docs] def update_config(self, key: str, value: Any) -> None: """Update a configuration value using dot notation. Parameters ---------- key : str Configuration key (supports dot notation like 'optimization.method') value : Any New value to set """ if self.config is None: self.config = {} keys = key.split(".") config_ref = self.config # Navigate to the parent of the target key for k in keys[:-1]: if k not in config_ref: config_ref[k] = {} config_ref = config_ref[k] # Set the value config_ref[keys[-1]] = value
[docs] def is_static_mode_enabled(self) -> bool: """Check if static analysis mode is enabled.""" if not self.config: return True analysis_mode = self.config.get("analysis_mode", "static_isotropic") return "static" in analysis_mode.lower()
[docs] def get_target_angle_ranges(self) -> dict[str, Any]: """Get angle filtering ranges.""" if not self.config: return {"enabled": False} optimization = self.config.get("optimization", {}) angle_filtering = optimization.get("angle_filtering", {}) if not isinstance(angle_filtering, dict): logger.warning( "optimization.angle_filtering must be a dict, ignoring (got %s)", type(angle_filtering).__name__, ) return {"enabled": False} return angle_filtering
def _get_parameter_manager(self) -> Any: """Get or create cached ParameterManager. This avoids creating a new ParameterManager on every config access, providing ~14x speedup for repeated parameter queries. Returns ------- ParameterManager Cached ParameterManager instance """ if self._cached_param_manager is None: from homodyne.config.parameter_manager import ParameterManager # Determine analysis mode analysis_mode = "laminar_flow" if self.is_static_mode_enabled(): analysis_mode = "static" # Create and cache ParameterManager self._cached_param_manager = ParameterManager(self.config, analysis_mode) logger.debug(f"Created cached ParameterManager for mode: {analysis_mode}") return self._cached_param_manager
[docs] def get_parameter_bounds( self, parameter_names: list[str] | None = None, ) -> list[dict[str, Any]]: """Get parameter bounds from configuration (cached). Uses cached ParameterManager internally for improved performance. Parameters ---------- parameter_names : list of str, optional List of parameter names to get bounds for. If None, returns bounds for all parameters in the current analysis mode. Returns ------- list of dict List of bound dictionaries with keys: 'name', 'min', 'max', 'type' Examples -------- >>> config_mgr = ConfigManager("config.yaml") >>> bounds = config_mgr.get_parameter_bounds(["D0", "alpha"]) >>> bounds[0] {'min': 1.0, 'max': 1000000.0, 'name': 'D0', 'type': 'Normal'} Notes ----- This method uses a cached ParameterManager for ~14x speedup on repeated calls. """ bounds = self._get_parameter_manager().get_parameter_bounds(parameter_names) if not isinstance(bounds, list): raise TypeError( f"ParameterManager.get_parameter_bounds returned {type(bounds).__name__}, expected list" ) return bounds
[docs] def get_active_parameters(self) -> list[str]: """Get list of active (physical) parameters from configuration (cached). Uses cached ParameterManager internally for improved performance. Returns ------- list of str List of parameter names to be optimized. Falls back to mode-appropriate parameters if not specified in config. Examples -------- >>> config_mgr = ConfigManager("config.yaml") >>> config_mgr.get_active_parameters() ['D0', 'alpha', 'D_offset', 'gamma_dot_t0', 'beta', 'gamma_dot_t_offset', 'phi0'] Notes ----- This method uses a cached ParameterManager for ~14x speedup on repeated calls. """ params = self._get_parameter_manager().get_active_parameters() if not isinstance(params, list): raise TypeError( f"ParameterManager.get_active_parameters returned {type(params).__name__}, expected list" ) return params
[docs] def get_initial_parameters( self, use_midpoint_defaults: bool = True, ) -> dict[str, float]: """Get initial parameter values from configuration. Loads initial parameter values from the `initial_parameters.values` section of the configuration. If values are null or missing, calculates mid-point defaults from parameter bounds. Parameters ---------- use_midpoint_defaults : bool If True (default), calculate mid-point defaults when values are null. If False, raise an error when values are missing. Returns ------- dict[str, float] Dictionary mapping parameter names (canonical) to initial values. Only includes active parameters (excludes fixed parameters). Raises ------ ValueError If values are null and use_midpoint_defaults is False. If number of values doesn't match number of parameter names. Examples -------- >>> # With explicit values in config >>> config = { ... 'initial_parameters': { ... 'parameter_names': ['D0', 'alpha', 'D_offset'], ... 'values': [1000.0, 0.5, 10.0] ... } ... } >>> config_mgr = ConfigManager(config_override=config) >>> config_mgr.get_initial_parameters() {'D0': 1000.0, 'alpha': 0.5, 'D_offset': 10.0} >>> # With null values (mid-point defaults) >>> config = { ... 'initial_parameters': { ... 'parameter_names': ['D0', 'alpha'], ... 'values': null ... } ... } >>> config_mgr = ConfigManager(config_override=config) >>> params = config_mgr.get_initial_parameters() >>> # params['D0'] will be mid-point of bounds: (min + max) / 2 Notes ----- - Uses ParameterManager for name mapping (gamma_dot_0 → gamma_dot_t0) - Respects active_parameters and fixed_parameters from config - Logs when using mid-point defaults - Returns only active parameters (fixed parameters excluded) """ if not self.config: logger.warning("No configuration loaded, using empty initial parameters") return {} # Get initial_parameters section initial_params = self.config.get("initial_parameters", {}) if not initial_params: logger.info( "No initial_parameters section in config, using mid-point defaults" ) return self._calculate_midpoint_defaults() # Get parameter names from config param_names_config = initial_params.get("parameter_names") if not param_names_config or not isinstance(param_names_config, list): logger.info( "No parameter_names in initial_parameters, using active parameters from mode" ) return self._calculate_midpoint_defaults() # Get parameter values from config param_values = initial_params.get("values") # Handle null/missing values if param_values is None: if use_midpoint_defaults: logger.info( f"initial_parameters.values is null, calculating mid-point defaults for {len(param_names_config)} parameters" ) return self._calculate_midpoint_defaults() else: raise ValueError( "initial_parameters.values is null and use_midpoint_defaults is False" ) # Validate that values is a list if not isinstance(param_values, list): raise ValueError( f"initial_parameters.values must be a list, got {type(param_values)}" ) # Validate length match if len(param_values) != len(param_names_config): raise ValueError( f"Number of values ({len(param_values)}) does not match " f"number of parameter_names ({len(param_names_config)})" ) # Get ParameterManager for name mapping (used for validation) _param_manager = self._get_parameter_manager() # noqa: F841 # Import name mapping once at the top of this section from homodyne.config.types import PARAMETER_NAME_MAPPING # Build initial parameters dict with name mapping initial_params_dict: dict[str, float] = {} for param_name, value in zip(param_names_config, param_values, strict=False): # Apply name mapping (e.g., gamma_dot_0 → gamma_dot_t0) canonical_name = PARAMETER_NAME_MAPPING.get(param_name, param_name) initial_params_dict[canonical_name] = float(value) # Filter by active_parameters if specified active_params_config = initial_params.get("active_parameters") if active_params_config and isinstance(active_params_config, list): # Map active parameter names to canonical names active_canonical = set() for name in active_params_config: canonical = PARAMETER_NAME_MAPPING.get(name, name) active_canonical.add(canonical) # Filter to only active parameters initial_params_dict = { k: v for k, v in initial_params_dict.items() if k in active_canonical } logger.info( f"Filtered to {len(initial_params_dict)} active parameters: {list(initial_params_dict.keys())}" ) # Exclude fixed_parameters fixed_params = initial_params.get("fixed_parameters") if fixed_params and isinstance(fixed_params, dict): # Map fixed parameter names to canonical names fixed_canonical = set() for name in fixed_params.keys(): canonical = PARAMETER_NAME_MAPPING.get(name, name) fixed_canonical.add(canonical) # Remove fixed parameters from initial_params_dict initial_params_dict = { k: v for k, v in initial_params_dict.items() if k not in fixed_canonical } logger.info( f"Excluded {len(fixed_canonical)} fixed parameters, " f"{len(initial_params_dict)} remaining" ) # Load per-angle scaling parameters (contrast, offset) if present per_angle_scaling = initial_params.get("per_angle_scaling") if per_angle_scaling and isinstance(per_angle_scaling, dict): # Extract contrast and offset arrays contrast_values = per_angle_scaling.get("contrast") offset_values = per_angle_scaling.get("offset") if contrast_values is not None and isinstance(contrast_values, list): if len(contrast_values) == 1: # Single-angle: use scalar contrast initial_params_dict["contrast"] = float(contrast_values[0]) logger.info( f"Loaded scalar contrast from per_angle_scaling: {contrast_values[0]}" ) else: # Multi-angle: use per-angle contrast_0, contrast_1, ... for idx, val in enumerate(contrast_values): initial_params_dict[f"contrast_{idx}"] = float(val) logger.info( f"Loaded {len(contrast_values)} per-angle contrast values" ) if offset_values is not None and isinstance(offset_values, list): if len(offset_values) == 1: # Single-angle: use scalar offset initial_params_dict["offset"] = float(offset_values[0]) logger.info( f"Loaded scalar offset from per_angle_scaling: {offset_values[0]}" ) else: # Multi-angle: use per-angle offset_0, offset_1, ... for idx, val in enumerate(offset_values): initial_params_dict[f"offset_{idx}"] = float(val) logger.info(f"Loaded {len(offset_values)} per-angle offset values") logger.info( f"Loaded initial parameters from config: {list(initial_params_dict.keys())}" ) return initial_params_dict
def _calculate_midpoint_defaults(self) -> dict[str, float]: """Calculate mid-point default values from parameter bounds. Returns ------- dict[str, float] Dictionary mapping parameter names to mid-point values: (min + max) / 2 Notes ----- - Uses ParameterManager to get bounds - Only includes active parameters (excludes fixed) - Logs calculation for transparency """ param_manager = self._get_parameter_manager() # Get active parameter names (already excludes fixed parameters) active_params = param_manager.get_active_parameters() # Get bounds for active parameters bounds_list = param_manager.get_parameter_bounds(active_params) # Calculate mid-points midpoint_dict: dict[str, float] = {} for bound_dict in bounds_list: param_name = bound_dict["name"] min_val = bound_dict["min"] max_val = bound_dict["max"] midpoint = (min_val + max_val) / 2.0 midpoint_dict[param_name] = midpoint logger.info( f"Calculated mid-point defaults for {len(midpoint_dict)} parameters" ) logger.debug(f"Mid-point values: {midpoint_dict}") return midpoint_dict
[docs] def validate_per_angle_scaling(self, n_phi: int) -> list[str]: """Validate per-angle scaling array lengths against number of phi angles. This method should be called after loading phi angles from data to verify that the per_angle_scaling arrays in the config match the actual number of angles in the data. Parameters ---------- n_phi : int Number of phi angles in the loaded data. Returns ------- list[str] List of validation warnings (empty if all valid). Raises ------ ValueError If per-angle scaling arrays have incorrect length and cannot be used. Examples -------- >>> config_mgr = ConfigManager("config.yaml") >>> warnings = config_mgr.validate_per_angle_scaling(n_phi=5) >>> if warnings: ... for w in warnings: ... logger.warning(w) """ warnings: list[str] = [] if not self.config: return warnings initial_params = self.config.get("initial_parameters", {}) per_angle_scaling = initial_params.get("per_angle_scaling") if not per_angle_scaling or not isinstance(per_angle_scaling, dict): return warnings contrast_values = per_angle_scaling.get("contrast") offset_values = per_angle_scaling.get("offset") # Validate contrast array length if contrast_values is not None and isinstance(contrast_values, list): n_contrast = len(contrast_values) if n_contrast != n_phi and n_contrast != 1: raise ValueError( f"per_angle_scaling.contrast has {n_contrast} values but data has " f"{n_phi} phi angles. Must have either 1 (scalar) or {n_phi} values." ) if n_contrast == 1 and n_phi > 1: warnings.append( f"per_angle_scaling.contrast has 1 value but data has {n_phi} angles. " f"Using scalar contrast for all angles." ) # Validate offset array length if offset_values is not None and isinstance(offset_values, list): n_offset = len(offset_values) if n_offset != n_phi and n_offset != 1: raise ValueError( f"per_angle_scaling.offset has {n_offset} values but data has " f"{n_phi} phi angles. Must have either 1 (scalar) or {n_phi} values." ) if n_offset == 1 and n_phi > 1: warnings.append( f"per_angle_scaling.offset has 1 value but data has {n_phi} angles. " f"Using scalar offset for all angles." ) # Cross-check contrast and offset array lengths if ( contrast_values is not None and offset_values is not None and isinstance(contrast_values, list) and isinstance(offset_values, list) ): n_contrast = len(contrast_values) n_offset = len(offset_values) if n_contrast != n_offset and n_contrast > 1 and n_offset > 1: warnings.append( f"per_angle_scaling arrays have different lengths: " f"contrast={n_contrast}, offset={n_offset}. This may cause issues." ) if warnings: for w in warnings: logger.warning(w) return warnings
[docs] def get_cmc_config(self) -> dict[str, Any]: """Get CMC (Consensus Monte Carlo) configuration with validation and defaults. Extracts and validates the CMC configuration section from the optimization settings. Applies default values for missing fields and validates ranges and backend compatibility. Returns ------- dict CMC configuration dictionary with validated settings including: - enable: bool or "auto" - min_points_for_cmc: int - sharding: dict with strategy, num_shards, max_points_per_shard - backend: dict with name, checkpoint settings - combination: dict with method, validation settings - per_shard_mcmc: dict with num_warmup, num_samples, etc. - validation: dict with convergence criteria Raises ------ ValueError If required CMC fields are invalid or incompatible with hardware Examples -------- >>> config_mgr = ConfigManager("cmc_config.yaml") >>> cmc_config = config_mgr.get_cmc_config() >>> print(cmc_config["sharding"]["strategy"]) 'stratified' Notes ----- - Automatically applies sensible defaults for missing fields - Validates value ranges (e.g., num_shards > 0) - Checks backend compatibility with detected hardware - Logs migration warnings for deprecated settings """ if not self.config: return self._get_default_cmc_config() optimization = self.config.get("optimization", {}) cmc_raw = optimization.get("cmc", {}) # If no CMC config, return defaults if not cmc_raw: logger.debug("No CMC configuration found, using defaults") return self._get_default_cmc_config() # Start with defaults and override with user settings cmc_config = self._get_default_cmc_config() self._merge_cmc_config(cmc_config, cmc_raw) # Validate the configuration self._validate_cmc_config(cmc_config) # Check for deprecated settings self._check_cmc_deprecated_settings(optimization) return cmc_config
def _get_default_cmc_config(self) -> dict[str, Any]: """Get default CMC configuration. T052: Logs default value application at DEBUG level. Returns ------- dict Default CMC configuration with sensible defaults """ # T052: Log default value application logger.debug("Applying default CMC configuration values") return { "enable": "auto", "min_points_for_cmc": 100000, "sharding": { "strategy": "random", "num_shards": "auto", "max_points_per_shard": "auto", }, "backend": { "name": "auto", "enable_checkpoints": True, "checkpoint_frequency": 10, "checkpoint_dir": "./checkpoints/cmc", "keep_last_checkpoints": 3, "resume_from_checkpoint": True, }, "combination": { "method": "robust_consensus_mc", "validate_results": True, "min_success_rate": 0.90, "min_success_rate_warning": 0.80, }, # Per-shard NUTS defaults are tuned to keep # laminar_flow CMC workloads below the 2 hour # per-shard timeout on typical CPU nodes. # These values are intentionally lighter than # early prototypes (fewer chains / samples). "per_shard_mcmc": { "num_warmup": 500, "num_samples": 1500, "num_chains": 4, "target_accept_prob": 0.85, "subsample_size": "auto", }, "validation": { "strict_mode": True, "min_per_shard_ess": 100.0, "max_per_shard_rhat": 1.1, "max_between_shard_kl": 2.0, "min_success_rate": 0.90, "max_divergence_rate": 0.10, "require_nlsq_warmstart": False, "use_nlsq_informed_priors": True, "nlsq_prior_width_factor": 2.0, "max_parameter_cv": 1.0, "heterogeneity_abort": True, }, } def _merge_cmc_config(self, defaults: dict[str, Any], user: dict[str, Any]) -> None: """Merge user CMC configuration into defaults (recursive). Parameters ---------- defaults : dict Default configuration dictionary (modified in place) user : dict User-provided configuration to merge """ for key, value in user.items(): if ( key in defaults and isinstance(defaults[key], dict) and isinstance(value, dict) ): # Recursive merge for nested dictionaries self._merge_cmc_config(defaults[key], value) else: # Direct override for non-dict values defaults[key] = value def _validate_cmc_config(self, cmc_config: dict[str, Any]) -> None: """Validate CMC configuration values. Parameters ---------- cmc_config : dict CMC configuration to validate Raises ------ ValueError If configuration values are invalid """ # Validate enable field enable = cmc_config.get("enable") if enable not in [True, False, "auto"]: raise ValueError( f"CMC enable must be True, False, or 'auto', got: {enable}" ) # Validate min_points_for_cmc min_points = cmc_config.get("min_points_for_cmc", 0) if not isinstance(min_points, int) or min_points < 1: raise ValueError( f"min_points_for_cmc must be a positive integer (>= 1), got: {min_points}" ) # Validate sharding sharding = cmc_config.get("sharding", {}) strategy = sharding.get("strategy", "stratified") if strategy not in ["stratified", "random", "contiguous"]: raise ValueError( f"Sharding strategy must be 'stratified', 'random', or 'contiguous', got: {strategy}" ) num_shards = sharding.get("num_shards", "auto") if num_shards != "auto" and ( not isinstance(num_shards, int) or num_shards <= 0 ): raise ValueError( f"num_shards must be 'auto' or positive integer, got: {num_shards}" ) # Note: initialization config section is deprecated in v2.1.0 # CMC now uses identity mass matrix by default (no SVI initialization) # Validate backend (handle both old dict schema and new string schema) backend = cmc_config.get("backend", {}) # Handle new schema: backend is a string ("jax" or "numpy") for computational backend # vs old schema: backend is a dict with name key for parallel execution backend if isinstance(backend, str): # New schema: computational backend as string valid_computational_backends = ["jax", "numpy"] if backend not in valid_computational_backends: raise ValueError( f"Computational backend must be one of {valid_computational_backends}, got: {backend}" ) # Check for new backend_config field (parallel execution) backend_config = cmc_config.get("backend_config", {}) if backend_config: backend_name = backend_config.get("name", "auto") valid_parallel_backends = [ "auto", "pjit", "multiprocessing", "pbs", "slurm", "jax", # legacy alias, mapped to pjit downstream ] if backend_name not in valid_parallel_backends: raise ValueError( f"Parallel execution backend must be one of {valid_parallel_backends}, got: {backend_name}" ) else: # Old schema: backend is dict with name for parallel execution backend_name = backend.get("name", "auto") valid_backends = [ "auto", "pjit", "multiprocessing", "pbs", "slurm", "jax", # legacy alias, mapped to pjit downstream ] if backend_name not in valid_backends: raise ValueError( f"Backend name must be one of {valid_backends}, got: {backend_name}" ) # Validate combination combination = cmc_config.get("combination", {}) comb_method = combination.get("method", "robust_consensus_mc") valid_methods = [ "consensus_mc", "robust_consensus_mc", "weighted_gaussian", "simple_average", "auto", ] if comb_method not in valid_methods: raise ValueError( f"Combination method must be one of {valid_methods}, got: {comb_method}" ) min_success = combination.get("min_success_rate", 0.9) if not isinstance(min_success, (int, float)) or not 0.0 <= min_success <= 1.0: raise ValueError( f"min_success_rate must be between 0.0 and 1.0, got: {min_success}" ) # Validate per_shard_mcmc per_shard = cmc_config.get("per_shard_mcmc", {}) for key in ["num_warmup", "num_samples", "num_chains"]: value = per_shard.get(key, 1) if not isinstance(value, int) or value <= 0: raise ValueError( f"per_shard_mcmc.{key} must be a positive integer, got: {value}" ) # Validate validation settings validation = cmc_config.get("validation", {}) ess = validation.get("min_per_shard_ess", 100) if not isinstance(ess, (int, float)) or ess < 0: raise ValueError(f"min_per_shard_ess must be non-negative, got: {ess}") rhat = validation.get("max_per_shard_rhat", 1.1) if not isinstance(rhat, (int, float)) or rhat < 1.0: raise ValueError(f"max_per_shard_rhat must be >= 1.0, got: {rhat}") logger.debug("CMC configuration validation passed") def _check_cmc_deprecated_settings(self, optimization: dict[str, Any]) -> None: """Check for deprecated CMC settings and log warnings. Parameters ---------- optimization : dict Optimization section of configuration """ # Check for old CMC keys that might have been used in early prototypes deprecated_keys = { "consensus_monte_carlo": "Use 'cmc' instead of 'consensus_monte_carlo'", "parallel_mcmc": "Parallel MCMC is now configured via 'cmc.backend'", } for old_key, message in deprecated_keys.items(): if old_key in optimization: logger.warning( f"Deprecated CMC configuration key '{old_key}' detected. {message}" ) # Check for deprecated sharding keys cmc = optimization.get("cmc", {}) sharding = cmc.get("sharding", {}) if "optimal_shard_size" in sharding: logger.warning( "Deprecated sharding key 'optimal_shard_size' detected. " "Use 'max_points_per_shard' instead." ) def _validate_config(self) -> None: """Lightweight configuration validation. Checks for required sections and valid values. Can be disabled by setting HOMODYNE_VALIDATE_CONFIG=false environment variable. T051: Logs key configuration values at INFO level. T052: Logs default value applications at DEBUG level. T053: Logs unusual settings as warnings. """ _KNOWN_TOP_LEVEL_KEYS = { "metadata", "analysis_mode", "analyzer_parameters", "analysis_settings", "experimental_data", "phi_filtering", "initial_parameters", "parameter_space", "optimization", "noise_estimation", "performance", "logging", "quality_control", "plotting", "output", "validation", "config_version", } if not self.config: logger.warning("Configuration is empty") return # Warn about unknown top-level keys (possible typos) unknown_keys = set(self.config.keys()) - _KNOWN_TOP_LEVEL_KEYS if unknown_keys: logger.warning( "Unknown top-level config keys (possible typo): %s", unknown_keys ) # Check for required sections required_sections = ["analysis_mode"] for section in required_sections: if section not in self.config: logger.warning(f"Missing recommended section: {section}") # Validate analysis_mode value valid_modes = ["static", "laminar_flow"] mode = self.config.get("analysis_mode", "") if mode and mode not in valid_modes: logger.warning( f"Unknown analysis_mode: '{mode}'. Valid modes: {valid_modes}", ) # T051: Log key configuration values at INFO level self._log_key_config_values() # T053: Log unusual but valid settings with warnings self._log_unusual_settings() logger.debug("Configuration validation completed") def _log_key_config_values(self) -> None: """T051: Log key configuration values at INFO level. Logs analysis mode, dataset info, and optimizer selection. """ if not self.config: return # Analysis mode mode = self.config.get("analysis_mode", "unknown") logger.info(f"Analysis mode: {mode}") # Dataset info exp_data = self.config.get("experimental_data", {}) file_path = exp_data.get("file_path") if file_path: logger.info(f"Data file: {file_path}") # Optimizer selection optimization = self.config.get("optimization", {}) method = optimization.get("method", "nlsq") logger.info(f"Optimizer: {method}") # Log dataset size estimate if available nlsq_config = optimization.get("nlsq", {}) memory_fraction = nlsq_config.get("memory_fraction") if memory_fraction: logger.debug(f"Memory fraction: {memory_fraction}") if not (0 < memory_fraction < 1): logger.warning( "memory_fraction=%s outside valid range (0, 1); should be between 0 and 1", memory_fraction, ) def _log_unusual_settings(self) -> None: """T053: Log unusual but valid settings with impact warnings. Warns about settings that may have unexpected effects. """ if not self.config: return optimization = self.config.get("optimization", {}) # Warn about very high iteration limits nlsq_config = optimization.get("nlsq", {}) or optimization.get("lsq", {}) max_iter = nlsq_config.get("max_iterations", 10000) if max_iter > 50000: logger.warning( f"High max_iterations ({max_iter}) may cause long runtimes. " f"Consider 10000-20000 for most analyses." ) # Warn about very loose tolerance tolerance = nlsq_config.get("tolerance", 1e-8) if tolerance > 1e-4: logger.warning( f"Loose tolerance ({tolerance}) may produce imprecise results. " f"Consider 1e-8 or tighter for production." ) # Warn about very tight tolerance if tolerance < 1e-14: logger.warning( f"Very tight tolerance ({tolerance}) may cause convergence issues. " f"Machine precision limits apply." ) # Warn about force_stratified_ls with large datasets force_stratified = nlsq_config.get("force_stratified_ls", False) if force_stratified: logger.warning( "force_stratified_ls=True enabled. " "This uses full Jacobian (high memory) - ensure sufficient RAM." ) # Warn about disabled anti-degeneracy for laminar_flow mode = self.config.get("analysis_mode", "static") anti_deg = nlsq_config.get("anti_degeneracy", {}) if mode == "laminar_flow": hierarchical = anti_deg.get("hierarchical", {}) if hierarchical.get("enable") is False: logger.warning( "hierarchical.enable=False for laminar_flow may cause " "gradient cancellation issues with many phi angles." ) def _normalize_schema(self) -> None: """Normalize configuration schema for backward compatibility. Handles multiple configuration format versions by converting legacy formats to modern standardized formats transparently. """ if not self.config: return self._normalize_analysis_mode() self._normalize_experimental_data() self._validate_config_version() def _normalize_analysis_mode(self) -> None: """Normalize analysis_mode to canonical lowercase form. Handles case-insensitive input and legacy mode names: - "STATIC", "Static" → "static" - "LAMINAR_FLOW", "Laminar_Flow" → "laminar_flow" - "static_isotropic" → "static" (legacy alias) - "static_anisotropic" → "static" (legacy alias) """ if self.config is None or "analysis_mode" not in self.config: return mode = self.config["analysis_mode"] if not isinstance(mode, str): return original_mode = mode normalized_mode = mode.lower() # Handle legacy aliases if normalized_mode in ("static_isotropic", "static_anisotropic"): normalized_mode = "static" if normalized_mode != original_mode: self.config["analysis_mode"] = normalized_mode logger.debug( f"Normalized analysis_mode: '{original_mode}' -> '{normalized_mode}'" ) def _validate_config_version(self) -> None: """Validate config_version against package version. Warns if config version doesn't match package version, which may indicate incompatible configuration schema. """ if self.config is None or "metadata" not in self.config: return config_version = self.config["metadata"].get("config_version") if not config_version: return # Get package version try: from homodyne import __version__ as package_version # Extract major.minor for comparison (ignore patch) def get_major_minor(version: str) -> str: parts = version.split(".") if len(parts) >= 2: return f"{parts[0]}.{parts[1]}" return version config_mm = get_major_minor(str(config_version)) package_mm = get_major_minor(str(package_version)) if config_mm != package_mm: logger.warning( f"Config version mismatch: config={config_version}, " f"package={package_version}. Configuration schema may be incompatible." ) except ImportError: # Package version not available, skip validation pass def _normalize_experimental_data(self) -> None: """Normalize experimental_data section. Supports two formats: 1. Template/Legacy: data_folder_path + data_file_name 2. Modern: file_path The normalization adds the missing format while preserving the original fields for backward compatibility. """ if self.config is None or "experimental_data" not in self.config: return from pathlib import Path exp_data = self.config["experimental_data"] # Handle legacy composite format (data_folder_path + data_file_name) if "data_folder_path" in exp_data and "data_file_name" in exp_data: folder_path = exp_data["data_folder_path"] filename = exp_data["data_file_name"] # Skip normalization if either value is None if folder_path is None or filename is None: logger.debug( "Skipping normalization: data_folder_path or data_file_name is None", ) return folder = Path(folder_path) # Resolve relative paths for consistency # Note: Keep as-is if already absolute to preserve user intent file_path = folder / filename # Add modern format while preserving legacy fields exp_data["file_path"] = str(file_path) logger.info( f"Normalized legacy config format:\n" f" {folder} + {filename}\n" f" -> file_path: {file_path}", ) # Handle phi angles similarly if "phi_angles_path" in exp_data and "phi_angles_file" in exp_data: phi_folder = Path(exp_data["phi_angles_path"]) phi_file = exp_data["phi_angles_file"] phi_path = phi_folder / phi_file # Add combined path for convenience exp_data["phi_angles_full_path"] = str(phi_path) logger.debug(f"Normalized phi angles path: {phi_path}")
[docs] def load_xpcs_config(config_path: str) -> dict[str, Any]: """Load XPCS configuration from file. Convenience function for loading configuration files. Parameters ---------- config_path : str Path to configuration file Returns ------- dict Configuration dictionary """ manager = ConfigManager(config_path) return manager.config if manager.config is not None else {}