diff --git a/ebay_client/notification/webhook.py b/ebay_client/notification/webhook.py index e185c8e..531b599 100644 --- a/ebay_client/notification/webhook.py +++ b/ebay_client/notification/webhook.py @@ -2,6 +2,7 @@ from __future__ import annotations import base64 import hashlib +import json from collections.abc import Callable from dataclasses import dataclass from datetime import UTC, datetime, timedelta @@ -10,6 +11,7 @@ from typing import Any from cryptography.exceptions import InvalidSignature from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.serialization import load_der_public_key from pydantic import BaseModel, Field from ebay_client.generated.notification.models import PublicKey @@ -39,16 +41,10 @@ class WebhookSignatureParser: DIGEST_ALIASES = ("digest",) def parse(self, header_value: str) -> ParsedSignatureHeader: - separators = header_value.replace(";", ",").split(",") - parts: dict[str, str] = {} - for item in separators: - if "=" not in item: - continue - key, value = item.split("=", 1) - parts[key.strip().lower()] = value.strip().strip('"') + parts = self._parse_parts(header_value) signature_value = self._first_match(parts, self.SIGNATURE_ALIASES) - signature = base64.b64decode(signature_value) if signature_value else None + signature = self._decode_base64(signature_value) if signature_value else None return ParsedSignatureHeader( key_id=self._first_match(parts, self.KEY_ALIASES), signature=signature, @@ -57,6 +53,49 @@ class WebhookSignatureParser: raw_parts=parts, ) + def _parse_parts(self, header_value: str) -> dict[str, str]: + decoded = self._decode_base64_to_text(header_value) + if decoded: + try: + payload = json.loads(decoded) + except json.JSONDecodeError: + payload = None + if isinstance(payload, dict): + return {str(key).strip().lower(): str(value).strip() for key, value in payload.items() if value is not None} + + separators = header_value.replace(";", ",").split(",") + parts: dict[str, str] = {} + for item in separators: + if "=" not in item: + continue + key, value = item.split("=", 1) + parts[key.strip().lower()] = value.strip().strip('"') + return parts + + @staticmethod + def _decode_base64_to_text(value: str) -> str | None: + decoded = WebhookSignatureParser._decode_base64(value) + if decoded is None: + return None + try: + return decoded.decode("utf-8") + except UnicodeDecodeError: + return None + + @staticmethod + def _decode_base64(value: str) -> bytes | None: + normalized = value.strip() + if not normalized: + return None + padding = (-len(normalized)) % 4 + normalized = normalized + ("=" * padding) + for decoder in (base64.b64decode, base64.urlsafe_b64decode): + try: + return decoder(normalized) + except Exception: + continue + return None + @staticmethod def _first_match(parts: dict[str, str], aliases: tuple[str, ...]) -> str | None: for alias in aliases: @@ -107,10 +146,12 @@ class WebhookSignatureValidator: if not parsed.key_id or not parsed.signature: return False key_payload = self.resolver.resolve(parsed.key_id) - pem = key_payload.key - if not pem: + key_value = key_payload.key + if not key_value: + return False + public_key = self._load_public_key(key_value) + if public_key is None: return False - public_key = serialization.load_pem_public_key(pem.encode("utf-8")) digest_name = (parsed.digest or key_payload.digest or "SHA256").upper() digest = self._hash_algorithm(digest_name) message = message_builder(body) if message_builder else body @@ -130,3 +171,16 @@ class WebhookSignatureValidator: if normalized == "SHA512": return hashes.SHA512() return hashes.SHA256() + + @staticmethod + def _load_public_key(key_value: str): + normalized = key_value.strip() + if not normalized: + return None + if "BEGIN PUBLIC KEY" in normalized: + return serialization.load_pem_public_key(normalized.encode("utf-8")) + + decoded = WebhookSignatureParser._decode_base64(normalized) + if decoded is None: + return None + return load_der_public_key(decoded) diff --git a/tests/test_notification_webhook.py b/tests/test_notification_webhook.py index 7098247..7e2653d 100644 --- a/tests/test_notification_webhook.py +++ b/tests/test_notification_webhook.py @@ -1,6 +1,14 @@ from __future__ import annotations +import base64 +import json + +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec + +from ebay_client.generated.notification.models import PublicKey from ebay_client.notification.webhook import WebhookChallengeHandler, WebhookSignatureParser +from ebay_client.notification.webhook import WebhookPublicKeyResolver, WebhookSignatureValidator def test_challenge_handler_builds_sha256_response() -> None: @@ -21,4 +29,57 @@ def test_signature_parser_extracts_known_fields() -> None: assert parsed.key_id == "public-key-1" assert parsed.algorithm == "ECDSA" assert parsed.digest == "SHA256" - assert parsed.signature == b"foo" \ No newline at end of file + assert parsed.signature == b"foo" + + +def test_signature_parser_decodes_base64_json_header() -> None: + parser = WebhookSignatureParser() + header = base64.b64encode( + json.dumps( + { + "alg": "ECDSA", + "kid": "public-key-1", + "signature": base64.b64encode(b"signed-bytes").decode("ascii"), + "digest": "SHA256", + } + ).encode("utf-8") + ).decode("ascii") + + parsed = parser.parse(header) + + assert parsed.key_id == "public-key-1" + assert parsed.algorithm == "ECDSA" + assert parsed.digest == "SHA256" + assert parsed.signature == b"signed-bytes" + + +def test_signature_validator_verifies_base64_json_header_and_der_key() -> None: + private_key = ec.generate_private_key(ec.SECP256R1()) + public_key = private_key.public_key() + public_key_der = public_key.public_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + body = b'{"notificationId":"abc-123"}' + signature = private_key.sign(body, ec.ECDSA(hashes.SHA256())) + header = base64.b64encode( + json.dumps( + { + "alg": "ECDSA", + "kid": "public-key-1", + "signature": base64.b64encode(signature).decode("ascii"), + "digest": "SHA256", + } + ).encode("utf-8") + ).decode("ascii") + + resolver = WebhookPublicKeyResolver( + lambda key_id: PublicKey( + key=base64.b64encode(public_key_der).decode("ascii"), + algorithm="ECDSA", + digest="SHA256", + ) + ) + validator = WebhookSignatureValidator(resolver) + + assert validator.validate(header_value=header, body=body) is True \ No newline at end of file