Source code for homodyne.utils.logging

"""Structured logging utilities for the homodyne package.

Provides a lightweight but flexible logging system that matches the CMC
reimplementation requirements: contextual log prefixes, configurable console
and rotating file handlers, and helpers for performance monitoring.
"""

from __future__ import annotations

import functools
import inspect
import logging
import threading
import time
import traceback
from collections.abc import Callable, Generator, Mapping
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Any, TypeVar

# Type variables for decorators
F = TypeVar("F", bound=Callable[..., Any])

# Type alias for logger types
LoggerType = logging.Logger | logging.LoggerAdapter[logging.Logger]

DEFAULT_FORMAT_DETAILED = "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s"
DEFAULT_FORMAT_SIMPLE = "%(levelname)-8s | %(message)s"


def _resolve_level(level: str | int | None) -> int | None:
    """Convert string/int log level to logging level constant."""
    if level is None:
        return None
    if isinstance(level, int):
        return level
    return getattr(logging, str(level).upper(), logging.INFO)


class _ColorFormatter(logging.Formatter):
    """Optional ANSI color formatter for console logging."""

    COLOR_MAP = {
        "DEBUG": "\033[36m",  # Cyan
        "INFO": "\033[32m",  # Green
        "WARNING": "\033[33m",  # Yellow
        "ERROR": "\033[31m",  # Red
        "CRITICAL": "\033[35m",  # Magenta
    }
    RESET = "\033[0m"

    def __init__(self, fmt: str, datefmt: str | None, use_color: bool) -> None:
        super().__init__(fmt=fmt, datefmt=datefmt)
        self.use_color = use_color

    def format(self, record: logging.LogRecord) -> str:
        original_levelname = record.levelname
        if self.use_color and original_levelname in self.COLOR_MAP:
            record.levelname = (
                f"{self.COLOR_MAP[original_levelname]}{original_levelname}{self.RESET}"
            )
        try:
            return super().format(record)
        finally:
            record.levelname = original_levelname


class _ContextAdapter(logging.LoggerAdapter):
    """Logger adapter that prefixes messages with structured context."""

    def process(self, msg: str, kwargs: Any) -> tuple[str, Any]:
        if not self.extra:
            return msg, kwargs

        context_parts = [
            f"{key}={value}"
            for key, value in self.extra.items()
            if value is not None and value != ""
        ]
        if context_parts:
            msg = f"[{' '.join(context_parts)}] {msg}"
        return msg, kwargs


