Add webhook utilities and FastAPI integration for eBay notifications; enhance tests for request handling
This commit is contained in:
parent
10008a0edc
commit
30b62dedab
5 changed files with 293 additions and 2 deletions
71
examples/fastapi_notification_webhook.py
Normal file
71
examples/fastapi_notification_webhook.py
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
|
||||
from fastapi import FastAPI, Header, Request, Response
|
||||
|
||||
from ebay_client.core.auth.models import EbayOAuthConfig
|
||||
from ebay_client.client import EbayClient
|
||||
from ebay_client.notification import WebhookPublicKeyResolver, WebhookRequestHandler, WebhookSignatureValidator
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
oauth_config = EbayOAuthConfig(
|
||||
client_id=os.environ["EBAY_CLIENT_ID"],
|
||||
client_secret=os.environ["EBAY_CLIENT_SECRET"],
|
||||
default_scopes=["https://api.ebay.com/oauth/api_scope"],
|
||||
)
|
||||
ebay_client = EbayClient(oauth_config)
|
||||
|
||||
verification_token = os.environ["EBAY_NOTIFICATION_VERIFICATION_TOKEN"]
|
||||
public_key_resolver = WebhookPublicKeyResolver(ebay_client.notification.get_public_key)
|
||||
webhook_handler = WebhookRequestHandler(
|
||||
signature_validator=WebhookSignatureValidator(public_key_resolver)
|
||||
)
|
||||
|
||||
|
||||
def process_event(topic_id: str | None, payload: dict | list | None) -> None:
|
||||
if topic_id == "MARKETPLACE_ACCOUNT_DELETION":
|
||||
return
|
||||
|
||||
|
||||
@app.get("/webhooks/ebay")
|
||||
async def ebay_challenge(challenge_code: str | None = None, request: Request | None = None) -> Response:
|
||||
if request is None:
|
||||
return Response(status_code=500)
|
||||
|
||||
endpoint = str(request.url.replace(query=""))
|
||||
result = webhook_handler.handle_challenge(
|
||||
challenge_code=challenge_code,
|
||||
verification_token=verification_token,
|
||||
endpoint=endpoint,
|
||||
)
|
||||
return Response(
|
||||
content=result.body,
|
||||
status_code=result.status_code,
|
||||
headers=result.headers,
|
||||
media_type=result.headers.get("Content-Type"),
|
||||
)
|
||||
|
||||
|
||||
@app.post("/webhooks/ebay")
|
||||
async def ebay_notification(
|
||||
request: Request,
|
||||
x_ebay_signature: str | None = Header(default=None, alias="X-EBAY-SIGNATURE"),
|
||||
) -> Response:
|
||||
body = await request.body()
|
||||
result = webhook_handler.handle_notification(
|
||||
signature_header=x_ebay_signature,
|
||||
body=body,
|
||||
)
|
||||
|
||||
if result.event is not None:
|
||||
process_event(result.event.topic_id, result.event.data)
|
||||
|
||||
return Response(
|
||||
content=result.response.body,
|
||||
status_code=result.response.status_code,
|
||||
headers=result.response.headers,
|
||||
media_type=result.response.headers.get("Content-Type"),
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue