- Added retry logic for webhook event delivery in `retry.py` with exponential backoff. - Created example webhook server implementations in `server.py` for Flask and FastAPI. - Developed comprehensive examples for using the Elytra webhooks subpackage in `webhook_examples.py`. - Introduced unit tests for webhook functionality, including event handling, authentication, and retry logic in `test_webhooks.py`.
372 lines
12 KiB
Python
372 lines
12 KiB
Python
"""Comprehensive examples of using the Elytra webhooks subpackage."""
|
|
|
|
# ============================================================================
|
|
# Example 1: Simple Webhook Parsing Without Authentication
|
|
# ============================================================================
|
|
|
|
from elytra_client.webhooks import CloudEvent, EventType, Operation, parse_webhook_payload
|
|
|
|
|
|
def example_simple_parsing():
|
|
"""Parse a webhook payload without authentication validation."""
|
|
|
|
# Typical webhook payload from Elytra
|
|
webhook_payload = {
|
|
"specversion": "1.0",
|
|
"id": "385d27ef-ac6d-4d93-904b-ea9801fc1bde",
|
|
"source": "elytra-pim",
|
|
"type": "com.elytra.product.attribute.value",
|
|
"datacontenttype": "application/json",
|
|
"time": "2025-12-31T04:19:30.860984129+01:00",
|
|
"eventtype": "product_attribute_value",
|
|
"operation": "modify",
|
|
"data": {
|
|
"objectData": {
|
|
"type": 2,
|
|
"value": "example value",
|
|
"attributeID": 50728,
|
|
},
|
|
"objectId": 80194,
|
|
"objectType": "DAOAttributeValue",
|
|
},
|
|
}
|
|
|
|
# Parse the webhook
|
|
event: CloudEvent = parse_webhook_payload(webhook_payload)
|
|
|
|
# Access event properties
|
|
print(f"Event ID: {event.id}")
|
|
print(f"Event Type: {event.get_event_type()}")
|
|
print(f"Operation: {event.get_operation()}")
|
|
print(f"Object ID: {event.get_object_id()}")
|
|
print(f"Object Type: {event.get_object_type()}")
|
|
|
|
# Conditional handling based on event type
|
|
if event.get_event_type() == EventType.PRODUCT_ATTRIBUTE_VALUE:
|
|
if event.get_operation() == Operation.MODIFY:
|
|
print("Product attribute value was modified")
|
|
object_data = event.get_object_data()
|
|
if object_data:
|
|
print(f"New value: {object_data.get('value')}")
|
|
|
|
|
|
# ============================================================================
|
|
# Example 2: Webhook with Basic Authentication
|
|
# ============================================================================
|
|
|
|
from elytra_client.webhooks import BasicAuth, WebhookValidationError, WebhookValidator
|
|
|
|
|
|
def example_basic_auth():
|
|
"""Validate webhook with HTTP Basic Authentication."""
|
|
|
|
# Create validator with basic auth
|
|
auth = BasicAuth(username="admin", password="secret")
|
|
validator = WebhookValidator(auth_type="basic", auth=auth)
|
|
|
|
# Incoming webhook payload
|
|
payload = {
|
|
"specversion": "1.0",
|
|
"id": "test-event-id",
|
|
"source": "elytra-pim",
|
|
"type": "com.elytra.product",
|
|
"eventtype": "product",
|
|
"operation": "add",
|
|
}
|
|
|
|
# Headers from the incoming request
|
|
headers = {
|
|
"Authorization": "Basic YWRtaW46c2VjcmV0" # base64 encoded admin:secret
|
|
}
|
|
|
|
try:
|
|
# Validate and parse in one call
|
|
event = validator.validate_and_parse(payload, headers)
|
|
print(f"Webhook validated successfully. Event ID: {event.id}")
|
|
except WebhookValidationError as e:
|
|
print(f"Validation failed: {e}")
|
|
|
|
|
|
# ============================================================================
|
|
# Example 3: Webhook with Bearer Token Authentication
|
|
# ============================================================================
|
|
|
|
from elytra_client.webhooks import BearerAuth
|
|
|
|
|
|
def example_bearer_auth():
|
|
"""Validate webhook with Bearer token authentication."""
|
|
|
|
auth = BearerAuth(token="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...")
|
|
validator = WebhookValidator(auth_type="bearer", auth=auth)
|
|
|
|
payload = {
|
|
"specversion": "1.0",
|
|
"id": "test-event-id",
|
|
"source": "elytra-pim",
|
|
"type": "com.elytra.product",
|
|
"eventtype": "product",
|
|
"operation": "add",
|
|
}
|
|
|
|
headers = {"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."}
|
|
|
|
try:
|
|
event = validator.validate_and_parse(payload, headers)
|
|
print(f"Bearer token validated. Event ID: {event.id}")
|
|
except WebhookValidationError as e:
|
|
print(f"Validation failed: {e}")
|
|
|
|
|
|
# ============================================================================
|
|
# Example 4: Webhook with API Key Authentication
|
|
# ============================================================================
|
|
|
|
from elytra_client.webhooks import APIKeyAuth
|
|
|
|
|
|
def example_api_key_auth():
|
|
"""Validate webhook with API key authentication."""
|
|
|
|
auth = APIKeyAuth(api_key="sk_live_abc123xyz789", header_name="X-API-Key")
|
|
validator = WebhookValidator(auth_type="apikey", auth=auth)
|
|
|
|
payload = {
|
|
"specversion": "1.0",
|
|
"id": "test-event-id",
|
|
"source": "elytra-pim",
|
|
"type": "com.elytra.product",
|
|
"eventtype": "product",
|
|
"operation": "add",
|
|
}
|
|
|
|
headers = {"X-API-Key": "sk_live_abc123xyz789"}
|
|
|
|
try:
|
|
event = validator.validate_and_parse(payload, headers)
|
|
print(f"API key validated. Event ID: {event.id}")
|
|
except WebhookValidationError as e:
|
|
print(f"Validation failed: {e}")
|
|
|
|
|
|
# ============================================================================
|
|
# Example 5: Event Filtering and Routing
|
|
# ============================================================================
|
|
|
|
from elytra_client.webhooks.server import EventHandler, SimpleWebhookEventDispatcher
|
|
|
|
|
|
class MyEventHandler(EventHandler):
|
|
"""Custom event handler for specific event types."""
|
|
|
|
def handle_product_added(self, event: CloudEvent) -> None:
|
|
"""Handle product added events."""
|
|
print(f"🎉 New product added: {event.get_object_id()}")
|
|
|
|
def handle_attribute_value_changed(self, event: CloudEvent) -> None:
|
|
"""Handle attribute value changes."""
|
|
print(f"📝 Attribute value changed for product: {event.get_object_id()}")
|
|
|
|
|
|
def example_event_routing():
|
|
"""Route events to different handlers based on type and operation."""
|
|
|
|
# Create dispatcher and handler
|
|
dispatcher = SimpleWebhookEventDispatcher()
|
|
handler = MyEventHandler()
|
|
|
|
# Register specific handlers
|
|
dispatcher.register_handler(
|
|
callback=handler.handle_product_added,
|
|
event_type=EventType.PRODUCT,
|
|
operation=Operation.ADD,
|
|
)
|
|
|
|
dispatcher.register_handler(
|
|
callback=handler.handle_attribute_value_changed,
|
|
event_type=EventType.PRODUCT_ATTRIBUTE_VALUE,
|
|
)
|
|
|
|
# Register a catch-all handler
|
|
dispatcher.register_default_handler(
|
|
lambda event: print(f"Other event received: {event.eventtype}")
|
|
)
|
|
|
|
# Dispatch events
|
|
event1 = CloudEvent(
|
|
id="1",
|
|
source="elytra-pim",
|
|
type="com.elytra.product",
|
|
eventtype="product",
|
|
operation="add",
|
|
)
|
|
dispatcher.dispatch(event1) # Calls handle_product_added
|
|
|
|
event2 = CloudEvent(
|
|
id="2",
|
|
source="elytra-pim",
|
|
type="com.elytra.product.attribute.value",
|
|
eventtype="product_attribute_value",
|
|
operation="modify",
|
|
)
|
|
dispatcher.dispatch(event2) # Calls handle_attribute_value_changed
|
|
|
|
|
|
# ============================================================================
|
|
# Example 6: Retry Configuration and Execution
|
|
# ============================================================================
|
|
|
|
import requests
|
|
|
|
from elytra_client.webhooks import ExponentialBackoffRetry, RetryConfig, execute_with_retry
|
|
|
|
|
|
def example_retry_with_webhook_delivery():
|
|
"""Retry webhook delivery with exponential backoff."""
|
|
|
|
# Configure retry policy
|
|
retry_config = RetryConfig(
|
|
max_retries=5,
|
|
initial_delay=1000, # 1 second
|
|
backoff_multiplier=2.0,
|
|
max_delay=300000, # 5 minutes
|
|
)
|
|
|
|
retry_policy = ExponentialBackoffRetry(retry_config)
|
|
|
|
def send_webhook(url: str, data: dict) -> requests.Response:
|
|
"""Send webhook with retry on failure."""
|
|
return requests.post(url, json=data, timeout=10)
|
|
|
|
try:
|
|
# This will retry with exponential backoff on failure
|
|
response = execute_with_retry(
|
|
send_webhook,
|
|
retry_policy,
|
|
url="https://example.com/webhook",
|
|
data={"event": "product_added"},
|
|
)
|
|
print(f"Webhook sent successfully: {response.status_code}")
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Webhook delivery failed after retries: {e}")
|
|
|
|
|
|
# ============================================================================
|
|
# Example 7: Flask Integration
|
|
# ============================================================================
|
|
|
|
def example_flask_integration():
|
|
"""Flask webhook endpoint with authentication and validation."""
|
|
|
|
from flask import Flask, jsonify, request
|
|
|
|
from elytra_client.webhooks.server import FlaskWebhookExample
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Setup webhook handler with authentication
|
|
auth = BasicAuth(username="webhook_user", password="webhook_pass")
|
|
validator = WebhookValidator(auth_type="basic", auth=auth)
|
|
webhook_handler = FlaskWebhookExample(validator)
|
|
|
|
@app.route("/webhook/elytra", methods=["POST"])
|
|
def handle_elytra_webhook():
|
|
"""Handle incoming Elytra webhook events."""
|
|
response, status_code = webhook_handler.handle_request(
|
|
request.json, dict(request.headers)
|
|
)
|
|
return jsonify(response), status_code
|
|
|
|
return app
|
|
|
|
|
|
# ============================================================================
|
|
# Example 8: FastAPI Integration
|
|
# ============================================================================
|
|
|
|
def example_fastapi_integration():
|
|
"""FastAPI webhook endpoint with authentication and validation."""
|
|
|
|
from fastapi import FastAPI, Request
|
|
|
|
from elytra_client.webhooks.server import FastAPIWebhookExample
|
|
|
|
app = FastAPI()
|
|
|
|
# Setup webhook handler with API key authentication
|
|
auth = APIKeyAuth(api_key="sk_live_secret123", header_name="X-Webhook-Key")
|
|
validator = WebhookValidator(auth_type="apikey", auth=auth)
|
|
dispatcher = SimpleWebhookEventDispatcher()
|
|
webhook_handler = FastAPIWebhookExample(validator, dispatcher)
|
|
|
|
@app.post("/webhook/elytra")
|
|
async def handle_elytra_webhook(request: Request):
|
|
"""Handle incoming Elytra webhook events."""
|
|
payload = await request.json()
|
|
return webhook_handler.handle_request(payload, dict(request.headers))
|
|
|
|
return app
|
|
|
|
|
|
# ============================================================================
|
|
# Example 9: Complete Production Example
|
|
# ============================================================================
|
|
|
|
def example_complete_setup():
|
|
"""Complete production-ready setup with all features."""
|
|
|
|
from elytra_client.webhooks import BasicAuth, EventType, Operation, WebhookValidator
|
|
from elytra_client.webhooks.server import FlaskWebhookExample, SimpleWebhookEventDispatcher
|
|
|
|
# 1. Setup authentication
|
|
auth = BasicAuth(username="elytra_webhook", password="secure_password_here")
|
|
|
|
# 2. Create validator
|
|
validator = WebhookValidator(auth_type="basic", auth=auth)
|
|
|
|
# 3. Create event dispatcher
|
|
dispatcher = SimpleWebhookEventDispatcher()
|
|
|
|
# 4. Register handlers for different event types
|
|
def on_product_added(event: CloudEvent) -> None:
|
|
print(f"Product {event.get_object_id()} was added")
|
|
# TODO: Implement your product sync logic
|
|
|
|
def on_attribute_changed(event: CloudEvent) -> None:
|
|
print(f"Attribute changed for product {event.get_object_id()}")
|
|
# TODO: Implement your attribute sync logic
|
|
|
|
dispatcher.register_handler(
|
|
on_product_added, EventType.PRODUCT, Operation.ADD
|
|
)
|
|
dispatcher.register_handler(
|
|
on_attribute_changed, EventType.PRODUCT_ATTRIBUTE_VALUE
|
|
)
|
|
|
|
# 5. Create Flask webhook handler
|
|
webhook_handler = FlaskWebhookExample(validator, dispatcher)
|
|
|
|
# 6. Use in Flask
|
|
from flask import Flask, jsonify, request
|
|
|
|
app = Flask(__name__)
|
|
|
|
@app.route("/webhook", methods=["POST"])
|
|
def webhook():
|
|
response, status = webhook_handler.handle_request(
|
|
request.json, dict(request.headers)
|
|
)
|
|
return jsonify(response), status
|
|
|
|
return app
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("Elytra Webhooks Examples")
|
|
print("=" * 60)
|
|
|
|
print("\n1. Simple Parsing:")
|
|
example_simple_parsing()
|
|
|
|
print("\n2. Event Routing:")
|
|
example_event_routing()
|