47 lines
1.7 KiB
Python
47 lines
1.7 KiB
Python
from booklooker_client import BooklookerWebhookHelper, InMemoryIdempotencyStore
|
|
from booklooker_client.models.order import OrderBatch, OrderRecord
|
|
|
|
|
|
class FakeClient:
|
|
def get_orders(self, *, order_id=None, date=None, date_from=None, date_to=None):
|
|
return OrderBatch(orders=[OrderRecord(orderId=order_id, buyerUsername="alice")])
|
|
|
|
def get_article_status(self, order_no: str):
|
|
class _Result:
|
|
def model_dump(self, mode="json"):
|
|
return {"order_no": order_no, "status": "ACTIVE"}
|
|
|
|
return _Result()
|
|
|
|
|
|
def test_webhook_helper_enriches_order_payload() -> None:
|
|
helper = BooklookerWebhookHelper()
|
|
event = helper.enrich_with_client(
|
|
{
|
|
"event_type": "order.created",
|
|
"event_id": "evt-1",
|
|
"orderId": 1234,
|
|
},
|
|
FakeClient(),
|
|
)
|
|
|
|
assert event.resource_type == "order"
|
|
assert event.resource_id == "1234"
|
|
assert event.enriched_data is not None
|
|
assert event.enriched_data["orders"][0]["buyerUsername"] == "alice"
|
|
|
|
|
|
def test_duplicate_detection() -> None:
|
|
helper = BooklookerWebhookHelper()
|
|
first = helper.enrich_with_client({"event_type": "order.created", "event_id": "evt-2", "orderId": 99}, FakeClient())
|
|
second = helper.enrich_with_client({"event_type": "order.created", "event_id": "evt-2", "orderId": 99}, FakeClient())
|
|
|
|
assert first.resource_type == "order"
|
|
assert second.resource_type == "duplicate"
|
|
assert second.enriched_data == {"duplicate": True}
|
|
|
|
|
|
def test_idempotency_store_ttl_expiry() -> None:
|
|
store = InMemoryIdempotencyStore(ttl_seconds=0)
|
|
store.mark_seen("evt-expire")
|
|
assert store.has_seen("evt-expire") is False
|