booklooker_client/examples/webhook_fastapi.py

22 lines
888 B
Python

from __future__ import annotations
import os
from fastapi import FastAPI, HTTPException, Request
from booklooker_client import BooklookerConfig, BooklookerWebhookHelper, SyncBooklookerClient
app = FastAPI(title="Booklooker webhook receiver")
helper = BooklookerWebhookHelper(webhook_secret=os.environ.get("BOOKLOOKER_WEBHOOK_SECRET"))
client = SyncBooklookerClient(BooklookerConfig(api_key=os.environ.get("BOOKLOOKER_API_KEY", "REPLACE_ME")))
@app.post("/webhooks/booklooker")
async def receive_booklooker_webhook(request: Request) -> dict:
raw_body = await request.body()
if not helper.validate_request(raw_body, request.headers):
raise HTTPException(status_code=401, detail="Invalid webhook signature")
payload = await request.json()
event = helper.enrich_with_client(payload, client)
return {"accepted": True, "event": event.model_dump(mode="json")}