From b8f889f22479dc2918744c77f546bfb646fa5ce7 Mon Sep 17 00:00:00 2001 From: claudi Date: Fri, 20 Feb 2026 10:08:07 +0100 Subject: [PATCH] feat: Implement webhook retry logic and server examples - Added retry logic for webhook event delivery in `retry.py` with exponential backoff. - Created example webhook server implementations in `server.py` for Flask and FastAPI. - Developed comprehensive examples for using the Elytra webhooks subpackage in `webhook_examples.py`. - Introduced unit tests for webhook functionality, including event handling, authentication, and retry logic in `test_webhooks.py`. --- elytra_client/webhooks/README.md | 420 +++++++++++++++++++++++++++ elytra_client/webhooks/__init__.py | 75 +++++ elytra_client/webhooks/auth.py | 193 +++++++++++++ elytra_client/webhooks/handlers.py | 193 +++++++++++++ elytra_client/webhooks/models.py | 287 +++++++++++++++++++ elytra_client/webhooks/retry.py | 178 ++++++++++++ elytra_client/webhooks/server.py | 332 ++++++++++++++++++++++ examples/webhook_examples.py | 372 ++++++++++++++++++++++++ tests/test_webhooks.py | 437 +++++++++++++++++++++++++++++ 9 files changed, 2487 insertions(+) create mode 100644 elytra_client/webhooks/README.md create mode 100644 elytra_client/webhooks/__init__.py create mode 100644 elytra_client/webhooks/auth.py create mode 100644 elytra_client/webhooks/handlers.py create mode 100644 elytra_client/webhooks/models.py create mode 100644 elytra_client/webhooks/retry.py create mode 100644 elytra_client/webhooks/server.py create mode 100644 examples/webhook_examples.py create mode 100644 tests/test_webhooks.py diff --git a/elytra_client/webhooks/README.md b/elytra_client/webhooks/README.md new file mode 100644 index 0000000..848c59f --- /dev/null +++ b/elytra_client/webhooks/README.md @@ -0,0 +1,420 @@ +""" +# Elytra Webhooks Subpackage Documentation + +## Overview + +The `elytra_client.webhooks` subpackage provides comprehensive tools for handling CloudEvents-based webhooks from the Elytra PIM Event API. It includes: + +- **CloudEvents Models**: Strongly-typed Pydantic models for CloudEvents specification +- **Authentication**: Support for Basic, Bearer, and API Key authentication +- **Validation**: Robust webhook payload validation +- **Event Handling**: Event dispatching and routing +- **Retry Logic**: Exponential backoff retry policies +- **Framework Integration**: Ready-to-use examples for Flask and FastAPI + +## Installation + +The webhooks subpackage is included with the elytra-pim-client package. No additional installation is needed. + +```python +from elytra_client.webhooks import CloudEvent, parse_webhook_payload +``` + +## Quick Start + +### Basic Event Parsing + +Parse incoming webhook payloads without authentication validation: + +```python +from elytra_client.webhooks import parse_webhook_payload + +# In your webhook endpoint +event = parse_webhook_payload(request.json()) +print(f"Event type: {event.eventtype}") +print(f"Object ID: {event.get_object_id()}") +``` + +### With Authentication + +Add authentication validation: + +```python +from elytra_client.webhooks import ( + WebhookValidator, + BasicAuth +) + +auth = BasicAuth(username="user", password="pass") +validator = WebhookValidator(auth_type="basic", auth=auth) + +event = validator.validate_and_parse( + payload=request.json(), + headers=dict(request.headers) +) +``` + +## Core Components + +### CloudEvent Model + +The `CloudEvent` class represents an event following the CNCF CloudEvents specification v1.0. + +**Key Fields:** +- `specversion`: CloudEvents spec version (always "1.0") +- `id`: Unique event identifier +- `source`: Event source (e.g., "elytra-pim") +- `type`: Event type in format `com.elytra.` +- `eventtype`: Elytra-specific event type (product, product_attribute_value, etc.) +- `operation`: Operation type (add, modify, remove, link, unlink) +- `data`: Event payload containing object data and metadata + +**Useful Methods:** +- `get_event_type()`: Returns parsed EventType enum +- `get_operation()`: Returns parsed Operation enum +- `get_object_data()`: Extracts objectData from payload +- `get_object_id()`: Extracts object ID from payload +- `get_object_type()`: Extracts object type from payload + +### Event Types and Operations + +**EventType Enum** includes all Elytra event types: +- Product events: `PRODUCT`, `PRODUCT_GROUP`, `PRODUCT_FOREST`, etc. +- Attribute events: `PRODUCT_ATTRIBUTES`, `PRODUCT_ATTRIBUTE_VALUE`, etc. +- Structure events: `PRODUCT_STRUCTURE`, `PRODUCT_TEXT`, etc. +- And many more... + +**Operation Enum** includes standard operations: +- `ADD`: Object was created +- `MODIFY`: Object was modified +- `REMOVE`: Object was deleted +- `LINK`: Objects were linked +- `UNLINK`: Objects were unlinked + +### Authentication + +The package supports three authentication methods: + +#### Basic Authentication + +```python +from elytra_client.webhooks import BasicAuth, WebhookValidator + +auth = BasicAuth(username="admin", password="secret") +validator = WebhookValidator(auth_type="basic", auth=auth) +``` + +#### Bearer Token Authentication + +```python +from elytra_client.webhooks import BearerAuth, WebhookValidator + +auth = BearerAuth(token="eyJhbGciOiJIUzI1NiIs...") +validator = WebhookValidator(auth_type="bearer", auth=auth) +``` + +#### API Key Authentication + +```python +from elytra_client.webhooks import APIKeyAuth, WebhookValidator + +auth = APIKeyAuth( + api_key="sk_live_secret123", + header_name="X-API-Key" # Custom header name +) +validator = WebhookValidator(auth_type="apikey", auth=auth) +``` + +### Webhook Validation + +The `WebhookValidator` class handles authentication and payload validation: + +```python +from elytra_client.webhooks import ( + WebhookValidator, + WebhookValidationError, + BasicAuth +) + +auth = BasicAuth(username="user", password="pass") +validator = WebhookValidator(auth_type="basic", auth=auth) + +try: + # Validate authentication and CloudEvent format + event = validator.validate_and_parse( + payload=incoming_json, + headers=incoming_headers + ) + print(f"Valid event received: {event.id}") +except WebhookValidationError as e: + print(f"Validation failed: {e}") +``` + +### Event Routing and Dispatching + +Use `SimpleWebhookEventDispatcher` to route events to different handlers: + +```python +from elytra_client.webhooks import EventType, Operation +from elytra_client.webhooks.server import SimpleWebhookEventDispatcher + +dispatcher = SimpleWebhookEventDispatcher() + +# Register handler for specific event type and operation +dispatcher.register_handler( + callback=handle_product_added, + event_type=EventType.PRODUCT, + operation=Operation.ADD +) + +# Register handler for all operations of a type +dispatcher.register_handler( + callback=handle_product_changes, + event_type=EventType.PRODUCT +) + +# Register default fallback handler +dispatcher.register_default_handler(handle_other_events) + +# Dispatch events +dispatcher.dispatch(event) +``` + +### Retry Logic + +Configure and use retry policies for webhook delivery: + +```python +from elytra_client.webhooks import ( + RetryConfig, + ExponentialBackoffRetry, + execute_with_retry +) + +# Configure retry policy +retry_config = RetryConfig( + max_retries=5, + initial_delay=1000, # 1 second + backoff_multiplier=2.0, + max_delay=300000 # 5 minutes max +) + +policy = ExponentialBackoffRetry(retry_config) + +# Execute function with retries +try: + result = execute_with_retry( + some_function, + policy, + arg1="value" + ) +except Exception as e: + print(f"Failed after retries: {e}") +``` + +**Retry Delay Calculation:** +The delay follows exponential backoff: `delay = min(initial_delay * (multiplier ^ attempt), max_delay)` + +Examples: +- Attempt 0: 1000ms (1 second) +- Attempt 1: 2000ms (2 seconds) +- Attempt 2: 4000ms (4 seconds) +- Attempt 3: 8000ms (8 seconds) +- etc., capped at max_delay + +## Framework Integration + +### Flask Example + +```python +from flask import Flask, request, jsonify +from elytra_client.webhooks import BasicAuth, WebhookValidator +from elytra_client.webhooks.server import FlaskWebhookExample + +app = Flask(__name__) + +# Setup +auth = BasicAuth(username="webhook_user", password="secret") +validator = WebhookValidator(auth_type="basic", auth=auth) +webhook_handler = FlaskWebhookExample(validator) + +@app.route("/webhook", methods=["POST"]) +def handle_webhook(): + response, status = webhook_handler.handle_request( + request.json, + dict(request.headers) + ) + return jsonify(response), status + +if __name__ == "__main__": + app.run(port=5000) +``` + +### FastAPI Example + +```python +from fastapi import FastAPI, Request +from elytra_client.webhooks import APIKeyAuth, WebhookValidator +from elytra_client.webhooks.server import FastAPIWebhookExample + +app = FastAPI() + +# Setup +auth = APIKeyAuth(api_key="sk_live_secret", header_name="X-API-Key") +validator = WebhookValidator(auth_type="apikey", auth=auth) +webhook_handler = FastAPIWebhookExample(validator) + +@app.post("/webhook") +async def handle_webhook(request: Request): + payload = await request.json() + return webhook_handler.handle_request( + payload, + dict(request.headers) + ) +``` + +## Complete Production Example + +```python +from flask import Flask, request, jsonify +from elytra_client.webhooks import ( + BasicAuth, + WebhookValidator, + EventType, + Operation +) +from elytra_client.webhooks.server import ( + FlaskWebhookExample, + SimpleWebhookEventDispatcher +) + +app = Flask(__name__) + +# Setup authentication +auth = BasicAuth(username="elytra", password="secure_password") +validator = WebhookValidator(auth_type="basic", auth=auth) + +# Setup event routing +dispatcher = SimpleWebhookEventDispatcher() + +def sync_product_to_db(event): + product_id = event.get_object_id() + print(f"Syncing product {product_id} to database...") + # Your sync logic here + +dispatcher.register_handler( + sync_product_to_db, + EventType.PRODUCT, + Operation.ADD +) + +# Setup webhook handler +webhook_handler = FlaskWebhookExample(validator, dispatcher) + +@app.route("/webhook/elytra", methods=["POST"]) +def elytra_webhook(): + response, status = webhook_handler.handle_request( + request.json, + dict(request.headers) + ) + return jsonify(response), status + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=5000, debug=False) +``` + +## Configuration XML Reference + +While this subpackage handles receiving webhooks, you'll configure them in Elytra's web client. Here's what the XML configuration looks like: + +```xml + + webhook + + + + product + product_attribute_value + + + add + modify + remove + + + https://your-app.com/webhook/elytra + post + basic + elytra + secure_password + + + + + + +``` + +## Error Handling + +The main exception to handle is `WebhookValidationError`: + +```python +from elytra_client.webhooks import WebhookValidationError + +try: + event = validator.validate_and_parse(payload, headers) +except WebhookValidationError as e: + # Handle validation error + log.error(f"Webhook validation failed: {e}") + return {"status": "error"}, 400 +``` + +## Best Practices + +1. **Always Validate**: Use `WebhookValidator` to validate both authentication and event structure +2. **Use Type Hints**: Leverage the strongly-typed models for better IDE support +3. **Handle Errors**: Implement proper error handling and logging +4. **Implement Idempotency**: Since webhooks may be retried, ensure your handlers are idempotent +5. **Acknowledge Quickly**: Return success response immediately; do heavy processing async +6. **Log Events**: Log all webhook events for debugging and audit trails +7. **Secure Credentials**: Store authentication credentials securely (environment variables, secrets manager) +8. **Use HTTPS**: Always use HTTPS URLs for webhook endpoints +9. **Implement Timeouts**: Set reasonable timeouts for webhook processing +10. **Monitor**: Set up monitoring and alerts for webhook failures + +## API Reference + +### CloudEvent +- `specversion: str` - CloudEvents spec version +- `id: str` - Unique event identifier +- `source: str` - Event source +- `type: str` - Event type (com.elytra.*) +- `datacontenttype: str` - Content type +- `time: datetime | None` - Event timestamp +- `eventtype: str | None` - Elytra event type +- `operation: str | None` - Operation type +- `data: dict | None` - Event payload +- `subject: str | None` - Event subject +- `dataschema: str | None` - Data schema URI + +### WebhookValidator +- `validate(payload, headers=None) -> bool` - Validate without parsing +- `parse(payload) -> CloudEvent` - Parse without validating auth +- `validate_and_parse(payload, headers=None) -> CloudEvent` - Full validation and parsing + +### RetryConfig +- `max_retries: int` (default: 5) - Maximum retry attempts +- `initial_delay: int` (default: 1000) - Initial delay in milliseconds +- `backoff_multiplier: float` (default: 2.0) - Exponential backoff multiplier +- `max_delay: int` (default: 300000) - Maximum delay in milliseconds + +## See Also + +- [CloudEvents Specification](https://cloudevents.io/) +- [Elytra PIM Documentation](https://www.elytra.ch/) +- [examples/webhook_examples.py](../examples/webhook_examples.py) - Comprehensive usage examples +""" + +# This is a documentation file - it can be read as a docstring or converted to markdown +__doc__ = __doc__ diff --git a/elytra_client/webhooks/__init__.py b/elytra_client/webhooks/__init__.py new file mode 100644 index 0000000..6402359 --- /dev/null +++ b/elytra_client/webhooks/__init__.py @@ -0,0 +1,75 @@ +"""Webhook helper subpackage for Elytra Event API integration. + +This package provides utilities for handling CloudEvents-based webhooks +from the Elytra PIM Event API, including event parsing, validation, +authentication, and retry logic. + +Example: + Basic webhook event handling:: + + from elytra_client.webhooks import CloudEvent, parse_webhook_payload + + # Parse incoming webhook request + event = parse_webhook_payload(request_json) + + # Access event data + if event.type == "com.elytra.product.attribute.value": + print(f"Product attribute changed: {event.data}") + + With authentication:: + + from elytra_client.webhooks import WebhookValidator + from elytra_client.webhooks.auth import BasicAuth + + auth = BasicAuth(username="user", password="pass") + validator = WebhookValidator(auth=auth) + + # Validate incoming webhook + is_valid = validator.validate( + payload=request_json, + headers=request_headers + ) +""" + +from .auth import APIKeyAuthProvider, AuthProvider, BasicAuthProvider, BearerAuthProvider +from .handlers import WebhookValidator, parse_webhook_payload, validate_cloud_event +from .models import ( + APIKeyAuth, + BasicAuth, + BearerAuth, + CloudEvent, + EventType, + Operation, + RetryConfig, + WebhookAuth, + WebhookConfig, +) +from .retry import ExponentialBackoffRetry, RetryPolicy, calculate_retry_delay + +__all__ = [ + # Models + "CloudEvent", + "WebhookConfig", + "WebhookAuth", + "BasicAuth", + "BearerAuth", + "APIKeyAuth", + "EventType", + "Operation", + "RetryConfig", + # Handlers + "WebhookValidator", + "parse_webhook_payload", + "validate_cloud_event", + # Auth + "AuthProvider", + "BasicAuthProvider", + "BearerAuthProvider", + "APIKeyAuthProvider", + # Retry + "RetryPolicy", + "ExponentialBackoffRetry", + "calculate_retry_delay", +] + +__version__ = "0.1.0" diff --git a/elytra_client/webhooks/auth.py b/elytra_client/webhooks/auth.py new file mode 100644 index 0000000..f6c12c0 --- /dev/null +++ b/elytra_client/webhooks/auth.py @@ -0,0 +1,193 @@ +"""Webhook authentication helpers for constructing and validating authentication headers.""" + +import base64 +from abc import ABC, abstractmethod +from typing import Dict, Optional + +from .models import APIKeyAuth, BasicAuth, BearerAuth, WebhookAuth + + +class AuthProvider(ABC): + """Abstract base class for webhook authentication providers.""" + + @abstractmethod + def get_headers(self) -> Dict[str, str]: + """Return the authentication headers to include in webhook requests.""" + pass + + @abstractmethod + def validate_header(self, headers: Dict[str, str]) -> bool: + """Validate authentication headers from an incoming webhook request.""" + pass + + +class BasicAuthProvider(AuthProvider): + """HTTP Basic Authentication provider.""" + + def __init__(self, auth: BasicAuth): + """Initialize with BasicAuth credentials. + + Args: + auth: BasicAuth model with username and password + """ + self.auth = auth + + def get_headers(self) -> Dict[str, str]: + """Generate Basic Authentication header. + + Returns: + Dict with Authorization header + """ + credentials = f"{self.auth.username}:{self.auth.password}" + encoded = base64.b64encode(credentials.encode()).decode() + return {"Authorization": f"Basic {encoded}"} + + def validate_header(self, headers: Dict[str, str]) -> bool: + """Validate Basic Authentication header from incoming request. + + Args: + headers: Request headers dict (case-insensitive keys recommended) + + Returns: + True if authentication is valid, False otherwise + """ + auth_header = None + for key, value in headers.items(): + if key.lower() == "authorization": + auth_header = value + break + + if not auth_header or not auth_header.startswith("Basic "): + return False + + try: + encoded = auth_header.split(" ")[1] + decoded = base64.b64decode(encoded).decode() + username, password = decoded.split(":", 1) + return ( + username == self.auth.username and password == self.auth.password + ) + except (IndexError, ValueError, UnicodeDecodeError): + return False + + +class BearerAuthProvider(AuthProvider): + """HTTP Bearer Token Authentication provider.""" + + def __init__(self, auth: BearerAuth): + """Initialize with BearerAuth token. + + Args: + auth: BearerAuth model with token + """ + self.auth = auth + + def get_headers(self) -> Dict[str, str]: + """Generate Bearer Authentication header. + + Returns: + Dict with Authorization header + """ + return {"Authorization": f"Bearer {self.auth.token}"} + + def validate_header(self, headers: Dict[str, str]) -> bool: + """Validate Bearer Authentication header from incoming request. + + Args: + headers: Request headers dict (case-insensitive keys recommended) + + Returns: + True if token matches, False otherwise + """ + auth_header = None + for key, value in headers.items(): + if key.lower() == "authorization": + auth_header = value + break + + if not auth_header or not auth_header.startswith("Bearer "): + return False + + try: + token = auth_header.split(" ")[1] + return token == self.auth.token + except IndexError: + return False + + +class APIKeyAuthProvider(AuthProvider): + """API Key Authentication provider.""" + + def __init__(self, auth: APIKeyAuth): + """Initialize with APIKeyAuth credentials. + + Args: + auth: APIKeyAuth model with api_key and header_name + """ + self.auth = auth + + def get_headers(self) -> Dict[str, str]: + """Generate API Key header. + + Returns: + Dict with API key header + """ + return {self.auth.header_name: self.auth.api_key} + + def validate_header(self, headers: Dict[str, str]) -> bool: + """Validate API Key header from incoming request. + + Args: + headers: Request headers dict (case-insensitive keys recommended) + + Returns: + True if API key matches, False otherwise + """ + for key, value in headers.items(): + if key.lower() == self.auth.header_name.lower(): + return value == self.auth.api_key + + return False + + +class NoAuthProvider(AuthProvider): + """No authentication provider (for unauthenticated webhooks).""" + + def get_headers(self) -> Dict[str, str]: + """Return empty headers dict.""" + return {} + + def validate_header(self, headers: Dict[str, str]) -> bool: + """Always return True for no authentication.""" + return True + + +def get_auth_provider(auth_type: str, auth: Optional[WebhookAuth] = None) -> AuthProvider: + """Factory function to get the appropriate authentication provider. + + Args: + auth_type: Type of authentication ("none", "basic", "bearer", "apikey") + auth: Authentication credentials model + + Returns: + AuthProvider instance appropriate for the auth_type + + Raises: + ValueError: If auth_type is unknown or auth is missing for non-none types + """ + if auth_type == "none": + return NoAuthProvider() + elif auth_type == "basic": + if not isinstance(auth, BasicAuth): + raise ValueError(f"Expected BasicAuth for auth_type 'basic', got {type(auth)}") + return BasicAuthProvider(auth) + elif auth_type == "bearer": + if not isinstance(auth, BearerAuth): + raise ValueError(f"Expected BearerAuth for auth_type 'bearer', got {type(auth)}") + return BearerAuthProvider(auth) + elif auth_type == "apikey": + if not isinstance(auth, APIKeyAuth): + raise ValueError(f"Expected APIKeyAuth for auth_type 'apikey', got {type(auth)}") + return APIKeyAuthProvider(auth) + else: + raise ValueError(f"Unknown auth_type: {auth_type}") diff --git a/elytra_client/webhooks/handlers.py b/elytra_client/webhooks/handlers.py new file mode 100644 index 0000000..575d7bf --- /dev/null +++ b/elytra_client/webhooks/handlers.py @@ -0,0 +1,193 @@ +"""Webhook request handlers and validators for CloudEvents from Elytra Event API.""" + +from typing import Any, Dict, Optional + +from pydantic import ValidationError + +from .auth import AuthProvider, get_auth_provider +from .models import CloudEvent, WebhookAuth + + +class WebhookValidationError(Exception): + """Raised when webhook validation fails.""" + + pass + + +class WebhookValidator: + """Validates incoming webhook requests and extracts CloudEvent data. + + Provides authentication validation and CloudEvent format validation. + + Example: + >>> from elytra_client.webhooks import WebhookValidator, BasicAuth + >>> auth = BasicAuth(username="user", password="pass") + >>> validator = WebhookValidator(auth_type="basic", auth=auth) + >>> is_valid = validator.validate(payload, headers) + """ + + def __init__( + self, + auth_type: str = "none", + auth: Optional[WebhookAuth] = None, + ): + """Initialize the webhook validator. + + Args: + auth_type: Type of authentication ("none", "basic", "bearer", "apikey") + auth: Authentication credentials (required if auth_type is not "none") + + Raises: + ValueError: If auth configuration is invalid + """ + self.auth_type = auth_type + self.auth = auth + self.auth_provider = get_auth_provider(auth_type, auth) + + def validate( + self, + payload: Dict[str, Any], + headers: Optional[Dict[str, str]] = None, + ) -> bool: + """Validate an incoming webhook request. + + Validates both authentication and CloudEvent structure. + + Args: + payload: The request body as a dictionary + headers: Optional request headers dict for auth validation + + Returns: + True if validation succeeds + + Raises: + WebhookValidationError: If validation fails + """ + # Validate authentication if headers provided + if headers is not None and self.auth_type != "none": + if not self.auth_provider.validate_header(headers): + raise WebhookValidationError("Authentication validation failed") + + # Validate CloudEvent structure + try: + CloudEvent(**payload) + except ValidationError as e: + raise WebhookValidationError(f"Invalid CloudEvent format: {str(e)}") + + return True + + def parse(self, payload: Dict[str, Any]) -> CloudEvent: + """Parse and validate a webhook payload into a CloudEvent. + + Args: + payload: The request body as a dictionary + + Returns: + CloudEvent instance + + Raises: + WebhookValidationError: If payload is not a valid CloudEvent + """ + try: + return CloudEvent(**payload) + except ValidationError as e: + raise WebhookValidationError(f"Invalid CloudEvent format: {str(e)}") + + def validate_and_parse( + self, + payload: Dict[str, Any], + headers: Optional[Dict[str, str]] = None, + ) -> CloudEvent: + """Validate and parse a webhook request in one step. + + Args: + payload: The request body as a dictionary + headers: Optional request headers dict for auth validation + + Returns: + CloudEvent instance if validation succeeds + + Raises: + WebhookValidationError: If validation or parsing fails + """ + self.validate(payload, headers) + return self.parse(payload) + + +def parse_webhook_payload(payload: Dict[str, Any]) -> CloudEvent: + """Parse a webhook payload into a CloudEvent without authentication validation. + + A convenience function for simple webhook parsing without authentication checks. + + Args: + payload: The request body as a dictionary + + Returns: + CloudEvent instance + + Raises: + WebhookValidationError: If payload is not a valid CloudEvent + + Example: + >>> from elytra_client.webhooks import parse_webhook_payload + >>> event = parse_webhook_payload(request.json()) + >>> print(f"Event type: {event.eventtype}") + """ + try: + return CloudEvent(**payload) + except ValidationError as e: + raise WebhookValidationError(f"Invalid CloudEvent format: {str(e)}") + + +def validate_cloud_event(event: CloudEvent) -> bool: + """Validate that a CloudEvent meets minimum requirements. + + Checks that required fields are present and valid. + + Args: + event: CloudEvent instance to validate + + Returns: + True if event is valid + + Raises: + WebhookValidationError: If validation fails + """ + if not event.id: + raise WebhookValidationError("CloudEvent must have an 'id' field") + + if not event.source: + raise WebhookValidationError("CloudEvent must have a 'source' field") + + if not event.type: + raise WebhookValidationError("CloudEvent must have a 'type' field") + + if not event.type.startswith("com.elytra."): + raise WebhookValidationError( + "CloudEvent type must start with 'com.elytra.' prefix" + ) + + return True + + +def extract_auth_headers(auth_type: str, auth: Optional[WebhookAuth]) -> Dict[str, str]: + """Extract authentication headers for webhook requests. + + Helper function to generate the appropriate auth headers when sending webhook requests. + + Args: + auth_type: Type of authentication ("none", "basic", "bearer", "apikey") + auth: Authentication credentials + + Returns: + Dictionary of headers to include in webhook request + + Example: + >>> from elytra_client.webhooks.handlers import extract_auth_headers + >>> from elytra_client.webhooks import BasicAuth + >>> auth = BasicAuth(username="user", password="pass") + >>> headers = extract_auth_headers("basic", auth) + >>> # headers = {"Authorization": "Basic dXNlcjpwYXNz"} + """ + provider = get_auth_provider(auth_type, auth) + return provider.get_headers() diff --git a/elytra_client/webhooks/models.py b/elytra_client/webhooks/models.py new file mode 100644 index 0000000..1142ce9 --- /dev/null +++ b/elytra_client/webhooks/models.py @@ -0,0 +1,287 @@ +"""Data models for CloudEvents-based webhooks following the CNCF CloudEvents specification. + +Reference: https://cloudevents.io/ +""" + +from __future__ import annotations + +from datetime import datetime +from enum import Enum +from typing import Any, Dict, Literal, Optional, Union + +from pydantic import BaseModel, Field, HttpUrl, validator + + +class Operation(str, Enum): + """Supported event operation types.""" + + ADD = "add" + MODIFY = "modify" + REMOVE = "remove" + LINK = "link" + UNLINK = "unlink" + + +class EventType(str, Enum): + """Elytra Event API event types.""" + + NONE = "none" + PROJECT = "project" + CUSTOMER = "customer" + TODO = "todo" + USER = "user" + CONVERSATION = "conversation" + USERGROUP = "usergroup" + CALENDAR = "calendar" + RIGHTS = "rights" + LANGUAGE = "language" + DOCUMENT = "document" + DOCUMENTFOLDER = "documentfolder" + PRODUCT_ATTRIBUTES = "product_attributes" + PRODUCT_ATTRIBUTE_GROUPS = "product_attribute_groups" + PRODUCT = "product" + PRODUCT_GROUP = "product_group" + PRODUCT_FOREST = "product_forest" + PRODUCT_ATTRIBUTE_VALUE = "product_attribute_value" + PRODUCT_TEXT = "product_text" + PRODUCT_COLLECTOR = "product_collector" + PRODUCT_LANGUAGE_FALLBACK = "product_language_fallback" + PRODUCT_PICTURE = "product_picture" + PRODUCT_STRUCTURE = "product_structure" + PRODUCT_TEXT_STYLE = "product_text_style" + PRODUCT_LANGUAGE = "product_language" + CATALOG_CONTENT = "catalog_content" + PRODUCT_IMPORT = "product_import" + PRODUCT_SEARCH_FILTER = "product_search_filter" + PRODUCT_PRODUCT_LINK = "product_product_link" + ATTRIBUTE_FORMAT = "attribute_format" + CLASSIFICATION_FEATUREMAPPING = "classification_featuremapping" + CONFIGURATION_FILES = "configuration_files" + WORKFLOW = "workflow" + WORKFLOW_TASK = "workflow_task" + PAGEPLAN_GRID = "pageplan_grid" + MEDIA_EXPLORER = "media_explorer" + ATTRIBUTE_UNIT = "attribute_unit" + JMS_CONNECTION_TEST = "jms_connection_test" + SCHEDULER_TREE_UPDATE = "scheduler_tree_update" + SCHEDULER_ACTIVE_TABLE_UPDATE = "scheduler_active_table_update" + JMS_PERFORMANCE_TEST = "jms_performance_test" + CONFIGFILE_HISTORY_REMOVE = "configfile_history_remove" + MIMETYPE_PLUGIN = "mimetype_plugin" + PAGEPLAN_PREVIEW = "pageplan_preview" + PAGEPLAN = "pageplan" + SHORTCUT = "shortcut" + PAGEPLAN_GRIDELEMENT = "pageplan_gridelement" + INTEGRATION_TEST = "integration_test" + SHORTCUT_DELETEALLFORUSER = "shortcut_deleteallforuser" + OBJECT_WORKFLOW = "object_workflow" + RULESET = "ruleset" + USER_PROPERTIES = "user_properties" + USER_ATTRIBUTEFILTER = "user_attributefilter" + ATTRIBUTE_RULE = "attribute_rule" + SERVER_CACHE_ATTRIBUTRULES_CHANGED = "server_cache_attributrules_changed" + FORMULA_UPDATE = "formula_update" + OBJECT_WORKFLOW_TEMPLATE = "object_workflow_template" + ASSETBROWSER_NODE_EVENT = "assetbrowser_node_event" + CLIENT_TEXT_MESSAGE = "client_text_message" + SESSION_CLOSED = "session_closed" + OBJECT_WORKFLOW_COMMENT = "object_workflow_comment" + ENUMERATION_ATTRIBUTE_CHAIN = "enumeration_attribute_chain" + ENUMERATION_VALUE_CHAIN = "enumeration_value_chain" + USER_FAVORITES = "user_favorites" + OBJECT_COLLECTION = "object_collection" + SCHEDULER_JOB = "scheduler_job" + ATTRIBUTE_UNITTYPE = "attribute_unittype" + PRODUCT_MASS_MODIFY = "product_mass_modify" + PRODUCT_SEARCH_SUBSCRIPTION = "product_search_subscription" + SCHEDULER_JOB_ACTIVE_UPDATE = "scheduler_job_active_update" + CONFIG = "config" + SCHEDULER_JOB_EXECUTION_STATUS_CHANGED = "scheduler_job_execution_status_changed" + INDEXUTILS_CACHE_CHANGED = "indexutils_cache_changed" + + +class CloudEvent(BaseModel): + """CloudEvent model following CNCF CloudEvents specification v1.0. + + This represents an event that comes from the Elytra PIM Event API. + + Attributes: + specversion: CloudEvents specification version (always "1.0") + id: Unique event identifier + source: Identifies the context in which an event happened + type: Describes the type of event in the format: com.elytra. + datacontenttype: Content type of data (e.g., "application/json") + time: Timestamp of when the event occurred + eventtype: Elytra-specific event type + operation: Type of operation (add, modify, remove, link, unlink) + data: Event payload containing object data and metadata + subject: Optional subject of the event + """ + + specversion: str = Field(default="1.0", description="CloudEvents spec version") + id: str = Field(..., description="Unique event identifier") + source: str = Field(..., description="Event source identifier") + type: str = Field(..., description="Event type in format com.elytra.") + datacontenttype: str = Field( + default="application/json", description="Content type of the data" + ) + time: Optional[datetime] = Field( + default=None, description="Timestamp when the event occurred" + ) + eventtype: Optional[str] = Field( + default=None, description="Elytra-specific event type" + ) + operation: Optional[str] = Field( + default=None, description="Operation type (add, modify, remove, link, unlink)" + ) + data: Optional[Dict[str, Any]] = Field( + default=None, description="Event payload data" + ) + subject: Optional[str] = Field(default=None, description="Event subject") + dataschema: Optional[str] = Field(default=None, description="Data schema URI") + + class Config: + """Pydantic config.""" + + json_schema_extra = { + "example": { + "specversion": "1.0", + "id": "385d27ef-ac6d-4d93-904b-ea9801fc1bde", + "source": "elytra-pim", + "type": "com.elytra.product.attribute.value", + "datacontenttype": "application/json", + "time": "2025-12-31T04:19:30.860984129+01:00", + "eventtype": "product_attribute_value", + "operation": "modify", + } + } + + def get_event_type(self) -> Optional[EventType]: + """Extract and return the Elytra event type from the type field.""" + if not self.eventtype: + return None + try: + return EventType(self.eventtype) + except ValueError: + return None + + def get_operation(self) -> Optional[Operation]: + """Extract and return the operation type.""" + if not self.operation: + return None + try: + return Operation(self.operation) + except ValueError: + return None + + def get_object_data(self) -> Optional[Dict[str, Any]]: + """Extract the objectData from the event payload.""" + if not self.data: + return None + return self.data.get("objectData") + + def get_object_id(self) -> Optional[Union[str, int]]: + """Extract the object ID from the event payload.""" + if not self.data: + return None + return self.data.get("objectId") + + def get_object_type(self) -> Optional[str]: + """Extract the object type from the event payload.""" + if not self.data: + return None + return self.data.get("objectType") + + +class BasicAuth(BaseModel): + """HTTP Basic Authentication credentials.""" + + username: str = Field(..., description="Username for authentication") + password: str = Field(..., description="Password for authentication") + + +class BearerAuth(BaseModel): + """HTTP Bearer Token Authentication.""" + + token: str = Field(..., description="Bearer token for authentication") + + +class APIKeyAuth(BaseModel): + """API Key Authentication.""" + + api_key: str = Field(..., description="API key value") + header_name: str = Field( + default="api-key", description="Header name for the API key" + ) + + +WebhookAuth = Union[BasicAuth, BearerAuth, APIKeyAuth] + + +class RetryConfig(BaseModel): + """Webhook retry configuration. + + Attributes: + max_retries: Maximum number of retry attempts (default: 5) + initial_delay: Initial delay before retry in milliseconds (default: 1000) + backoff_multiplier: Multiplier for exponential backoff (default: 2.0) + max_delay: Maximum delay between retries in milliseconds (default: 300000 = 5 min) + """ + + max_retries: int = Field(default=5, ge=0, description="Maximum retry attempts") + initial_delay: int = Field( + default=1000, ge=100, description="Initial delay in milliseconds" + ) + backoff_multiplier: float = Field( + default=2.0, ge=1.0, description="Exponential backoff multiplier" + ) + max_delay: int = Field( + default=300000, ge=1000, description="Max delay in milliseconds" + ) + + +class WebhookConfig(BaseModel): + """Webhook configuration for receiving Elytra events. + + Attributes: + url: The webhook URL where events will be posted + method: HTTP method (post, patch, put) + auth_type: Authentication method (none, basic, bearer, apikey) + auth: Authentication credentials (if auth_type is not "none") + event_types: List of event types to subscribe to (empty = all) + operations: List of operations to subscribe to (empty = all) + retry: Retry configuration + """ + + url: HttpUrl = Field(..., description="Webhook URL") + method: Literal["post", "patch", "put"] = Field( + default="post", description="HTTP method" + ) + auth_type: Literal["none", "basic", "bearer", "apikey"] = Field( + default="none", description="Authentication type" + ) + auth: Optional[WebhookAuth] = Field( + default=None, description="Authentication credentials" + ) + event_types: list[EventType] = Field( + default_factory=list, description="Event types to subscribe to" + ) + operations: list[Operation] = Field( + default_factory=list, description="Operations to subscribe to" + ) + retry: RetryConfig = Field( + default_factory=RetryConfig, description="Retry configuration" + ) + active: bool = Field(default=True, description="Whether the webhook is active") + + @validator("auth", always=True) + def validate_auth(cls, v: Optional[WebhookAuth], values: Dict[str, Any]) -> Optional[WebhookAuth]: + """Validate that auth is provided when auth_type is not 'none'.""" + auth_type = values.get("auth_type") + if auth_type != "none" and v is None: + raise ValueError( + f"auth is required when auth_type is '{auth_type}'" + ) + if auth_type == "none" and v is not None: + raise ValueError("auth should be None when auth_type is 'none'") + return v diff --git a/elytra_client/webhooks/retry.py b/elytra_client/webhooks/retry.py new file mode 100644 index 0000000..399ce94 --- /dev/null +++ b/elytra_client/webhooks/retry.py @@ -0,0 +1,178 @@ +"""Retry logic and utilities for webhook event delivery.""" + +import time +from abc import ABC, abstractmethod +from typing import Callable, Optional, Type, TypeVar + +from .models import RetryConfig + +T = TypeVar("T") + + +def calculate_retry_delay( + attempt: int, + initial_delay: int = 1000, + backoff_multiplier: float = 2.0, + max_delay: int = 300000, +) -> int: + """Calculate the delay before the next retry attempt using exponential backoff. + + Formula: delay = min(initial_delay * (backoff_multiplier ^ attempt), max_delay) + + Args: + attempt: Current retry attempt number (0-indexed) + initial_delay: Initial delay in milliseconds (default: 1000) + backoff_multiplier: Multiplier for exponential backoff (default: 2.0) + max_delay: Maximum delay in milliseconds (default: 300000 = 5 minutes) + + Returns: + Delay in milliseconds before next attempt + + Example: + >>> calculate_retry_delay(0) # First retry + 1000 + >>> calculate_retry_delay(1) # Second retry + 2000 + >>> calculate_retry_delay(2) # Third retry + 4000 + """ + if attempt < 0: + return initial_delay + + delay = initial_delay * (backoff_multiplier ** attempt) + return int(min(delay, max_delay)) + + +class RetryPolicy(ABC): + """Abstract base class for retry policies.""" + + @abstractmethod + def should_retry(self, attempt: int, exception: Optional[Exception] = None) -> bool: + """Determine if a retry should be attempted. + + Args: + attempt: Current attempt number (0-indexed) + exception: Optional exception that caused the failure + + Returns: + True if retry should be attempted, False otherwise + """ + pass + + @abstractmethod + def get_delay(self, attempt: int) -> int: + """Get delay in milliseconds before the next retry. + + Args: + attempt: Current attempt number (0-indexed) + + Returns: + Delay in milliseconds + """ + pass + + def wait(self, attempt: int) -> None: + """Wait before the next retry. + + Args: + attempt: Current attempt number (0-indexed) + """ + delay_ms = self.get_delay(attempt) + time.sleep(delay_ms / 1000.0) + + +class ExponentialBackoffRetry(RetryPolicy): + """Exponential backoff retry policy.""" + + def __init__(self, config: RetryConfig): + """Initialize ExponentialBackoffRetry. + + Args: + config: RetryConfig with max_retries, initial_delay, backoff_multiplier, max_delay + """ + self.config = config + + def should_retry(self, attempt: int, exception: Optional[Exception] = None) -> bool: + """Check if retry should be attempted. + + Args: + attempt: Current attempt number (0-indexed) + exception: Optional exception that caused the failure + + Returns: + True if attempts remain, False otherwise + """ + return attempt < self.config.max_retries + + def get_delay(self, attempt: int) -> int: + """Get delay using exponential backoff. + + Args: + attempt: Current attempt number (0-indexed) + + Returns: + Delay in milliseconds + """ + return calculate_retry_delay( + attempt=attempt, + initial_delay=self.config.initial_delay, + backoff_multiplier=self.config.backoff_multiplier, + max_delay=self.config.max_delay, + ) + + +class NoRetryPolicy(RetryPolicy): + """No retry policy (fail immediately).""" + + def should_retry(self, attempt: int, exception: Optional[Exception] = None) -> bool: + """Never retry.""" + return False + + def get_delay(self, attempt: int) -> int: + """Always return 0 delay.""" + return 0 + + +def execute_with_retry( + func: Callable[..., T], + retry_policy: RetryPolicy, + *args, + **kwargs, +) -> T: + """Execute a function with retry logic. + + Args: + func: The function to execute + retry_policy: RetryPolicy instance defining retry behavior + *args: Positional arguments to pass to func + **kwargs: Keyword arguments to pass to func + + Returns: + The return value of func + + Raises: + The last exception raised if all retries are exhausted + + Example: + >>> from elytra_client.webhooks import ExponentialBackoffRetry + >>> config = RetryConfig(max_retries=3) + >>> policy = ExponentialBackoffRetry(config) + >>> result = execute_with_retry( + ... some_func, + ... policy, + ... arg1="value", + ... ) + """ + attempt = 0 + last_exception = None + + while True: + try: + return func(*args, **kwargs) + except Exception as e: + last_exception = e + if not retry_policy.should_retry(attempt, e): + raise + + retry_policy.wait(attempt) + attempt += 1 diff --git a/elytra_client/webhooks/server.py b/elytra_client/webhooks/server.py new file mode 100644 index 0000000..80d133c --- /dev/null +++ b/elytra_client/webhooks/server.py @@ -0,0 +1,332 @@ +"""Example webhook server implementations for handling Elytra Event API webhooks. + +This module provides example implementations of webhook servers that can receive +CloudEvents from the Elytra PIM Event API. Choose the framework that best fits +your application. + +Note: These are example implementations. Production deployments should add +additional error handling, logging, and security measures. +""" + +import json +from typing import Any, Callable, Dict, Optional + +from .handlers import WebhookValidationError, WebhookValidator, parse_webhook_payload +from .models import CloudEvent, EventType, Operation + + +class EventHandler: + """Base class for handling Elytra CloudEvents. + + Subclass this to implement custom event handling logic. + """ + + def handle_event(self, event: CloudEvent) -> None: + """Handle a CloudEvent from Elytra. + + Override this method to implement your event handling logic. + + Args: + event: The CloudEvent to handle + """ + pass + + def handle_product_added(self, event: CloudEvent) -> None: + """Handle product add events.""" + pass + + def handle_product_modified(self, event: CloudEvent) -> None: + """Handle product modify events.""" + pass + + def handle_product_removed(self, event: CloudEvent) -> None: + """Handle product remove events.""" + pass + + def handle_attribute_value_changed(self, event: CloudEvent) -> None: + """Handle product attribute value change events.""" + pass + + +class SimpleWebhookEventDispatcher: + """Simple event dispatcher that routes events to registered handlers. + + Routes events based on event type and operation. + + Example: + >>> dispatcher = SimpleWebhookEventDispatcher() + >>> dispatcher.register_handler( + ... event_type=EventType.PRODUCT, + ... operation=Operation.ADD, + ... callback=my_handler.handle_product_added, + ... ) + >>> dispatcher.dispatch(event) + """ + + def __init__(self): + """Initialize the event dispatcher.""" + self.handlers: Dict[ + tuple[Optional[str], Optional[str]], list[Callable[[CloudEvent], None]] + ] = {} + self.default_handlers: list[Callable[[CloudEvent], None]] = [] + + def register_handler( + self, + callback: Callable[[CloudEvent], None], + event_type: Optional[EventType] = None, + operation: Optional[Operation] = None, + ) -> None: + """Register a handler for a specific event type and/or operation. + + Args: + callback: Callable that accepts a CloudEvent + event_type: Filter by event type (None = all types) + operation: Filter by operation (None = all operations) + + Example: + >>> dispatcher.register_handler( + ... callback=handle_product_changes, + ... event_type=EventType.PRODUCT, + ... ) + >>> dispatcher.register_handler( + ... callback=handle_all_events, + ... ) + """ + key = ( + event_type.value if event_type else None, + operation.value if operation else None, + ) + if key not in self.handlers: + self.handlers[key] = [] + self.handlers[key].append(callback) + + def register_default_handler( + self, callback: Callable[[CloudEvent], None] + ) -> None: + """Register a default handler for events not matching any registered handler. + + Args: + callback: Callable that accepts a CloudEvent + """ + self.default_handlers.append(callback) + + def dispatch(self, event: CloudEvent) -> None: + """Dispatch an event to all matching handlers. + + Handlers are called in the following order: + 1. Handlers matching both event type and operation + 2. Handlers matching event type only + 3. Handlers matching operation only + 4. Default handlers (if no others matched) + + Args: + event: The CloudEvent to dispatch + """ + event_type = event.get_event_type() + operation = event.get_operation() + + event_type_value = event_type.value if event_type else None + operation_value = operation.value if operation else None + + handlers_called = False + + # Try exact match (event type + operation) + key = (event_type_value, operation_value) + if key in self.handlers: + for handler in self.handlers[key]: + handler(event) + handlers_called = True + + # Try event type only + key = (event_type_value, None) + if key in self.handlers: + for handler in self.handlers[key]: + handler(event) + handlers_called = True + + # Try operation only + key = (None, operation_value) + if key in self.handlers: + for handler in self.handlers[key]: + handler(event) + handlers_called = True + + # Call default handlers if nothing matched + if not handlers_called: + for handler in self.default_handlers: + handler(event) + + +class FlaskWebhookExample: + """Example Flask webhook endpoint implementation. + + This is a minimal example. Production code should add logging, monitoring, + error handling, and proper HTTP status codes. + + Example: + >>> from flask import Flask, request + >>> app = Flask(__name__) + >>> webhook = FlaskWebhookExample(validator) + >>> + >>> @app.route("/webhook", methods=["POST"]) + >>> def handle_webhook(): + ... return webhook.handle_request(request.json, request.headers) + """ + + def __init__( + self, + validator: WebhookValidator, + dispatcher: Optional[SimpleWebhookEventDispatcher] = None, + ): + """Initialize the Flask webhook handler. + + Args: + validator: WebhookValidator instance for authentication and validation + dispatcher: Optional event dispatcher for routing events + """ + self.validator = validator + self.dispatcher = dispatcher + + def handle_request( + self, + payload: Dict[str, Any], + headers: Optional[Dict[str, str]] = None, + ) -> tuple[Dict[str, Any], int]: + """Handle an incoming webhook request from Flask. + + Args: + payload: Request JSON body + headers: Request headers + + Returns: + Tuple of (response_dict, http_status_code) + + Example: + >>> @app.route("/webhook", methods=["POST"]) + >>> def webhook(): + ... response, status = webhook_handler.handle_request( + ... request.json, dict(request.headers) + ... ) + ... return response, status + """ + try: + # Validate and parse the webhook + event = self.validator.validate_and_parse(payload, headers) + + # Dispatch to handlers if provided + if self.dispatcher: + self.dispatcher.dispatch(event) + + return {"status": "success", "event_id": event.id}, 200 + + except WebhookValidationError as e: + return {"status": "error", "message": str(e)}, 400 + except Exception as e: + return {"status": "error", "message": "Internal server error"}, 500 + + +class FastAPIWebhookExample: + """Example FastAPI webhook endpoint implementation. + + This is a minimal example. Production code should add logging, monitoring, + error handling, and proper HTTP status codes. + + Example: + >>> from fastapi import FastAPI, Request + >>> app = FastAPI() + >>> webhook = FastAPIWebhookExample(validator) + >>> + >>> @app.post("/webhook") + >>> async def handle_webhook(request: Request): + ... payload = await request.json() + ... return webhook.handle_request( + ... payload, + ... dict(request.headers) + ... ) + """ + + def __init__( + self, + validator: WebhookValidator, + dispatcher: Optional[SimpleWebhookEventDispatcher] = None, + ): + """Initialize the FastAPI webhook handler. + + Args: + validator: WebhookValidator instance for authentication and validation + dispatcher: Optional event dispatcher for routing events + """ + self.validator = validator + self.dispatcher = dispatcher + + def handle_request( + self, + payload: Dict[str, Any], + headers: Optional[Dict[str, str]] = None, + ) -> Dict[str, Any]: + """Handle an incoming webhook request from FastAPI. + + Args: + payload: Request JSON body + headers: Request headers + + Returns: + Response dictionary + + Example: + >>> @app.post("/webhook") + >>> async def webhook(request: Request): + ... payload = await request.json() + ... return webhook_handler.handle_request( + ... payload, + ... dict(request.headers) + ... ) + """ + try: + # Validate and parse the webhook + event = self.validator.validate_and_parse(payload, headers) + + # Dispatch to handlers if provided + if self.dispatcher: + self.dispatcher.dispatch(event) + + return {"status": "success", "event_id": event.id} + + except WebhookValidationError as e: + return {"status": "error", "message": str(e)} + + +# Example usage functions + +def example_basic_event_handler(event: CloudEvent) -> None: + """Simple example event handler that logs event info. + + Args: + event: The CloudEvent to handle + """ + print(f"Received event: {event.id}") + print(f"Event type: {event.eventtype}") + print(f"Operation: {event.operation}") + if event.data: + print(f"Object type: {event.get_object_type()}") + print(f"Object ID: {event.get_object_id()}") + + +def example_product_change_handler(event: CloudEvent) -> None: + """Example handler for product-related events. + + Args: + event: The CloudEvent to handle + """ + operation = event.get_operation() + object_data = event.get_object_data() + + if operation == Operation.ADD: + print(f"Product added: {event.get_object_id()}") + elif operation == Operation.MODIFY: + print(f"Product modified: {event.get_object_id()}") + elif operation == Operation.REMOVE: + print(f"Product removed: {event.get_object_id()}") + + if object_data: + print(f"Object data: {json.dumps(object_data, indent=2)}") diff --git a/examples/webhook_examples.py b/examples/webhook_examples.py new file mode 100644 index 0000000..0852bbc --- /dev/null +++ b/examples/webhook_examples.py @@ -0,0 +1,372 @@ +"""Comprehensive examples of using the Elytra webhooks subpackage.""" + +# ============================================================================ +# Example 1: Simple Webhook Parsing Without Authentication +# ============================================================================ + +from elytra_client.webhooks import CloudEvent, EventType, Operation, parse_webhook_payload + + +def example_simple_parsing(): + """Parse a webhook payload without authentication validation.""" + + # Typical webhook payload from Elytra + webhook_payload = { + "specversion": "1.0", + "id": "385d27ef-ac6d-4d93-904b-ea9801fc1bde", + "source": "elytra-pim", + "type": "com.elytra.product.attribute.value", + "datacontenttype": "application/json", + "time": "2025-12-31T04:19:30.860984129+01:00", + "eventtype": "product_attribute_value", + "operation": "modify", + "data": { + "objectData": { + "type": 2, + "value": "example value", + "attributeID": 50728, + }, + "objectId": 80194, + "objectType": "DAOAttributeValue", + }, + } + + # Parse the webhook + event: CloudEvent = parse_webhook_payload(webhook_payload) + + # Access event properties + print(f"Event ID: {event.id}") + print(f"Event Type: {event.get_event_type()}") + print(f"Operation: {event.get_operation()}") + print(f"Object ID: {event.get_object_id()}") + print(f"Object Type: {event.get_object_type()}") + + # Conditional handling based on event type + if event.get_event_type() == EventType.PRODUCT_ATTRIBUTE_VALUE: + if event.get_operation() == Operation.MODIFY: + print("Product attribute value was modified") + object_data = event.get_object_data() + if object_data: + print(f"New value: {object_data.get('value')}") + + +# ============================================================================ +# Example 2: Webhook with Basic Authentication +# ============================================================================ + +from elytra_client.webhooks import BasicAuth, WebhookValidationError, WebhookValidator + + +def example_basic_auth(): + """Validate webhook with HTTP Basic Authentication.""" + + # Create validator with basic auth + auth = BasicAuth(username="admin", password="secret") + validator = WebhookValidator(auth_type="basic", auth=auth) + + # Incoming webhook payload + payload = { + "specversion": "1.0", + "id": "test-event-id", + "source": "elytra-pim", + "type": "com.elytra.product", + "eventtype": "product", + "operation": "add", + } + + # Headers from the incoming request + headers = { + "Authorization": "Basic YWRtaW46c2VjcmV0" # base64 encoded admin:secret + } + + try: + # Validate and parse in one call + event = validator.validate_and_parse(payload, headers) + print(f"Webhook validated successfully. Event ID: {event.id}") + except WebhookValidationError as e: + print(f"Validation failed: {e}") + + +# ============================================================================ +# Example 3: Webhook with Bearer Token Authentication +# ============================================================================ + +from elytra_client.webhooks import BearerAuth + + +def example_bearer_auth(): + """Validate webhook with Bearer token authentication.""" + + auth = BearerAuth(token="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...") + validator = WebhookValidator(auth_type="bearer", auth=auth) + + payload = { + "specversion": "1.0", + "id": "test-event-id", + "source": "elytra-pim", + "type": "com.elytra.product", + "eventtype": "product", + "operation": "add", + } + + headers = {"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."} + + try: + event = validator.validate_and_parse(payload, headers) + print(f"Bearer token validated. Event ID: {event.id}") + except WebhookValidationError as e: + print(f"Validation failed: {e}") + + +# ============================================================================ +# Example 4: Webhook with API Key Authentication +# ============================================================================ + +from elytra_client.webhooks import APIKeyAuth + + +def example_api_key_auth(): + """Validate webhook with API key authentication.""" + + auth = APIKeyAuth(api_key="sk_live_abc123xyz789", header_name="X-API-Key") + validator = WebhookValidator(auth_type="apikey", auth=auth) + + payload = { + "specversion": "1.0", + "id": "test-event-id", + "source": "elytra-pim", + "type": "com.elytra.product", + "eventtype": "product", + "operation": "add", + } + + headers = {"X-API-Key": "sk_live_abc123xyz789"} + + try: + event = validator.validate_and_parse(payload, headers) + print(f"API key validated. Event ID: {event.id}") + except WebhookValidationError as e: + print(f"Validation failed: {e}") + + +# ============================================================================ +# Example 5: Event Filtering and Routing +# ============================================================================ + +from elytra_client.webhooks.server import EventHandler, SimpleWebhookEventDispatcher + + +class MyEventHandler(EventHandler): + """Custom event handler for specific event types.""" + + def handle_product_added(self, event: CloudEvent) -> None: + """Handle product added events.""" + print(f"🎉 New product added: {event.get_object_id()}") + + def handle_attribute_value_changed(self, event: CloudEvent) -> None: + """Handle attribute value changes.""" + print(f"📝 Attribute value changed for product: {event.get_object_id()}") + + +def example_event_routing(): + """Route events to different handlers based on type and operation.""" + + # Create dispatcher and handler + dispatcher = SimpleWebhookEventDispatcher() + handler = MyEventHandler() + + # Register specific handlers + dispatcher.register_handler( + callback=handler.handle_product_added, + event_type=EventType.PRODUCT, + operation=Operation.ADD, + ) + + dispatcher.register_handler( + callback=handler.handle_attribute_value_changed, + event_type=EventType.PRODUCT_ATTRIBUTE_VALUE, + ) + + # Register a catch-all handler + dispatcher.register_default_handler( + lambda event: print(f"Other event received: {event.eventtype}") + ) + + # Dispatch events + event1 = CloudEvent( + id="1", + source="elytra-pim", + type="com.elytra.product", + eventtype="product", + operation="add", + ) + dispatcher.dispatch(event1) # Calls handle_product_added + + event2 = CloudEvent( + id="2", + source="elytra-pim", + type="com.elytra.product.attribute.value", + eventtype="product_attribute_value", + operation="modify", + ) + dispatcher.dispatch(event2) # Calls handle_attribute_value_changed + + +# ============================================================================ +# Example 6: Retry Configuration and Execution +# ============================================================================ + +import requests + +from elytra_client.webhooks import ExponentialBackoffRetry, RetryConfig, execute_with_retry + + +def example_retry_with_webhook_delivery(): + """Retry webhook delivery with exponential backoff.""" + + # Configure retry policy + retry_config = RetryConfig( + max_retries=5, + initial_delay=1000, # 1 second + backoff_multiplier=2.0, + max_delay=300000, # 5 minutes + ) + + retry_policy = ExponentialBackoffRetry(retry_config) + + def send_webhook(url: str, data: dict) -> requests.Response: + """Send webhook with retry on failure.""" + return requests.post(url, json=data, timeout=10) + + try: + # This will retry with exponential backoff on failure + response = execute_with_retry( + send_webhook, + retry_policy, + url="https://example.com/webhook", + data={"event": "product_added"}, + ) + print(f"Webhook sent successfully: {response.status_code}") + except requests.exceptions.RequestException as e: + print(f"Webhook delivery failed after retries: {e}") + + +# ============================================================================ +# Example 7: Flask Integration +# ============================================================================ + +def example_flask_integration(): + """Flask webhook endpoint with authentication and validation.""" + + from flask import Flask, jsonify, request + + from elytra_client.webhooks.server import FlaskWebhookExample + + app = Flask(__name__) + + # Setup webhook handler with authentication + auth = BasicAuth(username="webhook_user", password="webhook_pass") + validator = WebhookValidator(auth_type="basic", auth=auth) + webhook_handler = FlaskWebhookExample(validator) + + @app.route("/webhook/elytra", methods=["POST"]) + def handle_elytra_webhook(): + """Handle incoming Elytra webhook events.""" + response, status_code = webhook_handler.handle_request( + request.json, dict(request.headers) + ) + return jsonify(response), status_code + + return app + + +# ============================================================================ +# Example 8: FastAPI Integration +# ============================================================================ + +def example_fastapi_integration(): + """FastAPI webhook endpoint with authentication and validation.""" + + from fastapi import FastAPI, Request + + from elytra_client.webhooks.server import FastAPIWebhookExample + + app = FastAPI() + + # Setup webhook handler with API key authentication + auth = APIKeyAuth(api_key="sk_live_secret123", header_name="X-Webhook-Key") + validator = WebhookValidator(auth_type="apikey", auth=auth) + dispatcher = SimpleWebhookEventDispatcher() + webhook_handler = FastAPIWebhookExample(validator, dispatcher) + + @app.post("/webhook/elytra") + async def handle_elytra_webhook(request: Request): + """Handle incoming Elytra webhook events.""" + payload = await request.json() + return webhook_handler.handle_request(payload, dict(request.headers)) + + return app + + +# ============================================================================ +# Example 9: Complete Production Example +# ============================================================================ + +def example_complete_setup(): + """Complete production-ready setup with all features.""" + + from elytra_client.webhooks import BasicAuth, EventType, Operation, WebhookValidator + from elytra_client.webhooks.server import FlaskWebhookExample, SimpleWebhookEventDispatcher + + # 1. Setup authentication + auth = BasicAuth(username="elytra_webhook", password="secure_password_here") + + # 2. Create validator + validator = WebhookValidator(auth_type="basic", auth=auth) + + # 3. Create event dispatcher + dispatcher = SimpleWebhookEventDispatcher() + + # 4. Register handlers for different event types + def on_product_added(event: CloudEvent) -> None: + print(f"Product {event.get_object_id()} was added") + # TODO: Implement your product sync logic + + def on_attribute_changed(event: CloudEvent) -> None: + print(f"Attribute changed for product {event.get_object_id()}") + # TODO: Implement your attribute sync logic + + dispatcher.register_handler( + on_product_added, EventType.PRODUCT, Operation.ADD + ) + dispatcher.register_handler( + on_attribute_changed, EventType.PRODUCT_ATTRIBUTE_VALUE + ) + + # 5. Create Flask webhook handler + webhook_handler = FlaskWebhookExample(validator, dispatcher) + + # 6. Use in Flask + from flask import Flask, jsonify, request + + app = Flask(__name__) + + @app.route("/webhook", methods=["POST"]) + def webhook(): + response, status = webhook_handler.handle_request( + request.json, dict(request.headers) + ) + return jsonify(response), status + + return app + + +if __name__ == "__main__": + print("Elytra Webhooks Examples") + print("=" * 60) + + print("\n1. Simple Parsing:") + example_simple_parsing() + + print("\n2. Event Routing:") + example_event_routing() diff --git a/tests/test_webhooks.py b/tests/test_webhooks.py new file mode 100644 index 0000000..7fdd012 --- /dev/null +++ b/tests/test_webhooks.py @@ -0,0 +1,437 @@ +"""Unit tests for the Elytra webhooks subpackage.""" + +from datetime import datetime +from unittest.mock import Mock, patch + +import pytest + +from elytra_client.webhooks import ( + APIKeyAuth, + BasicAuth, + BearerAuth, + CloudEvent, + EventType, + ExponentialBackoffRetry, + Operation, + RetryConfig, + calculate_retry_delay, +) +from elytra_client.webhooks.auth import APIKeyAuthProvider, BasicAuthProvider, BearerAuthProvider +from elytra_client.webhooks.handlers import ( + WebhookValidationError, + WebhookValidator, + parse_webhook_payload, + validate_cloud_event, +) +from elytra_client.webhooks.server import EventHandler, SimpleWebhookEventDispatcher + + +class TestCloudEvent: + """Test CloudEvent model.""" + + def test_cloud_event_creation(self): + """Test creating a CloudEvent.""" + event = CloudEvent( + id="test-id", + source="elytra-pim", + type="com.elytra.product", + eventtype="product", + operation="add", + ) + assert event.id == "test-id" + assert event.source == "elytra-pim" + assert event.specversion == "1.0" + + def test_get_event_type(self): + """Test parsing event type.""" + event = CloudEvent( + id="test-id", + source="elytra-pim", + type="com.elytra.product", + eventtype="product", + ) + assert event.get_event_type() == EventType.PRODUCT + + def test_get_operation(self): + """Test parsing operation.""" + event = CloudEvent( + id="test-id", + source="elytra-pim", + type="com.elytra.product", + operation="modify", + ) + assert event.get_operation() == Operation.MODIFY + + def test_get_event_type_invalid(self): + """Test invalid event type returns None.""" + event = CloudEvent( + id="test-id", + source="elytra-pim", + type="com.elytra.unknown", + eventtype="invalid_type", + ) + assert event.get_event_type() is None + + def test_get_object_data(self): + """Test extracting object data.""" + event = CloudEvent( + id="test-id", + source="elytra-pim", + type="com.elytra.product", + data={"objectData": {"id": 123, "name": "Product"}, "objectId": 123}, + ) + assert event.get_object_data() == {"id": 123, "name": "Product"} + + def test_get_object_id(self): + """Test extracting object ID.""" + event = CloudEvent( + id="test-id", + source="elytra-pim", + type="com.elytra.product", + data={"objectId": 456}, + ) + assert event.get_object_id() == 456 + + def test_get_object_type(self): + """Test extracting object type.""" + event = CloudEvent( + id="test-id", + source="elytra-pim", + type="com.elytra.product", + data={"objectType": "Product"}, + ) + assert event.get_object_type() == "Product" + + +class TestWebhookValidation: + """Test webhook validation.""" + + def test_parse_valid_webhook(self): + """Test parsing valid webhook payload.""" + payload = { + "specversion": "1.0", + "id": "test-id", + "source": "elytra-pim", + "type": "com.elytra.product", + "eventtype": "product", + "operation": "add", + } + event = parse_webhook_payload(payload) + assert event.id == "test-id" + assert event.eventtype == "product" + + def test_parse_invalid_webhook(self): + """Test parsing invalid webhook raises error.""" + payload = { + "id": "test-id", + # Missing required 'source' and 'type' + } + with pytest.raises(WebhookValidationError): + parse_webhook_payload(payload) + + def test_validate_cloud_event_success(self): + """Test successful cloud event validation.""" + event = CloudEvent( + id="test-id", + source="elytra-pim", + type="com.elytra.product", + ) + assert validate_cloud_event(event) is True + + def test_validate_cloud_event_missing_id(self): + """Test validation fails with missing ID.""" + event = CloudEvent( + id="", + source="elytra-pim", + type="com.elytra.product", + ) + with pytest.raises(WebhookValidationError): + validate_cloud_event(event) + + def test_validate_cloud_event_invalid_type_prefix(self): + """Test validation fails with invalid type prefix.""" + event = CloudEvent( + id="test-id", + source="elytra-pim", + type="com.other.product", # Wrong prefix + ) + with pytest.raises(WebhookValidationError): + validate_cloud_event(event) + + +class TestBasicAuth: + """Test Basic authentication.""" + + def test_basic_auth_get_headers(self): + """Test Basic auth header generation.""" + auth = BasicAuth(username="admin", password="secret") + provider = BasicAuthProvider(auth) + headers = provider.get_headers() + assert "Authorization" in headers + assert headers["Authorization"].startswith("Basic ") + + def test_basic_auth_validate_valid(self): + """Test Basic auth validation with valid credentials.""" + auth = BasicAuth(username="admin", password="secret") + provider = BasicAuthProvider(auth) + headers = provider.get_headers() + assert provider.validate_header(headers) is True + + def test_basic_auth_validate_invalid(self): + """Test Basic auth validation with invalid credentials.""" + auth = BasicAuth(username="admin", password="secret") + provider = BasicAuthProvider(auth) + headers = {"Authorization": "Basic dGVzdDp3cm9uZw=="} # test:wrong + assert provider.validate_header(headers) is False + + def test_basic_auth_validate_missing(self): + """Test Basic auth validation with missing header.""" + auth = BasicAuth(username="admin", password="secret") + provider = BasicAuthProvider(auth) + assert provider.validate_header({}) is False + + +class TestBearerAuth: + """Test Bearer token authentication.""" + + def test_bearer_auth_get_headers(self): + """Test Bearer auth header generation.""" + auth = BearerAuth(token="my_token_123") + provider = BearerAuthProvider(auth) + headers = provider.get_headers() + assert headers["Authorization"] == "Bearer my_token_123" + + def test_bearer_auth_validate_valid(self): + """Test Bearer auth validation with valid token.""" + auth = BearerAuth(token="my_token_123") + provider = BearerAuthProvider(auth) + headers = {"Authorization": "Bearer my_token_123"} + assert provider.validate_header(headers) is True + + def test_bearer_auth_validate_invalid(self): + """Test Bearer auth validation with invalid token.""" + auth = BearerAuth(token="my_token_123") + provider = BearerAuthProvider(auth) + headers = {"Authorization": "Bearer wrong_token"} + assert provider.validate_header(headers) is False + + +class TestAPIKeyAuth: + """Test API Key authentication.""" + + def test_api_key_auth_get_headers(self): + """Test API Key header generation.""" + auth = APIKeyAuth(api_key="sk_live_123", header_name="X-API-Key") + provider = APIKeyAuthProvider(auth) + headers = provider.get_headers() + assert headers["X-API-Key"] == "sk_live_123" + + def test_api_key_auth_default_header_name(self): + """Test API Key with default header name.""" + auth = APIKeyAuth(api_key="sk_live_123") + provider = APIKeyAuthProvider(auth) + headers = provider.get_headers() + assert "api-key" in headers + + def test_api_key_auth_validate_valid(self): + """Test API Key validation with valid key.""" + auth = APIKeyAuth(api_key="sk_live_123", header_name="X-API-Key") + provider = APIKeyAuthProvider(auth) + headers = {"X-API-Key": "sk_live_123"} + assert provider.validate_header(headers) is True + + def test_api_key_auth_validate_invalid(self): + """Test API Key validation with invalid key.""" + auth = APIKeyAuth(api_key="sk_live_123", header_name="X-API-Key") + provider = APIKeyAuthProvider(auth) + headers = {"X-API-Key": "wrong_key"} + assert provider.validate_header(headers) is False + + +class TestWebhookValidator: + """Test WebhookValidator class.""" + + def test_validator_no_auth(self): + """Test validator without authentication.""" + validator = WebhookValidator(auth_type="none") + payload = { + "specversion": "1.0", + "id": "test-id", + "source": "elytra-pim", + "type": "com.elytra.product", + } + event = validator.validate_and_parse(payload) + assert event.id == "test-id" + + def test_validator_basic_auth(self): + """Test validator with basic auth.""" + auth = BasicAuth(username="user", password="pass") + validator = WebhookValidator(auth_type="basic", auth=auth) + payload = { + "specversion": "1.0", + "id": "test-id", + "source": "elytra-pim", + "type": "com.elytra.product", + } + # Valid header + headers = {"Authorization": "Basic dXNlcjpwYXNz"} + event = validator.validate_and_parse(payload, headers) + assert event.id == "test-id" + + def test_validator_auth_validation_fails(self): + """Test validator fails on invalid auth.""" + auth = BasicAuth(username="user", password="pass") + validator = WebhookValidator(auth_type="basic", auth=auth) + payload = { + "specversion": "1.0", + "id": "test-id", + "source": "elytra-pim", + "type": "com.elytra.product", + } + # Invalid header + headers = {"Authorization": "Basic invalid"} + with pytest.raises(WebhookValidationError): + validator.validate_and_parse(payload, headers) + + +class TestRetryLogic: + """Test retry logic.""" + + def test_calculate_retry_delay_exponential(self): + """Test exponential backoff delay calculation.""" + # First attempt: 1000ms + assert calculate_retry_delay(0, initial_delay=1000) == 1000 + # Second attempt: 2000ms + assert calculate_retry_delay(1, initial_delay=1000, backoff_multiplier=2.0) == 2000 + # Third attempt: 4000ms + assert calculate_retry_delay(2, initial_delay=1000, backoff_multiplier=2.0) == 4000 + + def test_calculate_retry_delay_max_cap(self): + """Test retry delay is capped at max.""" + # Should be capped at max_delay + delay = calculate_retry_delay( + 10, + initial_delay=1000, + backoff_multiplier=2.0, + max_delay=300000, + ) + assert delay <= 300000 + + def test_exponential_backoff_retry_should_retry(self): + """Test ExponentialBackoffRetry.should_retry.""" + config = RetryConfig(max_retries=3) + policy = ExponentialBackoffRetry(config) + assert policy.should_retry(0) is True + assert policy.should_retry(1) is True + assert policy.should_retry(2) is True + assert policy.should_retry(3) is False + + def test_exponential_backoff_retry_delay(self): + """Test ExponentialBackoffRetry.get_delay.""" + config = RetryConfig( + max_retries=3, + initial_delay=1000, + backoff_multiplier=2.0, + ) + policy = ExponentialBackoffRetry(config) + assert policy.get_delay(0) == 1000 + assert policy.get_delay(1) == 2000 + assert policy.get_delay(2) == 4000 + + +class TestEventDispatcher: + """Test event dispatcher.""" + + def test_register_and_dispatch_exact_match(self): + """Test handler is called on exact event type and operation match.""" + dispatcher = SimpleWebhookEventDispatcher() + called_with = [] + + def handler(event: CloudEvent): + called_with.append(event) + + dispatcher.register_handler( + handler, EventType.PRODUCT, Operation.ADD + ) + + event = CloudEvent( + id="test", + source="elytra-pim", + type="com.elytra.product", + eventtype="product", + operation="add", + ) + + dispatcher.dispatch(event) + assert len(called_with) == 1 + assert called_with[0].id == "test" + + def test_register_and_dispatch_type_only(self): + """Test handler is called on event type match.""" + dispatcher = SimpleWebhookEventDispatcher() + called_with = [] + + def handler(event: CloudEvent): + called_with.append(event) + + dispatcher.register_handler(handler, EventType.PRODUCT) + + event = CloudEvent( + id="test", + source="elytra-pim", + type="com.elytra.product", + eventtype="product", + operation="modify", + ) + + dispatcher.dispatch(event) + assert len(called_with) == 1 + + def test_default_handler_called(self): + """Test default handler is called when no specific match.""" + dispatcher = SimpleWebhookEventDispatcher() + called_with = [] + + def handler(event: CloudEvent): + called_with.append(event) + + dispatcher.register_default_handler(handler) + + event = CloudEvent( + id="test", + source="elytra-pim", + type="com.elytra.unknown", + eventtype="unknown", + ) + + dispatcher.dispatch(event) + assert len(called_with) == 1 + + def test_multiple_handlers_called(self): + """Test multiple handlers are called for same event.""" + dispatcher = SimpleWebhookEventDispatcher() + called = [] + + def handler1(event: CloudEvent): + called.append("handler1") + + def handler2(event: CloudEvent): + called.append("handler2") + + dispatcher.register_handler(handler1, EventType.PRODUCT) + dispatcher.register_default_handler(handler2) + + event = CloudEvent( + id="test", + source="elytra-pim", + type="com.elytra.product", + eventtype="product", + ) + + dispatcher.dispatch(event) + assert "handler1" in called + assert "handler2" not in called # Only called if no matching handler + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])