from __future__ import annotations import base64 import json from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec from ebay_client.generated.notification.models import PublicKey from ebay_client.notification.webhook import WebhookChallengeHandler, WebhookSignatureParser from ebay_client.notification.webhook import ( WebhookPublicKeyResolver, WebhookRequestHandler, WebhookSignatureValidator, ) def test_challenge_handler_builds_sha256_response() -> None: response = WebhookChallengeHandler.build_challenge_response( challenge_code="challenge", verification_token="verification", endpoint="https://example.test/webhook", ) assert len(response) == 64 def test_signature_parser_extracts_known_fields() -> None: parser = WebhookSignatureParser() parsed = parser.parse("kid=public-key-1,alg=ECDSA,digest=SHA256,sig=Zm9v") assert parsed.key_id == "public-key-1" assert parsed.algorithm == "ECDSA" assert parsed.digest == "SHA256" assert parsed.signature == b"foo" def test_signature_parser_decodes_base64_json_header() -> None: parser = WebhookSignatureParser() header = base64.b64encode( json.dumps( { "alg": "ECDSA", "kid": "public-key-1", "signature": base64.b64encode(b"signed-bytes").decode("ascii"), "digest": "SHA256", } ).encode("utf-8") ).decode("ascii") parsed = parser.parse(header) assert parsed.key_id == "public-key-1" assert parsed.algorithm == "ECDSA" assert parsed.digest == "SHA256" assert parsed.signature == b"signed-bytes" def test_signature_validator_verifies_base64_json_header_and_der_key() -> None: private_key = ec.generate_private_key(ec.SECP256R1()) public_key = private_key.public_key() public_key_der = public_key.public_bytes( encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo, ) body = b'{"notificationId":"abc-123"}' signature = private_key.sign(body, ec.ECDSA(hashes.SHA256())) header = base64.b64encode( json.dumps( { "alg": "ECDSA", "kid": "public-key-1", "signature": base64.b64encode(signature).decode("ascii"), "digest": "SHA256", } ).encode("utf-8") ).decode("ascii") resolver = WebhookPublicKeyResolver( lambda key_id: PublicKey( key=base64.b64encode(public_key_der).decode("ascii"), algorithm="ECDSA", digest="SHA256", ) ) validator = WebhookSignatureValidator(resolver) assert validator.validate(header_value=header, body=body) is True def test_request_handler_returns_json_challenge_response() -> None: handler = WebhookRequestHandler( signature_validator=WebhookSignatureValidator( WebhookPublicKeyResolver(lambda _: PublicKey(key="", algorithm="ECDSA", digest="SHA256")) ) ) response = handler.handle_challenge( challenge_code="challenge", verification_token="verification", endpoint="https://example.test/webhook", ) assert response.status_code == 200 assert response.headers["Content-Type"] == "application/json" body = json.loads(response.body.decode("utf-8")) assert "challengeResponse" in body def test_request_handler_accepts_verified_notification_and_parses_event() -> None: private_key = ec.generate_private_key(ec.SECP256R1()) public_key = private_key.public_key() public_key_der = public_key.public_bytes( encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo, ) body = json.dumps( { "notificationId": "abc-123", "publishDate": "2026-04-07T00:00:00.000Z", "topicId": "MARKETPLACE_ACCOUNT_DELETION", "data": {"userId": "user-1"}, "schemaVersion": "1.0", } ).encode("utf-8") signature = private_key.sign(body, ec.ECDSA(hashes.SHA256())) header = base64.b64encode( json.dumps( { "alg": "ECDSA", "kid": "public-key-1", "signature": base64.b64encode(signature).decode("ascii"), "digest": "SHA256", } ).encode("utf-8") ).decode("ascii") resolver = WebhookPublicKeyResolver( lambda key_id: PublicKey( key=base64.b64encode(public_key_der).decode("ascii"), algorithm="ECDSA", digest="SHA256", ) ) handler = WebhookRequestHandler(signature_validator=WebhookSignatureValidator(resolver)) result = handler.handle_notification(signature_header=header, body=body) assert result.response.status_code == 200 assert result.event is not None assert result.event.notification_id == "abc-123" assert result.event.topic_id == "MARKETPLACE_ACCOUNT_DELETION" assert result.event.metadata["schemaVersion"] == "1.0" def test_request_handler_returns_412_for_invalid_signature() -> None: private_key = ec.generate_private_key(ec.SECP256R1()) public_key = private_key.public_key() public_key_der = public_key.public_bytes( encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo, ) body = b'{"notificationId":"abc-123"}' wrong_body = b'{"notificationId":"def-456"}' signature = private_key.sign(body, ec.ECDSA(hashes.SHA256())) header = base64.b64encode( json.dumps( { "alg": "ECDSA", "kid": "public-key-1", "signature": base64.b64encode(signature).decode("ascii"), "digest": "SHA256", } ).encode("utf-8") ).decode("ascii") resolver = WebhookPublicKeyResolver( lambda key_id: PublicKey( key=base64.b64encode(public_key_der).decode("ascii"), algorithm="ECDSA", digest="SHA256", ) ) handler = WebhookRequestHandler(signature_validator=WebhookSignatureValidator(resolver)) result = handler.handle_notification(signature_header=header, body=wrong_body) assert result.response.status_code == 412 assert result.event is None