- Added retry logic for webhook event delivery in `retry.py` with exponential backoff. - Created example webhook server implementations in `server.py` for Flask and FastAPI. - Developed comprehensive examples for using the Elytra webhooks subpackage in `webhook_examples.py`. - Introduced unit tests for webhook functionality, including event handling, authentication, and retry logic in `test_webhooks.py`.
332 lines
10 KiB
Python
332 lines
10 KiB
Python
"""Example webhook server implementations for handling Elytra Event API webhooks.
|
|
|
|
This module provides example implementations of webhook servers that can receive
|
|
CloudEvents from the Elytra PIM Event API. Choose the framework that best fits
|
|
your application.
|
|
|
|
Note: These are example implementations. Production deployments should add
|
|
additional error handling, logging, and security measures.
|
|
"""
|
|
|
|
import json
|
|
from typing import Any, Callable, Dict, Optional
|
|
|
|
from .handlers import WebhookValidationError, WebhookValidator, parse_webhook_payload
|
|
from .models import CloudEvent, EventType, Operation
|
|
|
|
|
|
class EventHandler:
|
|
"""Base class for handling Elytra CloudEvents.
|
|
|
|
Subclass this to implement custom event handling logic.
|
|
"""
|
|
|
|
def handle_event(self, event: CloudEvent) -> None:
|
|
"""Handle a CloudEvent from Elytra.
|
|
|
|
Override this method to implement your event handling logic.
|
|
|
|
Args:
|
|
event: The CloudEvent to handle
|
|
"""
|
|
pass
|
|
|
|
def handle_product_added(self, event: CloudEvent) -> None:
|
|
"""Handle product add events."""
|
|
pass
|
|
|
|
def handle_product_modified(self, event: CloudEvent) -> None:
|
|
"""Handle product modify events."""
|
|
pass
|
|
|
|
def handle_product_removed(self, event: CloudEvent) -> None:
|
|
"""Handle product remove events."""
|
|
pass
|
|
|
|
def handle_attribute_value_changed(self, event: CloudEvent) -> None:
|
|
"""Handle product attribute value change events."""
|
|
pass
|
|
|
|
|
|
class SimpleWebhookEventDispatcher:
|
|
"""Simple event dispatcher that routes events to registered handlers.
|
|
|
|
Routes events based on event type and operation.
|
|
|
|
Example:
|
|
>>> dispatcher = SimpleWebhookEventDispatcher()
|
|
>>> dispatcher.register_handler(
|
|
... event_type=EventType.PRODUCT,
|
|
... operation=Operation.ADD,
|
|
... callback=my_handler.handle_product_added,
|
|
... )
|
|
>>> dispatcher.dispatch(event)
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the event dispatcher."""
|
|
self.handlers: Dict[
|
|
tuple[Optional[str], Optional[str]], list[Callable[[CloudEvent], None]]
|
|
] = {}
|
|
self.default_handlers: list[Callable[[CloudEvent], None]] = []
|
|
|
|
def register_handler(
|
|
self,
|
|
callback: Callable[[CloudEvent], None],
|
|
event_type: Optional[EventType] = None,
|
|
operation: Optional[Operation] = None,
|
|
) -> None:
|
|
"""Register a handler for a specific event type and/or operation.
|
|
|
|
Args:
|
|
callback: Callable that accepts a CloudEvent
|
|
event_type: Filter by event type (None = all types)
|
|
operation: Filter by operation (None = all operations)
|
|
|
|
Example:
|
|
>>> dispatcher.register_handler(
|
|
... callback=handle_product_changes,
|
|
... event_type=EventType.PRODUCT,
|
|
... )
|
|
>>> dispatcher.register_handler(
|
|
... callback=handle_all_events,
|
|
... )
|
|
"""
|
|
key = (
|
|
event_type.value if event_type else None,
|
|
operation.value if operation else None,
|
|
)
|
|
if key not in self.handlers:
|
|
self.handlers[key] = []
|
|
self.handlers[key].append(callback)
|
|
|
|
def register_default_handler(
|
|
self, callback: Callable[[CloudEvent], None]
|
|
) -> None:
|
|
"""Register a default handler for events not matching any registered handler.
|
|
|
|
Args:
|
|
callback: Callable that accepts a CloudEvent
|
|
"""
|
|
self.default_handlers.append(callback)
|
|
|
|
def dispatch(self, event: CloudEvent) -> None:
|
|
"""Dispatch an event to all matching handlers.
|
|
|
|
Handlers are called in the following order:
|
|
1. Handlers matching both event type and operation
|
|
2. Handlers matching event type only
|
|
3. Handlers matching operation only
|
|
4. Default handlers (if no others matched)
|
|
|
|
Args:
|
|
event: The CloudEvent to dispatch
|
|
"""
|
|
event_type = event.get_event_type()
|
|
operation = event.get_operation()
|
|
|
|
event_type_value = event_type.value if event_type else None
|
|
operation_value = operation.value if operation else None
|
|
|
|
handlers_called = False
|
|
|
|
# Try exact match (event type + operation)
|
|
key = (event_type_value, operation_value)
|
|
if key in self.handlers:
|
|
for handler in self.handlers[key]:
|
|
handler(event)
|
|
handlers_called = True
|
|
|
|
# Try event type only
|
|
key = (event_type_value, None)
|
|
if key in self.handlers:
|
|
for handler in self.handlers[key]:
|
|
handler(event)
|
|
handlers_called = True
|
|
|
|
# Try operation only
|
|
key = (None, operation_value)
|
|
if key in self.handlers:
|
|
for handler in self.handlers[key]:
|
|
handler(event)
|
|
handlers_called = True
|
|
|
|
# Call default handlers if nothing matched
|
|
if not handlers_called:
|
|
for handler in self.default_handlers:
|
|
handler(event)
|
|
|
|
|
|
class FlaskWebhookExample:
|
|
"""Example Flask webhook endpoint implementation.
|
|
|
|
This is a minimal example. Production code should add logging, monitoring,
|
|
error handling, and proper HTTP status codes.
|
|
|
|
Example:
|
|
>>> from flask import Flask, request
|
|
>>> app = Flask(__name__)
|
|
>>> webhook = FlaskWebhookExample(validator)
|
|
>>>
|
|
>>> @app.route("/webhook", methods=["POST"])
|
|
>>> def handle_webhook():
|
|
... return webhook.handle_request(request.json, request.headers)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
validator: WebhookValidator,
|
|
dispatcher: Optional[SimpleWebhookEventDispatcher] = None,
|
|
):
|
|
"""Initialize the Flask webhook handler.
|
|
|
|
Args:
|
|
validator: WebhookValidator instance for authentication and validation
|
|
dispatcher: Optional event dispatcher for routing events
|
|
"""
|
|
self.validator = validator
|
|
self.dispatcher = dispatcher
|
|
|
|
def handle_request(
|
|
self,
|
|
payload: Dict[str, Any],
|
|
headers: Optional[Dict[str, str]] = None,
|
|
) -> tuple[Dict[str, Any], int]:
|
|
"""Handle an incoming webhook request from Flask.
|
|
|
|
Args:
|
|
payload: Request JSON body
|
|
headers: Request headers
|
|
|
|
Returns:
|
|
Tuple of (response_dict, http_status_code)
|
|
|
|
Example:
|
|
>>> @app.route("/webhook", methods=["POST"])
|
|
>>> def webhook():
|
|
... response, status = webhook_handler.handle_request(
|
|
... request.json, dict(request.headers)
|
|
... )
|
|
... return response, status
|
|
"""
|
|
try:
|
|
# Validate and parse the webhook
|
|
event = self.validator.validate_and_parse(payload, headers)
|
|
|
|
# Dispatch to handlers if provided
|
|
if self.dispatcher:
|
|
self.dispatcher.dispatch(event)
|
|
|
|
return {"status": "success", "event_id": event.id}, 200
|
|
|
|
except WebhookValidationError as e:
|
|
return {"status": "error", "message": str(e)}, 400
|
|
except Exception as e:
|
|
return {"status": "error", "message": "Internal server error"}, 500
|
|
|
|
|
|
class FastAPIWebhookExample:
|
|
"""Example FastAPI webhook endpoint implementation.
|
|
|
|
This is a minimal example. Production code should add logging, monitoring,
|
|
error handling, and proper HTTP status codes.
|
|
|
|
Example:
|
|
>>> from fastapi import FastAPI, Request
|
|
>>> app = FastAPI()
|
|
>>> webhook = FastAPIWebhookExample(validator)
|
|
>>>
|
|
>>> @app.post("/webhook")
|
|
>>> async def handle_webhook(request: Request):
|
|
... payload = await request.json()
|
|
... return webhook.handle_request(
|
|
... payload,
|
|
... dict(request.headers)
|
|
... )
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
validator: WebhookValidator,
|
|
dispatcher: Optional[SimpleWebhookEventDispatcher] = None,
|
|
):
|
|
"""Initialize the FastAPI webhook handler.
|
|
|
|
Args:
|
|
validator: WebhookValidator instance for authentication and validation
|
|
dispatcher: Optional event dispatcher for routing events
|
|
"""
|
|
self.validator = validator
|
|
self.dispatcher = dispatcher
|
|
|
|
def handle_request(
|
|
self,
|
|
payload: Dict[str, Any],
|
|
headers: Optional[Dict[str, str]] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Handle an incoming webhook request from FastAPI.
|
|
|
|
Args:
|
|
payload: Request JSON body
|
|
headers: Request headers
|
|
|
|
Returns:
|
|
Response dictionary
|
|
|
|
Example:
|
|
>>> @app.post("/webhook")
|
|
>>> async def webhook(request: Request):
|
|
... payload = await request.json()
|
|
... return webhook_handler.handle_request(
|
|
... payload,
|
|
... dict(request.headers)
|
|
... )
|
|
"""
|
|
try:
|
|
# Validate and parse the webhook
|
|
event = self.validator.validate_and_parse(payload, headers)
|
|
|
|
# Dispatch to handlers if provided
|
|
if self.dispatcher:
|
|
self.dispatcher.dispatch(event)
|
|
|
|
return {"status": "success", "event_id": event.id}
|
|
|
|
except WebhookValidationError as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
|
|
# Example usage functions
|
|
|
|
def example_basic_event_handler(event: CloudEvent) -> None:
|
|
"""Simple example event handler that logs event info.
|
|
|
|
Args:
|
|
event: The CloudEvent to handle
|
|
"""
|
|
print(f"Received event: {event.id}")
|
|
print(f"Event type: {event.eventtype}")
|
|
print(f"Operation: {event.operation}")
|
|
if event.data:
|
|
print(f"Object type: {event.get_object_type()}")
|
|
print(f"Object ID: {event.get_object_id()}")
|
|
|
|
|
|
def example_product_change_handler(event: CloudEvent) -> None:
|
|
"""Example handler for product-related events.
|
|
|
|
Args:
|
|
event: The CloudEvent to handle
|
|
"""
|
|
operation = event.get_operation()
|
|
object_data = event.get_object_data()
|
|
|
|
if operation == Operation.ADD:
|
|
print(f"Product added: {event.get_object_id()}")
|
|
elif operation == Operation.MODIFY:
|
|
print(f"Product modified: {event.get_object_id()}")
|
|
elif operation == Operation.REMOVE:
|
|
print(f"Product removed: {event.get_object_id()}")
|
|
|
|
if object_data:
|
|
print(f"Object data: {json.dumps(object_data, indent=2)}")
|