elytra_client/elytra_client/webhooks/server.py
claudi b8f889f224 feat: Implement webhook retry logic and server examples
- Added retry logic for webhook event delivery in `retry.py` with exponential backoff.
- Created example webhook server implementations in `server.py` for Flask and FastAPI.
- Developed comprehensive examples for using the Elytra webhooks subpackage in `webhook_examples.py`.
- Introduced unit tests for webhook functionality, including event handling, authentication, and retry logic in `test_webhooks.py`.
2026-02-20 10:08:07 +01:00

332 lines
10 KiB
Python

"""Example webhook server implementations for handling Elytra Event API webhooks.
This module provides example implementations of webhook servers that can receive
CloudEvents from the Elytra PIM Event API. Choose the framework that best fits
your application.
Note: These are example implementations. Production deployments should add
additional error handling, logging, and security measures.
"""
import json
from typing import Any, Callable, Dict, Optional
from .handlers import WebhookValidationError, WebhookValidator, parse_webhook_payload
from .models import CloudEvent, EventType, Operation
class EventHandler:
"""Base class for handling Elytra CloudEvents.
Subclass this to implement custom event handling logic.
"""
def handle_event(self, event: CloudEvent) -> None:
"""Handle a CloudEvent from Elytra.
Override this method to implement your event handling logic.
Args:
event: The CloudEvent to handle
"""
pass
def handle_product_added(self, event: CloudEvent) -> None:
"""Handle product add events."""
pass
def handle_product_modified(self, event: CloudEvent) -> None:
"""Handle product modify events."""
pass
def handle_product_removed(self, event: CloudEvent) -> None:
"""Handle product remove events."""
pass
def handle_attribute_value_changed(self, event: CloudEvent) -> None:
"""Handle product attribute value change events."""
pass
class SimpleWebhookEventDispatcher:
"""Simple event dispatcher that routes events to registered handlers.
Routes events based on event type and operation.
Example:
>>> dispatcher = SimpleWebhookEventDispatcher()
>>> dispatcher.register_handler(
... event_type=EventType.PRODUCT,
... operation=Operation.ADD,
... callback=my_handler.handle_product_added,
... )
>>> dispatcher.dispatch(event)
"""
def __init__(self):
"""Initialize the event dispatcher."""
self.handlers: Dict[
tuple[Optional[str], Optional[str]], list[Callable[[CloudEvent], None]]
] = {}
self.default_handlers: list[Callable[[CloudEvent], None]] = []
def register_handler(
self,
callback: Callable[[CloudEvent], None],
event_type: Optional[EventType] = None,
operation: Optional[Operation] = None,
) -> None:
"""Register a handler for a specific event type and/or operation.
Args:
callback: Callable that accepts a CloudEvent
event_type: Filter by event type (None = all types)
operation: Filter by operation (None = all operations)
Example:
>>> dispatcher.register_handler(
... callback=handle_product_changes,
... event_type=EventType.PRODUCT,
... )
>>> dispatcher.register_handler(
... callback=handle_all_events,
... )
"""
key = (
event_type.value if event_type else None,
operation.value if operation else None,
)
if key not in self.handlers:
self.handlers[key] = []
self.handlers[key].append(callback)
def register_default_handler(
self, callback: Callable[[CloudEvent], None]
) -> None:
"""Register a default handler for events not matching any registered handler.
Args:
callback: Callable that accepts a CloudEvent
"""
self.default_handlers.append(callback)
def dispatch(self, event: CloudEvent) -> None:
"""Dispatch an event to all matching handlers.
Handlers are called in the following order:
1. Handlers matching both event type and operation
2. Handlers matching event type only
3. Handlers matching operation only
4. Default handlers (if no others matched)
Args:
event: The CloudEvent to dispatch
"""
event_type = event.get_event_type()
operation = event.get_operation()
event_type_value = event_type.value if event_type else None
operation_value = operation.value if operation else None
handlers_called = False
# Try exact match (event type + operation)
key = (event_type_value, operation_value)
if key in self.handlers:
for handler in self.handlers[key]:
handler(event)
handlers_called = True
# Try event type only
key = (event_type_value, None)
if key in self.handlers:
for handler in self.handlers[key]:
handler(event)
handlers_called = True
# Try operation only
key = (None, operation_value)
if key in self.handlers:
for handler in self.handlers[key]:
handler(event)
handlers_called = True
# Call default handlers if nothing matched
if not handlers_called:
for handler in self.default_handlers:
handler(event)
class FlaskWebhookExample:
"""Example Flask webhook endpoint implementation.
This is a minimal example. Production code should add logging, monitoring,
error handling, and proper HTTP status codes.
Example:
>>> from flask import Flask, request
>>> app = Flask(__name__)
>>> webhook = FlaskWebhookExample(validator)
>>>
>>> @app.route("/webhook", methods=["POST"])
>>> def handle_webhook():
... return webhook.handle_request(request.json, request.headers)
"""
def __init__(
self,
validator: WebhookValidator,
dispatcher: Optional[SimpleWebhookEventDispatcher] = None,
):
"""Initialize the Flask webhook handler.
Args:
validator: WebhookValidator instance for authentication and validation
dispatcher: Optional event dispatcher for routing events
"""
self.validator = validator
self.dispatcher = dispatcher
def handle_request(
self,
payload: Dict[str, Any],
headers: Optional[Dict[str, str]] = None,
) -> tuple[Dict[str, Any], int]:
"""Handle an incoming webhook request from Flask.
Args:
payload: Request JSON body
headers: Request headers
Returns:
Tuple of (response_dict, http_status_code)
Example:
>>> @app.route("/webhook", methods=["POST"])
>>> def webhook():
... response, status = webhook_handler.handle_request(
... request.json, dict(request.headers)
... )
... return response, status
"""
try:
# Validate and parse the webhook
event = self.validator.validate_and_parse(payload, headers)
# Dispatch to handlers if provided
if self.dispatcher:
self.dispatcher.dispatch(event)
return {"status": "success", "event_id": event.id}, 200
except WebhookValidationError as e:
return {"status": "error", "message": str(e)}, 400
except Exception as e:
return {"status": "error", "message": "Internal server error"}, 500
class FastAPIWebhookExample:
"""Example FastAPI webhook endpoint implementation.
This is a minimal example. Production code should add logging, monitoring,
error handling, and proper HTTP status codes.
Example:
>>> from fastapi import FastAPI, Request
>>> app = FastAPI()
>>> webhook = FastAPIWebhookExample(validator)
>>>
>>> @app.post("/webhook")
>>> async def handle_webhook(request: Request):
... payload = await request.json()
... return webhook.handle_request(
... payload,
... dict(request.headers)
... )
"""
def __init__(
self,
validator: WebhookValidator,
dispatcher: Optional[SimpleWebhookEventDispatcher] = None,
):
"""Initialize the FastAPI webhook handler.
Args:
validator: WebhookValidator instance for authentication and validation
dispatcher: Optional event dispatcher for routing events
"""
self.validator = validator
self.dispatcher = dispatcher
def handle_request(
self,
payload: Dict[str, Any],
headers: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
"""Handle an incoming webhook request from FastAPI.
Args:
payload: Request JSON body
headers: Request headers
Returns:
Response dictionary
Example:
>>> @app.post("/webhook")
>>> async def webhook(request: Request):
... payload = await request.json()
... return webhook_handler.handle_request(
... payload,
... dict(request.headers)
... )
"""
try:
# Validate and parse the webhook
event = self.validator.validate_and_parse(payload, headers)
# Dispatch to handlers if provided
if self.dispatcher:
self.dispatcher.dispatch(event)
return {"status": "success", "event_id": event.id}
except WebhookValidationError as e:
return {"status": "error", "message": str(e)}
# Example usage functions
def example_basic_event_handler(event: CloudEvent) -> None:
"""Simple example event handler that logs event info.
Args:
event: The CloudEvent to handle
"""
print(f"Received event: {event.id}")
print(f"Event type: {event.eventtype}")
print(f"Operation: {event.operation}")
if event.data:
print(f"Object type: {event.get_object_type()}")
print(f"Object ID: {event.get_object_id()}")
def example_product_change_handler(event: CloudEvent) -> None:
"""Example handler for product-related events.
Args:
event: The CloudEvent to handle
"""
operation = event.get_operation()
object_data = event.get_object_data()
if operation == Operation.ADD:
print(f"Product added: {event.get_object_id()}")
elif operation == Operation.MODIFY:
print(f"Product modified: {event.get_object_id()}")
elif operation == Operation.REMOVE:
print(f"Product removed: {event.get_object_id()}")
if object_data:
print(f"Object data: {json.dumps(object_data, indent=2)}")