feat: Enhance Booklooker client with webhook signature verification, idempotency handling, and retry logic

This commit is contained in:
claudi 2026-04-16 14:58:18 +02:00
parent 1d8ee1bba6
commit f2e5774204
6 changed files with 232 additions and 50 deletions

View file

@ -2,18 +2,21 @@ from __future__ import annotations
import os import os
from fastapi import FastAPI, Request from fastapi import FastAPI, HTTPException, Request
from booklooker_client import BooklookerConfig, BooklookerWebhookHelper, SyncBooklookerClient from booklooker_client import BooklookerConfig, BooklookerWebhookHelper, SyncBooklookerClient
app = FastAPI(title="Booklooker webhook receiver") app = FastAPI(title="Booklooker webhook receiver")
helper = BooklookerWebhookHelper() helper = BooklookerWebhookHelper(webhook_secret=os.environ.get("BOOKLOOKER_WEBHOOK_SECRET"))
client = SyncBooklookerClient(BooklookerConfig(api_key=os.environ.get("BOOKLOOKER_API_KEY", "REPLACE_ME"))) client = SyncBooklookerClient(BooklookerConfig(api_key=os.environ.get("BOOKLOOKER_API_KEY", "REPLACE_ME")))
@app.post("/webhooks/booklooker") @app.post("/webhooks/booklooker")
async def receive_booklooker_webhook(request: Request) -> dict: async def receive_booklooker_webhook(request: Request) -> dict:
raw_body = await request.body()
if not helper.validate_request(raw_body, request.headers):
raise HTTPException(status_code=401, detail="Invalid webhook signature")
payload = await request.json() payload = await request.json()
event = helper.enrich_with_client(payload, client) event = helper.enrich_with_client(payload, client)
return {"accepted": True, "event": event.model_dump(mode="json")} return {"accepted": True, "event": event.model_dump(mode="json")}

View file

