Add media content type inference and path-based upload methods for images and documents

This commit is contained in:
claudi 2026-04-07 10:34:43 +02:00
parent 937cd86c8b
commit 6d54c5900c
4 changed files with 202 additions and 16 deletions

View file

@ -14,11 +14,15 @@ Currently wired API domains include Notification, Inventory, Fulfillment, Accoun
The Media wrapper includes workflow helpers on top of the raw endpoints:
- `extract_resource_id()` to pull a media resource ID from a `Location` header
- `guess_media_content_type()` to infer a content type from a file name when possible
- `wait_for_video()` to poll until a video reaches `LIVE` or a terminal failure state
- `wait_for_document()` to poll until a document reaches `ACCEPTED` or a terminal failure state
- `create_upload_and_wait_video()` to stage, upload, and poll a video in one call
- `create_upload_and_wait_video_from_path()` to do the same directly from a local file path
- `create_upload_and_wait_document()` to stage, upload, and poll a document in one call
- `create_upload_and_wait_document_from_path()` to do the same directly from a local file path
- `create_document_from_url_and_wait()` to create a document from a URL and poll until it is accepted
- `create_image_from_path()` and `upload_document_from_path()` for path-based local file uploads
A concrete workflow example is available in `examples/media_workflows.py` for:

View file

