Batch Processing Multiple Datasets

Learning Objectives

By the end of this section you will understand:

  • How to process multiple HDF5 files in sequence

  • Script patterns for batch NLSQ and CMC analysis

  • How to aggregate results across datasets

  • Strategies for parallel processing on HPC clusters

Overview

XPCS experiments often produce many datasets: multiple samples, multiple q-values, or multiple time points. Homodyne provides no built-in batch runner, but the Python API makes it straightforward to build one.

Sequential Batch Script

A minimal script to process a list of HDF5 files:

"""
batch_nlsq.py — Process multiple XPCS files with NLSQ.
Usage: uv run python batch_nlsq.py --config base_config.yaml --files data/*.h5
"""

import argparse
import json
from pathlib import Path
import numpy as np

from homodyne.config import ConfigManager
from homodyne.data import load_xpcs_data, XPCSDataLoader, XPCSDataFormatError
from homodyne.optimization.nlsq import fit_nlsq_jax
from homodyne.utils.logging import get_logger

logger = get_logger(__name__)


def process_file(h5_path: Path, base_config_path: Path, output_dir: Path):
    """Process one HDF5 file and save results."""
    stem = h5_path.stem

    # Override file_path in config for this specific file
    config = ConfigManager(str(base_config_path))
    config.data.file_path = str(h5_path)

    try:
        data = load_xpcs_data(config=config)
    except XPCSDataFormatError as e:
        logger.error(f"Data loading failed for {h5_path.name}: {e}")
        return None

    result = fit_nlsq_jax(data, config)

    if result.success:
        # Build result dictionary
        result_dict = {
            "file": str(h5_path),
            "convergence_status": result.convergence_status,
            "reduced_chi_squared": result.reduced_chi_squared,
            "parameters": result.parameters.tolist(),
            "uncertainties": result.uncertainties.tolist(),
            "execution_time": result.execution_time,
        }
        # Save to JSON
        out_json = output_dir / f"{stem}_nlsq.json"
        with open(out_json, "w") as f:
            json.dump(result_dict, f, indent=2)
        logger.info(f"Saved {out_json}")
        return result_dict
    else:
        logger.warning(f"Fit failed for {h5_path.name}: {result.message}")
        return None


def main():
    parser = argparse.ArgumentParser(description="Batch NLSQ processing")
    parser.add_argument("--config", required=True, help="Base YAML config")
    parser.add_argument("--files", nargs="+", required=True, help="HDF5 files")
    parser.add_argument("--output", default="./results", help="Output directory")
    args = parser.parse_args()

    output_dir = Path(args.output)
    output_dir.mkdir(parents=True, exist_ok=True)

    results = []
    for h5_file in args.files:
        h5_path = Path(h5_file)
        logger.info(f"Processing {h5_path.name}")
        result = process_file(h5_path, Path(args.config), output_dir)
        if result:
            results.append(result)

    # Save aggregate summary
    summary_path = output_dir / "batch_summary.json"
    with open(summary_path, "w") as f:
        json.dump(results, f, indent=2)
    logger.info(f"Batch complete: {len(results)}/{len(args.files)} succeeded")
    logger.info(f"Summary saved to {summary_path}")


if __name__ == "__main__":
    main()

Run the batch script:

uv run python batch_nlsq.py \
  --config base_config.yaml \
  --files data/sample_*.h5 \
  --output results/

Multiple q-Values

If each HDF5 file contains data at multiple q-values, iterate over q:

import numpy as np
from homodyne.config import ConfigManager
from homodyne.data import load_xpcs_data
from homodyne.optimization.nlsq import fit_nlsq_jax

q_values = [0.020, 0.030, 0.054, 0.080, 0.110]  # Å⁻¹

results_by_q = {}
for q in q_values:
    config = ConfigManager("config_template.yaml")
    config.data.q_value = q

    data = load_xpcs_data(config=config)
    result = fit_nlsq_jax(data, config)

    results_by_q[q] = {
        "D0": result.parameters[0],
        "D0_err": result.uncertainties[0],
        "chi2_nu": result.reduced_chi_squared,
    }

# Check q-dependence of D0 (should be constant if Stokes-Einstein applies)
for q, res in sorted(results_by_q.items()):
    print(f"q={q:.3f}: D0 = {res['D0']:.2f} ± {res['D0_err']:.2f}")

Result Aggregation

After batch processing, aggregate results for analysis:

import json
import numpy as np
from pathlib import Path

results_dir = Path("results/")
all_results = []

for json_file in sorted(results_dir.glob("*_nlsq.json")):
    with open(json_file) as f:
        all_results.append(json.load(f))

# Extract D0 values across samples
D0_values = np.array([r['parameters'][0] for r in all_results
                       if r['convergence_status'] == 'converged'])
D0_errors = np.array([r['uncertainties'][0] for r in all_results
                       if r['convergence_status'] == 'converged'])

print(f"D0 mean:   {D0_values.mean():.2f} Ų/s")
print(f"D0 std:    {D0_values.std():.2f} Ų/s")
print(f"D0 range:  [{D0_values.min():.2f}, {D0_values.max():.2f}] Ų/s")

HPC Parallel Processing

On HPC clusters, use job arrays to process files in parallel:

SLURM job array example (batch_slurm.sh):

#!/bin/bash
#SBATCH --job-name=homodyne_batch
#SBATCH --array=0-99           # 100 files, one per job
#SBATCH --cpus-per-task=8
#SBATCH --mem=32G
#SBATCH --time=02:00:00
#SBATCH --output=logs/job_%A_%a.out

# Get file for this array index
FILE_LIST=files.txt              # One file path per line
FILE=$(sed -n "$((SLURM_ARRAY_TASK_ID + 1))p" "$FILE_LIST")

echo "Processing: $FILE"
uv run python process_one.py \
  --config config.yaml \
  --input "$FILE" \
  --output "results/job_${SLURM_ARRAY_TASK_ID}/"

process_one.py:

"""process_one.py — Single file processor for SLURM array jobs."""

import argparse
import json
from pathlib import Path

from homodyne.config import ConfigManager
from homodyne.data import load_xpcs_data
from homodyne.optimization.nlsq import fit_nlsq_jax

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--config")
    parser.add_argument("--input")
    parser.add_argument("--output")
    args = parser.parse_args()

    output_dir = Path(args.output)
    output_dir.mkdir(parents=True, exist_ok=True)

    config = ConfigManager(args.config)
    config.data.file_path = args.input

    data = load_xpcs_data(config=config)
    result = fit_nlsq_jax(data, config)

    result_dict = {
        "file": args.input,
        "parameters": result.parameters.tolist(),
        "uncertainties": result.uncertainties.tolist(),
        "chi2_nu": result.reduced_chi_squared,
        "converged": result.success,
    }

    out_path = output_dir / "result.json"
    with open(out_path, "w") as f:
        json.dump(result_dict, f, indent=2)

if __name__ == "__main__":
    main()

Collecting SLURM Results

After all array jobs complete, collect results:

import json
from pathlib import Path
import numpy as np

results = []
for job_dir in sorted(Path("results").glob("job_*")):
    result_file = job_dir / "result.json"
    if result_file.exists():
        with open(result_file) as f:
            results.append(json.load(f))

converged = [r for r in results if r['converged']]
print(f"Converged: {len(converged)}/{len(results)}")

D0_array = np.array([r['parameters'][0] for r in converged])
print(f"D0: {D0_array.mean():.2f} ± {D0_array.std():.2f} Ų/s")

See Also