feat: Add Booklooker client configuration and exception handling

- Introduced BooklookerConfig class for runtime configuration management.
- Created custom exceptions for API errors, including authentication and validation errors.
- Generated API contracts from OpenAPI specification, including endpoints and security schemes.
- Implemented models for articles, orders, and webhooks to facilitate data handling.
- Developed a webhook helper for processing and enriching webhook events.
- Added tests for configuration defaults, token expiration, and webhook enrichment.
This commit is contained in:
claudi 2026-04-16 14:42:19 +02:00
commit 1d8ee1bba6
21 changed files with 3009 additions and 0 deletions

9
.gitignore vendored Normal file
View file

@ -0,0 +1,9 @@
__pycache__/
.pytest_cache/
*.pyc
*.pyo
*.pyd
.venv/
dist/
build/
*.egg-info/

36
README.md Normal file
View file

@ -0,0 +1,36 @@
# booklooker-client
A Python client for the Booklooker REST API designed for middleware and connector-hub scenarios.
## Highlights
- Generated API contract derived from [openapi.yaml](openapi.yaml)
- Sync and async client entrypoints
- Pydantic models for normalized responses
- Token auto-refresh handling
- Webhook helper toolbox for Push-API style events
- FastAPI example receiver and enrichment workflow
## Installation
```bash
pip install -e .[dev,webhooks]
```
## Quick start
```python
from booklooker_client import BooklookerConfig, SyncBooklookerClient
config = BooklookerConfig(api_key="YOUR_API_KEY")
client = SyncBooklookerClient(config)
token = client.authenticate()
articles = client.get_article_list()
print(token.token)
print(articles.items)
```
## Notes
Booklooker wraps nearly all responses into a generic envelope with `status` and `returnValue`. This package normalizes those responses into typed Pydantic models so downstream middleware can work with stable structures.

14
examples/sync_usage.py Normal file
View file

@ -0,0 +1,14 @@
from __future__ import annotations
import os
from booklooker_client import BooklookerConfig, SyncBooklookerClient
api_key = os.environ.get("BOOKLOOKER_API_KEY", "REPLACE_ME")
config = BooklookerConfig(api_key=api_key)
with SyncBooklookerClient(config) as client:
token = client.authenticate()
print("Token acquired:", token.token)
print("Available endpoints:", client.available_endpoints)

View file

@ -0,0 +1,19 @@
from __future__ import annotations
import os
from fastapi import FastAPI, Request
from booklooker_client import BooklookerConfig, BooklookerWebhookHelper, SyncBooklookerClient
app = FastAPI(title="Booklooker webhook receiver")
helper = BooklookerWebhookHelper()
client = SyncBooklookerClient(BooklookerConfig(api_key=os.environ.get("BOOKLOOKER_API_KEY", "REPLACE_ME")))
@app.post("/webhooks/booklooker")
async def receive_booklooker_webhook(request: Request) -> dict:
payload = await request.json()
event = helper.enrich_with_client(payload, client)
return {"accepted": True, "event": event.model_dump(mode="json")}

1620
openapi.yaml Normal file

File diff suppressed because it is too large Load diff

38
pyproject.toml Normal file
View file

