Add media workflow helpers and resource ID extraction; enhance tests for video and document status polling

This commit is contained in:
claudi 2026-04-07 10:21:47 +02:00
parent 1307d5691a
commit ee539e8189
4 changed files with 138 additions and 6 deletions

View file

@ -9,6 +9,14 @@ This workspace contains a Python-first eBay REST client foundation with:
Currently wired API domains include Notification, Inventory, Fulfillment, Account, Feed, and Media.
## Media Helpers
The Media wrapper includes workflow helpers on top of the raw endpoints:
- `extract_resource_id()` to pull a media resource ID from a `Location` header
- `wait_for_video()` to poll until a video reaches `LIVE` or a terminal failure state
- `wait_for_document()` to poll until a document reaches `ACCEPTED` or a terminal failure state
## Generate Low-Level Clients
The project uses a dedicated code generation environment because the main runtime is currently on Python 3.14 while the model generator still targets earlier Python versions.

View file

@ -1,3 +1,3 @@
from ebay_client.media.client import CreatedMediaResource, MediaClient
from ebay_client.media.client import CreatedMediaResource, MediaClient, extract_resource_id
__all__ = ["CreatedMediaResource", "MediaClient"]
__all__ = ["CreatedMediaResource", "MediaClient", "extract_resource_id"]

View file

@ -1,5 +1,8 @@
from __future__ import annotations
from time import monotonic, sleep
from urllib.parse import urlparse
from pydantic import BaseModel
from ebay_client.core.http.transport import ApiTransport
@ -19,6 +22,19 @@ MEDIA_SCOPE = "https://api.ebay.com/oauth/api_scope/sell.inventory"
class CreatedMediaResource(BaseModel):
location: str | None = None
resource_id: str | None = None
def extract_resource_id(location: str | None) -> str | None:
if not location:
return None
path = urlparse(location).path.rstrip("/")
if not path:
return None
_, _, resource_id = path.rpartition("/")
return resource_id or None
class MediaClient:
@ -66,7 +82,8 @@ class MediaClient:
headers={"Content-Type": "application/json"},
json_body=payload.model_dump(by_alias=True, exclude_none=True),
)
return CreatedMediaResource(location=response.headers.get("Location"))
location = response.headers.get("Location")
return CreatedMediaResource(location=location, resource_id=extract_resource_id(location))
def get_video(self, video_id: str) -> Video:
return self.transport.request_model(
@ -140,4 +157,69 @@ class MediaClient:
f"/commerce/media/v1_beta/document/{document_id}/upload",
scopes=[MEDIA_SCOPE],
files={"file": (file_name, content, content_type)},
)
)
def wait_for_video(
self,
video_id: str,
*,
success_statuses: set[str] | None = None,
failure_statuses: set[str] | None = None,
timeout_seconds: float = 30.0,
poll_interval_seconds: float = 1.0,
) -> Video:
desired_statuses = success_statuses or {"LIVE"}
terminal_failures = failure_statuses or {"BLOCKED", "PROCESSING_FAILED"}
return self._wait_for_media_state(
fetch=lambda: self.get_video(video_id),
get_status=lambda payload: payload.status,
desired_statuses=desired_statuses,
terminal_failures=terminal_failures,
timeout_seconds=timeout_seconds,
poll_interval_seconds=poll_interval_seconds,
resource_label=f"video {video_id}",
)
def wait_for_document(
self,
document_id: str,
*,
success_statuses: set[str] | None = None,
failure_statuses: set[str] | None = None,
timeout_seconds: float = 30.0,
poll_interval_seconds: float = 1.0,
) -> DocumentResponse:
desired_statuses = success_statuses or {"ACCEPTED"}
terminal_failures = failure_statuses or {"REJECTED"}
return self._wait_for_media_state(
fetch=lambda: self.get_document(document_id),
get_status=lambda payload: payload.documentStatus,
desired_statuses=desired_statuses,
terminal_failures=terminal_failures,
timeout_seconds=timeout_seconds,
poll_interval_seconds=poll_interval_seconds,
resource_label=f"document {document_id}",
)
def _wait_for_media_state(
self,
*,
fetch,
get_status,
desired_statuses: set[str],
terminal_failures: set[str],
timeout_seconds: float,
poll_interval_seconds: float,
resource_label: str,
):
deadline = monotonic() + timeout_seconds
while True:
payload = fetch()
status = get_status(payload)
if status in desired_statuses:
return payload
if status in terminal_failures:
raise ValueError(f"{resource_label} reached terminal failure status: {status}")
if monotonic() >= deadline:
raise TimeoutError(f"Timed out while waiting for {resource_label}; last status was {status!r}")
sleep(poll_interval_seconds)

View file

@ -35,7 +35,7 @@ from ebay_client.generated.notification.models import (
UpdateSubscriptionRequest,
)
from ebay_client.inventory.client import InventoryClient
from ebay_client.media.client import CreatedMediaResource, MediaClient
from ebay_client.media.client import CreatedMediaResource, MediaClient, extract_resource_id
from ebay_client.notification.client import NotificationClient
@ -430,6 +430,13 @@ def test_media_wrapper_returns_created_resource_location_for_video(httpx_mock: H
assert isinstance(result, CreatedMediaResource)
assert result.location == "https://api.ebay.com/commerce/media/v1_beta/video/VIDEO-1"
assert result.resource_id == "VIDEO-1"
def test_extract_media_resource_id_handles_location_header() -> None:
assert extract_resource_id("https://api.ebay.com/commerce/media/v1_beta/video/VIDEO-1") == "VIDEO-1"
assert extract_resource_id("https://api.ebay.com/commerce/media/v1_beta/document/DOC-1/") == "DOC-1"
assert extract_resource_id(None) is None
def test_media_wrapper_returns_video_model(httpx_mock: HTTPXMock) -> None:
@ -520,4 +527,39 @@ def test_media_wrapper_returns_document_models(httpx_mock: HTTPXMock) -> None:
upload_request = httpx_mock.get_requests()[3]
assert upload_request.headers["Content-Type"].startswith("multipart/form-data;")
assert b"filename=\"guide.pdf\"" in upload_request.content
assert b"%PDF-1.7" in upload_request.content
assert b"%PDF-1.7" in upload_request.content
def test_media_wait_for_video_returns_live_payload(monkeypatch) -> None:
client = MediaClient(build_transport())
states = iter(
[
Video(videoId="VIDEO-1", status="PENDING_UPLOAD"),
Video(videoId="VIDEO-1", status="PROCESSING"),
Video(videoId="VIDEO-1", status="LIVE"),
]
)
monkeypatch.setattr(client, "get_video", lambda _video_id: next(states))
monkeypatch.setattr("ebay_client.media.client.sleep", lambda _seconds: None)
result = client.wait_for_video("VIDEO-1", poll_interval_seconds=0.0)
assert result.status == "LIVE"
def test_media_wait_for_document_raises_on_terminal_failure(monkeypatch) -> None:
client = MediaClient(build_transport())
monkeypatch.setattr(
client,
"get_document",
lambda _document_id: DocumentResponse(documentId="DOC-1", documentStatus="REJECTED"),
)
try:
client.wait_for_document("DOC-1", poll_interval_seconds=0.0)
except ValueError as exc:
assert "REJECTED" in str(exc)
else:
raise AssertionError("Expected wait_for_document to raise on terminal failure status")