Add JSON parsing and base64 decoding to WebhookSignatureParser; enhance tests for signature validation

This commit is contained in:
claudi 2026-04-07 09:41:39 +02:00
parent bebf99d826
commit 10008a0edc
2 changed files with 127 additions and 12 deletions

View file

@ -2,6 +2,7 @@ from __future__ import annotations
import base64 import base64
import hashlib import hashlib
import json
from collections.abc import Callable from collections.abc import Callable
from dataclasses import dataclass from dataclasses import dataclass
from datetime import UTC, datetime, timedelta from datetime import UTC, datetime, timedelta
@ -10,6 +11,7 @@ from typing import Any
from cryptography.exceptions import InvalidSignature from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import load_der_public_key
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from ebay_client.generated.notification.models import PublicKey from ebay_client.generated.notification.models import PublicKey
@ -39,16 +41,10 @@ class WebhookSignatureParser:
DIGEST_ALIASES = ("digest",) DIGEST_ALIASES = ("digest",)
def parse(self, header_value: str) -> ParsedSignatureHeader: def parse(self, header_value: str) -> ParsedSignatureHeader:
separators = header_value.replace(";", ",").split(",") parts = self._parse_parts(header_value)
parts: dict[str, str] = {}
for item in separators:
if "=" not in item:
continue
key, value = item.split("=", 1)
parts[key.strip().lower()] = value.strip().strip('"')
signature_value = self._first_match(parts, self.SIGNATURE_ALIASES) signature_value = self._first_match(parts, self.SIGNATURE_ALIASES)
signature = base64.b64decode(signature_value) if signature_value else None signature = self._decode_base64(signature_value) if signature_value else None
return ParsedSignatureHeader( return ParsedSignatureHeader(
key_id=self._first_match(parts, self.KEY_ALIASES), key_id=self._first_match(parts, self.KEY_ALIASES),
signature=signature, signature=signature,
@ -57,6 +53,49 @@ class WebhookSignatureParser:
raw_parts=parts, raw_parts=parts,
) )
def _parse_parts(self, header_value: str) -> dict[str, str]:
decoded = self._decode_base64_to_text(header_value)
if decoded:
try:
payload = json.loads(decoded)
except json.JSONDecodeError:
payload = None
if isinstance(payload, dict):
return {str(key).strip().lower(): str(value).strip() for key, value in payload.items() if value is not None}
separators = header_value.replace(";", ",").split(",")
parts: dict[str, str] = {}
for item in separators:
if "=" not in item:
continue
key, value = item.split("=", 1)
parts[key.strip().lower()] = value.strip().strip('"')
return parts
@staticmethod
def _decode_base64_to_text(value: str) -> str | None:
decoded = WebhookSignatureParser._decode_base64(value)
if decoded is None:
return None
try:
return decoded.decode("utf-8")
except UnicodeDecodeError:
return None
@staticmethod
def _decode_base64(value: str) -> bytes | None:
normalized = value.strip()
if not normalized:
return None
padding = (-len(normalized)) % 4
normalized = normalized + ("=" * padding)
for decoder in (base64.b64decode, base64.urlsafe_b64decode):
try:
return decoder(normalized)
except Exception:
continue
return None
@staticmethod @staticmethod
def _first_match(parts: dict[str, str], aliases: tuple[str, ...]) -> str | None: def _first_match(parts: dict[str, str], aliases: tuple[str, ...]) -> str | None:
for alias in aliases: for alias in aliases:
@ -107,10 +146,12 @@ class WebhookSignatureValidator:
if not parsed.key_id or not parsed.signature: if not parsed.key_id or not parsed.signature:
return False return False
key_payload = self.resolver.resolve(parsed.key_id) key_payload = self.resolver.resolve(parsed.key_id)
pem = key_payload.key key_value = key_payload.key
if not pem: if not key_value:
return False
public_key = self._load_public_key(key_value)
if public_key is None:
return False return False
public_key = serialization.load_pem_public_key(pem.encode("utf-8"))
digest_name = (parsed.digest or key_payload.digest or "SHA256").upper() digest_name = (parsed.digest or key_payload.digest or "SHA256").upper()
digest = self._hash_algorithm(digest_name) digest = self._hash_algorithm(digest_name)
message = message_builder(body) if message_builder else body message = message_builder(body) if message_builder else body
@ -130,3 +171,16 @@ class WebhookSignatureValidator:
if normalized == "SHA512": if normalized == "SHA512":
return hashes.SHA512() return hashes.SHA512()
return hashes.SHA256() return hashes.SHA256()
@staticmethod
def _load_public_key(key_value: str):
normalized = key_value.strip()
if not normalized:
return None
if "BEGIN PUBLIC KEY" in normalized:
return serialization.load_pem_public_key(normalized.encode("utf-8"))
decoded = WebhookSignatureParser._decode_base64(normalized)
if decoded is None:
return None
return load_der_public_key(decoded)

View file

@ -1,6 +1,14 @@
from __future__ import annotations from __future__ import annotations
import base64
import json
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from ebay_client.generated.notification.models import PublicKey
from ebay_client.notification.webhook import WebhookChallengeHandler, WebhookSignatureParser from ebay_client.notification.webhook import WebhookChallengeHandler, WebhookSignatureParser
from ebay_client.notification.webhook import WebhookPublicKeyResolver, WebhookSignatureValidator
def test_challenge_handler_builds_sha256_response() -> None: def test_challenge_handler_builds_sha256_response() -> None:
@ -21,4 +29,57 @@ def test_signature_parser_extracts_known_fields() -> None:
assert parsed.key_id == "public-key-1" assert parsed.key_id == "public-key-1"
assert parsed.algorithm == "ECDSA" assert parsed.algorithm == "ECDSA"
assert parsed.digest == "SHA256" assert parsed.digest == "SHA256"
assert parsed.signature == b"foo" assert parsed.signature == b"foo"
def test_signature_parser_decodes_base64_json_header() -> None:
parser = WebhookSignatureParser()
header = base64.b64encode(
json.dumps(
{
"alg": "ECDSA",
"kid": "public-key-1",
"signature": base64.b64encode(b"signed-bytes").decode("ascii"),
"digest": "SHA256",
}
).encode("utf-8")
).decode("ascii")
parsed = parser.parse(header)
assert parsed.key_id == "public-key-1"
assert parsed.algorithm == "ECDSA"
assert parsed.digest == "SHA256"
assert parsed.signature == b"signed-bytes"
def test_signature_validator_verifies_base64_json_header_and_der_key() -> None:
private_key = ec.generate_private_key(ec.SECP256R1())
public_key = private_key.public_key()
public_key_der = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
body = b'{"notificationId":"abc-123"}'
signature = private_key.sign(body, ec.ECDSA(hashes.SHA256()))
header = base64.b64encode(
json.dumps(
{
"alg": "ECDSA",
"kid": "public-key-1",
"signature": base64.b64encode(signature).decode("ascii"),
"digest": "SHA256",
}
).encode("utf-8")
).decode("ascii")
resolver = WebhookPublicKeyResolver(
lambda key_id: PublicKey(
key=base64.b64encode(public_key_der).decode("ascii"),
algorithm="ECDSA",
digest="SHA256",
)
)
validator = WebhookSignatureValidator(resolver)
assert validator.validate(header_value=header, body=body) is True