- Added retry logic for webhook event delivery in `retry.py` with exponential backoff. - Created example webhook server implementations in `server.py` for Flask and FastAPI. - Developed comprehensive examples for using the Elytra webhooks subpackage in `webhook_examples.py`. - Introduced unit tests for webhook functionality, including event handling, authentication, and retry logic in `test_webhooks.py`.
437 lines
14 KiB
Python
437 lines
14 KiB
Python
"""Unit tests for the Elytra webhooks subpackage."""
|
|
|
|
from datetime import datetime
|
|
from unittest.mock import Mock, patch
|
|
|
|
import pytest
|
|
|
|
from elytra_client.webhooks import (
|
|
APIKeyAuth,
|
|
BasicAuth,
|
|
BearerAuth,
|
|
CloudEvent,
|
|
EventType,
|
|
ExponentialBackoffRetry,
|
|
Operation,
|
|
RetryConfig,
|
|
calculate_retry_delay,
|
|
)
|
|
from elytra_client.webhooks.auth import APIKeyAuthProvider, BasicAuthProvider, BearerAuthProvider
|
|
from elytra_client.webhooks.handlers import (
|
|
WebhookValidationError,
|
|
WebhookValidator,
|
|
parse_webhook_payload,
|
|
validate_cloud_event,
|
|
)
|
|
from elytra_client.webhooks.server import EventHandler, SimpleWebhookEventDispatcher
|
|
|
|
|
|
class TestCloudEvent:
|
|
"""Test CloudEvent model."""
|
|
|
|
def test_cloud_event_creation(self):
|
|
"""Test creating a CloudEvent."""
|
|
event = CloudEvent(
|
|
id="test-id",
|
|
source="elytra-pim",
|
|
type="com.elytra.product",
|
|
eventtype="product",
|
|
operation="add",
|
|
)
|
|
assert event.id == "test-id"
|
|
assert event.source == "elytra-pim"
|
|
assert event.specversion == "1.0"
|
|
|
|
def test_get_event_type(self):
|
|
"""Test parsing event type."""
|
|
event = CloudEvent(
|
|
id="test-id",
|
|
source="elytra-pim",
|
|
type="com.elytra.product",
|
|
eventtype="product",
|
|
)
|
|
assert event.get_event_type() == EventType.PRODUCT
|
|
|
|
def test_get_operation(self):
|
|
"""Test parsing operation."""
|
|
event = CloudEvent(
|
|
id="test-id",
|
|
source="elytra-pim",
|
|
type="com.elytra.product",
|
|
operation="modify",
|
|
)
|
|
assert event.get_operation() == Operation.MODIFY
|
|
|
|
def test_get_event_type_invalid(self):
|
|
"""Test invalid event type returns None."""
|
|
event = CloudEvent(
|
|
id="test-id",
|
|
source="elytra-pim",
|
|
type="com.elytra.unknown",
|
|
eventtype="invalid_type",
|
|
)
|
|
assert event.get_event_type() is None
|
|
|
|
def test_get_object_data(self):
|
|
"""Test extracting object data."""
|
|
event = CloudEvent(
|
|
id="test-id",
|
|
source="elytra-pim",
|
|
type="com.elytra.product",
|
|
data={"objectData": {"id": 123, "name": "Product"}, "objectId": 123},
|
|
)
|
|
assert event.get_object_data() == {"id": 123, "name": "Product"}
|
|
|
|
def test_get_object_id(self):
|
|
"""Test extracting object ID."""
|
|
event = CloudEvent(
|
|
id="test-id",
|
|
source="elytra-pim",
|
|
type="com.elytra.product",
|
|
data={"objectId": 456},
|
|
)
|
|
assert event.get_object_id() == 456
|
|
|
|
def test_get_object_type(self):
|
|
"""Test extracting object type."""
|
|
event = CloudEvent(
|
|
id="test-id",
|
|
source="elytra-pim",
|
|
type="com.elytra.product",
|
|
data={"objectType": "Product"},
|
|
)
|
|
assert event.get_object_type() == "Product"
|
|
|
|
|
|
class TestWebhookValidation:
|
|
"""Test webhook validation."""
|
|
|
|
def test_parse_valid_webhook(self):
|
|
"""Test parsing valid webhook payload."""
|
|
payload = {
|
|
"specversion": "1.0",
|
|
"id": "test-id",
|
|
"source": "elytra-pim",
|
|
"type": "com.elytra.product",
|
|
"eventtype": "product",
|
|
"operation": "add",
|
|
}
|
|
event = parse_webhook_payload(payload)
|
|
assert event.id == "test-id"
|
|
assert event.eventtype == "product"
|
|
|
|
def test_parse_invalid_webhook(self):
|
|
"""Test parsing invalid webhook raises error."""
|
|
payload = {
|
|
"id": "test-id",
|
|
# Missing required 'source' and 'type'
|
|
}
|
|
with pytest.raises(WebhookValidationError):
|
|
parse_webhook_payload(payload)
|
|
|
|
def test_validate_cloud_event_success(self):
|
|
"""Test successful cloud event validation."""
|
|
event = CloudEvent(
|
|
id="test-id",
|
|
source="elytra-pim",
|
|
type="com.elytra.product",
|
|
)
|
|
assert validate_cloud_event(event) is True
|
|
|
|
def test_validate_cloud_event_missing_id(self):
|
|
"""Test validation fails with missing ID."""
|
|
event = CloudEvent(
|
|
id="",
|
|
source="elytra-pim",
|
|
type="com.elytra.product",
|
|
)
|
|
with pytest.raises(WebhookValidationError):
|
|
validate_cloud_event(event)
|
|
|
|
def test_validate_cloud_event_invalid_type_prefix(self):
|
|
"""Test validation fails with invalid type prefix."""
|
|
event = CloudEvent(
|
|
id="test-id",
|
|
source="elytra-pim",
|
|
type="com.other.product", # Wrong prefix
|
|
)
|
|
with pytest.raises(WebhookValidationError):
|
|
validate_cloud_event(event)
|
|
|
|
|
|
class TestBasicAuth:
|
|
"""Test Basic authentication."""
|
|
|
|
def test_basic_auth_get_headers(self):
|
|
"""Test Basic auth header generation."""
|
|
auth = BasicAuth(username="admin", password="secret")
|
|
provider = BasicAuthProvider(auth)
|
|
headers = provider.get_headers()
|
|
assert "Authorization" in headers
|
|
assert headers["Authorization"].startswith("Basic ")
|
|
|
|
def test_basic_auth_validate_valid(self):
|
|
"""Test Basic auth validation with valid credentials."""
|
|
auth = BasicAuth(username="admin", password="secret")
|
|
provider = BasicAuthProvider(auth)
|
|
headers = provider.get_headers()
|
|
assert provider.validate_header(headers) is True
|
|
|
|
def test_basic_auth_validate_invalid(self):
|
|
"""Test Basic auth validation with invalid credentials."""
|
|
auth = BasicAuth(username="admin", password="secret")
|
|
provider = BasicAuthProvider(auth)
|
|
headers = {"Authorization": "Basic dGVzdDp3cm9uZw=="} # test:wrong
|
|
assert provider.validate_header(headers) is False
|
|
|
|
def test_basic_auth_validate_missing(self):
|
|
"""Test Basic auth validation with missing header."""
|
|
auth = BasicAuth(username="admin", password="secret")
|
|
provider = BasicAuthProvider(auth)
|
|
assert provider.validate_header({}) is False
|
|
|
|
|
|
class TestBearerAuth:
|
|
"""Test Bearer token authentication."""
|
|
|
|
def test_bearer_auth_get_headers(self):
|
|
"""Test Bearer auth header generation."""
|
|
auth = BearerAuth(token="my_token_123")
|
|
provider = BearerAuthProvider(auth)
|
|
headers = provider.get_headers()
|
|
assert headers["Authorization"] == "Bearer my_token_123"
|
|
|
|
def test_bearer_auth_validate_valid(self):
|
|
"""Test Bearer auth validation with valid token."""
|
|
auth = BearerAuth(token="my_token_123")
|
|
provider = BearerAuthProvider(auth)
|
|
headers = {"Authorization": "Bearer my_token_123"}
|
|
assert provider.validate_header(headers) is True
|
|
|
|
def test_bearer_auth_validate_invalid(self):
|
|
"""Test Bearer auth validation with invalid token."""
|
|
auth = BearerAuth(token="my_token_123")
|
|
provider = BearerAuthProvider(auth)
|
|
headers = {"Authorization": "Bearer wrong_token"}
|
|
assert provider.validate_header(headers) is False
|
|
|
|
|
|
class TestAPIKeyAuth:
|
|
"""Test API Key authentication."""
|
|
|
|
def test_api_key_auth_get_headers(self):
|
|
"""Test API Key header generation."""
|
|
auth = APIKeyAuth(api_key="sk_live_123", header_name="X-API-Key")
|
|
provider = APIKeyAuthProvider(auth)
|
|
headers = provider.get_headers()
|
|
assert headers["X-API-Key"] == "sk_live_123"
|
|
|
|
def test_api_key_auth_default_header_name(self):
|
|
"""Test API Key with default header name."""
|
|
auth = APIKeyAuth(api_key="sk_live_123")
|
|
provider = APIKeyAuthProvider(auth)
|
|
headers = provider.get_headers()
|
|
assert "api-key" in headers
|
|
|
|
def test_api_key_auth_validate_valid(self):
|
|
"""Test API Key validation with valid key."""
|
|
auth = APIKeyAuth(api_key="sk_live_123", header_name="X-API-Key")
|
|
provider = APIKeyAuthProvider(auth)
|
|
headers = {"X-API-Key": "sk_live_123"}
|
|
assert provider.validate_header(headers) is True
|
|
|
|
def test_api_key_auth_validate_invalid(self):
|
|
"""Test API Key validation with invalid key."""
|
|
auth = APIKeyAuth(api_key="sk_live_123", header_name="X-API-Key")
|
|
provider = APIKeyAuthProvider(auth)
|
|
headers = {"X-API-Key": "wrong_key"}
|
|
assert provider.validate_header(headers) is False
|
|
|
|
|
|
class TestWebhookValidator:
|
|
"""Test WebhookValidator class."""
|
|
|
|
def test_validator_no_auth(self):
|
|
"""Test validator without authentication."""
|
|
validator = WebhookValidator(auth_type="none")
|
|
payload = {
|
|
"specversion": "1.0",
|
|
"id": "test-id",
|
|
"source": "elytra-pim",
|
|
"type": "com.elytra.product",
|
|
}
|
|
event = validator.validate_and_parse(payload)
|
|
assert event.id == "test-id"
|
|
|
|
def test_validator_basic_auth(self):
|
|
"""Test validator with basic auth."""
|
|
auth = BasicAuth(username="user", password="pass")
|
|
validator = WebhookValidator(auth_type="basic", auth=auth)
|
|
payload = {
|
|
"specversion": "1.0",
|
|
"id": "test-id",
|
|
"source": "elytra-pim",
|
|
"type": "com.elytra.product",
|
|
}
|
|
# Valid header
|
|
headers = {"Authorization": "Basic dXNlcjpwYXNz"}
|
|
event = validator.validate_and_parse(payload, headers)
|
|
assert event.id == "test-id"
|
|
|
|
def test_validator_auth_validation_fails(self):
|
|
"""Test validator fails on invalid auth."""
|
|
auth = BasicAuth(username="user", password="pass")
|
|
validator = WebhookValidator(auth_type="basic", auth=auth)
|
|
payload = {
|
|
"specversion": "1.0",
|
|
"id": "test-id",
|
|
"source": "elytra-pim",
|
|
"type": "com.elytra.product",
|
|
}
|
|
# Invalid header
|
|
headers = {"Authorization": "Basic invalid"}
|
|
with pytest.raises(WebhookValidationError):
|
|
validator.validate_and_parse(payload, headers)
|
|
|
|
|
|
class TestRetryLogic:
|
|
"""Test retry logic."""
|
|
|
|
def test_calculate_retry_delay_exponential(self):
|
|
"""Test exponential backoff delay calculation."""
|
|
# First attempt: 1000ms
|
|
assert calculate_retry_delay(0, initial_delay=1000) == 1000
|
|
# Second attempt: 2000ms
|
|
assert calculate_retry_delay(1, initial_delay=1000, backoff_multiplier=2.0) == 2000
|
|
# Third attempt: 4000ms
|
|
assert calculate_retry_delay(2, initial_delay=1000, backoff_multiplier=2.0) == 4000
|
|
|
|
def test_calculate_retry_delay_max_cap(self):
|
|
"""Test retry delay is capped at max."""
|
|
# Should be capped at max_delay
|
|
delay = calculate_retry_delay(
|
|
10,
|
|
initial_delay=1000,
|
|
backoff_multiplier=2.0,
|
|
max_delay=300000,
|
|
)
|
|
assert delay <= 300000
|
|
|
|
def test_exponential_backoff_retry_should_retry(self):
|
|
"""Test ExponentialBackoffRetry.should_retry."""
|
|
config = RetryConfig(max_retries=3)
|
|
policy = ExponentialBackoffRetry(config)
|
|
assert policy.should_retry(0) is True
|
|
assert policy.should_retry(1) is True
|
|
assert policy.should_retry(2) is True
|
|
assert policy.should_retry(3) is False
|
|
|
|
def test_exponential_backoff_retry_delay(self):
|
|
"""Test ExponentialBackoffRetry.get_delay."""
|
|
config = RetryConfig(
|
|
max_retries=3,
|
|
initial_delay=1000,
|
|
backoff_multiplier=2.0,
|
|
)
|
|
policy = ExponentialBackoffRetry(config)
|
|
assert policy.get_delay(0) == 1000
|
|
assert policy.get_delay(1) == 2000
|
|
assert policy.get_delay(2) == 4000
|
|
|
|
|
|
class TestEventDispatcher:
|
|
"""Test event dispatcher."""
|
|
|
|
def test_register_and_dispatch_exact_match(self):
|
|
"""Test handler is called on exact event type and operation match."""
|
|
dispatcher = SimpleWebhookEventDispatcher()
|
|
called_with = []
|
|
|
|
def handler(event: CloudEvent):
|
|
called_with.append(event)
|
|
|
|
dispatcher.register_handler(
|
|
handler, EventType.PRODUCT, Operation.ADD
|
|
)
|
|
|
|
event = CloudEvent(
|
|
id="test",
|
|
source="elytra-pim",
|
|
type="com.elytra.product",
|
|
eventtype="product",
|
|
operation="add",
|
|
)
|
|
|
|
dispatcher.dispatch(event)
|
|
assert len(called_with) == 1
|
|
assert called_with[0].id == "test"
|
|
|
|
def test_register_and_dispatch_type_only(self):
|
|
"""Test handler is called on event type match."""
|
|
dispatcher = SimpleWebhookEventDispatcher()
|
|
called_with = []
|
|
|
|
def handler(event: CloudEvent):
|
|
called_with.append(event)
|
|
|
|
dispatcher.register_handler(handler, EventType.PRODUCT)
|
|
|
|
event = CloudEvent(
|
|
id="test",
|
|
source="elytra-pim",
|
|
type="com.elytra.product",
|
|
eventtype="product",
|
|
operation="modify",
|
|
)
|
|
|
|
dispatcher.dispatch(event)
|
|
assert len(called_with) == 1
|
|
|
|
def test_default_handler_called(self):
|
|
"""Test default handler is called when no specific match."""
|
|
dispatcher = SimpleWebhookEventDispatcher()
|
|
called_with = []
|
|
|
|
def handler(event: CloudEvent):
|
|
called_with.append(event)
|
|
|
|
dispatcher.register_default_handler(handler)
|
|
|
|
event = CloudEvent(
|
|
id="test",
|
|
source="elytra-pim",
|
|
type="com.elytra.unknown",
|
|
eventtype="unknown",
|
|
)
|
|
|
|
dispatcher.dispatch(event)
|
|
assert len(called_with) == 1
|
|
|
|
def test_multiple_handlers_called(self):
|
|
"""Test multiple handlers are called for same event."""
|
|
dispatcher = SimpleWebhookEventDispatcher()
|
|
called = []
|
|
|
|
def handler1(event: CloudEvent):
|
|
called.append("handler1")
|
|
|
|
def handler2(event: CloudEvent):
|
|
called.append("handler2")
|
|
|
|
dispatcher.register_handler(handler1, EventType.PRODUCT)
|
|
dispatcher.register_default_handler(handler2)
|
|
|
|
event = CloudEvent(
|
|
id="test",
|
|
source="elytra-pim",
|
|
type="com.elytra.product",
|
|
eventtype="product",
|
|
)
|
|
|
|
dispatcher.dispatch(event)
|
|
assert "handler1" in called
|
|
assert "handler2" not in called # Only called if no matching handler
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|