@ -0,0 +1,38 @@
[build-system]
requires = ["setuptools>=69", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "booklooker-client"
version = "0.1.0"
description = "Python client for the Booklooker REST API with Pydantic models and webhook helpers"
readme = "README.md"
requires-python = ">=3.11"
authors = [
{ name = "GitHub Copilot" }
]
dependencies = [
"httpx>=0.27.0",
"pydantic>=2.8.2",
"PyYAML>=6.0.2"
]
[project.optional-dependencies]
webhooks = [
"fastapi>=0.115.0",
"uvicorn>=0.30.6"
]
dev = [
"pytest>=8.3.3",
"pytest-asyncio>=0.24.0"
]
[tool.setuptools.package-dir]
"" = "src"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-q"

View file

@ -0,0 +1,11 @@
from .client import AsyncBooklookerClient, SyncBooklookerClient
from .config import BooklookerConfig
from .webhooks import BooklookerWebhookHelper, InMemoryIdempotencyStore
__all__ = [
"AsyncBooklookerClient",
"SyncBooklookerClient",
"BooklookerConfig",
"BooklookerWebhookHelper",
"InMemoryIdempotencyStore",
]

View file

@ -0,0 +1,510 @@
from __future__ import annotations
from datetime import datetime, timezone
from decimal import Decimal, InvalidOperation
from pathlib import Path
from typing import Any, Iterable
import httpx
from .config import BooklookerConfig
from .exceptions import raise_for_error_code
from .generated.contracts import ENDPOINTS
from .models.article import (
ArticleDeleteResult,
ArticleField,
ArticleList,
ArticleListItem,
ArticleStatus,
ArticleStatusResult,
MediaType,
)
from .models.common import (
ApiEnvelope,
AuthToken,
FileImportErrorRecord,
FileStatusResult,
GenericResult,
ImportQueueStatus,
SearchResult,
UploadReceipt,
)
from .models.order import MessageType, OrderBatch, OrderRecord, OrderStatus
def _now_utc() -> datetime:
return datetime.now(timezone.utc)
def _resolve_enum_value(value: Any) -> Any:
return getattr(value, "value", value)
def _iter_file(path: Path, chunk_size: int = 65536) -> Iterable[bytes]:
with path.open("rb") as handle:
while True:
chunk = handle.read(chunk_size)
if not chunk:
break
yield chunk
def _parse_article_list(raw: Any, field: ArticleField) -> ArticleList:
if raw in (None, ""):
return ArticleList(items=[], field=field, raw=raw)
items: list[ArticleListItem] = []
for line in str(raw).splitlines():
if not line.strip():
continue
parts = [part.strip() for part in line.split("\t")]
item = ArticleListItem(value=parts[0])
if len(parts) > 1:
try:
item.price = Decimal(parts[1].replace(",", "."))
except (InvalidOperation, ValueError):
pass
if len(parts) > 2:
try:
item.stock = int(parts[2])
except ValueError:
pass
items.append(item)
return ArticleList(items=items, field=field, raw=raw)
def _parse_file_status(filename: str, raw: Any, show_errors: bool) -> FileStatusResult:
if show_errors and isinstance(raw, list):
return FileStatusResult(
filename=filename,
state="UPLOAD_DONE",
errors=[FileImportErrorRecord.model_validate(item) for item in raw],
raw=raw,
)
return FileStatusResult(filename=filename, state=None if raw is None else str(raw), raw=raw)
def _parse_orders(raw: Any) -> OrderBatch:
if raw in (None, ""):
return OrderBatch(orders=[], raw=raw)
if isinstance(raw, dict):
if isinstance(raw.get("orders"), list):
records = raw["orders"]
else:
records = [raw]
elif isinstance(raw, list):
records = raw
else:
records = [{"raw": raw}]
parsed = [OrderRecord.model_validate(item) if isinstance(item, dict) else OrderRecord() for item in records]
return OrderBatch(orders=parsed, raw=raw)
class _SyncClientBase:
def __init__(self, config: BooklookerConfig) -> None:
self.config = config
self._token: AuthToken | None = None
self._http = httpx.Client(
base_url=self.config.base_url,
timeout=self.config.timeout,
headers={"User-Agent": self.config.user_agent},
)
def close(self) -> None:
self._http.close()
def __enter__(self) -> "_SyncClientBase":
return self
def __exit__(self, *_: Any) -> None:
self.close()
@property
def available_endpoints(self) -> list[str]:
return list(ENDPOINTS.keys())
def authenticate(self) -> AuthToken:
raw = self._request("POST", "/authenticate", params={"apiKey": self.config.api_key}, requires_auth=False)
self._token = AuthToken(token=str(raw), expires_after_seconds=self.config.token_idle_timeout_seconds)
return self._token
def _get_token(self) -> AuthToken:
if self._token is None or self._token.expired:
return self.authenticate()
return self._token
def _request(
self,
method: str,
path: str,
*,
params: dict[str, Any] | None = None,
requires_auth: bool = True,
retry_on_expired_token: bool = True,
headers: dict[str, str] | None = None,
content: Any = None,
) -> Any:
request_params = dict(params or {})
if requires_auth:
token = self._get_token()
request_params.setdefault("token", token.token)
response = self._http.request(method, path, params=request_params, headers=headers, content=content)
response.raise_for_status()
envelope = ApiEnvelope.model_validate(response.json())
if envelope.status == "OK":
if requires_auth and self._token is not None:
self._token.acquired_at = _now_utc()
return envelope.returnValue
code = str(envelope.returnValue)
if code == "TOKEN_EXPIRED" and requires_auth and retry_on_expired_token and self.config.auto_refresh_token:
self._token = None
self.authenticate()
return self._request(
method,
path,
params=params,
requires_auth=requires_auth,
retry_on_expired_token=False,
headers=headers,
content=content,
)
raise_for_error_code(code)
class SyncBooklookerClient(_SyncClientBase):
def get_article_list(
self,
*,
field: ArticleField = ArticleField.ORDER_NO,
show_price: bool = False,
show_stock: bool = False,
media_type: MediaType | int | None = None,
) -> ArticleList:
params: dict[str, Any] = {
"field": field.value,
"showPrice": int(show_price),
"showStock": int(show_stock),
}
if media_type is not None:
params["mediaType"] = _resolve_enum_value(media_type)
raw = self._request("GET", "/article_list", params=params)
return _parse_article_list(raw, field)
def delete_article(self, order_no: str) -> ArticleDeleteResult:
raw = self._request("DELETE", "/article", params={"orderNo": order_no})
return ArticleDeleteResult(order_no=order_no, result=str(raw))
def get_article_status(self, order_no: str) -> ArticleStatusResult:
raw = self._request("GET", "/article_status", params={"orderNo": order_no})
return ArticleStatusResult(order_no=order_no, status=ArticleStatus(str(raw)))
def search(self, **params: Any) -> SearchResult:
raw = self._request("GET", "/search", params=params or None)
return SearchResult(raw=raw)
def import_file(
self,
file_path: str | Path,
*,
data_type: int,
file_type: str = "article",
media_type: MediaType | int = MediaType.BOOKS,
format_id: int | None = None,
encoding: str | None = None,
) -> UploadReceipt:
path = Path(file_path)
params: dict[str, Any] = {
"fileType": file_type,
"dataType": data_type,
"mediaType": _resolve_enum_value(media_type),
}
if format_id is not None:
params["formatID"] = format_id
if encoding is not None:
params["encoding"] = encoding
raw = self._request(
"POST",
"/file_import",
params=params,
headers={"Content-Type": "application/octet-stream"},
content=_iter_file(path),
)
return UploadReceipt(filename=path.name, result=str(raw), raw=raw)
def get_file_status(self, filename: str, *, show_errors: bool = False) -> FileStatusResult:
raw = self._request("GET", "/file_status", params={"filename": filename, "showErrors": int(show_errors)})
return _parse_file_status(filename, raw, show_errors)
def get_import_status(self) -> ImportQueueStatus:
raw = self._request("GET", "/import_status")
try:
pending = int(raw)
except (TypeError, ValueError):
pending = 0
return ImportQueueStatus(pending_files=pending, raw=raw)
def delete_image(self, order_no: str, position: int | None = None) -> GenericResult:
params: dict[str, Any] = {"orderNo": order_no}
if position is not None:
params["position"] = position
raw = self._request("DELETE", "/image", params=params)
return GenericResult(success=str(raw) == "SUCCESS", raw=raw)
def get_orders(
self,
*,
order_id: int | None = None,
date: str | None = None,
date_from: str | None = None,
date_to: str | None = None,
) -> OrderBatch:
params: dict[str, Any] = {}
if order_id is not None:
params["orderId"] = order_id
if date:
params["date"] = date
if date_from:
params["dateFrom"] = date_from
if date_to:
params["dateTo"] = date_to
raw = self._request("GET", "/order", params=params)
return _parse_orders(raw)
def cancel_order(self, order_id: int) -> GenericResult:
raw = self._request("PUT", "/order_cancel", params={"orderId": order_id})
return GenericResult(success=str(raw) == "SUCCESS", raw=raw)
def cancel_order_item(self, order_item_id: int, media_type: MediaType | int) -> GenericResult:
raw = self._request(
"PUT",
"/order_item_cancel",
params={"orderItemId": order_item_id, "mediaType": _resolve_enum_value(media_type)},
)
return GenericResult(success=str(raw) == "SUCCESS", raw=raw)
def send_order_message(
self,
order_id: int,
message_type: MessageType | str,
additional_text: str | None = None,
) -> GenericResult:
params: dict[str, Any] = {"orderId": order_id, "messageType": _resolve_enum_value(message_type)}
if additional_text:
params["additionalText"] = additional_text
raw = self._request("PUT", "/order_message", params=params)
return GenericResult(success=True, raw=raw)
def update_order_status(self, order_id: int, status: OrderStatus | str) -> GenericResult:
raw = self._request("PUT", "/order_status", params={"orderId": order_id, "status": _resolve_enum_value(status)})
return GenericResult(success=str(raw) == "SUCCESS", raw=raw)
class AsyncBooklookerClient:
def __init__(self, config: BooklookerConfig) -> None:
self.config = config
self._token: AuthToken | None = None
self._http = httpx.AsyncClient(
base_url=self.config.base_url,
timeout=self.config.timeout,
headers={"User-Agent": self.config.user_agent},
)
async def aclose(self) -> None:
await self._http.aclose()
async def __aenter__(self) -> "AsyncBooklookerClient":
return self
async def __aexit__(self, *_: Any) -> None:
await self.aclose()
@property
def available_endpoints(self) -> list[str]:
return list(ENDPOINTS.keys())
async def authenticate(self) -> AuthToken:
raw = await self._request("POST", "/authenticate", params={"apiKey": self.config.api_key}, requires_auth=False)
self._token = AuthToken(token=str(raw), expires_after_seconds=self.config.token_idle_timeout_seconds)
return self._token
async def _get_token(self) -> AuthToken:
if self._token is None or self._token.expired:
return await self.authenticate()
return self._token
async def _request(
self,
method: str,
path: str,
*,
params: dict[str, Any] | None = None,
requires_auth: bool = True,
retry_on_expired_token: bool = True,
headers: dict[str, str] | None = None,
content: Any = None,
) -> Any:
request_params = dict(params or {})
if requires_auth:
token = await self._get_token()
request_params.setdefault("token", token.token)
response = await self._http.request(method, path, params=request_params, headers=headers, content=content)
response.raise_for_status()
envelope = ApiEnvelope.model_validate(response.json())
if envelope.status == "OK":
if requires_auth and self._token is not None:
self._token.acquired_at = _now_utc()
return envelope.returnValue
code = str(envelope.returnValue)
if code == "TOKEN_EXPIRED" and requires_auth and retry_on_expired_token and self.config.auto_refresh_token:
self._token = None
await self.authenticate()
return await self._request(
method,
path,
params=params,
requires_auth=requires_auth,
retry_on_expired_token=False,
headers=headers,
content=content,
)
raise_for_error_code(code)
async def get_article_list(
self,
*,
field: ArticleField = ArticleField.ORDER_NO,
show_price: bool = False,
show_stock: bool = False,
media_type: MediaType | int | None = None,
) -> ArticleList:
params: dict[str, Any] = {
"field": field.value,
"showPrice": int(show_price),
"showStock": int(show_stock),
}
if media_type is not None:
params["mediaType"] = _resolve_enum_value(media_type)
raw = await self._request("GET", "/article_list", params=params)
return _parse_article_list(raw, field)
async def delete_article(self, order_no: str) -> ArticleDeleteResult:
raw = await self._request("DELETE", "/article", params={"orderNo": order_no})
return ArticleDeleteResult(order_no=order_no, result=str(raw))
async def get_article_status(self, order_no: str) -> ArticleStatusResult:
raw = await self._request("GET", "/article_status", params={"orderNo": order_no})
return ArticleStatusResult(order_no=order_no, status=ArticleStatus(str(raw)))
async def search(self, **params: Any) -> SearchResult:
raw = await self._request("GET", "/search", params=params or None)
return SearchResult(raw=raw)
async def import_file(
self,
file_path: str | Path,
*,
data_type: int,
file_type: str = "article",
media_type: MediaType | int = MediaType.BOOKS,
format_id: int | None = None,
encoding: str | None = None,
) -> UploadReceipt:
path = Path(file_path)
params: dict[str, Any] = {
"fileType": file_type,
"dataType": data_type,
"mediaType": _resolve_enum_value(media_type),
}
if format_id is not None:
params["formatID"] = format_id
if encoding is not None:
params["encoding"] = encoding
raw = await self._request(
"POST",
"/file_import",
params=params,
headers={"Content-Type": "application/octet-stream"},
content=_iter_file(path),
)
return UploadReceipt(filename=path.name, result=str(raw), raw=raw)
async def get_file_status(self, filename: str, *, show_errors: bool = False) -> FileStatusResult:
raw = await self._request("GET", "/file_status", params={"filename": filename, "showErrors": int(show_errors)})
return _parse_file_status(filename, raw, show_errors)
async def get_import_status(self) -> ImportQueueStatus:
raw = await self._request("GET", "/import_status")
try:
pending = int(raw)
except (TypeError, ValueError):
pending = 0
return ImportQueueStatus(pending_files=pending, raw=raw)
async def delete_image(self, order_no: str, position: int | None = None) -> GenericResult:
params: dict[str, Any] = {"orderNo": order_no}
if position is not None:
params["position"] = position
raw = await self._request("DELETE", "/image", params=params)
return GenericResult(success=str(raw) == "SUCCESS", raw=raw)
async def get_orders(
self,
*,
order_id: int | None = None,
date: str | None = None,
date_from: str | None = None,
date_to: str | None = None,
) -> OrderBatch:
params: dict[str, Any] = {}
if order_id is not None:
params["orderId"] = order_id
if date:
params["date"] = date
if date_from:
params["dateFrom"] = date_from
if date_to:
params["dateTo"] = date_to
raw = await self._request("GET", "/order", params=params)
return _parse_orders(raw)
async def cancel_order(self, order_id: int) -> GenericResult:
raw = await self._request("PUT", "/order_cancel", params={"orderId": order_id})
return GenericResult(success=str(raw) == "SUCCESS", raw=raw)
async def cancel_order_item(self, order_item_id: int, media_type: MediaType | int) -> GenericResult:
raw = await self._request(
"PUT",
"/order_item_cancel",
params={"orderItemId": order_item_id, "mediaType": _resolve_enum_value(media_type)},
)
return GenericResult(success=str(raw) == "SUCCESS", raw=raw)
async def send_order_message(
self,
order_id: int,
message_type: MessageType | str,
additional_text: str | None = None,
) -> GenericResult:
params: dict[str, Any] = {"orderId": order_id, "messageType": _resolve_enum_value(message_type)}
if additional_text:
params["additionalText"] = additional_text
raw = await self._request("PUT", "/order_message", params=params)
return GenericResult(success=True, raw=raw)
async def update_order_status(self, order_id: int, status: OrderStatus | str) -> GenericResult:
raw = await self._request("PUT", "/order_status", params={"orderId": order_id, "status": _resolve_enum_value(status)})
return GenericResult(success=str(raw) == "SUCCESS", raw=raw)

View file

@ -0,0 +1,19 @@
from __future__ import annotations
from pathlib import Path
from pydantic import BaseModel, ConfigDict, Field
class BooklookerConfig(BaseModel):
"""Runtime configuration for Booklooker clients."""
model_config = ConfigDict(extra="ignore")
api_key: str = Field(..., min_length=1)
base_url: str = Field(default="https://api.booklooker.de/2.0")
timeout: float = Field(default=30.0, gt=0)
user_agent: str = Field(default="booklooker-client/0.1.0")
auto_refresh_token: bool = Field(default=True)
token_idle_timeout_seconds: int = Field(default=600, ge=60)
openapi_path: Path = Field(default_factory=lambda: Path(__file__).resolve().parents[2] / "openapi.yaml")

View file

@ -0,0 +1,68 @@
from __future__ import annotations
class BooklookerError(Exception):
"""Base exception for the package."""
class ApiEnvelopeError(BooklookerError):
def __init__(self, code: str, detail: str | None = None) -> None:
self.code = code
self.detail = detail or code
super().__init__(self.detail)
class AuthenticationError(ApiEnvelopeError):
pass
class TokenExpiredError(AuthenticationError):
pass
class RateLimitError(ApiEnvelopeError):
pass
class NotFoundError(ApiEnvelopeError):
pass
class ValidationApiError(ApiEnvelopeError):
pass
class UploadError(ApiEnvelopeError):
pass
class ServerDownError(ApiEnvelopeError):
pass
_ERROR_MAP: dict[str, type[ApiEnvelopeError]] = {
"AUTHENTICATION_FAILED": AuthenticationError,
"API_KEY_MISSING": AuthenticationError,
"TOKEN_MISSING": AuthenticationError,
"TOKEN_UNKNOWN": AuthenticationError,
"TOKEN_EXPIRED": TokenExpiredError,
"QUOTA_EXCEEDED": RateLimitError,
"NOT_FOUND": NotFoundError,
"INVALID_PARAMETERS": ValidationApiError,
"INVALID_ORDERID": ValidationApiError,
"INVALID_DATE": ValidationApiError,
"INVALID_DATE_FROM": ValidationApiError,
"INVALID_DATE_TO": ValidationApiError,
"STATUS_NOT_VALID": ValidationApiError,
"INVALID_MESSAGE_TYPE": ValidationApiError,
"FILE_MISSING": UploadError,
"INVALID_FILE_TYPE": UploadError,
"UPLOAD_FAILED": UploadError,
"QUEUE_FULL": UploadError,
"SERVER_DOWN": ServerDownError,
}
def raise_for_error_code(code: str) -> None:
error_cls = _ERROR_MAP.get(code, ApiEnvelopeError)
raise error_cls(code=code)

View file

@ -0,0 +1,3 @@
from .contracts import API_INFO, ENDPOINTS, SECURITY_SCHEMES, TAGS
__all__ = ["API_INFO", "ENDPOINTS", "SECURITY_SCHEMES", "TAGS"]

View file

@ -0,0 +1,136 @@
"""Generated from openapi.yaml. Do not edit manually."""
API_INFO = {
"title": "booklooker REST API",
"version": "2.0",
"server_url": "https://api.booklooker.de/2.0"
}
TAGS = [
"authentication",
"article",
"upload",
"image",
"order"
]
SECURITY_SCHEMES = {
"tokenAuth": {
"type": "apiKey",
"in": "query",
"name": "token"
}
}
ENDPOINTS = {
"POST /authenticate": {
"summary": "Authentifizierung via API Key",
"tag": "authentication",
"parameters": [
"apiKey"
]
},
"DELETE /article": {
"summary": "Einzelnen Artikel zum Löschen vormerken",
"tag": "article",
"parameters": [
"orderNo"
]
},
"GET /article_list": {
"summary": "Liste aller eigenen aktiven Artikelnummern",
"tag": "article",
"parameters": [
"field",
"showPrice",
"showStock",
"mediaType"
]
},
"GET /article_status": {
"summary": "Abfragen des Status eines Artikels",
"tag": "article",
"parameters": [
"orderNo"
]
},
"GET /search": {
"summary": "Suche in der booklooker-Datenbank",
"tag": "article",
"parameters": []
},
"POST /file_import": {
"summary": "Upload von Angebots- oder Bilddateien",
"tag": "upload",
"parameters": [
"fileType",
"dataType",
"mediaType",
"formatID",
"encoding"
]
},
"GET /file_status": {
"summary": "Abfragen des Status einer hochgeladenen Angebotsdatei",
"tag": "upload",
"parameters": [
"filename",
"showErrors"
]
},
"DELETE /image": {
"summary": "Einzelne oder alle Bilder eines Artikels löschen",
"tag": "image",
"parameters": [
"orderNo",
"position"
]
},
"GET /import_status": {
"summary": "Abfragen der Anzahl unverarbeiteter hochgeladener Angebotsdateien",
"tag": "upload",
"parameters": []
},
"GET /order": {
"summary": "Download aller Bestellungen eines bestimmten Tages",
"tag": "order",
"parameters": [
"orderId",
"date",
"dateFrom",
"dateTo"
]
},
"PUT /order_cancel": {
"summary": "Stornieren einer kompletten Bestellung",
"tag": "order",
"parameters": [
"orderId"
]
},
"PUT /order_item_cancel": {
"summary": "Stornieren der Bestellung eines Einzelartikels",
"tag": "order",
"parameters": [
"orderItemId",
"mediaType"
]
},
"PUT /order_message": {
"summary": "Versand einer Nachricht an den Kunden",
"tag": "order",
"parameters": [
"orderId",
"messageType",
"additionalText"
]
},
"PUT /order_status": {
"summary": "Setzen des Status einer Bestellung",
"tag": "order",
"parameters": [
"orderId",
"status"
]
}
}

View file

@ -0,0 +1,85 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import yaml
def _resolve_parameter_names(parameters: list[dict[str, Any]], components: dict[str, Any]) -> list[str]:
resolved_names: list[str] = []
registry = components.get("parameters", {}) if isinstance(components, dict) else {}
for parameter in parameters or []:
candidate = parameter
if isinstance(parameter, dict) and "$ref" in parameter:
ref = str(parameter["$ref"])
if ref.startswith("#/components/parameters/"):
candidate = registry.get(ref.rsplit("/", 1)[-1], {})
if isinstance(candidate, dict) and candidate.get("name"):
resolved_names.append(str(candidate["name"]))
return resolved_names
def build_contract(openapi_path: Path) -> dict[str, Any]:
spec = yaml.safe_load(openapi_path.read_text(encoding="utf-8"))
server_url = ""
servers = spec.get("servers") or []
if servers:
server_url = servers[0].get("url", "")
components = spec.get("components", {})
endpoints: dict[str, Any] = {}
for route, operations in (spec.get("paths") or {}).items():
for method, operation in operations.items():
endpoints[f"{method.upper()} {route}"] = {
"summary": operation.get("summary", ""),
"tag": (operation.get("tags") or [None])[0],
"parameters": _resolve_parameter_names(operation.get("parameters", []), components),
}
return {
"api_info": {
"title": spec.get("info", {}).get("title", "booklooker REST API"),
"version": spec.get("info", {}).get("version", ""),
"server_url": server_url,
},
"tags": [tag.get("name") for tag in spec.get("tags", []) if isinstance(tag, dict)],
"security_schemes": spec.get("components", {}).get("securitySchemes", {}),
"endpoints": endpoints,
}
def render_contract(contract: dict[str, Any]) -> str:
body = [
'"""Generated from openapi.yaml. Do not edit manually."""',
"",
f"API_INFO = {json.dumps(contract['api_info'], indent=4, ensure_ascii=False)}",
"",
f"TAGS = {json.dumps(contract['tags'], indent=4, ensure_ascii=False)}",
"",
f"SECURITY_SCHEMES = {json.dumps(contract['security_schemes'], indent=4, ensure_ascii=False)}",
"",
f"ENDPOINTS = {json.dumps(contract['endpoints'], indent=4, ensure_ascii=False)}",
"",
]
return "\n".join(body)
def main() -> Path:
root = Path(__file__).resolve().parents[2]
openapi_path = root / "openapi.yaml"
output_path = root / "src" / "booklooker_client" / "generated" / "contracts.py"
output_path.parent.mkdir(parents=True, exist_ok=True)
contract = build_contract(openapi_path)
output_path.write_text(render_contract(contract), encoding="utf-8")
return output_path
if __name__ == "__main__":
path = main()
print(path)

View file

@ -0,0 +1,31 @@
from .article import ArticleDeleteResult, ArticleField, ArticleList, ArticleListItem, ArticleStatus, ArticleStatusResult, MediaType
from .common import ApiEnvelope, AuthToken, FileImportErrorRecord, FileStatusResult, GenericResult, ImportQueueStatus, SearchResult, UploadReceipt
from .order import Address, MessageType, OrderBatch, OrderItem, OrderRecord, OrderStatus, PaymentMethod
from .webhook import MiddlewareEvent, WebhookEvent
__all__ = [
"Address",
"ApiEnvelope",
"ArticleDeleteResult",
"ArticleField",
"ArticleList",
"ArticleListItem",
"ArticleStatus",
"ArticleStatusResult",
"AuthToken",
"FileImportErrorRecord",
"FileStatusResult",
"GenericResult",
"ImportQueueStatus",
"MediaType",
"MessageType",
"MiddlewareEvent",
"OrderBatch",
"OrderItem",
"OrderRecord",
"OrderStatus",
"PaymentMethod",
"SearchResult",
"UploadReceipt",
"WebhookEvent",
]

View file

@ -0,0 +1,48 @@
from __future__ import annotations
from decimal import Decimal
from enum import Enum, IntEnum
from pydantic import BaseModel, Field
class MediaType(IntEnum):
BOOKS = 0
MOVIES = 1
MUSIC = 2
AUDIOBOOKS = 3
GAMES = 4
class ArticleField(str, Enum):
ORDER_NO = "orderNo"
ISBN = "isbn"
EAN = "ean"
class ArticleStatus(str, Enum):
ACTIVE = "ACTIVE"
SOLD = "SOLD"
DELETED = "DELETED"
class ArticleListItem(BaseModel):
value: str
price: Decimal | None = None
stock: int | None = None
class ArticleList(BaseModel):
items: list[ArticleListItem] = Field(default_factory=list)
field: ArticleField = ArticleField.ORDER_NO
raw: str | list[str] | None = None
class ArticleDeleteResult(BaseModel):
order_no: str
result: str = "SUCCESS"
class ArticleStatusResult(BaseModel):
order_no: str
status: ArticleStatus

View file

@ -0,0 +1,61 @@
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any, Literal
from pydantic import BaseModel, ConfigDict, Field
class ApiEnvelope(BaseModel):
model_config = ConfigDict(extra="allow")
status: Literal["OK", "NOK"]
returnValue: Any = None
class AuthToken(BaseModel):
token: str
acquired_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
expires_after_seconds: int = 600
@property
def expired(self) -> bool:
age = datetime.now(timezone.utc) - self.acquired_at
return age.total_seconds() >= self.expires_after_seconds
class GenericResult(BaseModel):
success: bool = True
raw: Any = None
class SearchResult(BaseModel):
raw: Any = None
class ImportQueueStatus(BaseModel):
pending_files: int
raw: Any = None
class FileImportErrorRecord(BaseModel):
model_config = ConfigDict(extra="allow")
errorID: int | None = None
record: int | None = None
orderNo: str | None = None
title: str | None = None
details: str | None = None
class FileStatusResult(BaseModel):
filename: str | None = None
state: str | None = None
errors: list[FileImportErrorRecord] = Field(default_factory=list)
raw: Any = None
class UploadReceipt(BaseModel):
filename: str
result: str
raw: Any = None

View file

@ -0,0 +1,90 @@
from __future__ import annotations
from enum import Enum, IntEnum
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
class MessageType(str, Enum):
PAYMENT_INFORMATION = "PAYMENT_INFORMATION"
PAYMENT_REMINDER = "PAYMENT_REMINDER"
SHIPPING_NOTICE = "SHIPPING_NOTICE"
class OrderStatus(str, Enum):
BUYER_NO_REACTION = "BUYER_NO_REACTION"
CANCELED = "CANCELED"
PAID_WAITING_FOR_SHIPMENT = "PAID_WAITING_FOR_SHIPMENT"
READY_FOR_SHIPMENT = "READY_FOR_SHIPMENT"
SHIPPED_AND_PAID = "SHIPPED_AND_PAID"
SHIPPED_WAITING_FOR_PAYMENT = "SHIPPED_WAITING_FOR_PAYMENT"
TO_BE_PAID = "TO_BE_PAID"
VENDOR_NO_REACTION = "VENDOR_NO_REACTION"
WAITING_FOR_PAYMENT = "WAITING_FOR_PAYMENT"
WAITING_FOR_SHIPMENT = "WAITING_FOR_SHIPMENT"
WAITING_FOR_PAYMENT_INFO = "WAITING_FOR_PAYMENT_INFO"
RECEIVED_AND_PAID = "RECEIVED_AND_PAID"
class PaymentMethod(IntEnum):
BANK_TRANSFER = 1
OPEN_INVOICE = 2
OPEN_INVOICE_PREPAY_RESERVED = 11
DIRECT_DEBIT = 3
CREDIT_CARD = 4
CASH_ON_DELIVERY = 5
PAYPAL = 6
SKRILL = 8
CASH_PICKUP = 9
SOFORT = 10
class Address(BaseModel):
model_config = ConfigDict(extra="allow")
title: str | None = None
name: str | None = None
firstName: str | None = None
company: str | None = None
addressSupplement: str | None = None
street: str | None = None
zip: str | None = None
city: str | None = None
country: str | None = None
class OrderItem(BaseModel):
model_config = ConfigDict(extra="allow")
amount: int | None = None
author: str | None = None
mediaType: int | None = None
note: str | None = None
orderItemId: int | None = None
orderNo: str | None = None
orderTitle: str | None = None
singlePrice: float | None = None
status: str | None = None
totalPriceRebated: float | None = None
class OrderRecord(BaseModel):
model_config = ConfigDict(extra="allow")
orderId: int | None = None
orderDate: str | None = None
orderTime: str | None = None
buyerUsername: str | None = None
email: str | None = None
status: str | None = None
paymentId: int | None = None
comments: str | None = None
invoiceAddress: Address | None = None
deliveryAddress: Address | None = None
orderItems: list[OrderItem] = Field(default_factory=list)
class OrderBatch(BaseModel):
orders: list[OrderRecord] = Field(default_factory=list)
raw: Any = None

View file

@ -0,0 +1,42 @@
from __future__ import annotations
from datetime import datetime
from typing import Any
from uuid import uuid4
from pydantic import AliasChoices, BaseModel, ConfigDict, Field, model_validator
class WebhookEvent(BaseModel):
model_config = ConfigDict(extra="allow", populate_by_name=True)
event_type: str = Field(validation_alias=AliasChoices("event_type", "type", "event", "name"))
event_id: str = Field(default_factory=lambda: uuid4().hex, validation_alias=AliasChoices("event_id", "id"))
timestamp: datetime | None = Field(default=None, validation_alias=AliasChoices("timestamp", "createdAt", "created_at"))
order_id: int | None = Field(default=None, validation_alias=AliasChoices("orderId", "order_id"))
order_no: str | None = Field(default=None, validation_alias=AliasChoices("orderNo", "order_no"))
payload: dict[str, Any] = Field(default_factory=dict)
@model_validator(mode="before")
@classmethod
def _capture_payload(cls, value: Any) -> Any:
if isinstance(value, dict) and "payload" not in value:
copied = dict(value)
copied["payload"] = dict(value)
return copied
return value
@property
def resource_id(self) -> str | None:
if self.order_id is not None:
return str(self.order_id)
return self.order_no
class MiddlewareEvent(BaseModel):
event_id: str
event_type: str
resource_id: str | None = None
resource_type: str | None = None
raw_payload: dict[str, Any] = Field(default_factory=dict)
enriched_data: dict[str, Any] | None = None

View file

@ -0,0 +1,101 @@
from __future__ import annotations
from typing import Any
from .models.webhook import MiddlewareEvent, WebhookEvent
class InMemoryIdempotencyStore:
def __init__(self) -> None:
self._seen: set[str] = set()
def has_seen(self, event_id: str) -> bool:
return event_id in self._seen
def mark_seen(self, event_id: str) -> None:
self._seen.add(event_id)
class BooklookerWebhookHelper:
"""Utility toolbox for parsing and enriching Booklooker push payloads."""
def __init__(self, idempotency_store: InMemoryIdempotencyStore | None = None) -> None:
self.idempotency_store = idempotency_store or InMemoryIdempotencyStore()
def parse_event(self, payload: dict[str, Any]) -> WebhookEvent:
return WebhookEvent.model_validate(payload)
def is_duplicate(self, event: WebhookEvent) -> bool:
return self.idempotency_store.has_seen(event.event_id)
def mark_processed(self, event: WebhookEvent) -> None:
self.idempotency_store.mark_seen(event.event_id)
def to_middleware_event(
self,
event: WebhookEvent,
*,
resource_type: str | None = None,
enriched_data: dict[str, Any] | None = None,
) -> MiddlewareEvent:
return MiddlewareEvent(
event_id=event.event_id,
event_type=event.event_type,
resource_id=event.resource_id,
resource_type=resource_type,
raw_payload=event.payload,
enriched_data=enriched_data,
)
def enrich_with_client(self, payload: dict[str, Any] | WebhookEvent, client: Any) -> MiddlewareEvent:
event = payload if isinstance(payload, WebhookEvent) else self.parse_event(payload)
if self.is_duplicate(event):
return self.to_middleware_event(
event,
resource_type="duplicate",
enriched_data={"duplicate": True},
)
enriched: dict[str, Any] | None = None
resource_type: str | None = None
if event.order_id is not None and hasattr(client, "get_orders"):
resource_type = "order"
date_hint = event.timestamp.date().isoformat() if event.timestamp else None
enriched = client.get_orders(order_id=event.order_id, date=date_hint).model_dump(mode="json")
elif event.order_no and hasattr(client, "get_article_status"):
resource_type = "article"
enriched = client.get_article_status(event.order_no).model_dump(mode="json")
self.mark_processed(event)
return self.to_middleware_event(event, resource_type=resource_type, enriched_data=enriched)
async def enrich_with_async_client(self, payload: dict[str, Any] | WebhookEvent, client: Any) -> MiddlewareEvent:
event = payload if isinstance(payload, WebhookEvent) else self.parse_event(payload)
if self.is_duplicate(event):
return self.to_middleware_event(
event,
resource_type="duplicate",
enriched_data={"duplicate": True},
)
enriched: dict[str, Any] | None = None
resource_type: str | None = None
if event.order_id is not None and hasattr(client, "get_orders"):
resource_type = "order"
date_hint = event.timestamp.date().isoformat() if event.timestamp else None
order_batch = await client.get_orders(order_id=event.order_id, date=date_hint)
enriched = order_batch.model_dump(mode="json")
elif event.order_no and hasattr(client, "get_article_status"):
resource_type = "article"
article_state = await client.get_article_status(event.order_no)
enriched = article_state.model_dump(mode="json")
self.mark_processed(event)
return self.to_middleware_event(event, resource_type=resource_type, enriched_data=enriched)
def fastapi_receiver_snippet(self, route: str = "/webhooks/booklooker") -> str:
return f'''from fastapi import FastAPI, Request\nfrom booklooker_client import BooklookerConfig, SyncBooklookerClient, BooklookerWebhookHelper\n\napp = FastAPI()\nhelper = BooklookerWebhookHelper()\nclient = SyncBooklookerClient(BooklookerConfig(api_key="YOUR_API_KEY"))\n\n@app.post("{route}")\nasync def receive_booklooker_webhook(request: Request):\n payload = await request.json()\n event = helper.enrich_with_client(payload, client)\n return {{"accepted": True, "event": event.model_dump(mode="json")}}\n'''

27
tests/test_models.py Normal file
View file

@ -0,0 +1,27 @@
from booklooker_client import BooklookerConfig, SyncBooklookerClient
from booklooker_client.models import ArticleField, ArticleList, ArticleListItem, AuthToken
def test_config_defaults() -> None:
config = BooklookerConfig(api_key="demo")
assert config.base_url == "https://api.booklooker.de/2.0"
assert config.timeout > 0
def test_auth_token_not_immediately_expired() -> None:
token = AuthToken(token="abc", expires_after_seconds=600)
assert token.expired is False
def test_article_list_model() -> None:
article_list = ArticleList(items=[ArticleListItem(value="ABC-1")], field=ArticleField.ORDER_NO)
assert article_list.items[0].value == "ABC-1"
def test_client_exposes_generated_endpoints() -> None:
client = SyncBooklookerClient(BooklookerConfig(api_key="demo"))
try:
assert "POST /authenticate" in client.available_endpoints
assert "GET /order" in client.available_endpoints
finally:
client.close()

41
tests/test_webhooks.py Normal file
View file

@ -0,0 +1,41 @@
from booklooker_client import BooklookerWebhookHelper
from booklooker_client.models.order import OrderBatch, OrderRecord
class FakeClient:
def get_orders(self, *, order_id=None, date=None, date_from=None, date_to=None):
return OrderBatch(orders=[OrderRecord(orderId=order_id, buyerUsername="alice")])
def get_article_status(self, order_no: str):
class _Result:
def model_dump(self, mode="json"):
return {"order_no": order_no, "status": "ACTIVE"}
return _Result()
def test_webhook_helper_enriches_order_payload() -> None:
helper = BooklookerWebhookHelper()
event = helper.enrich_with_client(
{
"event_type": "order.created",
"event_id": "evt-1",
"orderId": 1234,
},
FakeClient(),
)
assert event.resource_type == "order"
assert event.resource_id == "1234"
assert event.enriched_data is not None
assert event.enriched_data["orders"][0]["buyerUsername"] == "alice"
def test_duplicate_detection() -> None:
helper = BooklookerWebhookHelper()
first = helper.enrich_with_client({"event_type": "order.created", "event_id": "evt-2", "orderId": 99}, FakeClient())
second = helper.enrich_with_client({"event_type": "order.created", "event_id": "evt-2", "orderId": 99}, FakeClient())
assert first.resource_type == "order"
assert second.resource_type == "duplicate"
assert second.enriched_data == {"duplicate": True}