Utilities

This page documents the cross-cutting utility modules used throughout the Homodyne package: structured logging, CPU/NUMA detection, and path validation.


homodyne.utils.logging

Lightweight structured logging system built on Python’s standard logging module. Provides contextual log prefixes, configurable handlers (console + rotating file), and helpers for performance monitoring and call tracing.

Key exports:

Symbol

Purpose

get_logger(name)

Returns a logger (or context-aware LoggerAdapter) for name

log_phase(name)

Context manager that logs entry/exit and elapsed time for a phase

log_performance(threshold)

Decorator that logs execution time when it exceeds threshold seconds

log_calls

Decorator that logs every function call with arguments

log_exception(logger, exc)

Logs exception with full traceback at ERROR level

with_context(**kv)

Context manager that attaches key–value context to all log messages

LogConfiguration

Dataclass for configuring the global logging setup from CLI args

Structured logging utilities for the homodyne package.

Provides a lightweight but flexible logging system that matches the CMC reimplementation requirements: contextual log prefixes, configurable console and rotating file handlers, and helpers for performance monitoring.

class homodyne.utils.logging.LogConfiguration[source]

Bases: object

Programmatic logging configuration.

Alternative to configure_logging() for programmatic control over logging settings.

console_level

Console log level (default “INFO”).

console_format

Console format (“simple” or “detailed”).

console_colors

Enable ANSI colors in console (default False).

file_enabled

Enable file logging (default True).

file_path

Log file path (None = auto-generate).

file_level

File log level (default “DEBUG”).

file_format

File format (“simple” or “detailed”).

file_rotation_mb

Max file size before rotation (default 10).

file_backup_count

Number of backup files to keep (default 5).

module_overrides

Per-module log level overrides.

Example

>>> config = LogConfiguration.from_cli_args(verbose=True, log_file="analysis.log")
>>> config.apply()
>>> config = LogConfiguration(
...     console_level="INFO",
...     file_level="DEBUG",
...     module_overrides={"jax": "WARNING", "homodyne.optimization": "DEBUG"}
... )
>>> config.apply()
console_level: str = 'INFO'
console_format: str = 'simple'
console_colors: bool = False
file_enabled: bool = True
file_path: str | Path | None = None
file_level: str = 'DEBUG'
file_format: str = 'detailed'
file_rotation_mb: int = 10
file_backup_count: int = 5
module_overrides: dict[str, str]
apply()[source]

Apply this configuration to the logging system.

Return type:

Path | None

Returns:

Path to log file if file logging is enabled, None otherwise.

classmethod from_dict(config)[source]

Create configuration from dictionary.

Parameters:

config (dict[str, Any]) – Dictionary with configuration values.

Return type:

LogConfiguration

Returns:

LogConfiguration instance.

classmethod from_cli_args(verbose=False, quiet=False, log_file=None)[source]

Create configuration from CLI flags.

Parameters:
  • verbose (bool) – Enable DEBUG level console logging.

  • quiet (bool) – Enable ERROR-only console logging.

  • log_file (str | None) – Path to log file (None = auto-generate if file logging enabled).

Return type:

LogConfiguration

Returns:

LogConfiguration instance.

__init__(console_level='INFO', console_format='simple', console_colors=False, file_enabled=True, file_path=None, file_level='DEBUG', file_format='detailed', file_rotation_mb=10, file_backup_count=5, module_overrides=<factory>)
class homodyne.utils.logging.AnalysisSummaryLogger[source]

Bases: object

Structured logging for analysis completion summaries.

Tracks phase timings, metrics, output files, and convergence status for logging a structured summary at analysis completion.

Example

>>> summary = AnalysisSummaryLogger(run_id="analysis_001", analysis_mode="laminar_flow")
>>> summary.start_phase("loading")
>>> data = load_data(config)
>>> summary.end_phase("loading", memory_peak_gb=2.1)
>>> summary.record_metric("chi_squared", result.chi_squared)
>>> summary.set_convergence_status("converged")
>>> summary.log_summary(logger)
__init__(run_id, analysis_mode)[source]

Initialize summary logger for an analysis run.

Parameters:
  • run_id (str) – Unique identifier for this analysis run.

  • analysis_mode (str) – Analysis mode (e.g., “static_isotropic”, “laminar_flow”).

start_phase(name)[source]

