71 lines
No EOL
2.2 KiB
Python
71 lines
No EOL
2.2 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
|
|
from fastapi import FastAPI, Header, Request, Response
|
|
|
|
from ebay_client.core.auth.models import EbayOAuthConfig
|
|
from ebay_client.client import EbayClient
|
|
from ebay_client.notification import WebhookPublicKeyResolver, WebhookRequestHandler, WebhookSignatureValidator
|
|
|
|
|
|
app = FastAPI()
|
|
|
|
oauth_config = EbayOAuthConfig(
|
|
client_id=os.environ["EBAY_CLIENT_ID"],
|
|
client_secret=os.environ["EBAY_CLIENT_SECRET"],
|
|
default_scopes=["https://api.ebay.com/oauth/api_scope"],
|
|
)
|
|
ebay_client = EbayClient(oauth_config)
|
|
|
|
verification_token = os.environ["EBAY_NOTIFICATION_VERIFICATION_TOKEN"]
|
|
public_key_resolver = WebhookPublicKeyResolver(ebay_client.notification.get_public_key)
|
|
webhook_handler = WebhookRequestHandler(
|
|
signature_validator=WebhookSignatureValidator(public_key_resolver)
|
|
)
|
|
|
|
|
|
def process_event(topic_id: str | None, payload: dict | list | None) -> None:
|
|
if topic_id == "MARKETPLACE_ACCOUNT_DELETION":
|
|
return
|
|
|
|
|
|
@app.get("/webhooks/ebay")
|
|
async def ebay_challenge(challenge_code: str | None = None, request: Request | None = None) -> Response:
|
|
if request is None:
|
|
return Response(status_code=500)
|
|
|
|
endpoint = str(request.url.replace(query=""))
|
|
result = webhook_handler.handle_challenge(
|
|
challenge_code=challenge_code,
|
|
verification_token=verification_token,
|
|
endpoint=endpoint,
|
|
)
|
|
return Response(
|
|
content=result.body,
|
|
status_code=result.status_code,
|
|
headers=result.headers,
|
|
media_type=result.headers.get("Content-Type"),
|
|
)
|
|
|
|
|
|
@app.post("/webhooks/ebay")
|
|
async def ebay_notification(
|
|
request: Request,
|
|
x_ebay_signature: str | None = Header(default=None, alias="X-EBAY-SIGNATURE"),
|
|
) -> Response:
|
|
body = await request.body()
|
|
result = webhook_handler.handle_notification(
|
|
signature_header=x_ebay_signature,
|
|
body=body,
|
|
)
|
|
|
|
if result.event is not None:
|
|
process_event(result.event.topic_id, result.event.data)
|
|
|
|
return Response(
|
|
content=result.response.body,
|
|
status_code=result.response.status_code,
|
|
headers=result.response.headers,
|
|
media_type=result.response.headers.get("Content-Type"),
|
|
) |