336 lines
10 KiB
Python
336 lines
10 KiB
Python
"""Unit tests for logging module."""
|
|
|
|
import json
|
|
import logging
|
|
import logging.handlers
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from webdrop_bridge.utils.logging import (
|
|
JSONFormatter,
|
|
PerformanceTracker,
|
|
get_logger,
|
|
setup_logging,
|
|
)
|
|
|
|
|
|
class TestSetupLogging:
|
|
"""Test logging configuration."""
|
|
|
|
def test_setup_logging_console_only(self):
|
|
"""Test console-only logging setup."""
|
|
logger = setup_logging(name="test_console", level="DEBUG")
|
|
|
|
assert logger.name == "test_console"
|
|
assert logger.level == logging.DEBUG
|
|
assert len(logger.handlers) == 1
|
|
assert isinstance(logger.handlers[0], logging.StreamHandler)
|
|
|
|
def test_setup_logging_with_file(self, tmp_path):
|
|
"""Test logging setup with file handler."""
|
|
log_file = tmp_path / "test.log"
|
|
|
|
logger = setup_logging(
|
|
name="test_file",
|
|
level="INFO",
|
|
log_file=log_file,
|
|
)
|
|
|
|
assert logger.name == "test_file"
|
|
assert len(logger.handlers) == 2
|
|
|
|
# Find file handler
|
|
file_handler = None
|
|
for handler in logger.handlers:
|
|
if isinstance(handler, logging.handlers.RotatingFileHandler):
|
|
file_handler = handler
|
|
break
|
|
|
|
assert file_handler is not None
|
|
assert log_file.exists()
|
|
|
|
def test_setup_logging_invalid_level(self):
|
|
"""Test that invalid log level raises KeyError."""
|
|
with pytest.raises(KeyError, match="Invalid logging level"):
|
|
setup_logging(level="INVALID")
|
|
|
|
def test_setup_logging_invalid_file_path(self):
|
|
"""Test that inaccessible file path raises ValueError."""
|
|
# Use a path that can't be created (on Windows, CON is reserved)
|
|
if Path.cwd().drive: # Windows
|
|
invalid_path = Path("CON") / "invalid.log"
|
|
else: # Unix - use root-only directory
|
|
invalid_path = Path("/root") / "invalid.log"
|
|
|
|
with pytest.raises(ValueError, match="Cannot write to log file"):
|
|
setup_logging(log_file=invalid_path)
|
|
|
|
def test_setup_logging_custom_format(self, tmp_path):
|
|
"""Test logging with custom format string."""
|
|
log_file = tmp_path / "test.log"
|
|
custom_fmt = "%(levelname)s:%(name)s:%(message)s"
|
|
|
|
logger = setup_logging(
|
|
name="test_format",
|
|
level="INFO",
|
|
log_file=log_file,
|
|
fmt=custom_fmt,
|
|
)
|
|
|
|
# Log a test message
|
|
logger.info("test message")
|
|
|
|
# Check format in log file
|
|
content = log_file.read_text()
|
|
assert "INFO:test_format:test message" in content
|
|
|
|
def test_setup_logging_creates_parent_dirs(self, tmp_path):
|
|
"""Test that setup_logging creates parent directories."""
|
|
log_file = tmp_path / "nested" / "dir" / "test.log"
|
|
|
|
logger = setup_logging(
|
|
name="test_nested",
|
|
level="INFO",
|
|
log_file=log_file,
|
|
)
|
|
|
|
logger.info("test")
|
|
|
|
assert log_file.exists()
|
|
assert log_file.parent.exists()
|
|
|
|
def test_setup_logging_removes_duplicates(self):
|
|
"""Test that multiple setup calls don't duplicate handlers."""
|
|
logger_name = "test_duplicates"
|
|
|
|
# First setup
|
|
logger1 = setup_logging(name=logger_name, level="DEBUG")
|
|
initial_handler_count = len(logger1.handlers)
|
|
|
|
# Second setup (should clear old handlers)
|
|
logger2 = setup_logging(name=logger_name, level="INFO")
|
|
|
|
assert logger2 is logger1 # Same logger object
|
|
assert len(logger2.handlers) == initial_handler_count
|
|
|
|
|
|
class TestGetLogger:
|
|
"""Test get_logger convenience function."""
|
|
|
|
def test_get_logger(self):
|
|
"""Test getting a logger instance."""
|
|
logger = get_logger("test.module")
|
|
|
|
assert isinstance(logger, logging.Logger)
|
|
assert logger.name == "test.module"
|
|
|
|
def test_get_logger_default_name(self):
|
|
"""Test get_logger uses __name__ by default."""
|
|
logger = get_logger()
|
|
|
|
# Should return a logger (when no name provided, uses logging module __name__)
|
|
assert isinstance(logger, logging.Logger)
|
|
assert logger.name == "webdrop_bridge.utils.logging"
|
|
|
|
|
|
class TestLogRotation:
|
|
"""Test log file rotation."""
|
|
|
|
def test_rotating_file_handler_configured(self, tmp_path):
|
|
"""Test that file handler is configured for rotation."""
|
|
log_file = tmp_path / "test.log"
|
|
|
|
logger = setup_logging(
|
|
name="test_rotation",
|
|
level="INFO",
|
|
log_file=log_file,
|
|
)
|
|
|
|
# Find rotating file handler
|
|
rotating_handler = None
|
|
for handler in logger.handlers:
|
|
if isinstance(handler, logging.handlers.RotatingFileHandler):
|
|
rotating_handler = handler
|
|
break
|
|
|
|
assert rotating_handler is not None
|
|
# Default: 10 MB max, 5 backups
|
|
assert rotating_handler.maxBytes == 10 * 1024 * 1024
|
|
assert rotating_handler.backupCount == 5
|
|
|
|
|
|
class TestJSONFormatter:
|
|
"""Test structured JSON logging."""
|
|
|
|
def test_json_formatter_creates_valid_json(self):
|
|
"""Test that JSONFormatter produces valid JSON."""
|
|
formatter = JSONFormatter()
|
|
record = logging.LogRecord(
|
|
name="test.module",
|
|
level=logging.INFO,
|
|
pathname="test.py",
|
|
lineno=42,
|
|
msg="Test message",
|
|
args=(),
|
|
exc_info=None,
|
|
)
|
|
|
|
output = formatter.format(record)
|
|
|
|
# Should be valid JSON
|
|
data = json.loads(output)
|
|
assert data["message"] == "Test message"
|
|
assert data["level"] == "INFO"
|
|
assert data["logger"] == "test.module"
|
|
assert data["line"] == 42
|
|
|
|
def test_json_formatter_includes_timestamp(self):
|
|
"""Test that JSON output includes ISO format timestamp."""
|
|
formatter = JSONFormatter()
|
|
record = logging.LogRecord(
|
|
name="test",
|
|
level=logging.INFO,
|
|
pathname="test.py",
|
|
lineno=1,
|
|
msg="Test",
|
|
args=(),
|
|
exc_info=None,
|
|
)
|
|
|
|
output = formatter.format(record)
|
|
data = json.loads(output)
|
|
|
|
assert "timestamp" in data
|
|
# Should be ISO format like "2026-01-29T12:34:56.789000"
|
|
assert "T" in data["timestamp"]
|
|
|
|
def test_json_formatter_with_exception(self):
|
|
"""Test JSON formatter handles exceptions."""
|
|
formatter = JSONFormatter()
|
|
|
|
try:
|
|
raise ValueError("Test error")
|
|
except ValueError:
|
|
import sys
|
|
|
|
record = logging.LogRecord(
|
|
name="test",
|
|
level=logging.ERROR,
|
|
pathname="test.py",
|
|
lineno=1,
|
|
msg="Error occurred",
|
|
args=(),
|
|
exc_info=sys.exc_info(),
|
|
)
|
|
|
|
output = formatter.format(record)
|
|
data = json.loads(output)
|
|
|
|
assert "exception" in data
|
|
assert "ValueError" in data["exception"]
|
|
assert "Test error" in data["exception"]
|
|
|
|
def test_setup_logging_with_json_format(self, tmp_path):
|
|
"""Test setup_logging with JSON format enabled."""
|
|
log_file = tmp_path / "test.log"
|
|
|
|
logger = setup_logging(
|
|
name="test_json",
|
|
level="INFO",
|
|
log_file=log_file,
|
|
json_format=True,
|
|
)
|
|
|
|
logger.info("Test JSON message", extra={"user_id": 123})
|
|
|
|
# Read and parse log file
|
|
content = log_file.read_text()
|
|
data = json.loads(content)
|
|
|
|
assert data["message"] == "Test JSON message"
|
|
assert data["level"] == "INFO"
|
|
assert data["user_id"] == 123
|
|
|
|
|
|
class TestLogArchival:
|
|
"""Test log file archival and rotation."""
|
|
|
|
def test_setup_logging_with_log_file_created(self, tmp_path):
|
|
"""Test that log file is created by setup_logging."""
|
|
log_file = tmp_path / "test.log"
|
|
|
|
logger = setup_logging(
|
|
name="test_file_creation",
|
|
level="INFO",
|
|
log_file=log_file,
|
|
)
|
|
|
|
logger.info("Test message")
|
|
|
|
# Check that log file was created
|
|
assert log_file.exists()
|
|
assert "Test message" in log_file.read_text()
|
|
|
|
def test_archive_old_logs_with_nonexistent_directory(self, tmp_path):
|
|
"""Test that archive function handles nonexistent directories."""
|
|
from webdrop_bridge.utils.logging import _archive_old_logs
|
|
|
|
nonexistent_log = tmp_path / "nonexistent" / "test.log"
|
|
|
|
# Should not raise even if directory doesn't exist
|
|
_archive_old_logs(nonexistent_log, retention_days=30)
|
|
assert True # Function completed without error
|
|
|
|
|
|
class TestPerformanceTracker:
|
|
"""Test performance metrics collection."""
|
|
|
|
def test_performance_tracker_context_manager(self):
|
|
"""Test PerformanceTracker context manager."""
|
|
tracker = PerformanceTracker("test_operation")
|
|
|
|
with tracker as t:
|
|
time.sleep(0.01) # Sleep for 10ms
|
|
assert t.start_time is not None
|
|
|
|
assert tracker.elapsed_time >= 0.01
|
|
assert tracker.get_elapsed() >= 0.01
|
|
|
|
def test_performance_tracker_logs_timing(self, caplog):
|
|
"""Test that PerformanceTracker logs elapsed time."""
|
|
logger = get_logger("test.perf")
|
|
caplog.set_level(logging.DEBUG)
|
|
|
|
with PerformanceTracker("database_query", logger=logger):
|
|
time.sleep(0.01)
|
|
|
|
# Should have logged the operation
|
|
assert "database_query" in caplog.text
|
|
|
|
def test_performance_tracker_logs_errors(self, caplog):
|
|
"""Test that PerformanceTracker logs errors."""
|
|
logger = get_logger("test.perf.error")
|
|
caplog.set_level(logging.WARNING)
|
|
|
|
try:
|
|
with PerformanceTracker("failing_operation", logger=logger):
|
|
raise ValueError("Test error")
|
|
except ValueError:
|
|
pass
|
|
|
|
# Should have logged the error
|
|
assert "failing_operation" in caplog.text
|
|
assert "error" in caplog.text.lower()
|
|
|
|
def test_performance_tracker_get_elapsed_before_exit(self):
|
|
"""Test getting elapsed time before context exit."""
|
|
tracker = PerformanceTracker("test")
|
|
|
|
with tracker:
|
|
elapsed = tracker.get_elapsed()
|
|
assert elapsed >= 0 # Should return time since start
|
|
|
|
# After exit, should have final time
|
|
assert tracker.elapsed_time >= elapsed
|