Mark phase start for timing.

Parameters:

name (str) – Phase name (e.g., “loading”, “optimization”).

Return type:

None

end_phase(name, memory_peak_gb=None)[source]

Mark phase completion.

Parameters:
  • name (str) – Phase name that was started.

  • memory_peak_gb (float | None) – Optional peak memory usage during phase.

Return type:

None

record_metric(name, value)[source]

Record a named metric (e.g., chi_squared).

Parameters:
  • name (str) – Metric name.

  • value (float) – Metric value.

Return type:

None

add_output_file(path)[source]

Record an output file path.

Parameters:

path (Path | str) – Path to output file.

Return type:

None

set_convergence_status(status)[source]

Set final convergence status.

Parameters:

status (str) – Convergence status (e.g., “converged”, “max_iter”, “failed”).

Return type:

None

increment_warning_count()[source]

Increment warning counter.

Return type:

None

increment_error_count()[source]

Increment error counter.

Return type:

None

set_config_summary(optimizer=None, n_params=None, n_data_points=None, n_phi_angles=None, data_file=None, **kwargs)[source]

T054: Set configuration summary for logging.

Parameters:
  • optimizer (str | None) – Optimizer used (e.g., “nlsq”, “cmc”).

  • n_params (int | None) – Number of parameters being optimized.

  • n_data_points (int | None) – Total number of data points.

  • n_phi_angles (int | None) – Number of phi angles.

  • data_file (str | None) – Path to data file.

  • **kwargs (Any) – Additional key-value pairs to include.

Return type:

None

log_summary(logger)[source]

Log the complete analysis summary.

Parameters:

logger (Logger | LoggerAdapter) – Logger to use for output.

Return type:

None

as_dict()[source]

Export summary as dictionary for JSON serialization.

Return type:

dict[str, Any]

Returns:

Dictionary with all summary data.

class homodyne.utils.logging.MinimalLogger[source]

Bases: object

Configurable logger manager for the homodyne package.

Thread-safe singleton for managing homodyne logging configuration.

static __new__(cls)[source]
Return type:

MinimalLogger

__init__()[source]
configure(level='INFO', *, console_level=None, console_format='detailed', console_colors=False, file_path=None, file_level=None, max_size_mb=10, backup_count=5, module_levels=None, force=False)[source]

Configure homodyne logging.

Thread-safe configuration of the logging system. Returns the file path if a file handler is created.

Return type:

Path | None

configure_from_dict(logging_config, *, verbose=False, quiet=False, output_dir=None, run_id=None)[source]

Configure logging from a logging: config section.

Return type:

Path | None

get_logger(name)[source]

Get or create a logger with hierarchical naming.

Return type:

Logger

homodyne.utils.logging.configure_logging(logging_config, *, verbose=False, quiet=False, output_dir=None, run_id=None)[source]

Public helper to configure logging from config + CLI flags.

Return type:

Path | None

homodyne.utils.logging.get_logger(name=None, *, context=None)[source]

Get a logger instance with automatic naming and optional context.

Return type:

Logger | LoggerAdapter[Logger]

homodyne.utils.logging.with_context(logger, **context)[source]

Create a contextual logger with key-value prefixes.

Context is formatted as [key=value][key2=value2] message. Nested calls merge contexts (inner overrides outer on key conflicts). Thread-safe for use in multiprocessing.

Parameters:
  • logger (Logger | LoggerAdapter[Logger]) – Base logger or existing contextual adapter to wrap.

  • **context (Any) – Key-value pairs to include as prefix.

Return type:

LoggerAdapter[Logger]

Returns:

A logger adapter that prefixes all messages with context.

Example

>>> logger = get_logger(__name__)
>>> ctx_logger = with_context(logger, run_id="abc123", mode="laminar_flow")
>>> ctx_logger.info("Starting analysis")
# Output: [run_id=abc123 mode=laminar_flow] Starting analysis
>>> # Nested context
>>> shard_logger = with_context(ctx_logger, shard=5)
>>> shard_logger.info("Processing shard")
# Output: [run_id=abc123 mode=laminar_flow shard=5] Processing shard
class homodyne.utils.logging.PhaseContext[source]

Bases: object

Context object returned by log_phase() with timing and memory info.

