ebay_client/tests/test_notification_webhook.py

190 lines
No EOL
6.3 KiB
Python

from __future__ import annotations
import base64
import json
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from ebay_client.generated.notification.models import PublicKey
from ebay_client.notification.webhook import WebhookChallengeHandler, WebhookSignatureParser
from ebay_client.notification.webhook import (
WebhookPublicKeyResolver,
WebhookRequestHandler,
WebhookSignatureValidator,
)
def test_challenge_handler_builds_sha256_response() -> None:
response = WebhookChallengeHandler.build_challenge_response(
challenge_code="challenge",
verification_token="verification",
endpoint="https://example.test/webhook",
)
assert len(response) == 64
def test_signature_parser_extracts_known_fields() -> None:
parser = WebhookSignatureParser()
parsed = parser.parse("kid=public-key-1,alg=ECDSA,digest=SHA256,sig=Zm9v")
assert parsed.key_id == "public-key-1"
assert parsed.algorithm == "ECDSA"
assert parsed.digest == "SHA256"
assert parsed.signature == b"foo"
def test_signature_parser_decodes_base64_json_header() -> None:
parser = WebhookSignatureParser()
header = base64.b64encode(
json.dumps(
{
"alg": "ECDSA",
"kid": "public-key-1",
"signature": base64.b64encode(b"signed-bytes").decode("ascii"),
"digest": "SHA256",
}
).encode("utf-8")
).decode("ascii")
parsed = parser.parse(header)
assert parsed.key_id == "public-key-1"
assert parsed.algorithm == "ECDSA"
assert parsed.digest == "SHA256"
assert parsed.signature == b"signed-bytes"
def test_signature_validator_verifies_base64_json_header_and_der_key() -> None:
private_key = ec.generate_private_key(ec.SECP256R1())
public_key = private_key.public_key()
public_key_der = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
body = b'{"notificationId":"abc-123"}'
signature = private_key.sign(body, ec.ECDSA(hashes.SHA256()))
header = base64.b64encode(
json.dumps(
{
"alg": "ECDSA",
"kid": "public-key-1",
"signature": base64.b64encode(signature).decode("ascii"),
"digest": "SHA256",
}
).encode("utf-8")
).decode("ascii")
resolver = WebhookPublicKeyResolver(
lambda key_id: PublicKey(
key=base64.b64encode(public_key_der).decode("ascii"),
algorithm="ECDSA",
digest="SHA256",
)
)
validator = WebhookSignatureValidator(resolver)
assert validator.validate(header_value=header, body=body) is True
def test_request_handler_returns_json_challenge_response() -> None:
handler = WebhookRequestHandler(
signature_validator=WebhookSignatureValidator(
WebhookPublicKeyResolver(lambda _: PublicKey(key="", algorithm="ECDSA", digest="SHA256"))
)
)
response = handler.handle_challenge(
challenge_code="challenge",
verification_token="verification",
endpoint="https://example.test/webhook",
)
assert response.status_code == 200
assert response.headers["Content-Type"] == "application/json"
body = json.loads(response.body.decode("utf-8"))
assert "challengeResponse" in body
def test_request_handler_accepts_verified_notification_and_parses_event() -> None:
private_key = ec.generate_private_key(ec.SECP256R1())
public_key = private_key.public_key()
public_key_der = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
body = json.dumps(
{
"notificationId": "abc-123",
"publishDate": "2026-04-07T00:00:00.000Z",
"topicId": "MARKETPLACE_ACCOUNT_DELETION",
"data": {"userId": "user-1"},
"schemaVersion": "1.0",
}
).encode("utf-8")
signature = private_key.sign(body, ec.ECDSA(hashes.SHA256()))
header = base64.b64encode(
json.dumps(
{
"alg": "ECDSA",
"kid": "public-key-1",
"signature": base64.b64encode(signature).decode("ascii"),
"digest": "SHA256",
}
).encode("utf-8")
).decode("ascii")
resolver = WebhookPublicKeyResolver(
lambda key_id: PublicKey(
key=base64.b64encode(public_key_der).decode("ascii"),
algorithm="ECDSA",
digest="SHA256",
)
)
handler = WebhookRequestHandler(signature_validator=WebhookSignatureValidator(resolver))
result = handler.handle_notification(signature_header=header, body=body)
assert result.response.status_code == 200
assert result.event is not None
assert result.event.notification_id == "abc-123"
assert result.event.topic_id == "MARKETPLACE_ACCOUNT_DELETION"
assert result.event.metadata["schemaVersion"] == "1.0"
def test_request_handler_returns_412_for_invalid_signature() -> None:
private_key = ec.generate_private_key(ec.SECP256R1())
public_key = private_key.public_key()
public_key_der = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
body = b'{"notificationId":"abc-123"}'
wrong_body = b'{"notificationId":"def-456"}'
signature = private_key.sign(body, ec.ECDSA(hashes.SHA256()))
header = base64.b64encode(
json.dumps(
{
"alg": "ECDSA",
"kid": "public-key-1",
"signature": base64.b64encode(signature).decode("ascii"),
"digest": "SHA256",
}
).encode("utf-8")
).decode("ascii")
resolver = WebhookPublicKeyResolver(
lambda key_id: PublicKey(
key=base64.b64encode(public_key_der).decode("ascii"),
algorithm="ECDSA",
digest="SHA256",
)
)
handler = WebhookRequestHandler(signature_validator=WebhookSignatureValidator(resolver))
result = handler.handle_notification(signature_header=header, body=wrong_body)
assert result.response.status_code == 412
assert result.event is None