| .. | ||
| __init__.py | ||
| auth.py | ||
| handlers.py | ||
| models.py | ||
| README.md | ||
| retry.py | ||
| server.py | ||
"""
Elytra Webhooks Subpackage Documentation
Overview
The elytra_client.webhooks subpackage provides comprehensive tools for handling CloudEvents-based webhooks from the Elytra PIM Event API. It includes:
- CloudEvents Models: Strongly-typed Pydantic models for CloudEvents specification
- Authentication: Support for Basic, Bearer, and API Key authentication
- Validation: Robust webhook payload validation
- Event Handling: Event dispatching and routing
- Retry Logic: Exponential backoff retry policies
- Framework Integration: Ready-to-use examples for Flask and FastAPI
Installation
The webhooks subpackage is included with the elytra-pim-client package. No additional installation is needed.
from elytra_client.webhooks import CloudEvent, parse_webhook_payload
Quick Start
Basic Event Parsing
Parse incoming webhook payloads without authentication validation:
from elytra_client.webhooks import parse_webhook_payload
# In your webhook endpoint
event = parse_webhook_payload(request.json())
print(f"Event type: {event.eventtype}")
print(f"Object ID: {event.get_object_id()}")
With Authentication
Add authentication validation:
from elytra_client.webhooks import (
WebhookValidator,
BasicAuth
)
auth = BasicAuth(username="user", password="pass")
validator = WebhookValidator(auth_type="basic", auth=auth)
event = validator.validate_and_parse(
payload=request.json(),
headers=dict(request.headers)
)
Core Components
CloudEvent Model
The CloudEvent class represents an event following the CNCF CloudEvents specification v1.0.
Key Fields:
specversion: CloudEvents spec version (always "1.0")id: Unique event identifiersource: Event source (e.g., "elytra-pim")type: Event type in formatcom.elytra.<event_type>eventtype: Elytra-specific event type (product, product_attribute_value, etc.)operation: Operation type (add, modify, remove, link, unlink)data: Event payload containing object data and metadata
Useful Methods:
get_event_type(): Returns parsed EventType enumget_operation(): Returns parsed Operation enumget_object_data(): Extracts objectData from payloadget_object_id(): Extracts object ID from payloadget_object_type(): Extracts object type from payload
Event Types and Operations
EventType Enum includes all Elytra event types:
- Product events:
PRODUCT,PRODUCT_GROUP,PRODUCT_FOREST, etc. - Attribute events:
PRODUCT_ATTRIBUTES,PRODUCT_ATTRIBUTE_VALUE, etc. - Structure events:
PRODUCT_STRUCTURE,PRODUCT_TEXT, etc. - And many more...
Operation Enum includes standard operations:
ADD: Object was createdMODIFY: Object was modifiedREMOVE: Object was deletedLINK: Objects were linkedUNLINK: Objects were unlinked
Authentication
The package supports three authentication methods:
Basic Authentication
from elytra_client.webhooks import BasicAuth, WebhookValidator
auth = BasicAuth(username="admin", password="secret")
validator = WebhookValidator(auth_type="basic", auth=auth)
Bearer Token Authentication
from elytra_client.webhooks import BearerAuth, WebhookValidator
auth = BearerAuth(token="eyJhbGciOiJIUzI1NiIs...")
validator = WebhookValidator(auth_type="bearer", auth=auth)
API Key Authentication
from elytra_client.webhooks import APIKeyAuth, WebhookValidator
auth = APIKeyAuth(
api_key="sk_live_secret123",
header_name="X-API-Key" # Custom header name
)
validator = WebhookValidator(auth_type="apikey", auth=auth)
Webhook Validation
The WebhookValidator class handles authentication and payload validation:
from elytra_client.webhooks import (
WebhookValidator,
WebhookValidationError,
BasicAuth
)
auth = BasicAuth(username="user", password="pass")
validator = WebhookValidator(auth_type="basic", auth=auth)
try:
# Validate authentication and CloudEvent format
event = validator.validate_and_parse(
payload=incoming_json,
headers=incoming_headers
)
print(f"Valid event received: {event.id}")
except WebhookValidationError as e:
print(f"Validation failed: {e}")
Event Routing and Dispatching
Use SimpleWebhookEventDispatcher to route events to different handlers:
from elytra_client.webhooks import EventType, Operation
from elytra_client.webhooks.server import SimpleWebhookEventDispatcher
dispatcher = SimpleWebhookEventDispatcher()
# Register handler for specific event type and operation
dispatcher.register_handler(
callback=handle_product_added,
event_type=EventType.PRODUCT,
operation=Operation.ADD
)
# Register handler for all operations of a type
dispatcher.register_handler(
callback=handle_product_changes,
event_type=EventType.PRODUCT
)
# Register default fallback handler
dispatcher.register_default_handler(handle_other_events)
# Dispatch events
dispatcher.dispatch(event)
Retry Logic
Configure and use retry policies for webhook delivery:
from elytra_client.webhooks import (
RetryConfig,
ExponentialBackoffRetry,
execute_with_retry
)
# Configure retry policy
retry_config = RetryConfig(
max_retries=5,
initial_delay=1000, # 1 second
backoff_multiplier=2.0,
max_delay=300000 # 5 minutes max
)
policy = ExponentialBackoffRetry(retry_config)
# Execute function with retries
try:
result = execute_with_retry(
some_function,
policy,
arg1="value"
)
except Exception as e:
print(f"Failed after retries: {e}")
Retry Delay Calculation:
The delay follows exponential backoff: delay = min(initial_delay * (multiplier ^ attempt), max_delay)
Examples:
- Attempt 0: 1000ms (1 second)
- Attempt 1: 2000ms (2 seconds)
- Attempt 2: 4000ms (4 seconds)
- Attempt 3: 8000ms (8 seconds)
- etc., capped at max_delay
Framework Integration
Flask Example
from flask import Flask, request, jsonify
from elytra_client.webhooks import BasicAuth, WebhookValidator
from elytra_client.webhooks.server import FlaskWebhookExample
app = Flask(__name__)
# Setup
auth = BasicAuth(username="webhook_user", password="secret")
validator = WebhookValidator(auth_type="basic", auth=auth)
webhook_handler = FlaskWebhookExample(validator)
@app.route("/webhook", methods=["POST"])
def handle_webhook():
response, status = webhook_handler.handle_request(
request.json,
dict(request.headers)
)
return jsonify(response), status
if __name__ == "__main__":
app.run(port=5000)
FastAPI Example
from fastapi import FastAPI, Request
from elytra_client.webhooks import APIKeyAuth, WebhookValidator
from elytra_client.webhooks.server import FastAPIWebhookExample
app = FastAPI()
# Setup
auth = APIKeyAuth(api_key="sk_live_secret", header_name="X-API-Key")
validator = WebhookValidator(auth_type="apikey", auth=auth)
webhook_handler = FastAPIWebhookExample(validator)
@app.post("/webhook")
async def handle_webhook(request: Request):
payload = await request.json()
return webhook_handler.handle_request(
payload,
dict(request.headers)
)
Complete Production Example
from flask import Flask, request, jsonify
from elytra_client.webhooks import (
BasicAuth,
WebhookValidator,
EventType,
Operation
)
from elytra_client.webhooks.server import (
FlaskWebhookExample,
SimpleWebhookEventDispatcher
)
app = Flask(__name__)
# Setup authentication
auth = BasicAuth(username="elytra", password="secure_password")
validator = WebhookValidator(auth_type="basic", auth=auth)
# Setup event routing
dispatcher = SimpleWebhookEventDispatcher()
def sync_product_to_db(event):
product_id = event.get_object_id()
print(f"Syncing product {product_id} to database...")
# Your sync logic here
dispatcher.register_handler(
sync_product_to_db,
EventType.PRODUCT,
Operation.ADD
)
# Setup webhook handler
webhook_handler = FlaskWebhookExample(validator, dispatcher)
@app.route("/webhook/elytra", methods=["POST"])
def elytra_webhook():
response, status = webhook_handler.handle_request(
request.json,
dict(request.headers)
)
return jsonify(response), status
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False)
Configuration XML Reference
While this subpackage handles receiving webhooks, you'll configure them in Elytra's web client. Here's what the XML configuration looks like:
<event-consumer>
<type>webhook</type>
<events>
<event>
<types>
<type>product</type>
<type>product_attribute_value</type>
</types>
<operations>
<operation>add</operation>
<operation>modify</operation>
<operation>remove</operation>
</operations>
<webhook>
<url>https://your-app.com/webhook/elytra</url>
<method>post</method>
<auth-type>basic</auth-type>
<username>elytra</username>
<password>secure_password</password>
</webhook>
</event>
</events>
<retry maxRetries="5" initialDelay="1000" backoffMultiplier="2.0"/>
<cloudevents source="elytra-pim" typePrefix="com.elytra"/>
</event-consumer>
Error Handling
The main exception to handle is WebhookValidationError:
from elytra_client.webhooks import WebhookValidationError
try:
event = validator.validate_and_parse(payload, headers)
except WebhookValidationError as e:
# Handle validation error
log.error(f"Webhook validation failed: {e}")
return {"status": "error"}, 400
Best Practices
- Always Validate: Use
WebhookValidatorto validate both authentication and event structure - Use Type Hints: Leverage the strongly-typed models for better IDE support
- Handle Errors: Implement proper error handling and logging
- Implement Idempotency: Since webhooks may be retried, ensure your handlers are idempotent
- Acknowledge Quickly: Return success response immediately; do heavy processing async
- Log Events: Log all webhook events for debugging and audit trails
- Secure Credentials: Store authentication credentials securely (environment variables, secrets manager)
- Use HTTPS: Always use HTTPS URLs for webhook endpoints
- Implement Timeouts: Set reasonable timeouts for webhook processing
- Monitor: Set up monitoring and alerts for webhook failures
API Reference
CloudEvent
specversion: str- CloudEvents spec versionid: str- Unique event identifiersource: str- Event sourcetype: str- Event type (com.elytra.*)datacontenttype: str- Content typetime: datetime | None- Event timestampeventtype: str | None- Elytra event typeoperation: str | None- Operation typedata: dict | None- Event payloadsubject: str | None- Event subjectdataschema: str | None- Data schema URI
WebhookValidator
validate(payload, headers=None) -> bool- Validate without parsingparse(payload) -> CloudEvent- Parse without validating authvalidate_and_parse(payload, headers=None) -> CloudEvent- Full validation and parsing
RetryConfig
max_retries: int(default: 5) - Maximum retry attemptsinitial_delay: int(default: 1000) - Initial delay in millisecondsbackoff_multiplier: float(default: 2.0) - Exponential backoff multipliermax_delay: int(default: 300000) - Maximum delay in milliseconds
See Also
- CloudEvents Specification
- Elytra PIM Documentation
- Elytra Event API Documentation
- examples/webhook_examples.py - Comprehensive usage examples """