ebay_client/ebay_client/core/auth/oauth.py
claudi 389d72a136 Add tests for OAuth client and webhook notification handling
- Implement tests for `EbayOAuthClient` to verify authorization URL generation with configured scopes and token reuse logic.
- Add tests for `WebhookChallengeHandler` to ensure correct SHA256 response generation and for `WebhookSignatureParser` to validate extraction of known fields from signature strings.
2026-04-07 08:40:50 +02:00

118 lines
4.5 KiB
Python

from __future__ import annotations
import base64
from typing import Iterable
from urllib.parse import urlencode
import httpx
from ebay_client.core.auth.models import EbayOAuthConfig, OAuthToken
from ebay_client.core.auth.store import InMemoryTokenStore, TokenStore
from ebay_client.core.errors import OAuthError
class EbayOAuthClient:
def __init__(
self,
config: EbayOAuthConfig,
*,
token_store: TokenStore | None = None,
timeout_seconds: float = 30.0,
) -> None:
self.config = config
self.token_store = token_store or InMemoryTokenStore()
self.timeout_seconds = timeout_seconds
def build_authorization_url(
self,
*,
scopes: Iterable[str] | None = None,
state: str | None = None,
prompt: str | None = None,
) -> str:
if not self.config.redirect_uri:
raise OAuthError("redirect_uri is required for authorization_code flow")
query = {
"client_id": self.config.client_id,
"redirect_uri": self.config.redirect_uri,
"response_type": "code",
"scope": " ".join(scopes or self.config.default_scopes),
}
if state:
query["state"] = state
if prompt:
query["prompt"] = prompt
return f"{self.config.auth_base_url}?{urlencode(query)}"
def get_valid_token(self, *, scopes: Iterable[str] | None = None) -> OAuthToken:
token = self.token_store.get_token()
if token is None or token.is_expired() or not self._has_required_scopes(token, scopes):
token = self.fetch_client_credentials_token(scopes=scopes)
return token
def fetch_client_credentials_token(self, *, scopes: Iterable[str] | None = None) -> OAuthToken:
requested_scopes = list(scopes or self.config.default_scopes)
payload = {
"grant_type": "client_credentials",
"scope": " ".join(requested_scopes),
}
token = self._request_token(payload)
self.token_store.set_token(token)
return token
def exchange_code(self, code: str, *, scopes: Iterable[str] | None = None) -> OAuthToken:
if not self.config.redirect_uri:
raise OAuthError("redirect_uri is required for authorization_code flow")
payload = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": self.config.redirect_uri,
}
requested_scopes = list(scopes or self.config.default_scopes)
if requested_scopes:
payload["scope"] = " ".join(requested_scopes)
token = self._request_token(payload)
self.token_store.set_token(token)
return token
def refresh_access_token(self, refresh_token: str, *, scopes: Iterable[str] | None = None) -> OAuthToken:
payload = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
}
requested_scopes = list(scopes or self.config.default_scopes)
if requested_scopes:
payload["scope"] = " ".join(requested_scopes)
token = self._request_token(payload)
self.token_store.set_token(token)
return token
def _request_token(self, form_data: dict[str, str]) -> OAuthToken:
headers = {
"Authorization": f"Basic {self._build_basic_token()}",
"Content-Type": "application/x-www-form-urlencoded",
}
try:
with httpx.Client(timeout=self.timeout_seconds) as client:
response = client.post(self.config.token_url, data=form_data, headers=headers)
response.raise_for_status()
except httpx.HTTPStatusError as exc:
raise OAuthError(f"OAuth token request failed with HTTP {exc.response.status_code}") from exc
except httpx.HTTPError as exc:
raise OAuthError("OAuth token request failed") from exc
try:
return OAuthToken.model_validate(response.json())
except Exception as exc:
raise OAuthError("OAuth token response could not be parsed") from exc
def _build_basic_token(self) -> str:
raw = f"{self.config.client_id}:{self.config.client_secret}".encode("utf-8")
return base64.b64encode(raw).decode("ascii")
@staticmethod
def _has_required_scopes(token: OAuthToken, scopes: Iterable[str] | None) -> bool:
requested = {scope for scope in (scopes or []) if scope}
if not requested:
return True
return requested.issubset(token.scopes())