@ -1,5 +1,7 @@
from __future__ import annotations
import mimetypes
from pathlib import Path
from time import monotonic, sleep
from urllib.parse import urlparse
@ -37,6 +39,11 @@ def extract_resource_id(location: str | None) -> str | None:
return resource_id or None
def guess_media_content_type(file_name: str, *, default: str = "application/octet-stream") -> str:
guessed_type, _ = mimetypes.guess_type(file_name)
return guessed_type or default
class MediaClient:
def __init__(self, transport: ApiTransport) -> None:
self.transport = transport
@ -56,6 +63,14 @@ class MediaClient:
files={"image": (file_name, content, content_type)},
)
def create_image_from_path(self, image_path: str | Path) -> ImageResponse:
path = Path(image_path)
return self.create_image_from_file(
file_name=path.name,
content=path.read_bytes(),
content_type=guess_media_content_type(path.name),
)
def create_image_from_url(self, payload: CreateImageFromUrlRequest) -> ImageResponse:
return self.transport.request_model(
ImageResponse,
@ -109,6 +124,31 @@ class MediaClient:
poll_interval_seconds=poll_interval_seconds,
)
def create_upload_and_wait_video_from_path(
self,
video_path: str | Path,
*,
title: str | None = None,
classification: list[str] | None = None,
description: str | None = None,
timeout_seconds: float = 30.0,
poll_interval_seconds: float = 1.0,
) -> Video:
path = Path(video_path)
content = path.read_bytes()
payload = CreateVideoRequest(
title=title or path.stem,
size=len(content),
classification=classification or ["ITEM"],
description=description,
)
return self.create_upload_and_wait_video(
payload,
content=content,
timeout_seconds=timeout_seconds,
poll_interval_seconds=poll_interval_seconds,
)
def get_video(self, video_id: str) -> Video:
return self.transport.request_model(
Video,
@ -222,6 +262,33 @@ class MediaClient:
files={"file": (file_name, content, content_type)},
)
def upload_document_from_path(self, document_id: str, document_path: str | Path) -> DocumentResponse:
path = Path(document_path)
return self.upload_document(
document_id,
file_name=path.name,
content=path.read_bytes(),
content_type=guess_media_content_type(path.name),
)
def create_upload_and_wait_document_from_path(
self,
payload: CreateDocumentRequest,
document_path: str | Path,
*,
timeout_seconds: float = 30.0,
poll_interval_seconds: float = 1.0,
) -> DocumentResponse:
path = Path(document_path)
return self.create_upload_and_wait_document(
payload,
file_name=path.name,
content=path.read_bytes(),
content_type=guess_media_content_type(path.name),
timeout_seconds=timeout_seconds,
poll_interval_seconds=poll_interval_seconds,
)
def wait_for_video(
self,
video_id: str,

View file

@ -22,11 +22,7 @@ def build_client() -> EbayClient:
def upload_image_from_file(client: EbayClient, image_path: Path) -> None:
image = client.media.create_image_from_file(
file_name=image_path.name,
content=image_path.read_bytes(),
content_type="image/jpeg",
)
image = client.media.create_image_from_path(image_path)
print("image_url:", image.imageUrl)
@ -49,15 +45,22 @@ def upload_document_and_wait(client: EbayClient, document_path: Path) -> None:
print("document_final_status:", accepted.documentStatus)
def upload_video_and_wait(client: EbayClient, video_path: Path) -> None:
live_video = client.media.create_upload_and_wait_video(
CreateVideoRequest(
title=video_path.stem,
size=video_path.stat().st_size,
classification=["ITEM"],
description="Example upload from the ebay-rest-client workspace.",
def upload_document_and_wait_from_path(client: EbayClient, document_path: Path) -> None:
accepted = client.media.create_upload_and_wait_document_from_path(
CreateDocumentRequest(
documentType="USER_GUIDE_OR_MANUAL",
languages=["en-US"],
),
content=video_path.read_bytes(),
document_path,
timeout_seconds=60.0,
)
print("document_final_status:", accepted.documentStatus)
def upload_video_and_wait(client: EbayClient, video_path: Path) -> None:
live_video = client.media.create_upload_and_wait_video_from_path(
video_path,
description="Example upload from the ebay-rest-client workspace.",
timeout_seconds=120.0,
)
print("video_status:", live_video.status)
@ -77,7 +80,7 @@ def main() -> None:
if image_url:
upload_image_from_url(client, image_url)
if document_file:
upload_document_and_wait(client, Path(document_file))
upload_document_and_wait_from_path(client, Path(document_file))
if video_file:
upload_video_and_wait(client, Path(video_file))

View file

@ -36,7 +36,7 @@ from ebay_client.generated.notification.models import (
UpdateSubscriptionRequest,
)
from ebay_client.inventory.client import InventoryClient
from ebay_client.media.client import CreatedMediaResource, MediaClient, extract_resource_id
from ebay_client.media.client import CreatedMediaResource, MediaClient, extract_resource_id, guess_media_content_type
from ebay_client.notification.client import NotificationClient
@ -720,4 +720,116 @@ def test_media_convenience_methods_raise_when_required_ids_are_missing(monkeypat
except RuntimeError:
pass
else:
raise AssertionError("Expected convenience method to raise when eBay omits the required identifier")
raise AssertionError("Expected convenience method to raise when eBay omits the required identifier")
def test_guess_media_content_type_uses_filename_extension() -> None:
assert guess_media_content_type("photo.jpg") == "image/jpeg"
assert guess_media_content_type("guide.pdf") == "application/pdf"
assert guess_media_content_type("unknown.custom") == "application/octet-stream"
def test_media_create_image_from_path_reads_file_and_infers_content_type(tmp_path, monkeypatch) -> None:
client = MediaClient(build_transport())
image_path = tmp_path / "photo.png"
image_path.write_bytes(b"png-data")
captured: dict[str, object] = {}
monkeypatch.setattr(
client,
"create_image_from_file",
lambda **kwargs: captured.update(kwargs) or ImageResponse(imageUrl="https://example.test/image"),
)
result = client.create_image_from_path(image_path)
assert result.imageUrl == "https://example.test/image"
assert captured == {
"file_name": "photo.png",
"content": b"png-data",
"content_type": "image/png",
}
def test_media_upload_document_from_path_reads_file_and_infers_content_type(tmp_path, monkeypatch) -> None:
client = MediaClient(build_transport())
document_path = tmp_path / "guide.pdf"
document_path.write_bytes(b"%PDF-1.7")
captured: dict[str, object] = {}
monkeypatch.setattr(
client,
"upload_document",
lambda document_id, **kwargs: captured.update({"document_id": document_id, **kwargs})
or DocumentResponse(documentId=document_id, documentStatus="SUBMITTED"),
)
result = client.upload_document_from_path("DOC-42", document_path)
assert result.documentId == "DOC-42"
assert captured == {
"document_id": "DOC-42",
"file_name": "guide.pdf",
"content": b"%PDF-1.7",
"content_type": "application/pdf",
}
def test_media_create_upload_and_wait_document_from_path_reads_file_and_delegates(tmp_path, monkeypatch) -> None:
client = MediaClient(build_transport())
document_path = tmp_path / "guide.pdf"
document_path.write_bytes(b"%PDF-1.7")
captured: dict[str, object] = {}
monkeypatch.setattr(
client,
"create_upload_and_wait_document",
lambda payload, **kwargs: captured.update({"payload": payload, **kwargs})
or DocumentResponse(documentId="DOC-77", documentStatus="ACCEPTED"),
)
result = client.create_upload_and_wait_document_from_path(
CreateDocumentRequest(documentType="USER_GUIDE_OR_MANUAL", languages=["en-US"]),
document_path,
poll_interval_seconds=0.0,
)
assert result.documentStatus == "ACCEPTED"
assert captured["file_name"] == "guide.pdf"
assert captured["content"] == b"%PDF-1.7"
assert captured["content_type"] == "application/pdf"
assert captured["poll_interval_seconds"] == 0.0
def test_media_create_upload_and_wait_video_from_path_builds_payload_and_delegates(tmp_path, monkeypatch) -> None:
client = MediaClient(build_transport())
video_path = tmp_path / "demo.mp4"
video_path.write_bytes(b"video-data")
captured: dict[str, object] = {}
monkeypatch.setattr(
client,
"create_upload_and_wait_video",
lambda payload, **kwargs: captured.update({"payload": payload, **kwargs})
or Video(videoId="VIDEO-88", status="LIVE"),
)
result = client.create_upload_and_wait_video_from_path(
video_path,
description="demo video",
poll_interval_seconds=0.0,
)
assert result.videoId == "VIDEO-88"
payload = captured["payload"]
assert isinstance(payload, CreateVideoRequest)
assert payload.title == "demo"
assert payload.size == len(b"video-data")
assert payload.classification == ["ITEM"]
assert payload.description == "demo video"
assert captured["content"] == b"video-data"
assert captured["poll_interval_seconds"] == 0.0