Utilities¶
This page documents the cross-cutting utility modules used throughout the Homodyne package: structured logging, CPU/NUMA detection, and path validation.
homodyne.utils.logging¶
Lightweight structured logging system built on Python’s standard logging
module. Provides contextual log prefixes, configurable handlers (console +
rotating file), and helpers for performance monitoring and call tracing.
Key exports:
Symbol |
Purpose |
|---|---|
|
Returns a logger (or context-aware |
|
Context manager that logs entry/exit and elapsed time for a phase |
|
Decorator that logs execution time when it exceeds |
|
Decorator that logs every function call with arguments |
|
Logs exception with full traceback at ERROR level |
|
Context manager that attaches key–value context to all log messages |
|
Dataclass for configuring the global logging setup from CLI args |
Structured logging utilities for the homodyne package.
Provides a lightweight but flexible logging system that matches the CMC reimplementation requirements: contextual log prefixes, configurable console and rotating file handlers, and helpers for performance monitoring.
- class homodyne.utils.logging.LogConfiguration[source]
Bases:
objectProgrammatic logging configuration.
Alternative to configure_logging() for programmatic control over logging settings.
- console_level
Console log level (default “INFO”).
- console_format
Console format (“simple” or “detailed”).
- console_colors
Enable ANSI colors in console (default False).
- file_enabled
Enable file logging (default True).
- file_path
Log file path (None = auto-generate).
- file_level
File log level (default “DEBUG”).
- file_format
File format (“simple” or “detailed”).
- file_rotation_mb
Max file size before rotation (default 10).
- file_backup_count
Number of backup files to keep (default 5).
- module_overrides
Per-module log level overrides.
Example
>>> config = LogConfiguration.from_cli_args(verbose=True, log_file="analysis.log") >>> config.apply()
>>> config = LogConfiguration( ... console_level="INFO", ... file_level="DEBUG", ... module_overrides={"jax": "WARNING", "homodyne.optimization": "DEBUG"} ... ) >>> config.apply()
- console_level: str = 'INFO'
- console_format: str = 'simple'
- console_colors: bool = False
- file_enabled: bool = True
- file_level: str = 'DEBUG'
- file_format: str = 'detailed'
- file_rotation_mb: int = 10
- file_backup_count: int = 5
- apply()[source]
Apply this configuration to the logging system.
- classmethod from_dict(config)[source]
Create configuration from dictionary.
- classmethod from_cli_args(verbose=False, quiet=False, log_file=None)[source]
Create configuration from CLI flags.
- __init__(console_level='INFO', console_format='simple', console_colors=False, file_enabled=True, file_path=None, file_level='DEBUG', file_format='detailed', file_rotation_mb=10, file_backup_count=5, module_overrides=<factory>)
- class homodyne.utils.logging.AnalysisSummaryLogger[source]
Bases:
objectStructured logging for analysis completion summaries.
Tracks phase timings, metrics, output files, and convergence status for logging a structured summary at analysis completion.
Example
>>> summary = AnalysisSummaryLogger(run_id="analysis_001", analysis_mode="laminar_flow") >>> summary.start_phase("loading") >>> data = load_data(config) >>> summary.end_phase("loading", memory_peak_gb=2.1) >>> summary.record_metric("chi_squared", result.chi_squared) >>> summary.set_convergence_status("converged") >>> summary.log_summary(logger)
- __init__(run_id, analysis_mode)[source]
Initialize summary logger for an analysis run.
- start_phase(name)[source]
Mark phase start for timing.
- end_phase(name, memory_peak_gb=None)[source]
Mark phase completion.
- record_metric(name, value)[source]
Record a named metric (e.g., chi_squared).
- add_output_file(path)[source]
Record an output file path.
- set_convergence_status(status)[source]
Set final convergence status.
- set_config_summary(optimizer=None, n_params=None, n_data_points=None, n_phi_angles=None, data_file=None, **kwargs)[source]
T054: Set configuration summary for logging.
- log_summary(logger)[source]
Log the complete analysis summary.
- Parameters:
logger (
Logger|LoggerAdapter) – Logger to use for output.- Return type:
- class homodyne.utils.logging.MinimalLogger[source]
Bases:
objectConfigurable logger manager for the homodyne package.
Thread-safe singleton for managing homodyne logging configuration.
- static __new__(cls)[source]
- Return type:
MinimalLogger
- __init__()[source]
- configure(level='INFO', *, console_level=None, console_format='detailed', console_colors=False, file_path=None, file_level=None, max_size_mb=10, backup_count=5, module_levels=None, force=False)[source]
Configure homodyne logging.
Thread-safe configuration of the logging system. Returns the file path if a file handler is created.
- configure_from_dict(logging_config, *, verbose=False, quiet=False, output_dir=None, run_id=None)[source]
Configure logging from a logging: config section.
- homodyne.utils.logging.configure_logging(logging_config, *, verbose=False, quiet=False, output_dir=None, run_id=None)[source]
Public helper to configure logging from config + CLI flags.
- homodyne.utils.logging.get_logger(name=None, *, context=None)[source]
Get a logger instance with automatic naming and optional context.
- Return type:
- homodyne.utils.logging.with_context(logger, **context)[source]
Create a contextual logger with key-value prefixes.
Context is formatted as [key=value][key2=value2] message. Nested calls merge contexts (inner overrides outer on key conflicts). Thread-safe for use in multiprocessing.
- Parameters:
logger (
Logger|LoggerAdapter[Logger]) – Base logger or existing contextual adapter to wrap.**context (
Any) – Key-value pairs to include as prefix.
- Return type:
- Returns:
A logger adapter that prefixes all messages with context.
Example
>>> logger = get_logger(__name__) >>> ctx_logger = with_context(logger, run_id="abc123", mode="laminar_flow") >>> ctx_logger.info("Starting analysis") # Output: [run_id=abc123 mode=laminar_flow] Starting analysis
>>> # Nested context >>> shard_logger = with_context(ctx_logger, shard=5) >>> shard_logger.info("Processing shard") # Output: [run_id=abc123 mode=laminar_flow shard=5] Processing shard
- class homodyne.utils.logging.PhaseContext[source]
Bases:
objectContext object returned by log_phase() with timing and memory info.
- name: str
- duration: float = 0.0
- __init__(name, duration=0.0, memory_peak_gb=None, memory_delta_gb=None)
- homodyne.utils.logging.log_phase(name, logger=None, level=20, track_memory=False, threshold_s=0.0)[source]
Context manager for phase-level timing with optional memory tracking.
- Parameters:
name (
str) – Phase name for logging.logger (
Logger|LoggerAdapter[Logger] |None) – Logger to use. If None, uses module logger.level (
int) – Log level for phase messages.track_memory (
bool) – Track memory usage during phase.threshold_s (
float) – Only log if duration > threshold (0 = always log).
- Yields:
PhaseContext with name, duration, memory_peak_gb, memory_delta_gb. Duration and memory values are populated after the context exits.
Example
>>> with log_phase("optimization", track_memory=True) as phase: ... result = run_optimization(data) >>> print(f"Took {phase.duration:.1f}s") # Logs: Phase 'optimization' completed in 45.3s (peak memory: 12.4 GB)
- homodyne.utils.logging.log_exception(logger, exc, context=None, level=logging.ERROR, include_traceback=True)[source]
Log an exception with full context for debugging.
Extracts module, function, and line number from exception traceback. Formats context as key-value pairs in the message.
- Parameters:
logger (
Logger|LoggerAdapter[Logger]) – Logger to use.exc (
BaseException) – Exception to log.context (
dict[str,Any] |None) – Additional context (e.g., parameter values).level (
int) – Log level (default ERROR).include_traceback (
bool) – Include full traceback (default True).
- Return type:
Example
>>> try: ... result = compute_jacobian(params) ... except ValueError as e: ... log_exception(logger, e, context={ ... "iteration": 45, ... "params": params.tolist()[:5] ... }) ... raise # Logs: # ERROR | homodyne.optimization.nlsq.core | Exception in compute_jacobian: # ValueError: invalid value # Context: iteration=45, params=[1.2e-11, 0.85, ...] # Traceback (most recent call last): # ...
- homodyne.utils.logging.log_calls(logger=None, level=logging.DEBUG, include_args=False, include_result=False)[source]
Decorator to log function calls.
- homodyne.utils.logging.log_performance(logger=None, level=logging.INFO, threshold=0.1)[source]
Decorator to log function performance.
- homodyne.utils.logging.log_operation(operation_name, logger=None, level=20)[source]
Context manager for logging operations.
- Parameters:
- Return type:
Usage Examples¶
Basic logger¶
from homodyne.utils.logging import get_logger
logger = get_logger(__name__)
logger.info("Starting analysis")
logger.debug("Parameter D0=%.4g", d0)
logger.warning("High divergence rate: %.1f%%", rate * 100)
logger.error("Solver failed after %d attempts", n_attempts)
Phase timing¶
from homodyne.utils.logging import get_logger, log_phase
logger = get_logger(__name__)
with log_phase("NLSQ optimisation"):
result = fit_nlsq_jax(data, config)
# Logs: "NLSQ optimisation complete in 12.34 s"
Performance decorator¶
from homodyne.utils.logging import log_performance
@log_performance(threshold=0.5) # log if call takes > 0.5 s
def compute_jacobian(params):
...
Contextual logging (CMC shard)¶
from homodyne.utils.logging import get_logger, with_context
logger = get_logger(__name__)
with with_context(shard_id=42, n_pts=5000):
logger.info("Starting NUTS sampling")
# Logs: "[shard_id=42 n_pts=5000] Starting NUTS sampling"
Exception logging¶
from homodyne.utils.logging import get_logger, log_exception
logger = get_logger(__name__)
try:
result = risky_operation()
except Exception as exc:
log_exception(logger, exc)
raise
CLI log configuration¶
from homodyne.utils.logging import LogConfiguration
# Configured from --verbose / --quiet flags
log_cfg = LogConfiguration.from_cli_args(args)
log_cfg.apply()
homodyne.device¶
CPU architecture detection and JAX/XLA configuration utilities for HPC environments. Detects physical and logical core counts, NUMA topology, and processor architecture (Intel/AMD) to inform thread and device allocation.
detect_cpu_info¶
- homodyne.device.cpu.detect_cpu_info()[source]
Detect CPU architecture and capabilities for optimization.
Usage Example¶
from homodyne.device.cpu import detect_cpu_info
info = detect_cpu_info()
print(f"Physical cores: {info['physical_cores']}")
print(f"Logical cores: {info['logical_cores']}")
print(f"Architecture: {info['architecture']}")
print(f"NUMA nodes: {info['numa_nodes']}")
print(f"Processor: {info['processor']}")
# Optimal worker count for CMC multiprocessing backend
n_workers = max(1, info['physical_cores'] // 2 - 1)
HPC CPU Optimization for Homodyne¶
CPU-primary optimization strategies for high-performance computing environments. Optimized for 36/128-core HPC nodes with intelligent thread management and JAX CPU configuration.
Key Features: - CPU core detection and optimal thread allocation - JAX CPU-specific optimizations for HPC environments - Memory-efficient processing strategies - NUMA-aware configuration - Intel/AMD architecture detection and optimization
HPC Environment Support: - 36-core HPC nodes (typical cluster setup) - 128-core HPC nodes (high-end clusters) - Multi-socket NUMA systems - Intel Xeon and AMD EPYC processors
- homodyne.device.cpu.configure_cpu_hpc(num_threads=None, enable_hyperthreading=False, numa_policy='auto', memory_optimization='standard', enable_onednn=False)[source]
Configure JAX and system for HPC CPU optimization.
Optimizes thread allocation, memory usage, and computational efficiency for HPC environments with 36/128-core nodes.
- Parameters:
num_threads (
int|None) – Number of threads to use. If None, auto-detects optimal count.enable_hyperthreading (
bool) – Whether to use hyperthreading. Usually disabled for HPC.numa_policy (
str) – NUMA memory policy (“auto”, “local”, “interleave”)memory_optimization (
str) – Memory optimization level (“minimal”, “standard”, “aggressive”)enable_onednn (
bool) – Enable Intel oneDNN optimizations for matrix operations. Only recommended for Intel CPUs with matrix-heavy workloads. XPCS analysis is element-wise dominated, so benefit is minimal. Set to True to benchmark potential improvements.
- Returns:
Configuration summary and performance hints
- Return type:
- homodyne.device.cpu.configure_cpu_threading(num_threads=None)[source]
Configure CPU threading for NLSQ optimization.
Performance Optimization (Spec 001 - FR-005, T024): Simplified threading configuration for NLSQ initialization. Calls configure_cpu_hpc() with sensible defaults for optimization workloads.
- homodyne.device.cpu.get_optimal_batch_size(data_size, available_memory_gb=None, target_memory_usage=0.7)[source]
Calculate optimal batch size for CPU processing.
- homodyne.device.cpu.benchmark_cpu_performance(test_size=10000, num_iterations=5)[source]
Benchmark CPU performance for optimization planning.
homodyne.utils.path_validation¶
Path validation utilities for checking file and directory existence with informative error messages. Used at CLI entry points and data loading boundaries to provide clear diagnostics.
Path validation utilities for secure file operations.
This module provides path validation functions to prevent path traversal attacks and ensure safe file operations for save_path parameters.
Security fixes implemented as part of code review remediation (Dec 2025). Addresses CVSS 7.5 path traversal vulnerability (VUL-001).
- exception homodyne.utils.path_validation.PathValidationError[source]
Bases:
ValueErrorRaised when path validation fails due to security concerns.
- homodyne.utils.path_validation.validate_save_path(path, *, allowed_extensions=None, require_parent_exists=True, allow_absolute=True, base_dir=None)[source]
Validate and sanitize a file save path.
Prevents path traversal attacks and ensures the path is safe for file operations.
- Parameters:
path (
str|Path|None) – Path to validate. If None, returns None.allowed_extensions (
tuple[str,...] |None) – Allowed file extensions (e.g., (‘.png’, ‘.pdf’)). If None, all extensions are allowed.require_parent_exists (
bool) – If True, validates that the parent directory exists.allow_absolute (
bool) – If True, absolute paths are allowed. If False, only relative paths are allowed.base_dir (
Path|None) – Base directory for relative paths. If provided, the resolved path must be within this directory (prevents path traversal). Defaults to current working directory.
- Returns:
Validated and resolved Path object, or None if path is None.
- Return type:
- Raises:
PathValidationError – If path validation fails due to security concerns.
ValueError – If path has invalid extension or parent doesn’t exist.
Examples
>>> validate_save_path("output/results.png") PosixPath('/current/dir/output/results.png')
>>> validate_save_path("../../../etc/passwd") PathValidationError: Path traversal detected
>>> validate_save_path("/tmp/test.png", allow_absolute=False) PathValidationError: Absolute paths not allowed
- homodyne.utils.path_validation.validate_plot_save_path(path, *, require_parent_exists=True)[source]
Validate a save path for plot files.
Convenience wrapper for validate_save_path with plot-specific defaults.
- Parameters:
- Returns:
Validated Path object or None.
- Return type:
- Raises:
PathValidationError – If path validation fails.
ValueError – If extension is not a valid image format.
Examples
>>> validate_plot_save_path("results/trace_plot.png") PosixPath('/current/dir/results/trace_plot.png')
- homodyne.utils.path_validation.get_safe_output_dir(output_dir=None, default_subdir='homodyne_output')[source]
Get a safe output directory, creating it if necessary.
- Parameters:
- Returns:
Validated and existing output directory.
- Return type:
- Raises:
PathValidationError – If the path is invalid or unsafe.
PermissionError – If directory cannot be created due to permissions.
homodyne.utils.async_io¶
Asynchronous I/O utilities for overlapping data loading and processing. Used internally by the CMC multiprocessing backend to prefetch shards while the current shard is being sampled.
Async I/O utilities for pipeline overlap.
Thread-based prefetching and background writing to hide I/O latency. GIL-safe since HDF5 and numpy release the GIL during I/O.
- class homodyne.utils.async_io.PrefetchLoader[source]
Bases:
Iterator[R]Thread-based prefetch iterator.
Loads the next item in a background thread while the current item is being processed.
- Parameters:
- __init__(source, load_fn)[source]
- class homodyne.utils.async_io.AsyncWriter[source]
Bases:
objectBackground thread pool for result serialization.
- Parameters:
max_workers (
int) – Maximum concurrent write threads.
- __init__(max_workers=2)[source]
- submit_task(fn, *args, **kwargs)[source]
Submit an arbitrary callable for background execution.
- Return type:
- wait_all(timeout=60.0)[source]
Wait for all pending writes. Returns list of errors.
TimeoutError is not treated as a failure — the write is still in progress and will complete during shutdown().