"""Structured logging utilities for the homodyne package.
Provides a lightweight but flexible logging system that matches the CMC
reimplementation requirements: contextual log prefixes, configurable console
and rotating file handlers, and helpers for performance monitoring.
"""
from __future__ import annotations
import functools
import inspect
import logging
import threading
import time
import traceback
from collections.abc import Callable, Generator, Mapping
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Any, TypeVar
# Type variables for decorators
F = TypeVar("F", bound=Callable[..., Any])
# Type alias for logger types
LoggerType = logging.Logger | logging.LoggerAdapter[logging.Logger]
DEFAULT_FORMAT_DETAILED = "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s"
DEFAULT_FORMAT_SIMPLE = "%(levelname)-8s | %(message)s"
def _resolve_level(level: str | int | None) -> int | None:
"""Convert string/int log level to logging level constant."""
if level is None:
return None
if isinstance(level, int):
return level
return getattr(logging, str(level).upper(), logging.INFO)
class _ColorFormatter(logging.Formatter):
"""Optional ANSI color formatter for console logging."""
COLOR_MAP = {
"DEBUG": "\033[36m", # Cyan
"INFO": "\033[32m", # Green
"WARNING": "\033[33m", # Yellow
"ERROR": "\033[31m", # Red
"CRITICAL": "\033[35m", # Magenta
}
RESET = "\033[0m"
def __init__(self, fmt: str, datefmt: str | None, use_color: bool) -> None:
super().__init__(fmt=fmt, datefmt=datefmt)
self.use_color = use_color
def format(self, record: logging.LogRecord) -> str:
original_levelname = record.levelname
if self.use_color and original_levelname in self.COLOR_MAP:
record.levelname = (
f"{self.COLOR_MAP[original_levelname]}{original_levelname}{self.RESET}"
)
try:
return super().format(record)
finally:
record.levelname = original_levelname
class _ContextAdapter(logging.LoggerAdapter):
"""Logger adapter that prefixes messages with structured context."""
def process(self, msg: str, kwargs: Any) -> tuple[str, Any]:
if not self.extra:
return msg, kwargs
context_parts = [
f"{key}={value}"
for key, value in self.extra.items()
if value is not None and value != ""
]
if context_parts:
msg = f"[{' '.join(context_parts)}] {msg}"
return msg, kwargs
[docs]
@dataclass
class LogConfiguration:
"""Programmatic logging configuration.
Alternative to configure_logging() for programmatic control over
logging settings.
Attributes:
console_level: Console log level (default "INFO").
console_format: Console format ("simple" or "detailed").
console_colors: Enable ANSI colors in console (default False).
file_enabled: Enable file logging (default True).
file_path: Log file path (None = auto-generate).
file_level: File log level (default "DEBUG").
file_format: File format ("simple" or "detailed").
file_rotation_mb: Max file size before rotation (default 10).
file_backup_count: Number of backup files to keep (default 5).
module_overrides: Per-module log level overrides.
Example:
>>> config = LogConfiguration.from_cli_args(verbose=True, log_file="analysis.log")
>>> config.apply()
>>> config = LogConfiguration(
... console_level="INFO",
... file_level="DEBUG",
... module_overrides={"jax": "WARNING", "homodyne.optimization": "DEBUG"}
... )
>>> config.apply()
"""
console_level: str = "INFO"
console_format: str = "simple"
console_colors: bool = False
file_enabled: bool = True
file_path: str | Path | None = None
file_level: str = "DEBUG"
file_format: str = "detailed"
file_rotation_mb: int = 10
file_backup_count: int = 5
module_overrides: dict[str, str] = field(default_factory=dict)
[docs]
def apply(self) -> Path | None:
"""Apply this configuration to the logging system.
Returns:
Path to log file if file logging is enabled, None otherwise.
"""
# Suppress external library logging by default
default_suppressions = {
"jax": "WARNING",
"numpy": "WARNING",
"numba": "WARNING",
"h5py": "WARNING",
}
# Merge default suppressions with user overrides (user overrides win)
merged_overrides = {**default_suppressions, **self.module_overrides}
# Determine file path
file_path = None
if self.file_enabled:
if self.file_path is not None:
file_path = Path(self.file_path)
else:
# Auto-generate timestamped log file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
file_path = Path(f"homodyne_{timestamp}.log")
return _logger_manager.configure(
level="DEBUG", # Root level should be lowest to allow filtering
console_level=self.console_level,
console_format=self.console_format,
console_colors=self.console_colors,
file_path=file_path,
file_level=self.file_level if self.file_enabled else False,
max_size_mb=self.file_rotation_mb,
backup_count=self.file_backup_count,
module_levels=merged_overrides,
force=True,
)
[docs]
@classmethod
def from_dict(cls, config: dict[str, Any]) -> LogConfiguration:
"""Create configuration from dictionary.
Args:
config: Dictionary with configuration values.
Returns:
LogConfiguration instance.
"""
return cls(
console_level=config.get("console_level", "INFO"),
console_format=config.get("console_format", "simple"),
console_colors=config.get("console_colors", False),
file_enabled=config.get("file_enabled", True),
file_path=config.get("file_path"),
file_level=config.get("file_level", "DEBUG"),
file_format=config.get("file_format", "detailed"),
file_rotation_mb=config.get("file_rotation_mb", 10),
file_backup_count=config.get("file_backup_count", 5),
module_overrides=config.get("module_overrides", {}),
)
[docs]
@classmethod
def from_cli_args(
cls,
verbose: bool = False,
quiet: bool = False,
log_file: str | None = None,
) -> LogConfiguration:
"""Create configuration from CLI flags.
Args:
verbose: Enable DEBUG level console logging.
quiet: Enable ERROR-only console logging.
log_file: Path to log file (None = auto-generate if file logging enabled).
Returns:
LogConfiguration instance.
"""
# Determine console level from flags
if quiet:
console_level = "ERROR"
elif verbose:
console_level = "DEBUG"
else:
console_level = "INFO"
return cls(
console_level=console_level,
console_format="detailed" if verbose else "simple",
console_colors=False,
file_enabled=True,
file_path=log_file,
file_level="DEBUG",
file_format="detailed",
)
@dataclass
class _PhaseRecord:
"""Internal record for phase timing."""
name: str
start_time: float | None = None
end_time: float | None = None
memory_peak_gb: float | None = None
@property
def duration(self) -> float | None:
if self.start_time is None or self.end_time is None:
return None
return self.end_time - self.start_time
[docs]
class AnalysisSummaryLogger:
"""Structured logging for analysis completion summaries.
Tracks phase timings, metrics, output files, and convergence status
for logging a structured summary at analysis completion.
Example:
>>> summary = AnalysisSummaryLogger(run_id="analysis_001", analysis_mode="laminar_flow")
>>> summary.start_phase("loading")
>>> data = load_data(config)
>>> summary.end_phase("loading", memory_peak_gb=2.1)
>>> summary.record_metric("chi_squared", result.chi_squared)
>>> summary.set_convergence_status("converged")
>>> summary.log_summary(logger)
"""
[docs]
def __init__(self, run_id: str, analysis_mode: str) -> None:
"""Initialize summary logger for an analysis run.
Args:
run_id: Unique identifier for this analysis run.
analysis_mode: Analysis mode (e.g., "static_isotropic", "laminar_flow").
"""
self.run_id = run_id
self.analysis_mode = analysis_mode
self._phases: dict[str, _PhaseRecord] = {}
self._metrics: dict[str, float] = {}
self._output_files: list[Path] = []
self._convergence_status: str | None = None
self._start_time = time.perf_counter()
self._warning_count = 0
self._error_count = 0
# T054: Configuration summary for logging
self._config_summary: dict[str, Any] = {}
[docs]
def start_phase(self, name: str) -> None:
"""Mark phase start for timing.
Args:
name: Phase name (e.g., "loading", "optimization").
"""
self._phases[name] = _PhaseRecord(name=name, start_time=time.perf_counter())
[docs]
def end_phase(self, name: str, memory_peak_gb: float | None = None) -> None:
"""Mark phase completion.
Args:
name: Phase name that was started.
memory_peak_gb: Optional peak memory usage during phase.
"""
if name in self._phases:
self._phases[name].end_time = time.perf_counter()
self._phases[name].memory_peak_gb = memory_peak_gb
[docs]
def record_metric(self, name: str, value: float) -> None:
"""Record a named metric (e.g., chi_squared).
Args:
name: Metric name.
value: Metric value.
"""
self._metrics[name] = value
[docs]
def add_output_file(self, path: Path | str) -> None:
"""Record an output file path.
Args:
path: Path to output file.
"""
self._output_files.append(Path(path))
[docs]
def set_convergence_status(self, status: str) -> None:
"""Set final convergence status.
Args:
status: Convergence status (e.g., "converged", "max_iter", "failed").
"""
self._convergence_status = status
[docs]
def increment_warning_count(self) -> None:
"""Increment warning counter."""
self._warning_count += 1
[docs]
def increment_error_count(self) -> None:
"""Increment error counter."""
self._error_count += 1
[docs]
def set_config_summary(
self,
optimizer: str | None = None,
n_params: int | None = None,
n_data_points: int | None = None,
n_phi_angles: int | None = None,
data_file: str | None = None,
**kwargs: Any,
) -> None:
"""T054: Set configuration summary for logging.
Args:
optimizer: Optimizer used (e.g., "nlsq", "cmc").
n_params: Number of parameters being optimized.
n_data_points: Total number of data points.
n_phi_angles: Number of phi angles.
data_file: Path to data file.
**kwargs: Additional key-value pairs to include.
"""
if optimizer is not None:
self._config_summary["optimizer"] = optimizer
if n_params is not None:
self._config_summary["n_params"] = n_params
if n_data_points is not None:
self._config_summary["n_data_points"] = n_data_points
if n_phi_angles is not None:
self._config_summary["n_phi_angles"] = n_phi_angles
if data_file is not None:
self._config_summary["data_file"] = data_file
# Add any additional kwargs
self._config_summary.update(kwargs)
[docs]
def log_summary(self, logger: logging.Logger | logging.LoggerAdapter) -> None:
"""Log the complete analysis summary.
Args:
logger: Logger to use for output.
"""
total_runtime = time.perf_counter() - self._start_time
# Build summary message
lines = [
"=" * 60,
"ANALYSIS SUMMARY",
"=" * 60,
f"Run ID: {self.run_id}",
f"Mode: {self.analysis_mode}",
f"Status: {self._convergence_status or 'unknown'}",
f"Total runtime: {total_runtime:.2f}s",
]
# T054: Add configuration summary
if self._config_summary:
lines.append("")
lines.append("Configuration:")
for key, value in self._config_summary.items():
if isinstance(value, int) and value > 1000:
lines.append(f" {key}: {value:,}")
else:
lines.append(f" {key}: {value}")
# Add phase timings
if self._phases:
lines.append("")
lines.append("Phase Timings:")
for name, record in self._phases.items():
duration = record.duration
if duration is not None:
mem_str = (
f" (peak: {record.memory_peak_gb:.1f} GB)"
if record.memory_peak_gb is not None
else ""
)
lines.append(f" {name}: {duration:.2f}s{mem_str}")
# Add metrics
if self._metrics:
lines.append("")
lines.append("Metrics:")
for name, value in self._metrics.items():
lines.append(f" {name}: {value:.6g}")
# Add output files
if self._output_files:
lines.append("")
lines.append("Output files:")
for path in self._output_files:
lines.append(f" {path}")
# Add warning/error counts
if self._warning_count > 0 or self._error_count > 0:
lines.append("")
lines.append(
f"Warnings: {self._warning_count}, Errors: {self._error_count}"
)
lines.append("=" * 60)
logger.info("\n".join(lines))
[docs]
def as_dict(self) -> dict[str, Any]:
"""Export summary as dictionary for JSON serialization.
Returns:
Dictionary with all summary data.
"""
total_runtime = time.perf_counter() - self._start_time
phases_dict = {}
for name, record in self._phases.items():
phases_dict[name] = {
"duration_s": record.duration,
"memory_peak_gb": record.memory_peak_gb,
}
# Sanitize metrics so NaN/Inf floats (which can occur in degenerate
# runs) do not cause json.dump to fail or produce invalid JSON output.
# Import lazily to avoid circular dependency (logging ← json_utils).
try:
from homodyne.io.json_utils import json_safe as _json_safe
except ImportError:
def _json_safe(value: Any) -> Any:
"""Minimal fallback when io module unavailable."""
if isinstance(value, float):
import math
if math.isnan(value):
return None
if math.isinf(value):
return str(value)
return value
return {
"run_id": self.run_id,
"analysis_mode": self.analysis_mode,
"convergence_status": self._convergence_status,
"total_runtime_s": total_runtime,
"config_summary": self._config_summary, # T054
"phases": phases_dict,
"metrics": _json_safe(self._metrics),
"output_files": [str(p) for p in self._output_files],
"warning_count": self._warning_count,
"error_count": self._error_count,
}
[docs]
class MinimalLogger:
"""Configurable logger manager for the homodyne package.
Thread-safe singleton for managing homodyne logging configuration.
"""
_instance: MinimalLogger | None = None
_initialized: bool
_configured: bool
_root_logger_name: str
_lock: threading.Lock
[docs]
def __new__(cls) -> MinimalLogger:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
cls._instance._lock = threading.Lock()
return cls._instance
[docs]
def __init__(self) -> None:
if self._initialized:
return
self._configured = False
self._root_logger_name = "homodyne"
self._initialized = True
@staticmethod
def _build_formatter(
format_name: str = "detailed",
use_color: bool = False,
) -> logging.Formatter:
fmt = (
DEFAULT_FORMAT_SIMPLE
if format_name == "simple"
else DEFAULT_FORMAT_DETAILED
)
return _ColorFormatter(
fmt=fmt, datefmt="%Y-%m-%d %H:%M:%S", use_color=use_color
)
def _clear_managed_handlers(self, logger: logging.Logger) -> None:
for handler in list(logger.handlers):
if getattr(handler, "_homodyne_managed", False):
logger.removeHandler(handler)
handler.close()
def _configure_impl(
self,
level: str | int = "INFO",
*,
console_level: str | int | None = None,
console_format: str = "detailed",
console_colors: bool = False,
file_path: str | Path | None = None,
file_level: str | int | None = None,
max_size_mb: int = 10,
backup_count: int = 5,
module_levels: Mapping[str, str | int] | None = None,
force: bool = False,
) -> Path | None:
"""Internal implementation of configure (called under lock)."""
root_logger = logging.getLogger(self._root_logger_name)
if force:
self._clear_managed_handlers(root_logger)
root_level_candidates = [_resolve_level(level)]
if console_level is not False:
root_level_candidates.append(_resolve_level(console_level))
if file_level is not False:
root_level_candidates.append(_resolve_level(file_level))
root_level = min(lvl for lvl in root_level_candidates if lvl is not None)
root_logger.setLevel(root_level)
# Console handler
console_handler: logging.Handler | None = None
for handler in root_logger.handlers:
if isinstance(handler, logging.StreamHandler) and not isinstance(
handler,
logging.FileHandler,
):
console_handler = handler
break
if console_level is not False:
if console_handler is None:
console_handler = logging.StreamHandler()
console_handler._homodyne_managed = True # type: ignore[attr-defined]
root_logger.addHandler(console_handler)
console_handler.setLevel(_resolve_level(console_level) or root_level)
console_handler.setFormatter(
self._build_formatter(console_format, use_color=console_colors)
)
# File handler
created_file: Path | None = None
if file_path:
file_path = Path(file_path)
try:
file_path.parent.mkdir(parents=True, exist_ok=True)
except OSError as e:
logger = logging.getLogger(self._root_logger_name)
logger.warning(
f"Cannot create log directory {file_path.parent}: {e}. "
"File logging disabled."
)
file_path = None # Skip file handler, continue with console-only
if file_path is not None:
created_file = file_path
max_bytes = int(max_size_mb * 1024 * 1024)
if max_bytes > 0:
file_handler: logging.Handler = RotatingFileHandler(
file_path,
maxBytes=max_bytes,
backupCount=backup_count,
)
else:
file_handler = logging.FileHandler(file_path)
file_handler._homodyne_managed = True # type: ignore[attr-defined]
file_handler.setLevel(_resolve_level(file_level) or root_level)
file_handler.setFormatter(
logging.Formatter(
DEFAULT_FORMAT_DETAILED,
datefmt="%Y-%m-%d %H:%M:%S",
)
)
root_logger.addHandler(file_handler)
# Default suppression for external libraries (FR-005)
# These are applied first, then user overrides can override them
default_suppressions = {
"jax": "WARNING",
"numpy": "WARNING",
"numba": "WARNING",
"h5py": "WARNING",
}
for lib_name, lib_level in default_suppressions.items():
lib_logger = logging.getLogger(lib_name)
# Only set if not already configured by user
if lib_logger.level == logging.NOTSET:
lib_logger.setLevel(_resolve_level(lib_level) or logging.WARNING)
# Module-specific overrides (user overrides win over defaults)
if module_levels:
for module_name, module_level in module_levels.items():
logging.getLogger(module_name).setLevel(
_resolve_level(module_level) or root_level
)
self._configured = True
return created_file
[docs]
def get_logger(self, name: str) -> logging.Logger:
"""Get or create a logger with hierarchical naming."""
if not name.startswith(self._root_logger_name):
if name == "__main__":
full_name = f"{self._root_logger_name}.main"
elif "." in name and name.startswith("homodyne"):
full_name = name
else:
full_name = f"{self._root_logger_name}.{name}"
else:
full_name = name
if not self._configured:
self.configure()
return logging.getLogger(full_name)
# Global logger manager instance
_logger_manager = MinimalLogger()
[docs]
def get_logger(
name: str | None = None,
*,
context: Mapping[str, Any] | None = None,
) -> logging.Logger | logging.LoggerAdapter[logging.Logger]:
"""Get a logger instance with automatic naming and optional context."""
if name is None:
frame = inspect.currentframe()
try:
if frame is not None and frame.f_back is not None:
name = frame.f_back.f_globals.get("__name__", "unknown")
finally:
del frame
base_logger = _logger_manager.get_logger(name or "unknown")
if context:
return _ContextAdapter(base_logger, dict(context))
return base_logger
[docs]
def with_context(
logger: logging.Logger | logging.LoggerAdapter[logging.Logger],
**context: Any,
) -> logging.LoggerAdapter[logging.Logger]:
"""Create a contextual logger with key-value prefixes.
Context is formatted as [key=value][key2=value2] message.
Nested calls merge contexts (inner overrides outer on key conflicts).
Thread-safe for use in multiprocessing.
Args:
logger: Base logger or existing contextual adapter to wrap.
**context: Key-value pairs to include as prefix.
Returns:
A logger adapter that prefixes all messages with context.
Example:
>>> logger = get_logger(__name__)
>>> ctx_logger = with_context(logger, run_id="abc123", mode="laminar_flow")
>>> ctx_logger.info("Starting analysis")
# Output: [run_id=abc123 mode=laminar_flow] Starting analysis
>>> # Nested context
>>> shard_logger = with_context(ctx_logger, shard=5)
>>> shard_logger.info("Processing shard")
# Output: [run_id=abc123 mode=laminar_flow shard=5] Processing shard
"""
# Filter out None values from new context
new_context = {k: v for k, v in context.items() if v is not None}
# If wrapping an existing _ContextAdapter, merge contexts (inner overrides outer)
if isinstance(logger, _ContextAdapter):
merged_context = dict(logger.extra) if logger.extra else {}
merged_context.update(new_context)
# Get the underlying logger to avoid nested adapters
base_logger = logger.logger
return _ContextAdapter(base_logger, merged_context)
# If wrapping a LoggerAdapter (not our _ContextAdapter), extract base logger
if isinstance(logger, logging.LoggerAdapter):
base_logger = logger.logger
return _ContextAdapter(base_logger, new_context)
# Wrapping a plain Logger
return _ContextAdapter(logger, new_context)
[docs]
@dataclass
class PhaseContext:
"""Context object returned by log_phase() with timing and memory info."""
name: str
duration: float = 0.0
memory_peak_gb: float | None = None
memory_delta_gb: float | None = None
def _get_memory_gb() -> float | None:
"""Get current process memory usage in GB, or None if unavailable."""
try:
import resource
# Get max resident set size (in KB on Linux, bytes on macOS)
rusage = resource.getrusage(resource.RUSAGE_SELF)
# maxrss is in bytes on macOS, KB on Linux
import sys
scale = (1024**3) if sys.platform == "darwin" else (1024**2)
return rusage.ru_maxrss / scale
except (ImportError, AttributeError):
return None
[docs]
@contextmanager
def log_phase(
name: str,
logger: LoggerType | None = None,
level: int = logging.INFO,
track_memory: bool = False,
threshold_s: float = 0.0,
) -> Generator[PhaseContext]:
"""Context manager for phase-level timing with optional memory tracking.
Args:
name: Phase name for logging.
logger: Logger to use. If None, uses module logger.
level: Log level for phase messages.
track_memory: Track memory usage during phase.
threshold_s: Only log if duration > threshold (0 = always log).
Yields:
PhaseContext with name, duration, memory_peak_gb, memory_delta_gb.
Duration and memory values are populated after the context exits.
Example:
>>> with log_phase("optimization", track_memory=True) as phase:
... result = run_optimization(data)
>>> print(f"Took {phase.duration:.1f}s")
# Logs: Phase 'optimization' completed in 45.3s (peak memory: 12.4 GB)
"""
resolved_logger = get_logger() if logger is None else logger
context = PhaseContext(name=name)
# Track initial memory if requested
memory_start: float | None = None
if track_memory:
memory_start = _get_memory_gb()
# Log phase start (only if no threshold or threshold is 0)
if threshold_s <= 0:
resolved_logger.log(level, f"Phase '{name}' started")
start_time = time.perf_counter()
try:
yield context
finally:
# Calculate duration
context.duration = time.perf_counter() - start_time
# Track memory if requested
if track_memory:
memory_end = _get_memory_gb()
if memory_end is not None:
context.memory_peak_gb = memory_end
if memory_start is not None:
context.memory_delta_gb = memory_end - memory_start
# Log phase completion if duration exceeds threshold
if context.duration >= threshold_s:
msg_parts = [f"Phase '{name}' completed in {context.duration:.2f}s"]
if context.memory_peak_gb is not None:
msg_parts.append(f"(peak memory: {context.memory_peak_gb:.1f} GB)")
resolved_logger.log(level, " ".join(msg_parts))
[docs]
def log_exception(
logger: logging.Logger | logging.LoggerAdapter[logging.Logger],
exc: BaseException,
context: dict[str, Any] | None = None,
level: int = logging.ERROR,
include_traceback: bool = True,
) -> None:
"""Log an exception with full context for debugging.
Extracts module, function, and line number from exception traceback.
Formats context as key-value pairs in the message.
Args:
logger: Logger to use.
exc: Exception to log.
context: Additional context (e.g., parameter values).
level: Log level (default ERROR).
include_traceback: Include full traceback (default True).
Example:
>>> try:
... result = compute_jacobian(params)
... except ValueError as e:
... log_exception(logger, e, context={
... "iteration": 45,
... "params": params.tolist()[:5]
... })
... raise
# Logs:
# ERROR | homodyne.optimization.nlsq.core | Exception in compute_jacobian:
# ValueError: invalid value
# Context: iteration=45, params=[1.2e-11, 0.85, ...]
# Traceback (most recent call last):
# ...
"""
# Extract location info from traceback
tb = exc.__traceback__
location_info = ""
if tb is not None:
# Walk to the innermost frame where the exception occurred
while tb.tb_next is not None:
tb = tb.tb_next
frame = tb.tb_frame
func_name = frame.f_code.co_name
line_no = tb.tb_lineno
module_name = frame.f_globals.get("__name__", "unknown")
location_info = f" in {module_name}.{func_name}:{line_no}"
# Build the message
exc_type = type(exc).__name__
exc_msg = str(exc)
msg_parts = [f"Exception{location_info}: {exc_type}: {exc_msg}"]
# Add context if provided
if context:
context_str = ", ".join(f"{k}={v!r}" for k, v in context.items())
msg_parts.append(f"Context: {context_str}")
# Add traceback if requested
if include_traceback:
tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
msg_parts.append(f"Traceback:\n{tb_str}")
logger.log(level, "\n".join(msg_parts))
[docs]
def log_calls(
logger: LoggerType | None = None,
level: int = logging.DEBUG,
include_args: bool = False,
include_result: bool = False,
) -> Callable[[F], F]:
"""Decorator to log function calls.
Args:
logger: Logger to use. If None, creates one for the module.
level: Logging level to use.
include_args: Whether to log function arguments.
include_result: Whether to log function return value.
"""
resolved_logger: LoggerType | None = logger
def decorator(func: F) -> F:
nonlocal resolved_logger
if resolved_logger is None:
resolved_logger = get_logger(func.__module__)
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
assert resolved_logger is not None # For type narrowing
# Guard: skip all formatting if log level is not enabled
log_enabled = resolved_logger.isEnabledFor(level)
# Log function entry
if log_enabled:
func_name = f"{func.__module__}.{func.__qualname__}"
if include_args:
args_str = ", ".join([repr(arg) for arg in args])
kwargs_str = ", ".join(
[f"{k}={repr(v)}" for k, v in kwargs.items()]
)
all_args = ", ".join(filter(None, [args_str, kwargs_str]))
resolved_logger.log(level, "Calling %s(%s)", func_name, all_args)
else:
resolved_logger.log(level, "Calling %s", func_name)
try:
result = func(*args, **kwargs)
# Log function exit
if log_enabled:
if include_result:
resolved_logger.log(
level, "Completed %s -> %r", func_name, result
)
else:
resolved_logger.log(level, "Completed %s", func_name)
return result
except Exception as e:
exc_func_name = (
func_name
if log_enabled
else f"{func.__module__}.{func.__qualname__}"
)
resolved_logger.log(
logging.ERROR, "Exception in %s: %s", exc_func_name, e
)
raise
return wrapper # type: ignore[return-value]
return decorator
[docs]
@contextmanager
def log_operation(
operation_name: str,
logger: LoggerType | None = None,
level: int = logging.INFO,
) -> Generator[LoggerType]:
"""Context manager for logging operations.
Args:
operation_name: Name of the operation.
logger: Logger to use. If None, creates one for caller's module.
level: Logging level to use.
"""
resolved_logger = get_logger() if logger is None else logger
resolved_logger.log(level, f"Starting operation: {operation_name}")
start_time = time.perf_counter()
try:
yield resolved_logger
duration = time.perf_counter() - start_time
resolved_logger.log(
level, f"Completed operation: {operation_name} in {duration:.3f}s"
)
except Exception as e:
duration = time.perf_counter() - start_time
resolved_logger.log(
logging.ERROR,
f"Failed operation: {operation_name} after {duration:.3f}s: {e}",
)
raise
# Configure default logging on import
_logger_manager.configure()