feat: Implement structured JSON logging and performance tracking features

This commit is contained in:
claudi 2026-01-29 10:25:54 +01:00
parent db3799a643
commit 5dc988005c
3 changed files with 370 additions and 12 deletions

View file

@ -1,12 +1,19 @@
"""Unit tests for logging module."""
import json
import logging
import logging.handlers
import time
from pathlib import Path
import pytest
from webdrop_bridge.utils.logging import get_logger, setup_logging
from webdrop_bridge.utils.logging import (
JSONFormatter,
PerformanceTracker,
get_logger,
setup_logging,
)
class TestSetupLogging:
@ -152,3 +159,178 @@ class TestLogRotation:
# Default: 10 MB max, 5 backups
assert rotating_handler.maxBytes == 10 * 1024 * 1024
assert rotating_handler.backupCount == 5
class TestJSONFormatter:
"""Test structured JSON logging."""
def test_json_formatter_creates_valid_json(self):
"""Test that JSONFormatter produces valid JSON."""
formatter = JSONFormatter()
record = logging.LogRecord(
name="test.module",
level=logging.INFO,
pathname="test.py",
lineno=42,
msg="Test message",
args=(),
exc_info=None,
)
output = formatter.format(record)
# Should be valid JSON
data = json.loads(output)
assert data["message"] == "Test message"
assert data["level"] == "INFO"
assert data["logger"] == "test.module"
assert data["line"] == 42
def test_json_formatter_includes_timestamp(self):
"""Test that JSON output includes ISO format timestamp."""
formatter = JSONFormatter()
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="test.py",
lineno=1,
msg="Test",
args=(),
exc_info=None,
)
output = formatter.format(record)
data = json.loads(output)
assert "timestamp" in data
# Should be ISO format like "2026-01-29T12:34:56.789000"
assert "T" in data["timestamp"]
def test_json_formatter_with_exception(self):
"""Test JSON formatter handles exceptions."""
formatter = JSONFormatter()
try:
raise ValueError("Test error")
except ValueError:
import sys
record = logging.LogRecord(
name="test",
level=logging.ERROR,
pathname="test.py",
lineno=1,
msg="Error occurred",
args=(),
exc_info=sys.exc_info(),
)
output = formatter.format(record)
data = json.loads(output)
assert "exception" in data
assert "ValueError" in data["exception"]
assert "Test error" in data["exception"]
def test_setup_logging_with_json_format(self, tmp_path):
"""Test setup_logging with JSON format enabled."""
log_file = tmp_path / "test.log"
logger = setup_logging(
name="test_json",
level="INFO",
log_file=log_file,
json_format=True,
)
logger.info("Test JSON message", extra={"user_id": 123})
# Read and parse log file
content = log_file.read_text()
data = json.loads(content)
assert data["message"] == "Test JSON message"
assert data["level"] == "INFO"
assert data["user_id"] == 123
class TestLogArchival:
"""Test log file archival and rotation."""
def test_setup_logging_with_log_file_created(self, tmp_path):
"""Test that log file is created by setup_logging."""
log_file = tmp_path / "test.log"
logger = setup_logging(
name="test_file_creation",
level="INFO",
log_file=log_file,
)
logger.info("Test message")
# Check that log file was created
assert log_file.exists()
assert "Test message" in log_file.read_text()
def test_archive_old_logs_with_nonexistent_directory(self, tmp_path):
"""Test that archive function handles nonexistent directories."""
from webdrop_bridge.utils.logging import _archive_old_logs
nonexistent_log = tmp_path / "nonexistent" / "test.log"
# Should not raise even if directory doesn't exist
_archive_old_logs(nonexistent_log, retention_days=30)
assert True # Function completed without error
class TestPerformanceTracker:
"""Test performance metrics collection."""
def test_performance_tracker_context_manager(self):
"""Test PerformanceTracker context manager."""
tracker = PerformanceTracker("test_operation")
with tracker as t:
time.sleep(0.01) # Sleep for 10ms
assert t.start_time is not None
assert tracker.elapsed_time >= 0.01
assert tracker.get_elapsed() >= 0.01
def test_performance_tracker_logs_timing(self, caplog):
"""Test that PerformanceTracker logs elapsed time."""
logger = get_logger("test.perf")
caplog.set_level(logging.DEBUG)
with PerformanceTracker("database_query", logger=logger):
time.sleep(0.01)
# Should have logged the operation
assert "database_query" in caplog.text
def test_performance_tracker_logs_errors(self, caplog):
"""Test that PerformanceTracker logs errors."""
logger = get_logger("test.perf.error")
caplog.set_level(logging.WARNING)
try:
with PerformanceTracker("failing_operation", logger=logger):
raise ValueError("Test error")
except ValueError:
pass
# Should have logged the error
assert "failing_operation" in caplog.text
assert "error" in caplog.text.lower()
def test_performance_tracker_get_elapsed_before_exit(self):
"""Test getting elapsed time before context exit."""
tracker = PerformanceTracker("test")
with tracker:
elapsed = tracker.get_elapsed()
assert elapsed >= 0 # Should return time since start
# After exit, should have final time
assert tracker.elapsed_time >= elapsed