elytra_client/elytra_client/webhooks
2026-02-20 10:23:53 +01:00
..
__init__.py feat: Implement webhook retry logic and server examples 2026-02-20 10:08:07 +01:00
auth.py feat: Implement webhook retry logic and server examples 2026-02-20 10:08:07 +01:00
handlers.py feat: Implement webhook retry logic and server examples 2026-02-20 10:08:07 +01:00
models.py feat: Implement webhook retry logic and server examples 2026-02-20 10:08:07 +01:00
README.md docs: Update README with new API documentation links and contact email 2026-02-20 10:23:53 +01:00
retry.py feat: Implement webhook retry logic and server examples 2026-02-20 10:08:07 +01:00
server.py feat: Implement webhook retry logic and server examples 2026-02-20 10:08:07 +01:00

"""

Elytra Webhooks Subpackage Documentation

Overview

The elytra_client.webhooks subpackage provides comprehensive tools for handling CloudEvents-based webhooks from the Elytra PIM Event API. It includes:

  • CloudEvents Models: Strongly-typed Pydantic models for CloudEvents specification
  • Authentication: Support for Basic, Bearer, and API Key authentication
  • Validation: Robust webhook payload validation
  • Event Handling: Event dispatching and routing
  • Retry Logic: Exponential backoff retry policies
  • Framework Integration: Ready-to-use examples for Flask and FastAPI

Installation

The webhooks subpackage is included with the elytra-pim-client package. No additional installation is needed.

from elytra_client.webhooks import CloudEvent, parse_webhook_payload

Quick Start

Basic Event Parsing

Parse incoming webhook payloads without authentication validation:

from elytra_client.webhooks import parse_webhook_payload

# In your webhook endpoint
event = parse_webhook_payload(request.json())
print(f"Event type: {event.eventtype}")
print(f"Object ID: {event.get_object_id()}")

With Authentication

Add authentication validation:

from elytra_client.webhooks import (
    WebhookValidator,
    BasicAuth
)

auth = BasicAuth(username="user", password="pass")
validator = WebhookValidator(auth_type="basic", auth=auth)

event = validator.validate_and_parse(
    payload=request.json(),
    headers=dict(request.headers)
)

Core Components

CloudEvent Model

The CloudEvent class represents an event following the CNCF CloudEvents specification v1.0.

Key Fields:

  • specversion: CloudEvents spec version (always "1.0")
  • id: Unique event identifier
  • source: Event source (e.g., "elytra-pim")
  • type: Event type in format com.elytra.<event_type>
  • eventtype: Elytra-specific event type (product, product_attribute_value, etc.)
  • operation: Operation type (add, modify, remove, link, unlink)
  • data: Event payload containing object data and metadata

Useful Methods:

  • get_event_type(): Returns parsed EventType enum
  • get_operation(): Returns parsed Operation enum
  • get_object_data(): Extracts objectData from payload
  • get_object_id(): Extracts object ID from payload
  • get_object_type(): Extracts object type from payload

Event Types and Operations

EventType Enum includes all Elytra event types:

  • Product events: PRODUCT, PRODUCT_GROUP, PRODUCT_FOREST, etc.
  • Attribute events: PRODUCT_ATTRIBUTES, PRODUCT_ATTRIBUTE_VALUE, etc.
  • Structure events: PRODUCT_STRUCTURE, PRODUCT_TEXT, etc.
  • And many more...

Operation Enum includes standard operations:

  • ADD: Object was created
  • MODIFY: Object was modified
  • REMOVE: Object was deleted
  • LINK: Objects were linked
  • UNLINK: Objects were unlinked

Authentication

The package supports three authentication methods:

Basic Authentication

from elytra_client.webhooks import BasicAuth, WebhookValidator

auth = BasicAuth(username="admin", password="secret")
validator = WebhookValidator(auth_type="basic", auth=auth)

Bearer Token Authentication

from elytra_client.webhooks import BearerAuth, WebhookValidator

auth = BearerAuth(token="eyJhbGciOiJIUzI1NiIs...")
validator = WebhookValidator(auth_type="bearer", auth=auth)

API Key Authentication

from elytra_client.webhooks import APIKeyAuth, WebhookValidator

auth = APIKeyAuth(
    api_key="sk_live_secret123",
    header_name="X-API-Key"  # Custom header name
)
validator = WebhookValidator(auth_type="apikey", auth=auth)

Webhook Validation

The WebhookValidator class handles authentication and payload validation:

from elytra_client.webhooks import (
    WebhookValidator,
    WebhookValidationError,
    BasicAuth
)

auth = BasicAuth(username="user", password="pass")
validator = WebhookValidator(auth_type="basic", auth=auth)

try:
    # Validate authentication and CloudEvent format
    event = validator.validate_and_parse(
        payload=incoming_json,
        headers=incoming_headers
    )
    print(f"Valid event received: {event.id}")
except WebhookValidationError as e:
    print(f"Validation failed: {e}")

Event Routing and Dispatching

Use SimpleWebhookEventDispatcher to route events to different handlers:

from elytra_client.webhooks import EventType, Operation
from elytra_client.webhooks.server import SimpleWebhookEventDispatcher

dispatcher = SimpleWebhookEventDispatcher()

# Register handler for specific event type and operation
dispatcher.register_handler(
    callback=handle_product_added,
    event_type=EventType.PRODUCT,
    operation=Operation.ADD
)

# Register handler for all operations of a type
dispatcher.register_handler(
    callback=handle_product_changes,
    event_type=EventType.PRODUCT
)

# Register default fallback handler
dispatcher.register_default_handler(handle_other_events)

# Dispatch events
dispatcher.dispatch(event)

Retry Logic

Configure and use retry policies for webhook delivery:

from elytra_client.webhooks import (
    RetryConfig,
    ExponentialBackoffRetry,
    execute_with_retry
)

# Configure retry policy
retry_config = RetryConfig(
    max_retries=5,
    initial_delay=1000,      # 1 second
    backoff_multiplier=2.0,
    max_delay=300000         # 5 minutes max
)

policy = ExponentialBackoffRetry(retry_config)

# Execute function with retries
try:
    result = execute_with_retry(
        some_function,
        policy,
        arg1="value"
    )
except Exception as e:
    print(f"Failed after retries: {e}")

Retry Delay Calculation: The delay follows exponential backoff: delay = min(initial_delay * (multiplier ^ attempt), max_delay)

Examples:

  • Attempt 0: 1000ms (1 second)
  • Attempt 1: 2000ms (2 seconds)
  • Attempt 2: 4000ms (4 seconds)
  • Attempt 3: 8000ms (8 seconds)
  • etc., capped at max_delay

Framework Integration

Flask Example

from flask import Flask, request, jsonify
from elytra_client.webhooks import BasicAuth, WebhookValidator
from elytra_client.webhooks.server import FlaskWebhookExample

app = Flask(__name__)

# Setup
auth = BasicAuth(username="webhook_user", password="secret")
validator = WebhookValidator(auth_type="basic", auth=auth)
webhook_handler = FlaskWebhookExample(validator)

@app.route("/webhook", methods=["POST"])
def handle_webhook():
    response, status = webhook_handler.handle_request(
        request.json,
        dict(request.headers)
    )
    return jsonify(response), status

if __name__ == "__main__":
    app.run(port=5000)

FastAPI Example

from fastapi import FastAPI, Request
from elytra_client.webhooks import APIKeyAuth, WebhookValidator
from elytra_client.webhooks.server import FastAPIWebhookExample

app = FastAPI()

# Setup
auth = APIKeyAuth(api_key="sk_live_secret", header_name="X-API-Key")
validator = WebhookValidator(auth_type="apikey", auth=auth)
webhook_handler = FastAPIWebhookExample(validator)

@app.post("/webhook")
async def handle_webhook(request: Request):
    payload = await request.json()
    return webhook_handler.handle_request(
        payload,
        dict(request.headers)
    )

Complete Production Example

from flask import Flask, request, jsonify
from elytra_client.webhooks import (
    BasicAuth,
    WebhookValidator,
    EventType,
    Operation
)
from elytra_client.webhooks.server import (
    FlaskWebhookExample,
    SimpleWebhookEventDispatcher
)

app = Flask(__name__)

# Setup authentication
auth = BasicAuth(username="elytra", password="secure_password")
validator = WebhookValidator(auth_type="basic", auth=auth)

# Setup event routing
dispatcher = SimpleWebhookEventDispatcher()

def sync_product_to_db(event):
    product_id = event.get_object_id()
    print(f"Syncing product {product_id} to database...")
    # Your sync logic here

dispatcher.register_handler(
    sync_product_to_db,
    EventType.PRODUCT,
    Operation.ADD
)

# Setup webhook handler
webhook_handler = FlaskWebhookExample(validator, dispatcher)

@app.route("/webhook/elytra", methods=["POST"])
def elytra_webhook():
    response, status = webhook_handler.handle_request(
        request.json,
        dict(request.headers)
    )
    return jsonify(response), status

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000, debug=False)

Configuration XML Reference

While this subpackage handles receiving webhooks, you'll configure them in Elytra's web client. Here's what the XML configuration looks like:

<event-consumer>
    <type>webhook</type>
    <events>
        <event>
            <types>
                <type>product</type>
                <type>product_attribute_value</type>
            </types>
            <operations>
                <operation>add</operation>
                <operation>modify</operation>
                <operation>remove</operation>
            </operations>
            <webhook>
                <url>https://your-app.com/webhook/elytra</url>
                <method>post</method>
                <auth-type>basic</auth-type>
                <username>elytra</username>
                <password>secure_password</password>
            </webhook>
        </event>
    </events>
    <retry maxRetries="5" initialDelay="1000" backoffMultiplier="2.0"/>
    <cloudevents source="elytra-pim" typePrefix="com.elytra"/>
</event-consumer>

Error Handling

The main exception to handle is WebhookValidationError:

from elytra_client.webhooks import WebhookValidationError

try:
    event = validator.validate_and_parse(payload, headers)
except WebhookValidationError as e:
    # Handle validation error
    log.error(f"Webhook validation failed: {e}")
    return {"status": "error"}, 400

Best Practices

  1. Always Validate: Use WebhookValidator to validate both authentication and event structure
  2. Use Type Hints: Leverage the strongly-typed models for better IDE support
  3. Handle Errors: Implement proper error handling and logging
  4. Implement Idempotency: Since webhooks may be retried, ensure your handlers are idempotent
  5. Acknowledge Quickly: Return success response immediately; do heavy processing async
  6. Log Events: Log all webhook events for debugging and audit trails
  7. Secure Credentials: Store authentication credentials securely (environment variables, secrets manager)
  8. Use HTTPS: Always use HTTPS URLs for webhook endpoints
  9. Implement Timeouts: Set reasonable timeouts for webhook processing
  10. Monitor: Set up monitoring and alerts for webhook failures

API Reference

CloudEvent

  • specversion: str - CloudEvents spec version
  • id: str - Unique event identifier
  • source: str - Event source
  • type: str - Event type (com.elytra.*)
  • datacontenttype: str - Content type
  • time: datetime | None - Event timestamp
  • eventtype: str | None - Elytra event type
  • operation: str | None - Operation type
  • data: dict | None - Event payload
  • subject: str | None - Event subject
  • dataschema: str | None - Data schema URI

WebhookValidator

  • validate(payload, headers=None) -> bool - Validate without parsing
  • parse(payload) -> CloudEvent - Parse without validating auth
  • validate_and_parse(payload, headers=None) -> CloudEvent - Full validation and parsing

RetryConfig

  • max_retries: int (default: 5) - Maximum retry attempts
  • initial_delay: int (default: 1000) - Initial delay in milliseconds
  • backoff_multiplier: float (default: 2.0) - Exponential backoff multiplier
  • max_delay: int (default: 300000) - Maximum delay in milliseconds

See Also