elytra_client/elytra_client/webhooks/models.py
claudi b8f889f224 feat: Implement webhook retry logic and server examples
- Added retry logic for webhook event delivery in `retry.py` with exponential backoff.
- Created example webhook server implementations in `server.py` for Flask and FastAPI.
- Developed comprehensive examples for using the Elytra webhooks subpackage in `webhook_examples.py`.
- Introduced unit tests for webhook functionality, including event handling, authentication, and retry logic in `test_webhooks.py`.
2026-02-20 10:08:07 +01:00

287 lines
10 KiB
Python

"""Data models for CloudEvents-based webhooks following the CNCF CloudEvents specification.
Reference: https://cloudevents.io/
"""
from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Literal, Optional, Union
from pydantic import BaseModel, Field, HttpUrl, validator
class Operation(str, Enum):
"""Supported event operation types."""
ADD = "add"
MODIFY = "modify"
REMOVE = "remove"
LINK = "link"
UNLINK = "unlink"
class EventType(str, Enum):
"""Elytra Event API event types."""
NONE = "none"
PROJECT = "project"
CUSTOMER = "customer"
TODO = "todo"
USER = "user"
CONVERSATION = "conversation"
USERGROUP = "usergroup"
CALENDAR = "calendar"
RIGHTS = "rights"
LANGUAGE = "language"
DOCUMENT = "document"
DOCUMENTFOLDER = "documentfolder"
PRODUCT_ATTRIBUTES = "product_attributes"
PRODUCT_ATTRIBUTE_GROUPS = "product_attribute_groups"
PRODUCT = "product"
PRODUCT_GROUP = "product_group"
PRODUCT_FOREST = "product_forest"
PRODUCT_ATTRIBUTE_VALUE = "product_attribute_value"
PRODUCT_TEXT = "product_text"
PRODUCT_COLLECTOR = "product_collector"
PRODUCT_LANGUAGE_FALLBACK = "product_language_fallback"
PRODUCT_PICTURE = "product_picture"
PRODUCT_STRUCTURE = "product_structure"
PRODUCT_TEXT_STYLE = "product_text_style"
PRODUCT_LANGUAGE = "product_language"
CATALOG_CONTENT = "catalog_content"
PRODUCT_IMPORT = "product_import"
PRODUCT_SEARCH_FILTER = "product_search_filter"
PRODUCT_PRODUCT_LINK = "product_product_link"
ATTRIBUTE_FORMAT = "attribute_format"
CLASSIFICATION_FEATUREMAPPING = "classification_featuremapping"
CONFIGURATION_FILES = "configuration_files"
WORKFLOW = "workflow"
WORKFLOW_TASK = "workflow_task"
PAGEPLAN_GRID = "pageplan_grid"
MEDIA_EXPLORER = "media_explorer"
ATTRIBUTE_UNIT = "attribute_unit"
JMS_CONNECTION_TEST = "jms_connection_test"
SCHEDULER_TREE_UPDATE = "scheduler_tree_update"
SCHEDULER_ACTIVE_TABLE_UPDATE = "scheduler_active_table_update"
JMS_PERFORMANCE_TEST = "jms_performance_test"
CONFIGFILE_HISTORY_REMOVE = "configfile_history_remove"
MIMETYPE_PLUGIN = "mimetype_plugin"
PAGEPLAN_PREVIEW = "pageplan_preview"
PAGEPLAN = "pageplan"
SHORTCUT = "shortcut"
PAGEPLAN_GRIDELEMENT = "pageplan_gridelement"
INTEGRATION_TEST = "integration_test"
SHORTCUT_DELETEALLFORUSER = "shortcut_deleteallforuser"
OBJECT_WORKFLOW = "object_workflow"
RULESET = "ruleset"
USER_PROPERTIES = "user_properties"
USER_ATTRIBUTEFILTER = "user_attributefilter"
ATTRIBUTE_RULE = "attribute_rule"
SERVER_CACHE_ATTRIBUTRULES_CHANGED = "server_cache_attributrules_changed"
FORMULA_UPDATE = "formula_update"
OBJECT_WORKFLOW_TEMPLATE = "object_workflow_template"
ASSETBROWSER_NODE_EVENT = "assetbrowser_node_event"
CLIENT_TEXT_MESSAGE = "client_text_message"
SESSION_CLOSED = "session_closed"
OBJECT_WORKFLOW_COMMENT = "object_workflow_comment"
ENUMERATION_ATTRIBUTE_CHAIN = "enumeration_attribute_chain"
ENUMERATION_VALUE_CHAIN = "enumeration_value_chain"
USER_FAVORITES = "user_favorites"
OBJECT_COLLECTION = "object_collection"
SCHEDULER_JOB = "scheduler_job"
ATTRIBUTE_UNITTYPE = "attribute_unittype"
PRODUCT_MASS_MODIFY = "product_mass_modify"
PRODUCT_SEARCH_SUBSCRIPTION = "product_search_subscription"
SCHEDULER_JOB_ACTIVE_UPDATE = "scheduler_job_active_update"
CONFIG = "config"
SCHEDULER_JOB_EXECUTION_STATUS_CHANGED = "scheduler_job_execution_status_changed"
INDEXUTILS_CACHE_CHANGED = "indexutils_cache_changed"
class CloudEvent(BaseModel):
"""CloudEvent model following CNCF CloudEvents specification v1.0.
This represents an event that comes from the Elytra PIM Event API.
Attributes:
specversion: CloudEvents specification version (always "1.0")
id: Unique event identifier
source: Identifies the context in which an event happened
type: Describes the type of event in the format: com.elytra.<event_type>
datacontenttype: Content type of data (e.g., "application/json")
time: Timestamp of when the event occurred
eventtype: Elytra-specific event type
operation: Type of operation (add, modify, remove, link, unlink)
data: Event payload containing object data and metadata
subject: Optional subject of the event
"""
specversion: str = Field(default="1.0", description="CloudEvents spec version")
id: str = Field(..., description="Unique event identifier")
source: str = Field(..., description="Event source identifier")
type: str = Field(..., description="Event type in format com.elytra.<type>")
datacontenttype: str = Field(
default="application/json", description="Content type of the data"
)
time: Optional[datetime] = Field(
default=None, description="Timestamp when the event occurred"
)
eventtype: Optional[str] = Field(
default=None, description="Elytra-specific event type"
)
operation: Optional[str] = Field(
default=None, description="Operation type (add, modify, remove, link, unlink)"
)
data: Optional[Dict[str, Any]] = Field(
default=None, description="Event payload data"
)
subject: Optional[str] = Field(default=None, description="Event subject")
dataschema: Optional[str] = Field(default=None, description="Data schema URI")
class Config:
"""Pydantic config."""
json_schema_extra = {
"example": {
"specversion": "1.0",
"id": "385d27ef-ac6d-4d93-904b-ea9801fc1bde",
"source": "elytra-pim",
"type": "com.elytra.product.attribute.value",
"datacontenttype": "application/json",
"time": "2025-12-31T04:19:30.860984129+01:00",
"eventtype": "product_attribute_value",
"operation": "modify",
}
}
def get_event_type(self) -> Optional[EventType]:
"""Extract and return the Elytra event type from the type field."""
if not self.eventtype:
return None
try:
return EventType(self.eventtype)
except ValueError:
return None
def get_operation(self) -> Optional[Operation]:
"""Extract and return the operation type."""
if not self.operation:
return None
try:
return Operation(self.operation)
except ValueError:
return None
def get_object_data(self) -> Optional[Dict[str, Any]]:
"""Extract the objectData from the event payload."""
if not self.data:
return None
return self.data.get("objectData")
def get_object_id(self) -> Optional[Union[str, int]]:
"""Extract the object ID from the event payload."""
if not self.data:
return None
return self.data.get("objectId")
def get_object_type(self) -> Optional[str]:
"""Extract the object type from the event payload."""
if not self.data:
return None
return self.data.get("objectType")
class BasicAuth(BaseModel):
"""HTTP Basic Authentication credentials."""
username: str = Field(..., description="Username for authentication")
password: str = Field(..., description="Password for authentication")
class BearerAuth(BaseModel):
"""HTTP Bearer Token Authentication."""
token: str = Field(..., description="Bearer token for authentication")
class APIKeyAuth(BaseModel):
"""API Key Authentication."""
api_key: str = Field(..., description="API key value")
header_name: str = Field(
default="api-key", description="Header name for the API key"
)
WebhookAuth = Union[BasicAuth, BearerAuth, APIKeyAuth]
class RetryConfig(BaseModel):
"""Webhook retry configuration.
Attributes:
max_retries: Maximum number of retry attempts (default: 5)
initial_delay: Initial delay before retry in milliseconds (default: 1000)
backoff_multiplier: Multiplier for exponential backoff (default: 2.0)
max_delay: Maximum delay between retries in milliseconds (default: 300000 = 5 min)
"""
max_retries: int = Field(default=5, ge=0, description="Maximum retry attempts")
initial_delay: int = Field(
default=1000, ge=100, description="Initial delay in milliseconds"
)
backoff_multiplier: float = Field(
default=2.0, ge=1.0, description="Exponential backoff multiplier"
)
max_delay: int = Field(
default=300000, ge=1000, description="Max delay in milliseconds"
)
class WebhookConfig(BaseModel):
"""Webhook configuration for receiving Elytra events.
Attributes:
url: The webhook URL where events will be posted
method: HTTP method (post, patch, put)
auth_type: Authentication method (none, basic, bearer, apikey)
auth: Authentication credentials (if auth_type is not "none")
event_types: List of event types to subscribe to (empty = all)
operations: List of operations to subscribe to (empty = all)
retry: Retry configuration
"""
url: HttpUrl = Field(..., description="Webhook URL")
method: Literal["post", "patch", "put"] = Field(
default="post", description="HTTP method"
)
auth_type: Literal["none", "basic", "bearer", "apikey"] = Field(
default="none", description="Authentication type"
)
auth: Optional[WebhookAuth] = Field(
default=None, description="Authentication credentials"
)
event_types: list[EventType] = Field(
default_factory=list, description="Event types to subscribe to"
)
operations: list[Operation] = Field(
default_factory=list, description="Operations to subscribe to"
)
retry: RetryConfig = Field(
default_factory=RetryConfig, description="Retry configuration"
)
active: bool = Field(default=True, description="Whether the webhook is active")
@validator("auth", always=True)
def validate_auth(cls, v: Optional[WebhookAuth], values: Dict[str, Any]) -> Optional[WebhookAuth]:
"""Validate that auth is provided when auth_type is not 'none'."""
auth_type = values.get("auth_type")
if auth_type != "none" and v is None:
raise ValueError(
f"auth is required when auth_type is '{auth_type}'"
)
if auth_type == "none" and v is not None:
raise ValueError("auth should be None when auth_type is 'none'")
return v