name: str
duration: float = 0.0
memory_peak_gb: float | None = None
memory_delta_gb: float | None = None
__init__(name, duration=0.0, memory_peak_gb=None, memory_delta_gb=None)
homodyne.utils.logging.log_phase(name, logger=None, level=20, track_memory=False, threshold_s=0.0)[source]

Context manager for phase-level timing with optional memory tracking.

Parameters:
  • name (str) – Phase name for logging.

  • logger (Logger | LoggerAdapter[Logger] | None) – Logger to use. If None, uses module logger.

  • level (int) – Log level for phase messages.

  • track_memory (bool) – Track memory usage during phase.

  • threshold_s (float) – Only log if duration > threshold (0 = always log).

Yields:

PhaseContext with name, duration, memory_peak_gb, memory_delta_gb. Duration and memory values are populated after the context exits.

Example

>>> with log_phase("optimization", track_memory=True) as phase:
...     result = run_optimization(data)
>>> print(f"Took {phase.duration:.1f}s")
# Logs: Phase 'optimization' completed in 45.3s (peak memory: 12.4 GB)
homodyne.utils.logging.log_exception(logger, exc, context=None, level=logging.ERROR, include_traceback=True)[source]

Log an exception with full context for debugging.

Extracts module, function, and line number from exception traceback. Formats context as key-value pairs in the message.

Parameters:
Return type:

None

Example

>>> try:
...     result = compute_jacobian(params)
... except ValueError as e:
...     log_exception(logger, e, context={
...         "iteration": 45,
...         "params": params.tolist()[:5]
...     })
...     raise
# Logs:
# ERROR | homodyne.optimization.nlsq.core | Exception in compute_jacobian:
# ValueError: invalid value
# Context: iteration=45, params=[1.2e-11, 0.85, ...]
# Traceback (most recent call last):
#   ...
homodyne.utils.logging.log_calls(logger=None, level=logging.DEBUG, include_args=False, include_result=False)[source]

Decorator to log function calls.

Parameters:
  • logger (Logger | LoggerAdapter[Logger] | None) – Logger to use. If None, creates one for the module.

  • level (int) – Logging level to use.

  • include_args (bool) – Whether to log function arguments.

  • include_result (bool) – Whether to log function return value.

Return type:

Callable[[TypeVar(F, bound= Callable[..., Any])], TypeVar(F, bound= Callable[..., Any])]

homodyne.utils.logging.log_performance(logger=None, level=logging.INFO, threshold=0.1)[source]

Decorator to log function performance.

Parameters:
  • logger (Logger | LoggerAdapter[Logger] | None) – Logger to use. If None, creates one for the module.

  • level (int) – Logging level to use.

  • threshold (float) – Minimum duration (seconds) to log.

Return type:

Callable[[TypeVar(F, bound= Callable[..., Any])], TypeVar(F, bound= Callable[..., Any])]

homodyne.utils.logging.log_operation(operation_name, logger=None, level=20)[source]

Context manager for logging operations.

Parameters:
  • operation_name (str) – Name of the operation.

  • logger (Logger | LoggerAdapter[Logger] | None) – Logger to use. If None, creates one for caller’s module.

  • level (int) – Logging level to use.

Return type:

Generator[Logger | LoggerAdapter[Logger]]

Usage Examples

Basic logger

from homodyne.utils.logging import get_logger

logger = get_logger(__name__)

logger.info("Starting analysis")
logger.debug("Parameter D0=%.4g", d0)
logger.warning("High divergence rate: %.1f%%", rate * 100)
logger.error("Solver failed after %d attempts", n_attempts)

Phase timing

from homodyne.utils.logging import get_logger, log_phase

logger = get_logger(__name__)

with log_phase("NLSQ optimisation"):
    result = fit_nlsq_jax(data, config)
# Logs: "NLSQ optimisation complete in 12.34 s"

Performance decorator

from homodyne.utils.logging import log_performance

@log_performance(threshold=0.5)   # log if call takes > 0.5 s
def compute_jacobian(params):
    ...

Contextual logging (CMC shard)

from homodyne.utils.logging import get_logger, with_context

logger = get_logger(__name__)

with with_context(shard_id=42, n_pts=5000):
    logger.info("Starting NUTS sampling")
    # Logs: "[shard_id=42 n_pts=5000] Starting NUTS sampling"

Exception logging

from homodyne.utils.logging import get_logger, log_exception

logger = get_logger(__name__)