[docs] @dataclass class LogConfiguration: """Programmatic logging configuration. Alternative to configure_logging() for programmatic control over logging settings. Attributes: console_level: Console log level (default "INFO"). console_format: Console format ("simple" or "detailed"). console_colors: Enable ANSI colors in console (default False). file_enabled: Enable file logging (default True). file_path: Log file path (None = auto-generate). file_level: File log level (default "DEBUG"). file_format: File format ("simple" or "detailed"). file_rotation_mb: Max file size before rotation (default 10). file_backup_count: Number of backup files to keep (default 5). module_overrides: Per-module log level overrides. Example: >>> config = LogConfiguration.from_cli_args(verbose=True, log_file="analysis.log") >>> config.apply() >>> config = LogConfiguration( ... console_level="INFO", ... file_level="DEBUG", ... module_overrides={"jax": "WARNING", "homodyne.optimization": "DEBUG"} ... ) >>> config.apply() """ console_level: str = "INFO" console_format: str = "simple" console_colors: bool = False file_enabled: bool = True file_path: str | Path | None = None file_level: str = "DEBUG" file_format: str = "detailed" file_rotation_mb: int = 10 file_backup_count: int = 5 module_overrides: dict[str, str] = field(default_factory=dict)
[docs] def apply(self) -> Path | None: """Apply this configuration to the logging system. Returns: Path to log file if file logging is enabled, None otherwise. """ # Suppress external library logging by default default_suppressions = { "jax": "WARNING", "numpy": "WARNING", "numba": "WARNING", "h5py": "WARNING", } # Merge default suppressions with user overrides (user overrides win) merged_overrides = {**default_suppressions, **self.module_overrides} # Determine file path file_path = None if self.file_enabled: if self.file_path is not None: file_path = Path(self.file_path) else: # Auto-generate timestamped log file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") file_path = Path(f"homodyne_{timestamp}.log") return _logger_manager.configure( level="DEBUG", # Root level should be lowest to allow filtering console_level=self.console_level, console_format=self.console_format, console_colors=self.console_colors, file_path=file_path, file_level=self.file_level if self.file_enabled else False, max_size_mb=self.file_rotation_mb, backup_count=self.file_backup_count, module_levels=merged_overrides, force=True, )
[docs] @classmethod def from_dict(cls, config: dict[str, Any]) -> LogConfiguration: """Create configuration from dictionary. Args: config: Dictionary with configuration values. Returns: LogConfiguration instance. """ return cls( console_level=config.get("console_level", "INFO"), console_format=config.get("console_format", "simple"), console_colors=config.get("console_colors", False), file_enabled=config.get("file_enabled", True), file_path=config.get("file_path"), file_level=config.get("file_level", "DEBUG"), file_format=config.get("file_format", "detailed"), file_rotation_mb=config.get("file_rotation_mb", 10), file_backup_count=config.get("file_backup_count", 5), module_overrides=config.get("module_overrides", {}), )
[docs] @classmethod def from_cli_args( cls, verbose: bool = False, quiet: bool = False, log_file: str | None = None, ) -> LogConfiguration: """Create configuration from CLI flags. Args: verbose: Enable DEBUG level console logging. quiet: Enable ERROR-only console logging. log_file: Path to log file (None = auto-generate if file logging enabled). Returns: LogConfiguration instance. """ # Determine console level from flags if quiet: console_level = "ERROR" elif verbose: console_level = "DEBUG" else: console_level = "INFO" return cls( console_level=console_level, console_format="detailed" if verbose else "simple", console_colors=False, file_enabled=True, file_path=log_file, file_level="DEBUG", file_format="detailed", )
@dataclass class _PhaseRecord: """Internal record for phase timing.""" name: str start_time: float | None = None end_time: float | None = None memory_peak_gb: float | None = None @property def duration(self) -> float | None: if self.start_time is None or self.end_time is None: return None return self.end_time - self.start_time
[docs] class AnalysisSummaryLogger: """Structured logging for analysis completion summaries. Tracks phase timings, metrics, output files, and convergence status for logging a structured summary at analysis completion. Example: >>> summary = AnalysisSummaryLogger(run_id="analysis_001", analysis_mode="laminar_flow") >>> summary.start_phase("loading") >>> data = load_data(config) >>> summary.end_phase("loading", memory_peak_gb=2.1) >>> summary.record_metric("chi_squared", result.chi_squared) >>> summary.set_convergence_status("converged") >>> summary.log_summary(logger) """
[docs] def __init__(self, run_id: str, analysis_mode: str) -> None: """Initialize summary logger for an analysis run. Args: run_id: Unique identifier for this analysis run. analysis_mode: Analysis mode (e.g., "static_isotropic", "laminar_flow"). """ self.run_id = run_id self.analysis_mode = analysis_mode self._phases: dict[str, _PhaseRecord] = {} self._metrics: dict[str, float] = {} self._output_files: list[Path] = [] self._convergence_status: str | None = None self._start_time = time.perf_counter() self._warning_count = 0 self._error_count = 0 # T054: Configuration summary for logging self._config_summary: dict[str, Any] = {}
[docs] def start_phase(self, name: str) -> None: """Mark phase start for timing. Args: name: Phase name (e.g., "loading", "optimization"). """ self._phases[name] = _PhaseRecord(name=name, start_time=time.perf_counter())
[docs] def end_phase(self, name: str, memory_peak_gb: float | None = None) -> None: """Mark phase completion. Args: name: Phase name that was started. memory_peak_gb: Optional peak memory usage during phase. """ if name in self._phases: self._phases[name].end_time = time.perf_counter() self._phases[name].memory_peak_gb = memory_peak_gb
[docs] def record_metric(self, name: str, value: float) -> None: """Record a named metric (e.g., chi_squared). Args: name: Metric name. value: Metric value. """ self._metrics[name] = value
[docs] def add_output_file(self, path: Path | str) -> None: """Record an output file path. Args: path: Path to output file. """ self._output_files.append(Path(path))
[docs] def set_convergence_status(self, status: str) -> None: """Set final convergence status. Args: status: Convergence status (e.g., "converged", "max_iter", "failed"). """ self._convergence_status = status
[docs] def increment_warning_count(self) -> None: """Increment warning counter.""" self._warning_count += 1
[docs] def increment_error_count(self) -> None: """Increment error counter.""" self._error_count += 1
[docs] def set_config_summary( self, optimizer: str | None = None, n_params: int | None = None, n_data_points: int | None = None, n_phi_angles: int | None = None, data_file: str | None = None, **kwargs: Any, ) -> None: """T054: Set configuration summary for logging. Args: optimizer: Optimizer used (e.g., "nlsq", "cmc"). n_params: Number of parameters being optimized. n_data_points: Total number of data points. n_phi_angles: Number of phi angles. data_file: Path to data file. **kwargs: Additional key-value pairs to include. """ if optimizer is not None: self._config_summary["optimizer"] = optimizer if n_params is not None: self._config_summary["n_params"] = n_params if n_data_points is not None: self._config_summary["n_data_points"] = n_data_points if n_phi_angles is not None: self._config_summary["n_phi_angles"] = n_phi_angles if data_file is not None: self._config_summary["data_file"] = data_file # Add any additional kwargs self._config_summary.update(kwargs)
[docs] def log_summary(self, logger: logging.Logger | logging.LoggerAdapter) -> None: """Log the complete analysis summary. Args: logger: Logger to use for output. """ total_runtime = time.perf_counter() - self._start_time # Build summary message lines = [ "=" * 60, "ANALYSIS SUMMARY", "=" * 60, f"Run ID: {self.run_id}", f"Mode: {self.analysis_mode}", f"Status: {self._convergence_status or 'unknown'}", f"Total runtime: {total_runtime:.2f}s", ] # T054: Add configuration summary if self._config_summary: lines.append("") lines.append("Configuration:") for key, value in self._config_summary.items(): if isinstance(value, int) and value > 1000: lines.append(f" {key}: {value:,}") else: lines.append(f" {key}: {value}") # Add phase timings if self._phases: lines.append("") lines.append("Phase Timings:") for name, record in self._phases.items(): duration = record.duration if duration is not None: mem_str = ( f" (peak: {record.memory_peak_gb:.1f} GB)" if record.memory_peak_gb is not None else "" ) lines.append(f" {name}: {duration:.2f}s{mem_str}") # Add metrics if self._metrics: lines.append("") lines.append("Metrics:") for name, value in self._metrics.items(): lines.append(f" {name}: {value:.6g}") # Add output files if self._output_files: lines.append("") lines.append("Output files:") for path in self._output_files: lines.append(f" {path}") # Add warning/error counts if self._warning_count > 0 or self._error_count > 0: lines.append("") lines.append( f"Warnings: {self._warning_count}, Errors: {self._error_count}" ) lines.append("=" * 60) logger.info("\n".join(lines))
[docs] def as_dict(self) -> dict[str, Any]: """Export summary as dictionary for JSON serialization. Returns: Dictionary with all summary data. """ total_runtime = time.perf_counter() - self._start_time phases_dict = {} for name, record in self._phases.items(): phases_dict[name] = { "duration_s": record.duration, "memory_peak_gb": record.memory_peak_gb, } # Sanitize metrics so NaN/Inf floats (which can occur in degenerate # runs) do not cause json.dump to fail or produce invalid JSON output. # Import lazily to avoid circular dependency (logging ← json_utils). try: from homodyne.io.json_utils import json_safe as _json_safe except ImportError: def _json_safe(value: Any) -> Any: """Minimal fallback when io module unavailable.""" if isinstance(value, float): import math if math.isnan(value): return None if math.isinf(value): return str(value) return value return { "run_id": self.run_id, "analysis_mode": self.analysis_mode, "convergence_status": self._convergence_status, "total_runtime_s": total_runtime, "config_summary": self._config_summary, # T054 "phases": phases_dict, "metrics": _json_safe(self._metrics), "output_files": [str(p) for p in self._output_files], "warning_count": self._warning_count, "error_count": self._error_count, }
[docs] class MinimalLogger: """Configurable logger manager for the homodyne package. Thread-safe singleton for managing homodyne logging configuration. """ _instance: MinimalLogger | None = None _initialized: bool _configured: bool _root_logger_name: str _lock: threading.Lock
[docs] def __new__(cls) -> MinimalLogger: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False cls._instance._lock = threading.Lock() return cls._instance
[docs] def __init__(self) -> None: if self._initialized: return self._configured = False self._root_logger_name = "homodyne" self._initialized = True
@staticmethod def _build_formatter( format_name: str = "detailed", use_color: bool = False, ) -> logging.Formatter: fmt = ( DEFAULT_FORMAT_SIMPLE if format_name == "simple" else DEFAULT_FORMAT_DETAILED ) return _ColorFormatter( fmt=fmt, datefmt="%Y-%m-%d %H:%M:%S", use_color=use_color ) def _clear_managed_handlers(self, logger: logging.Logger) -> None: for handler in list(logger.handlers): if getattr(handler, "_homodyne_managed", False): logger.removeHandler(handler) handler.close()
[docs] def configure( self, level: str | int = "INFO", *, console_level: str | int | None = None, console_format: str = "detailed", console_colors: bool = False, file_path: str | Path | None = None, file_level: str | int | None = None, max_size_mb: int = 10, backup_count: int = 5, module_levels: Mapping[str, str | int] | None = None, force: bool = False, ) -> Path | None: """Configure homodyne logging. Thread-safe configuration of the logging system. Returns the file path if a file handler is created. """ with self._lock: return self._configure_impl( level=level, console_level=console_level, console_format=console_format, console_colors=console_colors, file_path=file_path, file_level=file_level, max_size_mb=max_size_mb, backup_count=backup_count, module_levels=module_levels, force=force, )
def _configure_impl( self, level: str | int = "INFO", *, console_level: str | int | None = None, console_format: str = "detailed", console_colors: bool = False, file_path: str | Path | None = None, file_level: str | int | None = None, max_size_mb: int = 10, backup_count: int = 5, module_levels: Mapping[str, str | int] | None = None, force: bool = False, ) -> Path | None: """Internal implementation of configure (called under lock).""" root_logger = logging.getLogger(self._root_logger_name) if force: self._clear_managed_handlers(root_logger) root_level_candidates = [_resolve_level(level)] if console_level is not False: root_level_candidates.append(_resolve_level(console_level)) if file_level is not False: root_level_candidates.append(_resolve_level(file_level)) root_level = min(lvl for lvl in root_level_candidates if lvl is not None) root_logger.setLevel(root_level) # Console handler console_handler: logging.Handler | None = None for handler in root_logger.handlers: if isinstance(handler, logging.StreamHandler) and not isinstance( handler, logging.FileHandler, ): console_handler = handler break if console_level is not False: if console_handler is None: console_handler = logging.StreamHandler() console_handler._homodyne_managed = True # type: ignore[attr-defined] root_logger.addHandler(console_handler) console_handler.setLevel(_resolve_level(console_level) or root_level) console_handler.setFormatter( self._build_formatter(console_format, use_color=console_colors) ) # File handler created_file: Path | None = None if file_path: file_path = Path(file_path) try: file_path.parent.mkdir(parents=True, exist_ok=True) except OSError as e: logger = logging.getLogger(self._root_logger_name) logger.warning( f"Cannot create log directory {file_path.parent}: {e}. " "File logging disabled." ) file_path = None # Skip file handler, continue with console-only if file_path is not None: created_file = file_path max_bytes = int(max_size_mb * 1024 * 1024) if max_bytes > 0: file_handler: logging.Handler = RotatingFileHandler( file_path, maxBytes=max_bytes, backupCount=backup_count, ) else: file_handler = logging.FileHandler(file_path) file_handler._homodyne_managed = True # type: ignore[attr-defined] file_handler.setLevel(_resolve_level(file_level) or root_level) file_handler.setFormatter( logging.Formatter( DEFAULT_FORMAT_DETAILED, datefmt="%Y-%m-%d %H:%M:%S", ) ) root_logger.addHandler(file_handler) # Default suppression for external libraries (FR-005) # These are applied first, then user overrides can override them default_suppressions = { "jax": "WARNING", "numpy": "WARNING", "numba": "WARNING", "h5py": "WARNING", } for lib_name, lib_level in default_suppressions.items(): lib_logger = logging.getLogger(lib_name) # Only set if not already configured by user if lib_logger.level == logging.NOTSET: lib_logger.setLevel(_resolve_level(lib_level) or logging.WARNING) # Module-specific overrides (user overrides win over defaults) if module_levels: for module_name, module_level in module_levels.items(): logging.getLogger(module_name).setLevel( _resolve_level(module_level) or root_level ) self._configured = True return created_file
[docs] def configure_from_dict( self, logging_config: Mapping[str, Any] | None, *, verbose: bool = False, quiet: bool = False, output_dir: Path | str | None = None, run_id: str | None = None, ) -> Path | None: """Configure logging from a `logging:` config section.""" if not logging_config or not logging_config.get("enabled", True): return None level = logging_config.get("level", "INFO") console_cfg: Mapping[str, Any] = logging_config.get("console", {}) or {} file_cfg: Mapping[str, Any] = logging_config.get("file", {}) or {} console_enabled = console_cfg.get("enabled", True) console_level: str | int | None = ( console_cfg.get("level", level) if console_enabled else False ) if console_enabled: if quiet: console_level = "ERROR" elif verbose: console_level = "DEBUG" file_path: Path | None = None if file_cfg.get("enabled", False): if "path" in file_cfg: base_dir = Path(file_cfg.get("path", "./logs/")) if not base_dir.is_absolute(): base_dir = base_dir.resolve() else: base_dir = Path(output_dir) / "logs" if output_dir else Path("./logs") base_dir = base_dir.resolve() filename = file_cfg.get("filename", "homodyne_analysis.log") run_suffix = run_id or datetime.now().strftime("%Y%m%d_%H%M%S") if "{run_id}" in filename: filename = filename.format(run_id=run_suffix) else: stem = Path(filename).stem or "homodyne_analysis" suffix = Path(filename).suffix or ".log" filename = f"{stem}_{run_suffix}{suffix}" file_path = base_dir / filename return self.configure( level=level, console_level=console_level, console_format=console_cfg.get("format", "detailed"), console_colors=bool(console_cfg.get("colors", False)), file_path=file_path, file_level=file_cfg.get("level", "DEBUG"), max_size_mb=int(file_cfg.get("max_size_mb", 10)), backup_count=int(file_cfg.get("backup_count", 5)), module_levels=logging_config.get("modules"), force=True, )
[docs] def get_logger(self, name: str) -> logging.Logger: """Get or create a logger with hierarchical naming.""" if not name.startswith(self._root_logger_name): if name == "__main__": full_name = f"{self._root_logger_name}.main" elif "." in name and name.startswith("homodyne"): full_name = name else: full_name = f"{self._root_logger_name}.{name}" else: full_name = name if not self._configured: self.configure() return logging.getLogger(full_name)
# Global logger manager instance _logger_manager = MinimalLogger()
[docs] def configure_logging( logging_config: Mapping[str, Any] | None, *, verbose: bool = False, quiet: bool = False, output_dir: Path | str | None = None, run_id: str | None = None, ) -> Path | None: """Public helper to configure logging from config + CLI flags.""" return _logger_manager.configure_from_dict( logging_config, verbose=verbose, quiet=quiet, output_dir=output_dir, run_id=run_id, )
[docs] def get_logger( name: str | None = None, *, context: Mapping[str, Any] | None = None, ) -> logging.Logger | logging.LoggerAdapter[logging.Logger]: """Get a logger instance with automatic naming and optional context.""" if name is None: frame = inspect.currentframe() try: if frame is not None and frame.f_back is not None: name = frame.f_back.f_globals.get("__name__", "unknown") finally: del frame base_logger = _logger_manager.get_logger(name or "unknown") if context: return _ContextAdapter(base_logger, dict(context)) return base_logger
[docs] def with_context( logger: logging.Logger | logging.LoggerAdapter[logging.Logger], **context: Any, ) -> logging.LoggerAdapter[logging.Logger]: """Create a contextual logger with key-value prefixes. Context is formatted as [key=value][key2=value2] message. Nested calls merge contexts (inner overrides outer on key conflicts). Thread-safe for use in multiprocessing. Args: logger: Base logger or existing contextual adapter to wrap. **context: Key-value pairs to include as prefix. Returns: A logger adapter that prefixes all messages with context. Example: >>> logger = get_logger(__name__) >>> ctx_logger = with_context(logger, run_id="abc123", mode="laminar_flow") >>> ctx_logger.info("Starting analysis") # Output: [run_id=abc123 mode=laminar_flow] Starting analysis >>> # Nested context >>> shard_logger = with_context(ctx_logger, shard=5) >>> shard_logger.info("Processing shard") # Output: [run_id=abc123 mode=laminar_flow shard=5] Processing shard """ # Filter out None values from new context new_context = {k: v for k, v in context.items() if v is not None} # If wrapping an existing _ContextAdapter, merge contexts (inner overrides outer) if isinstance(logger, _ContextAdapter): merged_context = dict(logger.extra) if logger.extra else {} merged_context.update(new_context) # Get the underlying logger to avoid nested adapters base_logger = logger.logger return _ContextAdapter(base_logger, merged_context) # If wrapping a LoggerAdapter (not our _ContextAdapter), extract base logger if isinstance(logger, logging.LoggerAdapter): base_logger = logger.logger return _ContextAdapter(base_logger, new_context) # Wrapping a plain Logger return _ContextAdapter(logger, new_context)
[docs] @dataclass class PhaseContext: """Context object returned by log_phase() with timing and memory info.""" name: str duration: float = 0.0 memory_peak_gb: float | None = None memory_delta_gb: float | None = None
def _get_memory_gb() -> float | None: """Get current process memory usage in GB, or None if unavailable.""" try: import resource # Get max resident set size (in KB on Linux, bytes on macOS) rusage = resource.getrusage(resource.RUSAGE_SELF) # maxrss is in bytes on macOS, KB on Linux import sys scale = (1024**3) if sys.platform == "darwin" else (1024**2) return rusage.ru_maxrss / scale except (ImportError, AttributeError): return None
[docs] @contextmanager def log_phase( name: str, logger: LoggerType | None = None, level: int = logging.INFO, track_memory: bool = False, threshold_s: float = 0.0, ) -> Generator[PhaseContext]: """Context manager for phase-level timing with optional memory tracking. Args: name: Phase name for logging. logger: Logger to use. If None, uses module logger. level: Log level for phase messages. track_memory: Track memory usage during phase. threshold_s: Only log if duration > threshold (0 = always log). Yields: PhaseContext with name, duration, memory_peak_gb, memory_delta_gb. Duration and memory values are populated after the context exits. Example: >>> with log_phase("optimization", track_memory=True) as phase: ... result = run_optimization(data) >>> print(f"Took {phase.duration:.1f}s") # Logs: Phase 'optimization' completed in 45.3s (peak memory: 12.4 GB) """ resolved_logger = get_logger() if logger is None else logger context = PhaseContext(name=name) # Track initial memory if requested memory_start: float | None = None if track_memory: memory_start = _get_memory_gb() # Log phase start (only if no threshold or threshold is 0) if threshold_s <= 0: resolved_logger.log(level, f"Phase '{name}' started") start_time = time.perf_counter() try: yield context finally: # Calculate duration context.duration = time.perf_counter() - start_time # Track memory if requested if track_memory: memory_end = _get_memory_gb() if memory_end is not None: context.memory_peak_gb = memory_end if memory_start is not None: context.memory_delta_gb = memory_end - memory_start # Log phase completion if duration exceeds threshold if context.duration >= threshold_s: msg_parts = [f"Phase '{name}' completed in {context.duration:.2f}s"] if context.memory_peak_gb is not None: msg_parts.append(f"(peak memory: {context.memory_peak_gb:.1f} GB)") resolved_logger.log(level, " ".join(msg_parts))
[docs] def log_exception( logger: logging.Logger | logging.LoggerAdapter[logging.Logger], exc: BaseException, context: dict[str, Any] | None = None, level: int = logging.ERROR, include_traceback: bool = True, ) -> None: """Log an exception with full context for debugging. Extracts module, function, and line number from exception traceback. Formats context as key-value pairs in the message. Args: logger: Logger to use. exc: Exception to log. context: Additional context (e.g., parameter values). level: Log level (default ERROR). include_traceback: Include full traceback (default True). Example: >>> try: ... result = compute_jacobian(params) ... except ValueError as e: ... log_exception(logger, e, context={ ... "iteration": 45, ... "params": params.tolist()[:5] ... }) ... raise # Logs: # ERROR | homodyne.optimization.nlsq.core | Exception in compute_jacobian: # ValueError: invalid value # Context: iteration=45, params=[1.2e-11, 0.85, ...] # Traceback (most recent call last): # ... """ # Extract location info from traceback tb = exc.__traceback__ location_info = "" if tb is not None: # Walk to the innermost frame where the exception occurred while tb.tb_next is not None: tb = tb.tb_next frame = tb.tb_frame func_name = frame.f_code.co_name line_no = tb.tb_lineno module_name = frame.f_globals.get("__name__", "unknown") location_info = f" in {module_name}.{func_name}:{line_no}" # Build the message exc_type = type(exc).__name__ exc_msg = str(exc) msg_parts = [f"Exception{location_info}: {exc_type}: {exc_msg}"] # Add context if provided if context: context_str = ", ".join(f"{k}={v!r}" for k, v in context.items()) msg_parts.append(f"Context: {context_str}") # Add traceback if requested if include_traceback: tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) msg_parts.append(f"Traceback:\n{tb_str}") logger.log(level, "\n".join(msg_parts))
[docs] def log_calls( logger: LoggerType | None = None, level: int = logging.DEBUG, include_args: bool = False, include_result: bool = False, ) -> Callable[[F], F]: """Decorator to log function calls. Args: logger: Logger to use. If None, creates one for the module. level: Logging level to use. include_args: Whether to log function arguments. include_result: Whether to log function return value. """ resolved_logger: LoggerType | None = logger def decorator(func: F) -> F: nonlocal resolved_logger if resolved_logger is None: resolved_logger = get_logger(func.__module__) @functools.wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: assert resolved_logger is not None # For type narrowing # Guard: skip all formatting if log level is not enabled log_enabled = resolved_logger.isEnabledFor(level) # Log function entry if log_enabled: func_name = f"{func.__module__}.{func.__qualname__}" if include_args: args_str = ", ".join([repr(arg) for arg in args]) kwargs_str = ", ".join( [f"{k}={repr(v)}" for k, v in kwargs.items()] ) all_args = ", ".join(filter(None, [args_str, kwargs_str])) resolved_logger.log(level, "Calling %s(%s)", func_name, all_args) else: resolved_logger.log(level, "Calling %s", func_name) try: result = func(*args, **kwargs) # Log function exit if log_enabled: if include_result: resolved_logger.log( level, "Completed %s -> %r", func_name, result ) else: resolved_logger.log(level, "Completed %s", func_name) return result except Exception as e: exc_func_name = ( func_name if log_enabled else f"{func.__module__}.{func.__qualname__}" ) resolved_logger.log( logging.ERROR, "Exception in %s: %s", exc_func_name, e ) raise return wrapper # type: ignore[return-value] return decorator
[docs] def log_performance( logger: LoggerType | None = None, level: int = logging.INFO, threshold: float = 0.1, ) -> Callable[[F], F]: """Decorator to log function performance. Args: logger: Logger to use. If None, creates one for the module. level: Logging level to use. threshold: Minimum duration (seconds) to log. """ resolved_logger: LoggerType | None = logger def decorator(func: F) -> F: nonlocal resolved_logger if resolved_logger is None: resolved_logger = get_logger(func.__module__) @functools.wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: start_time = time.perf_counter() func_name = f"{func.__module__}.{func.__qualname__}" assert resolved_logger is not None # For type narrowing try: result = func(*args, **kwargs) duration = time.perf_counter() - start_time if duration >= threshold: resolved_logger.log( level, f"Performance: {func_name} completed in {duration:.3f}s", ) return result except Exception as e: duration = time.perf_counter() - start_time resolved_logger.log( logging.ERROR, f"Performance: {func_name} failed after {duration:.3f}s: {e}", ) raise return wrapper # type: ignore[return-value] return decorator
[docs] @contextmanager def log_operation( operation_name: str, logger: LoggerType | None = None, level: int = logging.INFO, ) -> Generator[LoggerType]: """Context manager for logging operations. Args: operation_name: Name of the operation. logger: Logger to use. If None, creates one for caller's module. level: Logging level to use. """ resolved_logger = get_logger() if logger is None else logger resolved_logger.log(level, f"Starting operation: {operation_name}") start_time = time.perf_counter() try: yield resolved_logger duration = time.perf_counter() - start_time resolved_logger.log( level, f"Completed operation: {operation_name} in {duration:.3f}s" ) except Exception as e: duration = time.perf_counter() - start_time resolved_logger.log( logging.ERROR, f"Failed operation: {operation_name} after {duration:.3f}s: {e}", ) raise
# Configure default logging on import _logger_manager.configure()