@ -1,5 +1,7 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import time
from datetime import datetime, timezone from datetime import datetime, timezone
from decimal import Decimal, InvalidOperation from decimal import Decimal, InvalidOperation
from pathlib import Path from pathlib import Path
@ -49,6 +51,10 @@ def _iter_file(path: Path, chunk_size: int = 65536) -> Iterable[bytes]:
yield chunk yield chunk
def _backoff_delay(base_delay: float, attempt: int) -> float:
return max(0.0, base_delay * attempt)
def _parse_article_list(raw: Any, field: ArticleField) -> ArticleList: def _parse_article_list(raw: Any, field: ArticleField) -> ArticleList:
if raw in (None, ""): if raw in (None, ""):
return ArticleList(items=[], field=field, raw=raw) return ArticleList(items=[], field=field, raw=raw)
@ -153,30 +159,43 @@ class _SyncClientBase:
token = self._get_token() token = self._get_token()
request_params.setdefault("token", token.token) request_params.setdefault("token", token.token)
response = self._http.request(method, path, params=request_params, headers=headers, content=content) last_error: Exception | None = None
response.raise_for_status() for attempt in range(1, self.config.max_retries + 2):
envelope = ApiEnvelope.model_validate(response.json()) try:
response = self._http.request(method, path, params=request_params, headers=headers, content=content)
response.raise_for_status()
envelope = ApiEnvelope.model_validate(response.json())
if envelope.status == "OK": if envelope.status == "OK":
if requires_auth and self._token is not None: if requires_auth and self._token is not None:
self._token.acquired_at = _now_utc() self._token.acquired_at = _now_utc()
return envelope.returnValue return envelope.returnValue
code = str(envelope.returnValue) code = str(envelope.returnValue)
if code == "TOKEN_EXPIRED" and requires_auth and retry_on_expired_token and self.config.auto_refresh_token: if code == "TOKEN_EXPIRED" and requires_auth and retry_on_expired_token and self.config.auto_refresh_token:
self._token = None self._token = None
self.authenticate() self.authenticate()
return self._request( return self._request(
method, method,
path, path,
params=params, params=params,
requires_auth=requires_auth, requires_auth=requires_auth,
retry_on_expired_token=False, retry_on_expired_token=False,
headers=headers, headers=headers,
content=content, content=content,
) )
raise_for_error_code(code) raise_for_error_code(code)
except httpx.RequestError as exc:
last_error = exc
if attempt > self.config.max_retries:
raise
time.sleep(_backoff_delay(self.config.retry_backoff_seconds, attempt))
if last_error is not None:
raise last_error
raise RuntimeError("Request handling failed unexpectedly")
class SyncBooklookerClient(_SyncClientBase): class SyncBooklookerClient(_SyncClientBase):
@ -221,6 +240,9 @@ class SyncBooklookerClient(_SyncClientBase):
encoding: str | None = None, encoding: str | None = None,
) -> UploadReceipt: ) -> UploadReceipt:
path = Path(file_path) path = Path(file_path)
if path.stat().st_size > self.config.max_upload_size_bytes:
raise ValueError("File exceeds configured Booklooker upload size limit")
params: dict[str, Any] = { params: dict[str, Any] = {
"fileType": file_type, "fileType": file_type,
"dataType": data_type, "dataType": data_type,
@ -244,6 +266,23 @@ class SyncBooklookerClient(_SyncClientBase):
raw = self._request("GET", "/file_status", params={"filename": filename, "showErrors": int(show_errors)}) raw = self._request("GET", "/file_status", params={"filename": filename, "showErrors": int(show_errors)})
return _parse_file_status(filename, raw, show_errors) return _parse_file_status(filename, raw, show_errors)
def wait_for_file_processing(
self,
filename: str,
*,
timeout_seconds: float = 300,
poll_interval_seconds: float = 2.0,
include_errors: bool = False,
) -> FileStatusResult:
deadline = time.time() + timeout_seconds
last_status = FileStatusResult(filename=filename, state="UNKNOWN")
while time.time() < deadline:
last_status = self.get_file_status(filename, show_errors=include_errors)
if last_status.state in {"UPLOAD_DONE", "UPLOAD_FAILED"} or (include_errors and last_status.errors):
return last_status
time.sleep(max(0.0, poll_interval_seconds))
return last_status
def get_import_status(self) -> ImportQueueStatus: def get_import_status(self) -> ImportQueueStatus:
raw = self._request("GET", "/import_status") raw = self._request("GET", "/import_status")
try: try:
@ -357,30 +396,43 @@ class AsyncBooklookerClient:
token = await self._get_token() token = await self._get_token()
request_params.setdefault("token", token.token) request_params.setdefault("token", token.token)
response = await self._http.request(method, path, params=request_params, headers=headers, content=content) last_error: Exception | None = None
response.raise_for_status() for attempt in range(1, self.config.max_retries + 2):
envelope = ApiEnvelope.model_validate(response.json()) try:
response = await self._http.request(method, path, params=request_params, headers=headers, content=content)
response.raise_for_status()
envelope = ApiEnvelope.model_validate(response.json())
if envelope.status == "OK": if envelope.status == "OK":
if requires_auth and self._token is not None: if requires_auth and self._token is not None:
self._token.acquired_at = _now_utc() self._token.acquired_at = _now_utc()
return envelope.returnValue return envelope.returnValue
code = str(envelope.returnValue) code = str(envelope.returnValue)
if code == "TOKEN_EXPIRED" and requires_auth and retry_on_expired_token and self.config.auto_refresh_token: if code == "TOKEN_EXPIRED" and requires_auth and retry_on_expired_token and self.config.auto_refresh_token:
self._token = None self._token = None
await self.authenticate() await self.authenticate()
return await self._request( return await self._request(
method, method,
path, path,
params=params, params=params,
requires_auth=requires_auth, requires_auth=requires_auth,
retry_on_expired_token=False, retry_on_expired_token=False,
headers=headers, headers=headers,
content=content, content=content,
) )
raise_for_error_code(code) raise_for_error_code(code)
except httpx.RequestError as exc:
last_error = exc
if attempt > self.config.max_retries:
raise
await asyncio.sleep(_backoff_delay(self.config.retry_backoff_seconds, attempt))
if last_error is not None:
raise last_error
raise RuntimeError("Request handling failed unexpectedly")
async def get_article_list( async def get_article_list(
self, self,
@ -423,6 +475,9 @@ class AsyncBooklookerClient:
encoding: str | None = None, encoding: str | None = None,
) -> UploadReceipt: ) -> UploadReceipt:
path = Path(file_path) path = Path(file_path)
if path.stat().st_size > self.config.max_upload_size_bytes:
raise ValueError("File exceeds configured Booklooker upload size limit")
params: dict[str, Any] = { params: dict[str, Any] = {
"fileType": file_type, "fileType": file_type,
"dataType": data_type, "dataType": data_type,
@ -446,6 +501,23 @@ class AsyncBooklookerClient:
raw = await self._request("GET", "/file_status", params={"filename": filename, "showErrors": int(show_errors)}) raw = await self._request("GET", "/file_status", params={"filename": filename, "showErrors": int(show_errors)})
return _parse_file_status(filename, raw, show_errors) return _parse_file_status(filename, raw, show_errors)
async def wait_for_file_processing(
self,
filename: str,
*,
timeout_seconds: float = 300,
poll_interval_seconds: float = 2.0,
include_errors: bool = False,
) -> FileStatusResult:
deadline = time.time() + timeout_seconds
last_status = FileStatusResult(filename=filename, state="UNKNOWN")
while time.time() < deadline:
last_status = await self.get_file_status(filename, show_errors=include_errors)
if last_status.state in {"UPLOAD_DONE", "UPLOAD_FAILED"} or (include_errors and last_status.errors):
return last_status
await asyncio.sleep(max(0.0, poll_interval_seconds))
return last_status
async def get_import_status(self) -> ImportQueueStatus: async def get_import_status(self) -> ImportQueueStatus:
raw = await self._request("GET", "/import_status") raw = await self._request("GET", "/import_status")
try: try:

View file

@ -16,4 +16,7 @@ class BooklookerConfig(BaseModel):
user_agent: str = Field(default="booklooker-client/0.1.0") user_agent: str = Field(default="booklooker-client/0.1.0")
auto_refresh_token: bool = Field(default=True) auto_refresh_token: bool = Field(default=True)
token_idle_timeout_seconds: int = Field(default=600, ge=60) token_idle_timeout_seconds: int = Field(default=600, ge=60)
max_retries: int = Field(default=2, ge=0, le=10)
retry_backoff_seconds: float = Field(default=0.5, ge=0)
max_upload_size_bytes: int = Field(default=80 * 1024 * 1024, gt=0)
openapi_path: Path = Field(default_factory=lambda: Path(__file__).resolve().parents[2] / "openapi.yaml") openapi_path: Path = Field(default_factory=lambda: Path(__file__).resolve().parents[2] / "openapi.yaml")

View file

@ -1,30 +1,78 @@
from __future__ import annotations from __future__ import annotations
import hashlib
import hmac
import time
from collections.abc import Mapping
from typing import Any from typing import Any
from .models.webhook import MiddlewareEvent, WebhookEvent from .models.webhook import MiddlewareEvent, WebhookEvent
class InMemoryIdempotencyStore: class InMemoryIdempotencyStore:
def __init__(self) -> None: def __init__(self, ttl_seconds: float = 24 * 60 * 60) -> None:
self._seen: set[str] = set() self.ttl_seconds = ttl_seconds
self._seen: dict[str, float] = {}
def _purge_expired(self) -> None:
now = time.time()
expired = [event_id for event_id, expires_at in self._seen.items() if expires_at <= now]
for event_id in expired:
self._seen.pop(event_id, None)
def has_seen(self, event_id: str) -> bool: def has_seen(self, event_id: str) -> bool:
self._purge_expired()
return event_id in self._seen return event_id in self._seen
def mark_seen(self, event_id: str) -> None: def mark_seen(self, event_id: str) -> None:
self._seen.add(event_id) expires_at = time.time() + self.ttl_seconds
self._seen[event_id] = expires_at
class BooklookerWebhookHelper: class BooklookerWebhookHelper:
"""Utility toolbox for parsing and enriching Booklooker push payloads.""" """Utility toolbox for parsing and enriching Booklooker push payloads."""
def __init__(self, idempotency_store: InMemoryIdempotencyStore | None = None) -> None: def __init__(
self,
idempotency_store: InMemoryIdempotencyStore | None = None,
webhook_secret: str | None = None,
signature_header_names: tuple[str, ...] = (
"x-booklooker-signature",
"x-signature",
"x-hub-signature-256",
),
) -> None:
self.idempotency_store = idempotency_store or InMemoryIdempotencyStore() self.idempotency_store = idempotency_store or InMemoryIdempotencyStore()
self.webhook_secret = webhook_secret
self.signature_header_names = tuple(name.lower() for name in signature_header_names)
def parse_event(self, payload: dict[str, Any]) -> WebhookEvent: def parse_event(self, payload: dict[str, Any]) -> WebhookEvent:
return WebhookEvent.model_validate(payload) return WebhookEvent.model_validate(payload)
def get_signature_from_headers(self, headers: Mapping[str, Any] | None) -> str | None:
if not headers:
return None
lowered = {str(key).lower(): str(value) for key, value in headers.items()}
for name in self.signature_header_names:
if name in lowered:
return lowered[name]
return None
def verify_signature(self, payload: bytes, signature: str | None) -> bool:
if self.webhook_secret is None:
return True
if not signature:
return False
normalized = signature.split("=", 1)[-1].strip().lower()
expected = hmac.new(self.webhook_secret.encode("utf-8"), payload, hashlib.sha256).hexdigest().lower()
return hmac.compare_digest(normalized, expected)
def validate_request(self, payload: bytes, headers: Mapping[str, Any] | None = None) -> bool:
signature = self.get_signature_from_headers(headers)
return self.verify_signature(payload, signature)
def is_duplicate(self, event: WebhookEvent) -> bool: def is_duplicate(self, event: WebhookEvent) -> bool:
return self.idempotency_store.has_seen(event.event_id) return self.idempotency_store.has_seen(event.event_id)
@ -98,4 +146,4 @@ class BooklookerWebhookHelper:
return self.to_middleware_event(event, resource_type=resource_type, enriched_data=enriched) return self.to_middleware_event(event, resource_type=resource_type, enriched_data=enriched)
def fastapi_receiver_snippet(self, route: str = "/webhooks/booklooker") -> str: def fastapi_receiver_snippet(self, route: str = "/webhooks/booklooker") -> str:
return f'''from fastapi import FastAPI, Request\nfrom booklooker_client import BooklookerConfig, SyncBooklookerClient, BooklookerWebhookHelper\n\napp = FastAPI()\nhelper = BooklookerWebhookHelper()\nclient = SyncBooklookerClient(BooklookerConfig(api_key="YOUR_API_KEY"))\n\n@app.post("{route}")\nasync def receive_booklooker_webhook(request: Request):\n payload = await request.json()\n event = helper.enrich_with_client(payload, client)\n return {{"accepted": True, "event": event.model_dump(mode="json")}}\n''' return f'''import os\n\nfrom fastapi import FastAPI, HTTPException, Request\nfrom booklooker_client import BooklookerConfig, SyncBooklookerClient, BooklookerWebhookHelper\n\napp = FastAPI()\nhelper = BooklookerWebhookHelper(webhook_secret=os.environ.get("BOOKLOOKER_WEBHOOK_SECRET"))\nclient = SyncBooklookerClient(BooklookerConfig(api_key=os.environ.get("BOOKLOOKER_API_KEY", "YOUR_API_KEY")))\n\n@app.post("{route}")\nasync def receive_booklooker_webhook(request: Request):\n raw_body = await request.body()\n if not helper.validate_request(raw_body, request.headers):\n raise HTTPException(status_code=401, detail="Invalid webhook signature")\n payload = await request.json()\n event = helper.enrich_with_client(payload, client)\n return {{"accepted": True, "event": event.model_dump(mode="json")}}\n'''

50
tests/test_resilience.py Normal file
View file

@ -0,0 +1,50 @@
import hashlib
import hmac
import httpx
import pytest
from booklooker_client import BooklookerConfig, BooklookerWebhookHelper, SyncBooklookerClient
def test_sync_client_retries_transient_timeout() -> None:
attempts = {"count": 0}
def handler(request: httpx.Request) -> httpx.Response:
attempts["count"] += 1
if attempts["count"] == 1:
raise httpx.ReadTimeout("temporary timeout", request=request)
return httpx.Response(200, json={"status": "OK", "returnValue": "REST_API_TOKEN"})
config = BooklookerConfig(api_key="demo", max_retries=1, retry_backoff_seconds=0)
client = SyncBooklookerClient(config)
client._http = httpx.Client(transport=httpx.MockTransport(handler), base_url=config.base_url)
try:
token = client.authenticate()
finally:
client.close()
assert token.token == "REST_API_TOKEN"
assert attempts["count"] == 2
def test_webhook_signature_verification() -> None:
payload = b'{"event_type":"order.created","event_id":"evt-10"}'
secret = "top-secret"
signature = hmac.new(secret.encode("utf-8"), payload, hashlib.sha256).hexdigest()
helper = BooklookerWebhookHelper(webhook_secret=secret)
assert helper.verify_signature(payload, signature) is True
def test_import_file_rejects_oversized_input(tmp_path) -> None:
file_path = tmp_path / "payload.bin"
file_path.write_bytes(b"1234")
client = SyncBooklookerClient(BooklookerConfig(api_key="demo", max_upload_size_bytes=1))
try:
with pytest.raises(ValueError):
client.import_file(file_path, data_type=0)
finally:
client.close()

View file

@ -1,4 +1,4 @@
from booklooker_client import BooklookerWebhookHelper from booklooker_client import BooklookerWebhookHelper, InMemoryIdempotencyStore
from booklooker_client.models.order import OrderBatch, OrderRecord from booklooker_client.models.order import OrderBatch, OrderRecord
@ -39,3 +39,9 @@ def test_duplicate_detection() -> None:
assert first.resource_type == "order" assert first.resource_type == "order"
assert second.resource_type == "duplicate" assert second.resource_type == "duplicate"
assert second.enriched_data == {"duplicate": True} assert second.enriched_data == {"duplicate": True}
def test_idempotency_store_ttl_expiry() -> None:
store = InMemoryIdempotencyStore(ttl_seconds=0)
store.mark_seen("evt-expire")
assert store.has_seen("evt-expire") is False