try:
    result = risky_operation()
except Exception as exc:
    log_exception(logger, exc)
    raise

CLI log configuration

from homodyne.utils.logging import LogConfiguration

# Configured from --verbose / --quiet flags
log_cfg = LogConfiguration.from_cli_args(args)
log_cfg.apply()

homodyne.device

CPU architecture detection and JAX/XLA configuration utilities for HPC environments. Detects physical and logical core counts, NUMA topology, and processor architecture (Intel/AMD) to inform thread and device allocation.

detect_cpu_info

homodyne.device.cpu.detect_cpu_info()[source]

Detect CPU architecture and capabilities for optimization.

Returns:

CPU information including cores, architecture, and optimization hints

Return type:

dict[str, Any]

Usage Example

from homodyne.device.cpu import detect_cpu_info

info = detect_cpu_info()
print(f"Physical cores: {info['physical_cores']}")
print(f"Logical cores:  {info['logical_cores']}")
print(f"Architecture:   {info['architecture']}")
print(f"NUMA nodes:     {info['numa_nodes']}")
print(f"Processor:      {info['processor']}")

# Optimal worker count for CMC multiprocessing backend
n_workers = max(1, info['physical_cores'] // 2 - 1)
HPC CPU Optimization for Homodyne

CPU-primary optimization strategies for high-performance computing environments. Optimized for 36/128-core HPC nodes with intelligent thread management and JAX CPU configuration.

Key Features: - CPU core detection and optimal thread allocation - JAX CPU-specific optimizations for HPC environments - Memory-efficient processing strategies - NUMA-aware configuration - Intel/AMD architecture detection and optimization

HPC Environment Support: - 36-core HPC nodes (typical cluster setup) - 128-core HPC nodes (high-end clusters) - Multi-socket NUMA systems - Intel Xeon and AMD EPYC processors

homodyne.device.cpu.configure_cpu_hpc(num_threads=None, enable_hyperthreading=False, numa_policy='auto', memory_optimization='standard', enable_onednn=False)[source]

Configure JAX and system for HPC CPU optimization.

Optimizes thread allocation, memory usage, and computational efficiency for HPC environments with 36/128-core nodes.

Parameters:
  • num_threads (int | None) – Number of threads to use. If None, auto-detects optimal count.

  • enable_hyperthreading (bool) – Whether to use hyperthreading. Usually disabled for HPC.

  • numa_policy (str) – NUMA memory policy (“auto”, “local”, “interleave”)

  • memory_optimization (str) – Memory optimization level (“minimal”, “standard”, “aggressive”)

  • enable_onednn (bool) – Enable Intel oneDNN optimizations for matrix operations. Only recommended for Intel CPUs with matrix-heavy workloads. XPCS analysis is element-wise dominated, so benefit is minimal. Set to True to benchmark potential improvements.

Returns:

Configuration summary and performance hints

Return type:

dict[str, Any]

homodyne.device.cpu.configure_cpu_threading(num_threads=None)[source]

Configure CPU threading for NLSQ optimization.

Performance Optimization (Spec 001 - FR-005, T024): Simplified threading configuration for NLSQ initialization. Calls configure_cpu_hpc() with sensible defaults for optimization workloads.

Parameters:

num_threads (int | None) – Number of threads to use. If None, auto-detects optimal count based on physical cores.

Returns:

Configuration summary including thread count and XLA settings.

Return type:

dict[str, Any]

homodyne.device.cpu.get_optimal_batch_size(data_size, available_memory_gb=None, target_memory_usage=0.7)[source]

Calculate optimal batch size for CPU processing.

Parameters:
  • data_size (int) – Total size of data to process

  • available_memory_gb (float | None) – Available memory in GB. If None, auto-detects.

  • target_memory_usage (float) – Target fraction of memory to use

Returns:

Optimal batch size for processing

Return type:

int

homodyne.device.cpu.benchmark_cpu_performance(test_size=10000, num_iterations=5)[source]

Benchmark CPU performance for optimization planning.

Parameters:
  • test_size (int) – Size of test computation

  • num_iterations (int) – Number of benchmark iterations

Returns:

Benchmark results with timing information

Return type:

dict[str, float]


homodyne.utils.path_validation

Path validation utilities for checking file and directory existence with informative error messages. Used at CLI entry points and data loading boundaries to provide clear diagnostics.

Path validation utilities for secure file operations.

This module provides path validation functions to prevent path traversal attacks and ensure safe file operations for save_path parameters.

Security fixes implemented as part of code review remediation (Dec 2025). Addresses CVSS 7.5 path traversal vulnerability (VUL-001).

exception homodyne.utils.path_validation.PathValidationError[source]

Bases: ValueError

Raised when path validation fails due to security concerns.

homodyne.utils.path_validation.validate_save_path(path, *, allowed_extensions=None, require_parent_exists=True, allow_absolute=True, base_dir=None)[source]

Validate and sanitize a file save path.

Prevents path traversal attacks and ensures the path is safe for file operations.

Parameters:
  • path (str | Path | None) – Path to validate. If None, returns None.

  • allowed_extensions (tuple[str, ...] | None) – Allowed file extensions (e.g., (‘.png’, ‘.pdf’)). If None, all extensions are allowed.

  • require_parent_exists (bool) – If True, validates that the parent directory exists.

  • allow_absolute (bool) – If True, absolute paths are allowed. If False, only relative paths are allowed.

  • base_dir (Path | None) – Base directory for relative paths. If provided, the resolved path must be within this directory (prevents path traversal). Defaults to current working directory.

Returns:

Validated and resolved Path object, or None if path is None.

Return type:

Path | None

Raises:
  • PathValidationError – If path validation fails due to security concerns.

  • ValueError – If path has invalid extension or parent doesn’t exist.

Examples

>>> validate_save_path("output/results.png")
PosixPath('/current/dir/output/results.png')
>>> validate_save_path("../../../etc/passwd")
PathValidationError: Path traversal detected
>>> validate_save_path("/tmp/test.png", allow_absolute=False)
PathValidationError: Absolute paths not allowed
homodyne.utils.path_validation.validate_plot_save_path(path, *, require_parent_exists=True)[source]

Validate a save path for plot files.

Convenience wrapper for validate_save_path with plot-specific defaults.

Parameters:
  • path (str | Path | None) – Path to validate.

  • require_parent_exists (bool) – If True, validates that the parent directory exists.

Returns:

Validated Path object or None.

Return type:

Path | None

Raises:
  • PathValidationError – If path validation fails.

  • ValueError – If extension is not a valid image format.

Examples

>>> validate_plot_save_path("results/trace_plot.png")
PosixPath('/current/dir/results/trace_plot.png')
homodyne.utils.path_validation.get_safe_output_dir(output_dir=None, default_subdir='homodyne_output')[source]

Get a safe output directory, creating it if necessary.

Parameters:
  • output_dir (str | Path | None) – Requested output directory. If None, uses cwd/default_subdir.

  • default_subdir (str) – Default subdirectory name if output_dir is None.

Returns:

Validated and existing output directory.

Return type:

Path

Raises:
  • PathValidationError – If the path is invalid or unsafe.

  • PermissionError – If directory cannot be created due to permissions.


homodyne.utils.async_io

Asynchronous I/O utilities for overlapping data loading and processing. Used internally by the CMC multiprocessing backend to prefetch shards while the current shard is being sampled.

Async I/O utilities for pipeline overlap.

Thread-based prefetching and background writing to hide I/O latency. GIL-safe since HDF5 and numpy release the GIL during I/O.

class homodyne.utils.async_io.PrefetchLoader[source]

Bases: Iterator[R]

Thread-based prefetch iterator.

Loads the next item in a background thread while the current item is being processed.

Parameters:
__init__(source, load_fn)[source]
class homodyne.utils.async_io.AsyncWriter[source]

Bases: object

Background thread pool for result serialization.

Parameters:

max_workers (int) – Maximum concurrent write threads.

__init__(max_workers=2)[source]
submit_npz(path, data)[source]

Write NPZ file in background.

Return type:

None

submit_json(path, data)[source]

Write JSON file in background.

Return type:

None

submit_task(fn, *args, **kwargs)[source]

Submit an arbitrary callable for background execution.

Return type:

None

wait_all(timeout=60.0)[source]

Wait for all pending writes. Returns list of errors.

TimeoutError is not treated as a failure — the write is still in progress and will complete during shutdown().

Return type:

list[Exception]

shutdown()[source]

Wait for pending writes and shut down. Idempotent.

Return type:

None