elytra_client/examples/webhook_examples.py
claudi b8f889f224 feat: Implement webhook retry logic and server examples
- Added retry logic for webhook event delivery in `retry.py` with exponential backoff.
- Created example webhook server implementations in `server.py` for Flask and FastAPI.
- Developed comprehensive examples for using the Elytra webhooks subpackage in `webhook_examples.py`.
- Introduced unit tests for webhook functionality, including event handling, authentication, and retry logic in `test_webhooks.py`.
2026-02-20 10:08:07 +01:00

372 lines
12 KiB
Python

"""Comprehensive examples of using the Elytra webhooks subpackage."""
# ============================================================================
# Example 1: Simple Webhook Parsing Without Authentication
# ============================================================================
from elytra_client.webhooks import CloudEvent, EventType, Operation, parse_webhook_payload
def example_simple_parsing():
"""Parse a webhook payload without authentication validation."""
# Typical webhook payload from Elytra
webhook_payload = {
"specversion": "1.0",
"id": "385d27ef-ac6d-4d93-904b-ea9801fc1bde",
"source": "elytra-pim",
"type": "com.elytra.product.attribute.value",
"datacontenttype": "application/json",
"time": "2025-12-31T04:19:30.860984129+01:00",
"eventtype": "product_attribute_value",
"operation": "modify",
"data": {
"objectData": {
"type": 2,
"value": "example value",
"attributeID": 50728,
},
"objectId": 80194,
"objectType": "DAOAttributeValue",
},
}
# Parse the webhook
event: CloudEvent = parse_webhook_payload(webhook_payload)
# Access event properties
print(f"Event ID: {event.id}")
print(f"Event Type: {event.get_event_type()}")
print(f"Operation: {event.get_operation()}")
print(f"Object ID: {event.get_object_id()}")
print(f"Object Type: {event.get_object_type()}")
# Conditional handling based on event type
if event.get_event_type() == EventType.PRODUCT_ATTRIBUTE_VALUE:
if event.get_operation() == Operation.MODIFY:
print("Product attribute value was modified")
object_data = event.get_object_data()
if object_data:
print(f"New value: {object_data.get('value')}")
# ============================================================================
# Example 2: Webhook with Basic Authentication
# ============================================================================
from elytra_client.webhooks import BasicAuth, WebhookValidationError, WebhookValidator
def example_basic_auth():
"""Validate webhook with HTTP Basic Authentication."""
# Create validator with basic auth
auth = BasicAuth(username="admin", password="secret")
validator = WebhookValidator(auth_type="basic", auth=auth)
# Incoming webhook payload
payload = {
"specversion": "1.0",
"id": "test-event-id",
"source": "elytra-pim",
"type": "com.elytra.product",
"eventtype": "product",
"operation": "add",
}
# Headers from the incoming request
headers = {
"Authorization": "Basic YWRtaW46c2VjcmV0" # base64 encoded admin:secret
}
try:
# Validate and parse in one call
event = validator.validate_and_parse(payload, headers)
print(f"Webhook validated successfully. Event ID: {event.id}")
except WebhookValidationError as e:
print(f"Validation failed: {e}")
# ============================================================================
# Example 3: Webhook with Bearer Token Authentication
# ============================================================================
from elytra_client.webhooks import BearerAuth
def example_bearer_auth():
"""Validate webhook with Bearer token authentication."""
auth = BearerAuth(token="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...")
validator = WebhookValidator(auth_type="bearer", auth=auth)
payload = {
"specversion": "1.0",
"id": "test-event-id",
"source": "elytra-pim",
"type": "com.elytra.product",
"eventtype": "product",
"operation": "add",
}
headers = {"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."}
try:
event = validator.validate_and_parse(payload, headers)
print(f"Bearer token validated. Event ID: {event.id}")
except WebhookValidationError as e:
print(f"Validation failed: {e}")
# ============================================================================
# Example 4: Webhook with API Key Authentication
# ============================================================================
from elytra_client.webhooks import APIKeyAuth
def example_api_key_auth():
"""Validate webhook with API key authentication."""
auth = APIKeyAuth(api_key="sk_live_abc123xyz789", header_name="X-API-Key")
validator = WebhookValidator(auth_type="apikey", auth=auth)
payload = {
"specversion": "1.0",
"id": "test-event-id",
"source": "elytra-pim",
"type": "com.elytra.product",
"eventtype": "product",
"operation": "add",
}
headers = {"X-API-Key": "sk_live_abc123xyz789"}
try:
event = validator.validate_and_parse(payload, headers)
print(f"API key validated. Event ID: {event.id}")
except WebhookValidationError as e:
print(f"Validation failed: {e}")
# ============================================================================
# Example 5: Event Filtering and Routing
# ============================================================================
from elytra_client.webhooks.server import EventHandler, SimpleWebhookEventDispatcher
class MyEventHandler(EventHandler):
"""Custom event handler for specific event types."""
def handle_product_added(self, event: CloudEvent) -> None:
"""Handle product added events."""
print(f"🎉 New product added: {event.get_object_id()}")
def handle_attribute_value_changed(self, event: CloudEvent) -> None:
"""Handle attribute value changes."""
print(f"📝 Attribute value changed for product: {event.get_object_id()}")
def example_event_routing():
"""Route events to different handlers based on type and operation."""
# Create dispatcher and handler
dispatcher = SimpleWebhookEventDispatcher()
handler = MyEventHandler()
# Register specific handlers
dispatcher.register_handler(
callback=handler.handle_product_added,
event_type=EventType.PRODUCT,
operation=Operation.ADD,
)
dispatcher.register_handler(
callback=handler.handle_attribute_value_changed,
event_type=EventType.PRODUCT_ATTRIBUTE_VALUE,
)
# Register a catch-all handler
dispatcher.register_default_handler(
lambda event: print(f"Other event received: {event.eventtype}")
)
# Dispatch events
event1 = CloudEvent(
id="1",
source="elytra-pim",
type="com.elytra.product",
eventtype="product",
operation="add",
)
dispatcher.dispatch(event1) # Calls handle_product_added
event2 = CloudEvent(
id="2",
source="elytra-pim",
type="com.elytra.product.attribute.value",
eventtype="product_attribute_value",
operation="modify",
)
dispatcher.dispatch(event2) # Calls handle_attribute_value_changed
# ============================================================================
# Example 6: Retry Configuration and Execution
# ============================================================================
import requests
from elytra_client.webhooks import ExponentialBackoffRetry, RetryConfig, execute_with_retry
def example_retry_with_webhook_delivery():
"""Retry webhook delivery with exponential backoff."""
# Configure retry policy
retry_config = RetryConfig(
max_retries=5,
initial_delay=1000, # 1 second
backoff_multiplier=2.0,
max_delay=300000, # 5 minutes
)
retry_policy = ExponentialBackoffRetry(retry_config)
def send_webhook(url: str, data: dict) -> requests.Response:
"""Send webhook with retry on failure."""
return requests.post(url, json=data, timeout=10)
try:
# This will retry with exponential backoff on failure
response = execute_with_retry(
send_webhook,
retry_policy,
url="https://example.com/webhook",
data={"event": "product_added"},
)
print(f"Webhook sent successfully: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Webhook delivery failed after retries: {e}")
# ============================================================================
# Example 7: Flask Integration
# ============================================================================
def example_flask_integration():
"""Flask webhook endpoint with authentication and validation."""
from flask import Flask, jsonify, request
from elytra_client.webhooks.server import FlaskWebhookExample
app = Flask(__name__)
# Setup webhook handler with authentication
auth = BasicAuth(username="webhook_user", password="webhook_pass")
validator = WebhookValidator(auth_type="basic", auth=auth)
webhook_handler = FlaskWebhookExample(validator)
@app.route("/webhook/elytra", methods=["POST"])
def handle_elytra_webhook():
"""Handle incoming Elytra webhook events."""
response, status_code = webhook_handler.handle_request(
request.json, dict(request.headers)
)
return jsonify(response), status_code
return app
# ============================================================================
# Example 8: FastAPI Integration
# ============================================================================
def example_fastapi_integration():
"""FastAPI webhook endpoint with authentication and validation."""
from fastapi import FastAPI, Request
from elytra_client.webhooks.server import FastAPIWebhookExample
app = FastAPI()
# Setup webhook handler with API key authentication
auth = APIKeyAuth(api_key="sk_live_secret123", header_name="X-Webhook-Key")
validator = WebhookValidator(auth_type="apikey", auth=auth)
dispatcher = SimpleWebhookEventDispatcher()
webhook_handler = FastAPIWebhookExample(validator, dispatcher)
@app.post("/webhook/elytra")
async def handle_elytra_webhook(request: Request):
"""Handle incoming Elytra webhook events."""
payload = await request.json()
return webhook_handler.handle_request(payload, dict(request.headers))
return app
# ============================================================================
# Example 9: Complete Production Example
# ============================================================================
def example_complete_setup():
"""Complete production-ready setup with all features."""
from elytra_client.webhooks import BasicAuth, EventType, Operation, WebhookValidator
from elytra_client.webhooks.server import FlaskWebhookExample, SimpleWebhookEventDispatcher
# 1. Setup authentication
auth = BasicAuth(username="elytra_webhook", password="secure_password_here")
# 2. Create validator
validator = WebhookValidator(auth_type="basic", auth=auth)
# 3. Create event dispatcher
dispatcher = SimpleWebhookEventDispatcher()
# 4. Register handlers for different event types
def on_product_added(event: CloudEvent) -> None:
print(f"Product {event.get_object_id()} was added")
# TODO: Implement your product sync logic
def on_attribute_changed(event: CloudEvent) -> None:
print(f"Attribute changed for product {event.get_object_id()}")
# TODO: Implement your attribute sync logic
dispatcher.register_handler(
on_product_added, EventType.PRODUCT, Operation.ADD
)
dispatcher.register_handler(
on_attribute_changed, EventType.PRODUCT_ATTRIBUTE_VALUE
)
# 5. Create Flask webhook handler
webhook_handler = FlaskWebhookExample(validator, dispatcher)
# 6. Use in Flask
from flask import Flask, jsonify, request
app = Flask(__name__)
@app.route("/webhook", methods=["POST"])
def webhook():
response, status = webhook_handler.handle_request(
request.json, dict(request.headers)
)
return jsonify(response), status
return app
if __name__ == "__main__":
print("Elytra Webhooks Examples")
print("=" * 60)
print("\n1. Simple Parsing:")
example_simple_parsing()
print("\n2. Event Routing:")
example_event_routing()