Homodyne Data Handler Architecture¶
Complete documentation of the data loading, configuration, and result writing systems in homodyne.
Version: 2.23.2 Last Updated: May 2026
Table of Contents¶
High-Level Architecture¶
┌─────────────────────────────────────────────────────────────────────────────────┐
│ USER ENTRY POINTS │
│ │
│ CLI: homodyne --config my.yaml API: XPCSDataLoader(config_dict=...) │
│ │ │ │
│ ▼ │ │
│ ┌────────────────────────┐ │ │
│ │ ConfigManager │◄───────────────────────┘ │
│ │ (config/manager.py) │ │
│ └──────────┬─────────────┘ │
│ │ │
│ ┌──────────▼─────────────┐ │
│ │ XPCSDataLoader │ HDF5 → Validate → Filter → Preprocess → Cache │
│ │ (data/xpcs_loader.py) │ │
│ └──────────┬─────────────┘ │
│ │ │
│ {wavevector_q_list, phi_angles_list, t1, t2, c2_exp} │
└─────────────────────────────────────────────────────────────────────────────────┘
│
════════════════════════════════════╪══════════════════════════════════════════════
▼
┌─────────────────────────────────────────────────────────────────────────────────┐
│ OPTIMIZATION LAYER │
│ │
│ fit_nlsq_jax(data, config) fit_mcmc_jax(data, config) │
│ │ │ │
│ ▼ ▼ │
│ OptimizationResult CMCResult │
└─────────────────────────────────────────────────────────────────────────────────┘
│
════════════════════════════════════╪══════════════════════════════════════════════
▼
┌─────────────────────────────────────────────────────────────────────────────────┐
│ RESULT WRITING │
│ │
│ save_nlsq_json_files() save_nlsq_npz_file() create_mcmc_*_dict() │
│ (io/nlsq_writers.py) (io/nlsq_writers.py) (io/mcmc_writers.py) │
│ │
│ Output: parameters.json + analysis_results.json + fitted_data.npz + plots │
└─────────────────────────────────────────────────────────────────────────────────┘
1. Configuration System¶
Files: config/manager.py, config/parameter_manager.py,
config/parameter_space.py, config/parameter_names.py,
config/parameter_registry.py, config/types.py
ConfigManager¶
class ConfigManager:
def __init__(
self,
config_file: str = "homodyne_config.yaml",
config_override: dict = None,
):
YAML Configuration Schema¶
# Top-level sections
analysis_mode: "laminar_flow" # "static", "static_isotropic", "laminar_flow"
experimental_data:
data_folder_path: "/path/to/data"
data_file_name: "experiment.hdf5"
cache_file_path: "cache.npz" # Optional NPZ cache
apply_diagonal_correction: true # Mandatory since v2.14.2
analyzer_parameters:
dt: 0.1 # Time step (seconds)
start_frame: 1 # 1-indexed
end_frame: -1 # -1 = all frames
wavevector_q: 0.0123 # Target q-vector (1/A)
stator_rotor_gap: 2000000.0 # Gap length (nm → A conversion)
data_filtering:
enabled: true
phi_range: {min: -90, max: 90} # Degrees, supports wrapping
q_range: {min: 0.01, max: 0.05} # 1/A
parameter_space:
bounds:
- {name: "D0", min: 100, max: 100000, prior_mu: 10000, prior_sigma: 5000}
- {name: "alpha", min: -2.0, max: 2.0}
# ...
initial_parameters:
parameter_names: ["D0", "alpha", "D_offset", ...]
values: [10000, 0.5, 100, ...]
fixed_parameters: {} # {name: fixed_value}
ConfigManager Key Methods¶
┌───────────────────────────────────────────────────────────────────────────┐
│ ConfigManager Public Interface │
│ │
│ config.config → dict Full configuration dictionary │
│ config.get_config() → dict Full config dict (no arguments) │
│ config.get_parameter_bounds() → list[dict] Parameter bounds │
│ config.get_active_parameters() → list[str] Active parameter names │
│ config.get_initial_parameters() → dict Initial values │
│ config.get_cmc_config() → dict CMC-specific configuration │
│ config.get_target_angle_ranges() → list Phi angle ranges │
│ config.is_static_mode_enabled() → bool Static mode check │
│ │
│ Internal: │
│ _normalize_schema() Flat → nested config migration │
│ _normalize_analysis_mode() Alias resolution (static_isotropic → ...) │
│ _validate_config() Schema validation │
│ _get_parameter_manager() → ParameterManager │
└───────────────────────────────────────────────────────────────────────────┘
ParameterManager¶
┌───────────────────────────────────────────────────────────────────────────┐
│ ParameterManager (config/parameter_manager.py) │
│ │
│ Centralized parameter bounds and validation │
│ │
│ get_parameter_bounds(names) → list[{name, min, max, type}] │
│ get_active_parameters() → list[str] │
│ get_optimizable_parameters() → list[str] Active minus fixed │
│ get_bounds_as_tuples() → [(min, max), ...] │
│ validate_physical_constraints(params) → ValidationResult │
│ │
│ Default Bounds (hardcoded): │
│ D0: [1e2, 1e5] │
│ alpha: [-2.0, 2.0] │
│ D_offset: [-1e5, 1e5] │
│ gamma_dot_t0: [1e-6, 1e4] │
│ beta: [-2.0, 2.0] │
│ gamma_dot_t_offset: [-0.1, 0.1] │
│ phi0: [-10, 10] degrees │
│ contrast: [0.0, 1.0] (Note: parameter_registry.py uses │
│ [0.01, 1.5] — inconsistency exists) │
│ offset: [0.5, 1.5] │
└───────────────────────────────────────────────────────────────────────────┘
JSON Configuration Support¶
data/config.py also supports JSON config files with migration utilities:
# Load JSON config (same structure as YAML)
load_json_config(path: str | Path) -> dict
# Migrate legacy JSON config to modern nested YAML structure
migrate_json_to_yaml_config(json_config: dict) -> dict
# Apply default values to partially-specified config dict
apply_config_defaults(config: dict) -> dict
# Validate config structure, return result with issues list
validate_config(config: dict) -> ConfigValidationResult
# Generate a complete example YAML config (for homodyne-config)
create_example_yaml_config(mode: str = "laminar_flow") -> str
ConfigValidationResult reports missing required sections, type errors, and
out-of-range values without raising exceptions—callers decide how to handle issues.
ParameterSpace (for CMC)¶
class ParameterSpace:
@classmethod
def from_config(config_dict) -> ParameterSpace:
# Returns: bounds dict + PriorDistribution dict + parameter_names list
Provides bounds and priors for NumPyro MCMC sampling. Constructed from YAML
parameter_space.bounds section.
ParameterRegistry (Singleton)¶
Consolidates parameter name/metadata duplication across the codebase:
registry.get_param_names(mode="laminar_flow")
# → ["D0", "alpha", "D_offset", "gamma_dot_t0", "beta", "gamma_dot_t_offset", "phi0"]
registry.get_all_param_names(mode="laminar_flow", n_angles=23)
# → ["contrast_0", ..., "contrast_22", "offset_0", ..., "offset_22", "D0", ...]
Parameter Name Constants¶
# config/parameter_names.py
STATIC_ISOTROPIC_PARAMS = ["contrast", "offset", "D0", "alpha", "D_offset"] # 5
LAMINAR_FLOW_PARAMS = STATIC_ISOTROPIC_PARAMS + [
"gamma_dot_t0", "beta", "gamma_dot_t_offset", "phi0"
] # 9
2. Data Loading¶
File: data/xpcs_loader.py (~2107 lines)
XPCSDataLoader¶
class XPCSDataLoader:
def __init__(
self,
config_path: str = None, # Path to YAML/JSON config
config_dict: dict = None, # Or direct config dict
configure_logging: bool = True,
generate_quality_reports: bool = False,
):
load_experimental_data() Flow¶
┌───────────────────────────────────────────────────────────────────────────┐
│ load_experimental_data() [xpcs_loader.py:590-742] │
│ │
│ 1. Check NPZ cache │
│ ├─ _load_from_cache() → validate cache q-vector hash │
│ └─ Cache hit? Return cached data (skip HDF5) │
│ │
│ 2. Load from HDF5 │
│ ├─ _detect_format() → "aps_old" or "aps_u" │
│ ├─ _load_aps_old_format() or _load_aps_u_format() │
│ │ ├─ Read correlation matrices from HDF5 groups │
│ │ └─ Extract q-vectors, phi angles │
│ ├─ _select_optimal_wavevector() → closest to config q │
│ └─ _get_selected_indices() → q-variants near target │
│ │
│ 3. Post-processing │
│ ├─ _reconstruct_full_matrix() → half-triangle to symmetric │
│ ├─ _correct_diagonal() or _correct_diagonal_batch() │
│ │ └─ Optional: _correct_diagonal_batch_jax() (JIT path) │
│ ├─ _apply_frame_slicing_to_selected_q() → [start:end+1] │
│ └─ _calculate_time_arrays() → t1, t2 from dt and n_frames │
│ │
│ 4. Optional stages │
│ ├─ _integrate_with_phi_filtering() → angle selection │
│ ├─ _apply_preprocessing_pipeline() → PreprocessingPipeline │
│ └─ _initialize_quality_control() → DataQualityController │
│ │
│ 5. Cache and return │
│ ├─ _save_to_cache() → NPZ with metadata │
│ ├─ _validate_loaded_data() → shape/dtype checks │
│ └─ Return data dict │
└───────────────────────────────────────────────────────────────────────────┘
Convenience Function: load_xpcs_data()¶
# Module-level convenience wrapper — avoids constructing XPCSDataLoader directly
from homodyne.data import load_xpcs_data
data = load_xpcs_data(
config_path: str | None = None,
config_dict: dict | None = None,
)
# Equivalent to: XPCSDataLoader(...).load_experimental_data()
# Preferred for one-shot scripts; XPCSDataLoader for repeated loads (caches state)
Return Data Structure¶
data = loader.load_experimental_data()
# Returns:
{
"wavevector_q_list": np.ndarray, # (n_q,) - selected q vectors [1/A]
"phi_angles_list": np.ndarray, # (n_phi,) - angles [degrees]
"t1": np.ndarray, # (n_time,) - [0, dt, 2dt, ..., (n-1)*dt]
"t2": np.ndarray, # (n_time,) - [0, dt, 2dt, ..., (n-1)*dt]
"c2_exp": np.ndarray, # (n_phi, n_time, n_time) - correlation
}
Key properties:
t1,t2are always 1D arrays (no 2D meshgrids)c2_expis symmetric, diagonal-correctedFrame slicing:
data[start_frame-1 : end_frame](config is 1-indexed)Time arrays start from 0:
[0, dt, 2*dt, ..., (n_frames-1)*dt]
3. HDF5 Format Detection & Loading¶
File: data/xpcs_loader.py
Format Detection¶
┌───────────────────────────────────────────────────────────────────────────┐
│ _detect_format() [xpcs_loader.py] │
│ │
│ Inspects HDF5 file structure to determine format: │
│ │
│ ┌─ "aps_old": Legacy APS format │
│ │ • Groups: /exchange/C2T_all or similar │
│ │ • Half-triangle storage (upper-tri only) │
│ │ • Multiple q-vectors in single file │
│ │ • Phi angles extracted from group names (regex) │
│ │ │
│ └─ "aps_u": Modern APS Unified format │
│ • Groups: /xpcs/... │
│ • Full matrix storage │
│ • Standardized metadata attributes │
│ │
│ Raises XPCSDataFormatError if format unrecognized │
└───────────────────────────────────────────────────────────────────────────┘
Half-Triangle Reconstruction¶
┌───────────────────────────────────────────────────────────────────────────┐
│ _reconstruct_full_matrix() [xpcs_loader.py] │
│ │
│ APS old format stores only upper triangle of C2(t1, t2): │
│ │
│ Input: Half-triangle array (flattened or upper-tri) │
│ Output: Full symmetric matrix │
│ │
│ ┌─ ─ ─ ─ ─┐ ┌─────────┐ │
│ │ * * * * *│ │ a b c d e│ │
│ │ * * * *│ → │ b f g h i│ │
│ │ * * *│ │ c g j k l│ │
│ │ * *│ │ d h k m n│ │
│ │ *│ │ e i l n o│ │
│ └─ ─ ─ ─ ─┘ └─────────┘ │
│ (upper-tri) (full symmetric) │
│ │
│ C2(t1, t2) = C2(t2, t1) by time-reversal symmetry │
└───────────────────────────────────────────────────────────────────────────┘
Diagonal Correction¶
┌───────────────────────────────────────────────────────────────────────────┐
│ Diagonal Correction (mandatory since v2.14.2) │
│ │
│ The diagonal C2(t, t) contains autocorrelation peaks that are │
│ physically distinct from the off-diagonal correlation signal. │
│ │
│ Method: Replace diagonal with interpolated off-diagonal values │
│ │
│ Three implementations: │
│ 1. _correct_diagonal() - Single matrix, NumPy │
│ 2. _correct_diagonal_batch() - Batch of matrices, NumPy │
│ 3. _correct_diagonal_batch_jax() - Batch, JIT-compiled (if available) │
│ │
│ Applied POST-LOAD to ensure cached + fresh data receive │
│ uniform treatment │
└───────────────────────────────────────────────────────────────────────────┘
Q-Vector Selection¶
┌───────────────────────────────────────────────────────────────────────────┐
│ Q-Vector Selection [xpcs_loader.py] │
│ │
│ 1. _select_optimal_wavevector(): │
│ • Finds q closest to config.wavevector_q │
│ • Selects nearby q-variants (same phi angles) │
│ │
│ 2. _get_selected_indices(): │
│ • Returns indices for all q-vector variants near target │
│ • Typically ~23 entries (different phi angles at same |q|) │
│ │
│ Output: phi_angles_list (n_phi,), c2_exp (n_phi, n_time, n_time) │
└───────────────────────────────────────────────────────────────────────────┘
4. Data Filtering¶
Files: data/filtering_utils.py, data/angle_filtering.py,
data/phi_filtering.py, data/validators.py
Filtering Pipeline¶
┌───────────────────────────────────────────────────────────────────────────┐
│ Data Filtering Stages │
│ │
│ 1. PHI ANGLE FILTERING (data_filtering.phi_range) │
│ ├─ Standard range: min <= phi <= max │
│ ├─ Wrapped range: phi >= min OR phi <= max (when min > max) │
│ │ Handles ranges crossing ±180 degrees │
│ └─ Returns: filtered phi_angles_list, c2_exp │
│ │
│ 2. Q-RANGE FILTERING (data_filtering.q_range) │
│ └─ Filters wavevector_q_list to [q_min, q_max] │
│ │
│ 3. FRAME-BASED FILTERING (analyzer_parameters) │
│ └─ start_frame, end_frame → slice c2 and time arrays │
│ │
│ 4. QUALITY-BASED FILTERING (optional) │
│ └─ Remove angles with low signal quality │
│ │
│ 5. T=0 EXCLUSION (CLI: _exclude_t0_from_analysis) │
│ └─ Remove first time point to prevent D(t)→∞ for alpha < 0 │
│ │
│ CRITICAL: Phi filtering uses OR logic for wrapped ranges │
│ (phi_min > phi_max means range crosses ±180 degrees) │
└───────────────────────────────────────────────────────────────────────────┘
Validators¶
# data/validators.py - Input validation at I/O boundaries
validate_numeric_range(value, name, min_val, max_val)
validate_array_shape(arr, expected_shape, name)
# Supports wrapped phi ranges (min > max)
5. Preprocessing Pipeline¶
File: data/preprocessing.py (~1153 lines)
PreprocessingPipeline¶
┌───────────────────────────────────────────────────────────────────────────┐
│ PreprocessingPipeline [preprocessing.py] │
│ │
│ Stages (executed in order): │
│ │
│ 1. DIAGONAL_CORRECTION (mandatory) │
│ ├─ basic: Average nearest off-diagonal neighbors │
│ ├─ statistical: Median of nearby off-diagonal elements │
│ └─ interpolation: Interpolate from off-diagonal values │
│ │
│ 2. NORMALIZATION (optional) │
│ ├─ mean: Divide by mean │
│ ├─ min_max: Scale to [0, 1] │
│ └─ z_score: (x - mean) / std │
│ │
│ 3. NOISE_REDUCTION (optional) │
│ ├─ median_filter: Spatial median filter │
│ └─ gaussian_filter: Gaussian smoothing │
│ │
│ 4. FORMAT_STANDARDIZATION │
│ └─ Ensure float64, contiguous memory layout │
│ │
│ 5. OUTPUT_VALIDATION │
│ └─ Shape, dtype, NaN/Inf checks │
│ │
│ Each stage records a TransformationRecord in PreprocessingProvenance │
│ for full audit trail │
└───────────────────────────────────────────────────────────────────────────┘
Provenance Tracking¶
@dataclass
class PreprocessingProvenance:
pipeline_id: str
stages: list[TransformationRecord] # Complete audit trail
input_shape: tuple
output_shape: tuple
timestamp: str
def to_dict(self) -> dict: ... # JSON-serializable
@dataclass
class TransformationRecord:
stage: PreprocessingStage
method: str
parameters: dict
input_shape: tuple
output_shape: tuple
duration_ms: float
6. Quality Control¶
File: data/quality_controller.py (~1646 lines)
DataQualityController¶
┌───────────────────────────────────────────────────────────────────────────┐
│ Quality Control System [quality_controller.py] │
│ │
│ Four validation stages (progressive): │
│ │
│ Stage 1: RAW_DATA │
│ ├─ Shape/dtype validation │
│ ├─ NaN/Inf detection │
│ └─ Basic value range checks │
│ │
│ Stage 2: FILTERED_DATA │
│ ├─ Angle coverage assessment │
│ ├─ Data completeness check │
│ └─ Consistency with raw data │
│ │
│ Stage 3: PREPROCESSED_DATA │
│ ├─ Transformation fidelity assessment │
│ ├─ Preprocessing artifact detection │
│ └─ Statistical distribution checks │
│ │
│ Stage 4: FINAL_DATA │
│ ├─ Comprehensive quality assessment │
│ ├─ Analysis readiness evaluation │
│ └─ Overall quality score computation │
│ │
│ Auto-Repair Strategies: │
│ ├─ NaN replacement (interpolation or zero-fill) │
│ ├─ Infinite value capping │
│ ├─ Negative correlation repair │
│ └─ Scaling issue correction │
│ │
│ QualityLevel enum: NONE, BASIC, STANDARD, COMPREHENSIVE │
│ (validation intensity levels, not quality scores) │
│ Quality score thresholds (0-100 scale): │
│ pass_threshold=50.0, warn_threshold=70.0, excellent_threshold=85.0 │
│ │
│ Output: QualityControlResult per stage + DataQualityReport (optional) │
└───────────────────────────────────────────────────────────────────────────┘
Quality Control Result¶
@dataclass
class QualityControlResult:
stage: QualityControlStage
passed: bool
metrics: QualityMetrics
issues: list[ValidationIssue] = field(default_factory=list)
repairs_applied: list[str] = field(default_factory=list)
recommendations: list[str] = field(default_factory=list)
processing_time: float = 0.0
data_shape_before: tuple | None = None
data_shape_after: tuple | None = None
data_modified: bool = False
QualityLevel Enum¶
class QualityLevel(Enum):
NONE = "none"
BASIC = "basic"
STANDARD = "standard"
COMPREHENSIVE = "comprehensive"
These represent validation intensity levels (how much validation to perform), not
quality scores. Quality scores use thresholds from QualityControlConfig:
pass_threshold=50.0, warn_threshold=70.0, excellent_threshold=85.0 (0-100 scale).
7. Caching & Performance¶
File: data/performance_engine.py (~1502 lines)
Multi-Level Cache¶
┌───────────────────────────────────────────────────────────────────────────┐
│ Caching Strategy [performance_engine.py + xpcs_loader.py] │
│ │
│ Level 1: NPZ File Cache (primary) │
│ ├─ Location: config.cache_file_path or auto-generated │
│ ├─ Format: np.savez_compressed (zlib deflate) │
│ ├─ Metadata: q-vector hash for validity check │
│ └─ _validate_cache_q_vector() → reject stale cache │
│ │
│ Level 2: Memory Cache (PerformanceEngine → MultiLevelCache) │
│ ├─ In-memory LRU cache for repeated accesses │
│ ├─ Thread-safe access with RLock │
│ └─ Automatic eviction by access time (LRU) │
│ │
│ Level 3: Memory-Mapped Files (for large datasets) │
│ ├─ MemoryMapManager for files exceeding available RAM │
│ └─ Lazy loading: only accessed regions loaded │
│ │
│ Cache Invalidation: │
│ ├─ Q-vector mismatch → full reload │
│ ├─ Config change (dt, frames) → full reload │
│ └─ No implicit cache: user controls via cache_file_path │
└───────────────────────────────────────────────────────────────────────────┘
AdaptiveChunker¶
Part of performance_engine.py. Automatically sizes data chunks for streaming
operations based on real-time memory pressure feedback:
AdaptiveChunker
├─ Initial chunk_size = available_memory / (3 * element_size)
├─ Shrinks chunk if MemoryPressureMonitor signals warning/critical
├─ Grows chunk if memory is stable across N consecutive chunks
└─ Tracks ChunkInfo per batch for provenance logging
Async I/O Utilities¶
data/performance_engine.py exposes two async helpers for pipeline overlap:
class PrefetchLoader:
"""Background thread prefetches the next HDF5 batch while current is processed."""
def __init__(self, load_fn: Callable, paths: list[Path], buffer_size: int = 2)
def __iter__(self) -> Iterator[np.ndarray]
def shutdown(self) -> None # Safe to call twice
class AsyncWriter:
"""Writes result arrays to disk in a background thread (non-blocking)."""
def __init__(self, write_fn: Callable, max_queue: int = 4)
def submit(self, data: np.ndarray, path: Path) -> None
def wait_all(self) -> list[Exception] # Returns errors rather than raising
def shutdown(self) -> None
Use PrefetchLoader when loading many HDF5 files sequentially; use AsyncWriter
when saving large NPZ result files without blocking the optimization loop.
Cache NPZ Format¶
# Saved via _save_to_cache() [xpcs_loader.py]
np.savez_compressed(cache_path,
wavevector_q_list=..., # (n_q,)
phi_angles_list=..., # (n_phi,)
t1=..., # (n_time,)
t2=..., # (n_time,)
c2_exp=..., # (n_phi, n_time, n_time)
# Metadata stored as a JSON-encoded scalar (NOT object serialization):
cache_metadata_json=np.asarray(json.dumps(cache_metadata)),
# JSON object with keys:
# config_wavevector_q, actual_wavevector_q,
# q_variance, q_count,
# start_frame, end_frame,
# phi_count, cache_version,
# selective_q_caching
)
# Note: q_vector_hash and dt are NOT stored in the cache NPZ.
Safe Cache Loading (since v2.23.2)¶
_load_from_cache() opens every cache with allow_pickle=False. Cache
files live at config-controlled paths, so deserializing arbitrary Python
objects from them is a code-execution risk. Metadata is therefore read from the
JSON-encoded cache_metadata_json scalar and parsed with json.loads(), never
unpickled.
_load_from_cache(cache_path)
├─ np.load(..., allow_pickle=False) # refuses object arrays
├─ "cache_metadata_json" present?
│ ├─ json.loads(scalar) → dict # malformed JSON → ValueError
│ └─ _validate_cache_q_vector(metadata)
├─ legacy "cache_metadata" object array present? → ValueError (refuse + regenerate)
└─ any data key is object-dtype? → ValueError (refuse + regenerate)
Pre-v2.23.2 caches used a cache_metadata object array; they are now
rejected with a clear error rather than loaded. Delete the stale .npz
and it is regenerated transparently on the next load.
8. Memory Management¶
File: data/memory_manager.py (~1030 lines), data/optimization.py (~971 lines)
AdvancedMemoryManager¶
┌───────────────────────────────────────────────────────────────────────────┐
│ Memory Management [memory_manager.py] │
│ │
│ Dynamic monitoring and optimization of memory usage during data │
│ loading and processing │
│ │
│ Features: │
│ ├─ Real-time memory pressure tracking │
│ ├─ Adaptive chunk sizing based on available memory │
│ ├─ Memory trend analysis (increasing/decreasing/stable) │
│ └─ Automatic garbage collection triggering │
│ │
│ Memory Thresholds (MemoryPressureMonitor defaults): │
│ ├─ Normal: < 75% system RAM │
│ ├─ Warning: 75-90% system RAM (warning_threshold=0.75) │
│ └─ Critical: > 90% system RAM (critical_threshold=0.9) │
└───────────────────────────────────────────────────────────────────────────┘
AdvancedDatasetOptimizer¶
┌───────────────────────────────────────────────────────────────────────────┐
│ Dataset Optimization [optimization.py] │
│ │
│ Size-aware processing strategies: │
│ │
│ DatasetInfo: │
│ ├─ total_size_bytes: Estimated memory footprint │
│ ├─ n_elements: Total array elements │
│ └─ recommended_strategy: "standard" | "chunked" | "memory_mapped" │
│ │
│ ProcessingStrategy selection: │
│ ├─ < 1 GB: Standard (load all into memory) │
│ ├─ 1-4 GB: Chunked (process in segments) │
│ └─ > 4 GB: Memory-mapped (mmap-based access) │
│ │
│ Adaptive chunk sizing: │
│ • Initial chunk = available_memory / (3 * element_size) │
│ • Adjusted based on MemoryManager feedback │
│ • Minimum: 1000 elements per chunk │
└───────────────────────────────────────────────────────────────────────────┘
9. Result Writing (NLSQ)¶
File: io/nlsq_writers.py (~171 lines)
save_nlsq_json_files()¶
def save_nlsq_json_files(
param_dict: dict, # {name: {value, uncertainty}}
analysis_dict: dict, # Method, fit_quality, dataset_info
convergence_dict: dict, # Status, iterations, recovery_actions
output_dir: Path,
) -> None
Writes 3 JSON files:
┌───────────────────────────────────────────────────────────────────────────┐
│ NLSQ JSON Output Files │
│ │
│ 1. parameters.json │
│ ├─ timestamp │
│ ├─ analysis_mode │
│ ├─ chi_squared, reduced_chi_squared │
│ ├─ convergence_status │
│ └─ parameters: {D0: {value, uncertainty}, alpha: {...}, ...} │
│ │
│ 2. analysis_results_nlsq.json │
│ ├─ method: "nlsq" │
│ ├─ fit_quality: {chi_squared, reduced_chi_squared, quality_flag} │
│ ├─ dataset_info: {n_angles, n_time_points, total_data_points, q} │
│ └─ optimization_summary: {status, iterations, execution_time} │
│ │
│ 3. convergence_metrics.json │
│ ├─ convergence: {status, iterations, execution_time, chi_squared} │
│ ├─ recovery_actions: [...] │
│ ├─ quality_flag │
│ └─ device_info │
└───────────────────────────────────────────────────────────────────────────┘
save_nlsq_npz_file()¶
def save_nlsq_npz_file(
phi_angles: np.ndarray, # (n_angles,)
c2_exp: np.ndarray, # (n_angles, n_t1, n_t2)
c2_raw: np.ndarray, # (n_angles, n_t1, n_t2)
c2_scaled: np.ndarray, # (n_angles, n_t1, n_t2)
c2_solver: np.ndarray | None, # Optional solver surface
per_angle_scaling: np.ndarray, # (n_angles, 2) [contrast, offset]
per_angle_scaling_solver: np.ndarray,# (n_angles, 2)
residuals: np.ndarray, # (n_angles, n_t1, n_t2)
residuals_norm: np.ndarray, # (n_angles, n_t1, n_t2)
t1: np.ndarray, # (n_t1,)
t2: np.ndarray, # (n_t2,)
q: float, # Wavevector [1/A]
output_dir: Path,
) -> None
Writes fitted_data.npz with 10-11 compressed arrays:
| Array | Shape | Description | |——-|——-|————-| | phi_angles |
(n_angles,) | Scattering angles | | c2_exp | (n_angles, n_t1, n_t2) | Experimental
correlation data | | c2_theoretical_raw | (n_angles, n_t1, n_t2) | Raw unscaled
theoretical fits | | c2_theoretical_scaled | (n_angles, n_t1, n_t2) | Scaled
theoretical fits | | c2_solver_scaled | (n_angles, n_t1, n_t2) | Optional
solver-evaluated surface | | per_angle_scaling | (n_angles, 2) | Final [contrast,
offset] per angle | | per_angle_scaling_solver | (n_angles, 2) | Original solver
values | | residuals | (n_angles, n_t1, n_t2) | Experimental - scaled | |
residuals_normalized | (n_angles, n_t1, n_t2) | Residuals / (0.05 * experimental) |
| t1 | (n_t1,) | Time array 1 (seconds) | | t2 | (n_t2,) | Time array 2
(seconds) | | q | (1,) | Wavevector magnitude [1/A] |
10. Result Writing (CMC)¶
Files: io/mcmc_writers.py (~639 lines), optimization/cmc/io.py (~430 lines)
mcmc_writers.py (High-Level Dictionaries)¶
create_mcmc_parameters_dict(result: CMCResult) -> dict
create_mcmc_analysis_dict(result: CMCResult, data: dict, method_name: str) -> dict
create_mcmc_diagnostics_dict(result: CMCResult) -> dict
CMC JSON Output Files¶
┌───────────────────────────────────────────────────────────────────────────┐
│ CMC JSON Output Files │
│ │
│ 1. parameters.json │
│ ├─ timestamp, analysis_mode, method │
│ ├─ sampling_summary: {n_samples, n_warmup, n_chains, total, time} │
│ ├─ convergence: {all_converged, min/max_r_hat, min_ess, accept_rate}│
│ └─ parameters: {D0: {mean, std}, alpha: {mean, std}, ...} │
│ │
│ 2. analysis_results_cmc.json │
│ ├─ sampling_quality: {convergence_status, quality_flag} │
│ │ ├─ warnings: ["R-hat between 1.05-1.1"] │
│ │ └─ recommendations: ["Increase n_warmup"] │
│ ├─ dataset_info, sampling_summary │
│ └─ parameter_space, initial_values │
│ │
│ 3. diagnostics.json │
│ ├─ convergence: {r_hat_threshold, ess_threshold} │
│ │ └─ per_parameter_diagnostics: [{name, r_hat, ess, converged}] │
│ ├─ sampling_efficiency: {acceptance_rate, divergences, tree_depth} │
│ └─ cmc_specific: {shard_summary, combination_method, num_shards} │
│ │
│ Quality Thresholds: │
│ R-hat < 1.05: "good" │
│ R-hat 1.05-1.1: "acceptable" + warning │
│ R-hat > 1.1: "poor" + warning │
│ ESS < 400: warning + recommendation │
└───────────────────────────────────────────────────────────────────────────┘
cmc/io.py (Lower-Level CMC I/O)¶
save_samples_npz(result, output_path) # Posterior samples
load_samples_npz(input_path) -> dict # Load samples
samples_to_arviz(samples_data) -> az.InferenceData
save_fitted_data_npz(result, c2_exp, c2_fitted, ...) # Fitted data
save_parameters_json(result, output_path) # Posterior statistics
save_diagnostics_json(result, output_path, ...) # Convergence
save_all_results(result, output_dir, ...) # Orchestrator
samples.npz schema:
| Array | Shape | Description | |——-|——-|————-| | posterior_samples |
(n_chains, n_samples, n_params) | Raw posterior samples | | param_names |
(n_params,) | Parameter names in sampling order | | r_hat | (n_params,) |
Per-parameter R-hat | | ess_bulk | (n_params,) | Bulk effective sample size | |
ess_tail | (n_params,) | Tail effective sample size | | divergences | (1,) |
Total divergent transitions | | analysis_mode | (1,) | “static” or “laminar_flow” |
| n_phi | (1,) | Number of phi angles | | n_chains | (1,) | Number of chains | |
n_samples | (1,) | Samples per chain | | schema_version | (2,) | (1, 0) |
JSON Serialization Safety¶
┌───────────────────────────────────────────────────────────────────────────┐
│ JSON Safety Layer (io/json_utils.py) │
│ │
│ json_safe(value) → recursively sanitize: │
│ NaN → None (JSON null) │
│ Inf → "Infinity" (JSON string) │
│ -Inf → "-Infinity" (JSON string) │
│ ndarray → list (recursive) │
│ int64 → int │
│ float64 → float │
│ │
│ json_serializer(obj) → default handler for json.dump() │
│ Handles: np.ndarray, np.integer, np.floating, Path, datetime │
│ │
│ CRITICAL: All writer functions use json_safe() to prevent │
│ invalid JSON tokens (NaN is not valid JSON) │
└───────────────────────────────────────────────────────────────────────────┘
11. CLI Orchestration¶
File: cli/commands.py (~3361 lines)
Result Saving Flow¶
┌───────────────────────────────────────────────────────────────────────────┐
│ CLI Result Saving Orchestration [cli/commands.py] │
│ │
│ dispatch_command(args) │
│ ├─ ConfigManager(args.config_file) │
│ ├─ XPCSDataLoader(config_dict=config.config) │
│ ├─ data = loader.load_experimental_data() │
│ ├─ _apply_angle_filtering_for_optimization(data, config) │
│ ├─ _exclude_t0_from_analysis(data) │
│ │ │
│ ├─ NLSQ path: │
│ │ ├─ result = fit_nlsq_jax(data, config) │
│ │ └─ save_nlsq_results(result, data, config, output_dir) │
│ │ ├─ _extract_nlsq_metadata(config, data) │
│ │ │ └─ Extract: L, dt, q (multi-level fallback) │
│ │ ├─ _prepare_parameter_data(result, mode, n_angles) │
│ │ │ └─ Per-angle scaling detection + legacy format │
│ │ ├─ compute_theoretical_fits() → c2_raw, c2_scaled │
│ │ ├─ save_nlsq_json_files() → 3 JSON files │
│ │ ├─ save_nlsq_npz_file() → fitted_data.npz │
│ │ └─ generate_nlsq_plots() → PNG heatmaps │
│ │ │
│ └─ CMC path: │
│ ├─ nlsq_result = fit_nlsq_jax() (warm-start, unless disabled) │
│ ├─ cmc_result = fit_mcmc_jax(data, config, nlsq_result=...) │
│ └─ save_mcmc_results(result, data, config, output_dir) │
│ ├─ create_mcmc_parameters_dict(result) → parameters.json │
│ ├─ create_mcmc_analysis_dict() → analysis_results_cmc.json │
│ ├─ create_mcmc_diagnostics_dict() → diagnostics.json │
│ ├─ _compute_theoretical_c2_from_mcmc() │
│ │ └─ Per-angle lstsq fitting from posterior means │
│ ├─ save_samples_npz() → samples.npz │
│ ├─ save_fitted_data_npz() → fitted_data.npz │
│ └─ generate_nlsq_plots() → PNG heatmaps (reused) │
└───────────────────────────────────────────────────────────────────────────┘
T=0 Exclusion¶
┌───────────────────────────────────────────────────────────────────────────┐
│ _exclude_t0_from_analysis() [cli/commands.py] │
│ │
│ Physics reason: D(t) = D0 * t^alpha → infinity as t → 0 for alpha < 0 │
│ │
│ Removes first time point from all arrays: │
│ t1[1:], t2[1:], c2_exp[:, 1:, 1:] │
│ │
│ Applied in CLI after loading, before optimization │
└───────────────────────────────────────────────────────────────────────────┘
Metadata Extraction¶
┌───────────────────────────────────────────────────────────────────────────┐
│ _extract_nlsq_metadata(config, data) [cli/commands.py] │
│ │
│ Extracts physics constants with multi-level fallback: │
│ │
│ L (gap length): │
│ 1. config.stator_rotor_gap (flat attribute; actual access is via │
│ nested dict: │
│ config_dict.get("analyzer_parameters", {}).get("geometry", {})) │
│ 2. config.sample_detector_distance │
│ 3. Default: 2000000.0 A │
│ │
│ dt (time step): │
│ 1. config.analyzer_parameters.dt │
│ 2. config.experimental_data.dt │
│ 3. None (inferred from data) │
│ │
│ q (wavevector): │
│ 1. data['wavevector_q_list'][0] │
│ │
│ Returns: {L, dt, q} │
└───────────────────────────────────────────────────────────────────────────┘
Output Directory Structure¶
homodyne_results/
├── nlsq/
│ ├── parameters.json # Parameter values + uncertainties
│ ├── analysis_results_nlsq.json # Fit quality + dataset info
│ ├── convergence_metrics.json # Convergence diagnostics
│ ├── fitted_data.npz # Experimental + theoretical arrays
│ └── c2_heatmaps_phi_*.png # Heatmap plots per angle
│
├── cmc/
│ ├── parameters.json # Posterior mean +/- std
│ ├── analysis_results_cmc.json # Sampling quality + diagnostics
│ ├── diagnostics.json # Convergence metrics
│ ├── samples.npz # Posterior samples (ArviZ-compatible)
│ ├── fitted_data.npz # Experimental + theoretical arrays
│ └── c2_heatmaps_phi_*.png # Heatmap plots per angle
│
└── homodyne_results.{json|yaml|npz} # Legacy format (backward compat)
Complete Data Flow¶
YAML Config HDF5 Data File
│ │
▼ ▼
ConfigManager XPCSDataLoader
├─ load_config() ├─ _detect_format() → "aps_old"|"aps_u"
├─ _normalize_schema() ├─ _load_aps_old_format()
├─ _normalize_analysis_mode() │ ├─ Read correlation matrices
└─ _validate_config() │ └─ Extract q-vectors, phi angles
│ ├─ _select_optimal_wavevector()
▼ ├─ _reconstruct_full_matrix()
config.config dict ├─ _correct_diagonal_batch()
├─ analyzer_parameters ├─ _apply_frame_slicing_to_selected_q()
├─ experimental_data └─ _calculate_time_arrays()
├─ parameter_space │
├─ initial_parameters ▼
└─ data_filtering {wavevector_q_list, phi_angles_list,
│ t1, t2, c2_exp}
▼ │
ParameterManager ▼
├─ get_parameter_bounds() _apply_angle_filtering_for_optimization()
├─ get_active_parameters() │
└─ get_optimizable_parameters() ▼
│ _exclude_t0_from_analysis()
│ │
└───────────┬───────────────────────┘
│
▼
NLSQ Path / CMC Path
│
▼
fit_nlsq_jax() / fit_mcmc_jax()
├─ HomodyneModel (for NLSQ)
├─ ParameterSpace (for CMC)
└─ Optimization
│
▼
OptimizationResult / CMCResult
│
▼
save_nlsq_results() / save_mcmc_results()
├─ _extract_nlsq_metadata() → {L, dt, q}
├─ _prepare_parameter_data() → {param: {value, unc}}
├─ compute_theoretical_fits() → c2 surfaces
├─ save_*_json_files() → JSON
├─ save_*_npz_file() → NPZ
└─ generate_*_plots() → PNG
Quick Reference Tables¶
Data Shapes at Each Stage¶
| Stage | wavevector_q_list | phi_angles_list | t1 | t2 | c2_exp | |——-|——————-|—————–|—-|—-|——–| | Raw loaded | (n_q,) | (n_phi,) | (n_time,) | (n_time,) | (n_phi, n_time, n_time) | | After phi filter | (n_q,) | (n_selected,) | (n_time,) | (n_time,) | (n_selected, n_time, n_time) | | After t=0 excl | (n_q,) | (n_selected,) | (n_time-1,) | (n_time-1,) | (n_selected, n_time-1, n_time-1) |
Configuration Defaults¶
| Parameter | Section | Default | Description | |———–|———|———|————-| | dt | analyzer_parameters | (required) | Time step in seconds | | start_frame | analyzer_parameters | 1 | First frame (1-indexed) | | end_frame | analyzer_parameters | -1 | Last frame (-1 = all) | | apply_diagonal_correction | experimental_data | true | Diagonal correction | | phi_range.min | data_filtering | -180 | Minimum phi angle | | phi_range.max | data_filtering | 180 | Maximum phi angle | | data_filtering.enabled | data_filtering | true | Enable filtering |
Error Types¶
| Exception | Module | Raised When | |———–|——–|————-| |
XPCSConfigurationError | data/xpcs_loader | Invalid config structure | |
XPCSDependencyError | data/xpcs_loader | Missing numpy/h5py | | XPCSDataFormatError
| data/xpcs_loader | Unrecognized HDF5 format | | PreprocessingError |
data/preprocessing | Preprocessing stage failure | | PreprocessingConfigurationError |
data/preprocessing | Invalid preprocessing config | | ValueError | config/manager |
Invalid config values | | OSError | io/nlsq_writers | File write failure |
JSON Output Summary¶
| File | Method | Size | Key Contents | |——|——–|——|————-| | parameters.json (NLSQ) | save_nlsq_json_files | ~2 KB | {value, uncertainty} per param | | parameters.json (CMC) | create_mcmc_parameters_dict | ~3 KB | {mean, std} per param | | analysis_results_.json | save_nlsq/create_mcmc_analysis | ~3 KB | fit_quality, dataset_info | | convergence_metrics.json | save_nlsq_json_files | ~2 KB | convergence status, recovery | | diagnostics.json (CMC) | create_mcmc_diagnostics_dict | ~5 KB | per-param R-hat, ESS, shards | | fitted_data.npz | save__npz_file | 50 KB-500 MB | exp + theoretical + residuals | | samples.npz (CMC) | save_samples_npz | 1-100 MB | posterior (chains x samples x params) |
Key Files Reference¶
Data Loading (homodyne/data/)¶
File |
Lines |
Purpose |
|---|---|---|
xpcs_loader.py |
~2107 |
XPCSDataLoader + |
config.py |
~752 |
YAML/JSON config loading, |
filtering_utils.py |
~613 |
Q-range, phi, quality, and frame-based filtering |
preprocessing.py |
~1153 |
Multi-stage preprocessing pipeline with provenance |
quality_controller.py |
~1646 |
Progressive quality control with auto-repair |
validation.py |
~1115 |
Data quality validation (NaN, shape, range checks) |
performance_engine.py |
~1502 |
Multi-level caching (MultiLevelCache), AdaptiveChunker, PrefetchLoader, AsyncWriter |
memory_manager.py |
~1030 |
Dynamic memory monitoring and pressure management |
optimization.py |
~971 |
Size-aware processing strategies (standard/chunked/mmap) |
angle_filtering.py |
~413 |
Angle normalization and filtering utilities |
phi_filtering.py |
~385 |
Vectorized phi angle filtering |
validators.py |
~296 |
Input validation at I/O boundaries |
types.py |
~44 |
Shared data types (prevents circular imports) |
Configuration (homodyne/config/)¶
| File | Lines | Purpose | |——|——-|———| | manager.py | ~1296 | ConfigManager: YAML loading, section access, CMC config | | parameter_space.py | ~895 | ParameterSpace: bounds + priors for MCMC | | parameter_manager.py | ~809 | ParameterManager: centralized bounds and validation | | parameter_registry.py | ~632 | ParameterRegistry: singleton for parameter metadata | | types.py | ~522 | TypedDict definitions for config structures | | parameter_names.py | ~315 | Parameter name constants and mappings | | physics_validators.py | ~286 | Physics constraint validation |
Result Writing (homodyne/io/)¶
| File | Lines | Purpose | |——|——-|———| | mcmc_writers.py | ~639 | CMC result dict creation (parameters, analysis, diagnostics) | | nlsq_writers.py | ~171 | NLSQ result saving (3 JSON + 1 NPZ) | | json_utils.py | ~114 | NaN/Inf-safe JSON serialization |
CMC-Specific I/O (homodyne/optimization/cmc/)¶
| File | Lines | Purpose | |——|——-|———| | io.py | ~430 | CMC samples NPZ, fitted data NPZ, save_all_results orchestrator |