Compare commits

..

4 commits

7 changed files with 877 additions and 38 deletions

View file

@ -13,12 +13,17 @@ Currently wired API domains include Notification, Inventory, Fulfillment, Accoun
The Media wrapper includes workflow helpers on top of the raw endpoints:
- `VideoWorkflowResult` and `DocumentWorkflowResult` to return structured data from the higher-level workflows
- `extract_resource_id()` to pull a media resource ID from a `Location` header
- `guess_media_content_type()` to infer a content type from a file name when possible
- `wait_for_video()` to poll until a video reaches `LIVE` or a terminal failure state
- `wait_for_document()` to poll until a document reaches `ACCEPTED` or a terminal failure state
- `create_upload_and_wait_video()` to stage, upload, and poll a video in one call
- `create_upload_and_wait_video_from_path()` to do the same directly from a local file path
- `create_upload_and_wait_document()` to stage, upload, and poll a document in one call
- `create_upload_and_wait_document_from_path()` to do the same directly from a local file path
- `create_document_from_url_and_wait()` to create a document from a URL and poll until it is accepted
- `create_image_from_path()` and `upload_document_from_path()` for path-based local file uploads
A concrete workflow example is available in `examples/media_workflows.py` for:

View file

@ -2,10 +2,19 @@ from __future__ import annotations
from ebay_client.core.http.transport import ApiTransport
from ebay_client.generated.account.models import (
FulfillmentPolicy,
FulfillmentPolicyRequest,
FulfillmentPolicyResponse,
PaymentPolicy,
PaymentPolicyRequest,
PaymentPolicyResponse,
Programs,
ReturnPolicy,
ReturnPolicyRequest,
ReturnPolicyResponse,
SetFulfillmentPolicyResponse,
SetPaymentPolicyResponse,
SetReturnPolicyResponse,
SellingPrivileges,
)
@ -27,6 +36,54 @@ class AccountClient:
params={"marketplace_id": marketplace_id},
)
def create_fulfillment_policy(self, payload: FulfillmentPolicyRequest) -> SetFulfillmentPolicyResponse:
return self.transport.request_model(
SetFulfillmentPolicyResponse,
"POST",
"/sell/account/v1/fulfillment_policy/",
scopes=[ACCOUNT_SCOPE],
headers={"Content-Type": "application/json"},
json_body=payload.model_dump(by_alias=True, exclude_none=True),
)
def get_fulfillment_policy(self, fulfillment_policy_id: str) -> FulfillmentPolicy:
return self.transport.request_model(
FulfillmentPolicy,
"GET",
f"/sell/account/v1/fulfillment_policy/{fulfillment_policy_id}",
scope_options=ACCOUNT_READ_SCOPE_OPTIONS,
)
def update_fulfillment_policy(
self,
fulfillment_policy_id: str,
payload: FulfillmentPolicyRequest,
) -> SetFulfillmentPolicyResponse:
return self.transport.request_model(
SetFulfillmentPolicyResponse,
"PUT",
f"/sell/account/v1/fulfillment_policy/{fulfillment_policy_id}",
scopes=[ACCOUNT_SCOPE],
headers={"Content-Type": "application/json"},
json_body=payload.model_dump(by_alias=True, exclude_none=True),
)
def delete_fulfillment_policy(self, fulfillment_policy_id: str) -> None:
self.transport.request_json(
"DELETE",
f"/sell/account/v1/fulfillment_policy/{fulfillment_policy_id}",
scopes=[ACCOUNT_SCOPE],
)
def get_fulfillment_policy_by_name(self, *, marketplace_id: str, name: str) -> FulfillmentPolicy:
return self.transport.request_model(
FulfillmentPolicy,
"GET",
"/sell/account/v1/fulfillment_policy/get_by_policy_name",
scope_options=ACCOUNT_READ_SCOPE_OPTIONS,
params={"marketplace_id": marketplace_id, "name": name},
)
def get_payment_policies(self, *, marketplace_id: str) -> PaymentPolicyResponse:
return self.transport.request_model(
PaymentPolicyResponse,
@ -36,6 +93,50 @@ class AccountClient:
params={"marketplace_id": marketplace_id},
)
def create_payment_policy(self, payload: PaymentPolicyRequest) -> SetPaymentPolicyResponse:
return self.transport.request_model(
SetPaymentPolicyResponse,
"POST",
"/sell/account/v1/payment_policy",
scopes=[ACCOUNT_SCOPE],
headers={"Content-Type": "application/json"},
json_body=payload.model_dump(by_alias=True, exclude_none=True),
)
def get_payment_policy(self, payment_policy_id: str) -> PaymentPolicy:
return self.transport.request_model(
PaymentPolicy,
"GET",
f"/sell/account/v1/payment_policy/{payment_policy_id}",
scope_options=ACCOUNT_READ_SCOPE_OPTIONS,
)
def update_payment_policy(self, payment_policy_id: str, payload: PaymentPolicyRequest) -> SetPaymentPolicyResponse:
return self.transport.request_model(
SetPaymentPolicyResponse,
"PUT",
f"/sell/account/v1/payment_policy/{payment_policy_id}",
scopes=[ACCOUNT_SCOPE],
headers={"Content-Type": "application/json"},
json_body=payload.model_dump(by_alias=True, exclude_none=True),
)
def delete_payment_policy(self, payment_policy_id: str) -> None:
self.transport.request_json(
"DELETE",
f"/sell/account/v1/payment_policy/{payment_policy_id}",
scopes=[ACCOUNT_SCOPE],
)
def get_payment_policy_by_name(self, *, marketplace_id: str, name: str) -> PaymentPolicy:
return self.transport.request_model(
PaymentPolicy,
"GET",
"/sell/account/v1/payment_policy/get_by_policy_name",
scope_options=ACCOUNT_READ_SCOPE_OPTIONS,
params={"marketplace_id": marketplace_id, "name": name},
)
def get_return_policies(self, *, marketplace_id: str) -> ReturnPolicyResponse:
return self.transport.request_model(
ReturnPolicyResponse,
@ -45,6 +146,50 @@ class AccountClient:
params={"marketplace_id": marketplace_id},
)
def create_return_policy(self, payload: ReturnPolicyRequest) -> SetReturnPolicyResponse:
return self.transport.request_model(
SetReturnPolicyResponse,
"POST",
"/sell/account/v1/return_policy",
scopes=[ACCOUNT_SCOPE],
headers={"Content-Type": "application/json"},
json_body=payload.model_dump(by_alias=True, exclude_none=True),
)
def get_return_policy(self, return_policy_id: str) -> ReturnPolicy:
return self.transport.request_model(
ReturnPolicy,
"GET",
f"/sell/account/v1/return_policy/{return_policy_id}",
scope_options=ACCOUNT_READ_SCOPE_OPTIONS,
)
def update_return_policy(self, return_policy_id: str, payload: ReturnPolicyRequest) -> SetReturnPolicyResponse:
return self.transport.request_model(
SetReturnPolicyResponse,
"PUT",
f"/sell/account/v1/return_policy/{return_policy_id}",
scopes=[ACCOUNT_SCOPE],
headers={"Content-Type": "application/json"},
json_body=payload.model_dump(by_alias=True, exclude_none=True),
)
def delete_return_policy(self, return_policy_id: str) -> None:
self.transport.request_json(
"DELETE",
f"/sell/account/v1/return_policy/{return_policy_id}",
scopes=[ACCOUNT_SCOPE],
)
def get_return_policy_by_name(self, *, marketplace_id: str, name: str) -> ReturnPolicy:
return self.transport.request_model(
ReturnPolicy,
"GET",
"/sell/account/v1/return_policy/get_by_policy_name",
scope_options=ACCOUNT_READ_SCOPE_OPTIONS,
params={"marketplace_id": marketplace_id, "name": name},
)
def get_privileges(self) -> SellingPrivileges:
return self.transport.request_model(
SellingPrivileges,

View file

@ -1,11 +1,20 @@
from __future__ import annotations
from urllib.parse import urlparse
from pydantic import BaseModel
from ebay_client.core.http.transport import ApiTransport
from ebay_client.generated.feed.models import (
CreateTaskRequest,
CreateUserScheduleRequest,
ScheduleTemplateResponse,
ScheduleTemplateCollection,
Task,
TaskCollection,
UserScheduleResponse,
UserScheduleCollection,
UpdateUserScheduleRequest,
)
FEED_INVENTORY_SCOPE = "https://api.ebay.com/oauth/api_scope/sell.inventory"
@ -22,19 +31,96 @@ FEED_READ_SCOPE_OPTIONS = [
]
class CreatedFeedResource(BaseModel):
location: str | None = None
resource_id: str | None = None
class FeedFileDownload(BaseModel):
content: bytes
content_disposition: str | None = None
file_name: str | None = None
def extract_feed_resource_id(location: str | None) -> str | None:
if not location:
return None
path = urlparse(location).path.rstrip("/")
if not path:
return None
_, _, resource_id = path.rpartition("/")
return resource_id or None
def _extract_file_name(content_disposition: str | None) -> str | None:
if not content_disposition:
return None
marker = "filename="
if marker not in content_disposition:
return None
value = content_disposition.split(marker, 1)[1].strip()
if value.startswith('"') and value.endswith('"'):
return value[1:-1]
return value
class FeedClient:
def __init__(self, transport: ApiTransport) -> None:
self.transport = transport
def get_tasks(self, *, feed_type: str | None = None) -> TaskCollection:
def get_tasks(
self,
*,
feed_type: str | None = None,
schedule_id: str | None = None,
date_range: str | None = None,
look_back_days: int | None = None,
limit: int | None = None,
offset: int | None = None,
) -> TaskCollection:
return self.transport.request_model(
TaskCollection,
"GET",
"/sell/feed/v1/task",
scope_options=FEED_READ_SCOPE_OPTIONS,
params={"feed_type": feed_type},
params={
"feed_type": feed_type,
"schedule_id": schedule_id,
"date_range": date_range,
"look_back_days": look_back_days,
"limit": limit,
"offset": offset,
},
)
def create_task(
self,
payload: CreateTaskRequest,
*,
marketplace_id: str,
accept_language: str | None = None,
) -> CreatedFeedResource:
headers = {
"Content-Type": "application/json",
"X-EBAY-C-MARKETPLACE-ID": marketplace_id,
}
if accept_language is not None:
headers["Accept-Language"] = accept_language
response = self.transport.request(
"POST",
"/sell/feed/v1/task",
scope_options=FEED_READ_SCOPE_OPTIONS,
headers=headers,
json_body=payload.model_dump(by_alias=True, exclude_none=True),
)
location = response.headers.get("Location")
return CreatedFeedResource(location=location, resource_id=extract_feed_resource_id(location))
def get_task(self, task_id: str) -> Task:
return self.transport.request_model(
Task,
@ -43,18 +129,108 @@ class FeedClient:
scope_options=FEED_READ_SCOPE_OPTIONS,
)
def get_schedule_templates(self) -> ScheduleTemplateCollection:
def get_input_file(self, task_id: str) -> FeedFileDownload:
return self._download_file(f"/sell/feed/v1/task/{task_id}/download_input_file")
def get_result_file(self, task_id: str) -> FeedFileDownload:
return self._download_file(f"/sell/feed/v1/task/{task_id}/download_result_file")
def upload_file(
self,
task_id: str,
*,
file_name: str,
content: bytes,
content_type: str = "application/octet-stream",
) -> None:
self.transport.request_json(
"POST",
f"/sell/feed/v1/task/{task_id}/upload_file",
scope_options=FEED_READ_SCOPE_OPTIONS,
files={"file": (file_name, content, content_type)},
)
def get_schedule_templates(
self,
*,
feed_type: str,
limit: int | None = None,
offset: int | None = None,
) -> ScheduleTemplateCollection:
return self.transport.request_model(
ScheduleTemplateCollection,
"GET",
"/sell/feed/v1/schedule_template",
scope_options=FEED_READ_SCOPE_OPTIONS,
params={"feed_type": feed_type, "limit": limit, "offset": offset},
)
def get_schedules(self) -> UserScheduleCollection:
def get_schedule_template(self, schedule_template_id: str) -> ScheduleTemplateResponse:
return self.transport.request_model(
ScheduleTemplateResponse,
"GET",
f"/sell/feed/v1/schedule_template/{schedule_template_id}",
scope_options=FEED_READ_SCOPE_OPTIONS,
)
def get_schedules(
self,
*,
feed_type: str,
limit: int | None = None,
offset: int | None = None,
) -> UserScheduleCollection:
return self.transport.request_model(
UserScheduleCollection,
"GET",
"/sell/feed/v1/schedule",
scope_options=FEED_READ_SCOPE_OPTIONS,
params={"feed_type": feed_type, "limit": limit, "offset": offset},
)
def create_schedule(self, payload: CreateUserScheduleRequest) -> CreatedFeedResource:
response = self.transport.request(
"POST",
"/sell/feed/v1/schedule",
scope_options=FEED_READ_SCOPE_OPTIONS,
headers={"Content-Type": "application/json"},
json_body=payload.model_dump(by_alias=True, exclude_none=True),
)
location = response.headers.get("Location")
return CreatedFeedResource(location=location, resource_id=extract_feed_resource_id(location))
def get_schedule(self, schedule_id: str) -> UserScheduleResponse:
return self.transport.request_model(
UserScheduleResponse,
"GET",
f"/sell/feed/v1/schedule/{schedule_id}",
scope_options=FEED_READ_SCOPE_OPTIONS,
)
def update_schedule(self, schedule_id: str, payload: UpdateUserScheduleRequest) -> None:
self.transport.request_json(
"PUT",
f"/sell/feed/v1/schedule/{schedule_id}",
scope_options=FEED_READ_SCOPE_OPTIONS,
headers={"Content-Type": "application/json"},
json_body=payload.model_dump(by_alias=True, exclude_none=True),
)
def delete_schedule(self, schedule_id: str) -> None:
self.transport.request_json(
"DELETE",
f"/sell/feed/v1/schedule/{schedule_id}",
scope_options=FEED_READ_SCOPE_OPTIONS,
)
def get_latest_result_file(self, schedule_id: str) -> FeedFileDownload:
return self._download_file(f"/sell/feed/v1/schedule/{schedule_id}/download_result_file")
def _download_file(self, path: str) -> FeedFileDownload:
response = self.transport.request("GET", path, scope_options=FEED_READ_SCOPE_OPTIONS)
content_disposition = response.headers.get("content-disposition")
return FeedFileDownload(
content=response.content,
content_disposition=content_disposition,
file_name=_extract_file_name(content_disposition),
)

View file

@ -1,3 +1,17 @@
from ebay_client.media.client import CreatedMediaResource, MediaClient, extract_resource_id
from ebay_client.media.client import (
CreatedMediaResource,
DocumentWorkflowResult,
MediaClient,
VideoWorkflowResult,
extract_resource_id,
guess_media_content_type,
)
__all__ = ["CreatedMediaResource", "MediaClient", "extract_resource_id"]
__all__ = [
"CreatedMediaResource",
"DocumentWorkflowResult",
"MediaClient",
"VideoWorkflowResult",
"extract_resource_id",
"guess_media_content_type",
]

View file

@ -1,5 +1,7 @@
from __future__ import annotations
import mimetypes
from pathlib import Path
from time import monotonic, sleep
from urllib.parse import urlparse
@ -25,6 +27,19 @@ class CreatedMediaResource(BaseModel):
resource_id: str | None = None
class VideoWorkflowResult(BaseModel):
created: CreatedMediaResource
video: Video
video_id: str | None = None
class DocumentWorkflowResult(BaseModel):
created: CreateDocumentResponse
uploaded: DocumentResponse | None = None
document: DocumentResponse
document_id: str | None = None
def extract_resource_id(location: str | None) -> str | None:
if not location:
return None
@ -37,6 +52,11 @@ def extract_resource_id(location: str | None) -> str | None:
return resource_id or None
def guess_media_content_type(file_name: str, *, default: str = "application/octet-stream") -> str:
guessed_type, _ = mimetypes.guess_type(file_name)
return guessed_type or default
class MediaClient:
def __init__(self, transport: ApiTransport) -> None:
self.transport = transport
@ -56,6 +76,14 @@ class MediaClient:
files={"image": (file_name, content, content_type)},
)
def create_image_from_path(self, image_path: str | Path) -> ImageResponse:
path = Path(image_path)
return self.create_image_from_file(
file_name=path.name,
content=path.read_bytes(),
content_type=guess_media_content_type(path.name),
)
def create_image_from_url(self, payload: CreateImageFromUrlRequest) -> ImageResponse:
return self.transport.request_model(
ImageResponse,
@ -94,7 +122,7 @@ class MediaClient:
content_range: str | None = None,
timeout_seconds: float = 30.0,
poll_interval_seconds: float = 1.0,
) -> Video:
) -> VideoWorkflowResult:
created = self.create_video(payload)
video_id = self._require_resource_id(created.resource_id, "video resource ID")
self.upload_video(
@ -103,11 +131,37 @@ class MediaClient:
content_length=content_length if content_length is not None else len(content),
content_range=content_range,
)
return self.wait_for_video(
video = self.wait_for_video(
video_id,
timeout_seconds=timeout_seconds,
poll_interval_seconds=poll_interval_seconds,
)
return VideoWorkflowResult(created=created, video=video, video_id=video_id)
def create_upload_and_wait_video_from_path(
self,
video_path: str | Path,
*,
title: str | None = None,
classification: list[str] | None = None,
description: str | None = None,
timeout_seconds: float = 30.0,
poll_interval_seconds: float = 1.0,
) -> VideoWorkflowResult:
path = Path(video_path)
content = path.read_bytes()
payload = CreateVideoRequest(
title=title or path.stem,
size=len(content),
classification=classification or ["ITEM"],
description=description,
)
return self.create_upload_and_wait_video(
payload,
content=content,
timeout_seconds=timeout_seconds,
poll_interval_seconds=poll_interval_seconds,
)
def get_video(self, video_id: str) -> Video:
return self.transport.request_model(
@ -158,20 +212,26 @@ class MediaClient:
content_type: str = "application/octet-stream",
timeout_seconds: float = 30.0,
poll_interval_seconds: float = 1.0,
) -> DocumentResponse:
) -> DocumentWorkflowResult:
created = self.create_document(payload)
document_id = self._require_resource_id(created.documentId, "documentId")
self.upload_document(
uploaded = self.upload_document(
document_id,
file_name=file_name,
content=content,
content_type=content_type,
)
return self.wait_for_document(
document = self.wait_for_document(
document_id,
timeout_seconds=timeout_seconds,
poll_interval_seconds=poll_interval_seconds,
)
return DocumentWorkflowResult(
created=created,
uploaded=uploaded,
document=document,
document_id=document_id,
)
def create_document_from_url(self, payload: CreateDocumentFromUrlRequest) -> CreateDocumentResponse:
return self.transport.request_model(
@ -189,14 +249,20 @@ class MediaClient:
*,
timeout_seconds: float = 30.0,
poll_interval_seconds: float = 1.0,
) -> DocumentResponse:
) -> DocumentWorkflowResult:
created = self.create_document_from_url(payload)
document_id = self._require_resource_id(created.documentId, "documentId")
return self.wait_for_document(
document = self.wait_for_document(
document_id,
timeout_seconds=timeout_seconds,
poll_interval_seconds=poll_interval_seconds,
)
return DocumentWorkflowResult(
created=created,
uploaded=None,
document=document,
document_id=document_id,
)
def get_document(self, document_id: str) -> DocumentResponse:
return self.transport.request_model(
@ -222,6 +288,33 @@ class MediaClient:
files={"file": (file_name, content, content_type)},
)
def upload_document_from_path(self, document_id: str, document_path: str | Path) -> DocumentResponse:
path = Path(document_path)
return self.upload_document(
document_id,
file_name=path.name,
content=path.read_bytes(),
content_type=guess_media_content_type(path.name),
)
def create_upload_and_wait_document_from_path(
self,
payload: CreateDocumentRequest,
document_path: str | Path,
*,
timeout_seconds: float = 30.0,
poll_interval_seconds: float = 1.0,
) -> DocumentWorkflowResult:
path = Path(document_path)
return self.create_upload_and_wait_document(
payload,
file_name=path.name,
content=path.read_bytes(),
content_type=guess_media_content_type(path.name),
timeout_seconds=timeout_seconds,
poll_interval_seconds=poll_interval_seconds,
)
def wait_for_video(
self,
video_id: str,

View file

@ -22,11 +22,7 @@ def build_client() -> EbayClient:
def upload_image_from_file(client: EbayClient, image_path: Path) -> None:
image = client.media.create_image_from_file(
file_name=image_path.name,
content=image_path.read_bytes(),
content_type="image/jpeg",
)
image = client.media.create_image_from_path(image_path)
print("image_url:", image.imageUrl)
@ -36,7 +32,7 @@ def upload_image_from_url(client: EbayClient, image_url: str) -> None:
def upload_document_and_wait(client: EbayClient, document_path: Path) -> None:
accepted = client.media.create_upload_and_wait_document(
result = client.media.create_upload_and_wait_document(
CreateDocumentRequest(
documentType="USER_GUIDE_OR_MANUAL",
languages=["en-US"],
@ -46,22 +42,31 @@ def upload_document_and_wait(client: EbayClient, document_path: Path) -> None:
content_type="application/pdf",
timeout_seconds=60.0,
)
print("document_final_status:", accepted.documentStatus)
print("document_id:", result.document_id)
print("document_final_status:", result.document.documentStatus)
def upload_document_and_wait_from_path(client: EbayClient, document_path: Path) -> None:
result = client.media.create_upload_and_wait_document_from_path(
CreateDocumentRequest(
documentType="USER_GUIDE_OR_MANUAL",
languages=["en-US"],
),
document_path,
timeout_seconds=60.0,
)
print("document_id:", result.document_id)
print("document_final_status:", result.document.documentStatus)
def upload_video_and_wait(client: EbayClient, video_path: Path) -> None:
live_video = client.media.create_upload_and_wait_video(
CreateVideoRequest(
title=video_path.stem,
size=video_path.stat().st_size,
classification=["ITEM"],
description="Example upload from the ebay-rest-client workspace.",
),
content=video_path.read_bytes(),
result = client.media.create_upload_and_wait_video_from_path(
video_path,
description="Example upload from the ebay-rest-client workspace.",
timeout_seconds=120.0,
)
print("video_status:", live_video.status)
print("video_id:", live_video.videoId)
print("video_id:", result.video_id)
print("video_status:", result.video.status)
def main() -> None:
@ -77,7 +82,7 @@ def main() -> None:
if image_url:
upload_image_from_url(client, image_url)
if document_file:
upload_document_and_wait(client, Path(document_file))
upload_document_and_wait_from_path(client, Path(document_file))
if video_file:
upload_video_and_wait(client, Path(video_file))

View file

@ -7,10 +7,25 @@ from pytest_httpx import HTTPXMock
from ebay_client.account.client import AccountClient
from ebay_client.core.auth.models import OAuthToken
from ebay_client.core.http.transport import ApiTransport
from ebay_client.feed.client import FeedClient
from ebay_client.feed.client import CreatedFeedResource, FeedClient, FeedFileDownload
from ebay_client.fulfillment.client import FulfillmentClient
from ebay_client.generated.account.models import Programs
from ebay_client.generated.account.models import (
FulfillmentPolicy,
FulfillmentPolicyRequest,
PaymentPolicy,
PaymentPolicyRequest,
Programs,
ReturnPolicy,
ReturnPolicyRequest,
)
from ebay_client.generated.feed.models import TaskCollection
from ebay_client.generated.feed.models import (
CreateTaskRequest,
CreateUserScheduleRequest,
ScheduleTemplateResponse,
UpdateUserScheduleRequest,
UserScheduleResponse,
)
from ebay_client.generated.fulfillment.models import Order
from ebay_client.generated.inventory.models import InventoryItemWithSkuLocaleGroupid
from ebay_client.generated.media.models import (
@ -36,7 +51,14 @@ from ebay_client.generated.notification.models import (
UpdateSubscriptionRequest,
)
from ebay_client.inventory.client import InventoryClient
from ebay_client.media.client import CreatedMediaResource, MediaClient, extract_resource_id
from ebay_client.media.client import (
CreatedMediaResource,
DocumentWorkflowResult,
MediaClient,
VideoWorkflowResult,
extract_resource_id,
guess_media_content_type,
)
from ebay_client.notification.client import NotificationClient
@ -283,6 +305,99 @@ def test_account_wrapper_returns_programs_model(httpx_mock: HTTPXMock) -> None:
assert result.programs and result.programs[0].programType == "OUT_OF_STOCK_CONTROL"
def test_account_wrapper_returns_policy_models_by_id_and_name(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(
method="GET",
url="https://api.ebay.com/sell/account/v1/fulfillment_policy/FULFILL-1",
json={"fulfillmentPolicyId": "FULFILL-1", "name": "Fast shipping"},
)
httpx_mock.add_response(
method="GET",
url="https://api.ebay.com/sell/account/v1/payment_policy/get_by_policy_name?marketplace_id=EBAY_US&name=Default%20payment",
json={"paymentPolicyId": "PAY-1", "name": "Default payment"},
)
httpx_mock.add_response(
method="GET",
url="https://api.ebay.com/sell/account/v1/return_policy/get_by_policy_name?marketplace_id=EBAY_US&name=30%20day%20returns",
json={"returnPolicyId": "RET-1", "name": "30 day returns"},
)
client = AccountClient(build_transport())
fulfillment = client.get_fulfillment_policy("FULFILL-1")
payment = client.get_payment_policy_by_name(marketplace_id="EBAY_US", name="Default payment")
returns = client.get_return_policy_by_name(marketplace_id="EBAY_US", name="30 day returns")
assert isinstance(fulfillment, FulfillmentPolicy)
assert fulfillment.fulfillmentPolicyId == "FULFILL-1"
assert isinstance(payment, PaymentPolicy)
assert payment.paymentPolicyId == "PAY-1"
assert isinstance(returns, ReturnPolicy)
assert returns.returnPolicyId == "RET-1"
def test_account_wrapper_serializes_policy_requests_and_delete(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(
method="POST",
url="https://api.ebay.com/sell/account/v1/payment_policy",
json={"paymentPolicyId": "PAY-NEW"},
status_code=201,
)
httpx_mock.add_response(
method="PUT",
url="https://api.ebay.com/sell/account/v1/fulfillment_policy/FULFILL-1",
json={"fulfillmentPolicyId": "FULFILL-1"},
status_code=200,
)
httpx_mock.add_response(
method="DELETE",
url="https://api.ebay.com/sell/account/v1/return_policy/RET-1",
status_code=204,
)
client = AccountClient(build_transport())
payment_payload = PaymentPolicyRequest(name="Default payment")
fulfillment_payload = FulfillmentPolicyRequest(name="Fast shipping")
created_payment = client.create_payment_policy(payment_payload)
updated_fulfillment = client.update_fulfillment_policy("FULFILL-1", fulfillment_payload)
client.delete_return_policy("RET-1")
assert created_payment.paymentPolicyId == "PAY-NEW"
assert updated_fulfillment.fulfillmentPolicyId == "FULFILL-1"
create_request = httpx_mock.get_requests()[0]
create_body = json.loads(create_request.content.decode("utf-8"))
assert create_body["name"] == "Default payment"
update_request = httpx_mock.get_requests()[1]
update_body = json.loads(update_request.content.decode("utf-8"))
assert update_body["name"] == "Fast shipping"
def test_account_wrapper_supports_return_policy_create_and_update(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(
method="POST",
url="https://api.ebay.com/sell/account/v1/return_policy",
json={"returnPolicyId": "RET-NEW"},
status_code=201,
)
httpx_mock.add_response(
method="PUT",
url="https://api.ebay.com/sell/account/v1/return_policy/RET-NEW",
json={"returnPolicyId": "RET-NEW"},
status_code=200,
)
client = AccountClient(build_transport())
payload = ReturnPolicyRequest(name="30 day returns")
created = client.create_return_policy(payload)
updated = client.update_return_policy("RET-NEW", payload)
assert created.returnPolicyId == "RET-NEW"
assert updated.returnPolicyId == "RET-NEW"
def test_feed_wrapper_returns_task_collection_model(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(
method="GET",
@ -298,6 +413,158 @@ def test_feed_wrapper_returns_task_collection_model(httpx_mock: HTTPXMock) -> No
assert result.tasks and result.tasks[0].taskId == "TASK-1"
def test_feed_wrapper_supports_schedule_crud_and_template_lookup(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(
method="POST",
url="https://api.ebay.com/sell/feed/v1/schedule",
status_code=201,
headers={"Location": "https://api.ebay.com/sell/feed/v1/schedule/SCHEDULE-1"},
)
httpx_mock.add_response(
method="GET",
url="https://api.ebay.com/sell/feed/v1/schedule/SCHEDULE-1",
json={"scheduleId": "SCHEDULE-1", "feedType": "LMS_ORDER_REPORT"},
)
httpx_mock.add_response(
method="PUT",
url="https://api.ebay.com/sell/feed/v1/schedule/SCHEDULE-1",
status_code=204,
)
httpx_mock.add_response(
method="DELETE",
url="https://api.ebay.com/sell/feed/v1/schedule/SCHEDULE-1",
status_code=204,
)
httpx_mock.add_response(
method="GET",
url="https://api.ebay.com/sell/feed/v1/schedule_template/TEMPLATE-1",
json={"scheduleTemplateId": "TEMPLATE-1", "feedType": "LMS_ORDER_REPORT"},
)
client = FeedClient(build_transport())
created = client.create_schedule(
CreateUserScheduleRequest(feedType="LMS_ORDER_REPORT", scheduleTemplateId="TEMPLATE-1")
)
schedule = client.get_schedule("SCHEDULE-1")
client.update_schedule("SCHEDULE-1", UpdateUserScheduleRequest(scheduleName="nightly"))
client.delete_schedule("SCHEDULE-1")
template = client.get_schedule_template("TEMPLATE-1")
assert isinstance(created, CreatedFeedResource)
assert created.resource_id == "SCHEDULE-1"
assert isinstance(schedule, UserScheduleResponse)
assert schedule.scheduleId == "SCHEDULE-1"
assert isinstance(template, ScheduleTemplateResponse)
assert template.scheduleTemplateId == "TEMPLATE-1"
create_body = json.loads(httpx_mock.get_requests()[0].content.decode("utf-8"))
assert create_body["feedType"] == "LMS_ORDER_REPORT"
update_body = json.loads(httpx_mock.get_requests()[2].content.decode("utf-8"))
assert update_body["scheduleName"] == "nightly"
def test_feed_wrapper_supports_task_create_upload_and_file_downloads(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(
method="POST",
url="https://api.ebay.com/sell/feed/v1/task",
status_code=202,
headers={"Location": "https://api.ebay.com/sell/feed/v1/task/TASK-2"},
)
httpx_mock.add_response(
method="POST",
url="https://api.ebay.com/sell/feed/v1/task/TASK-2/upload_file",
status_code=200,
json={},
)
httpx_mock.add_response(
method="GET",
url="https://api.ebay.com/sell/feed/v1/task/TASK-2/download_input_file",
status_code=200,
headers={"content-disposition": 'attachment; filename="input.xml"'},
content=b"<input />",
)
httpx_mock.add_response(
method="GET",
url="https://api.ebay.com/sell/feed/v1/task/TASK-2/download_result_file",
status_code=200,
headers={"content-disposition": 'attachment; filename="result.csv"'},
content=b"id,name\n1,demo\n",
)
httpx_mock.add_response(
method="GET",
url="https://api.ebay.com/sell/feed/v1/schedule/SCHEDULE-9/download_result_file",
status_code=200,
headers={"content-disposition": 'attachment; filename="latest.csv"'},
content=b"id,name\n2,latest\n",
)
client = FeedClient(build_transport())
created = client.create_task(
CreateTaskRequest(feedType="LMS_ORDER_REPORT", schemaVersion="1.0"),
marketplace_id="EBAY_US",
)
client.upload_file("TASK-2", file_name="input.xml", content=b"<input />", content_type="application/xml")
input_file = client.get_input_file("TASK-2")
result_file = client.get_result_file("TASK-2")
latest_file = client.get_latest_result_file("SCHEDULE-9")
assert isinstance(created, CreatedFeedResource)
assert created.resource_id == "TASK-2"
assert isinstance(input_file, FeedFileDownload)
assert input_file.file_name == "input.xml"
assert input_file.content == b"<input />"
assert result_file.file_name == "result.csv"
assert result_file.content.startswith(b"id,name")
assert latest_file.file_name == "latest.csv"
create_request = httpx_mock.get_requests()[0]
assert create_request.headers["X-EBAY-C-MARKETPLACE-ID"] == "EBAY_US"
create_body = json.loads(create_request.content.decode("utf-8"))
assert create_body["feedType"] == "LMS_ORDER_REPORT"
upload_request = httpx_mock.get_requests()[1]
assert upload_request.headers["Content-Type"].startswith("multipart/form-data;")
assert b"filename=\"input.xml\"" in upload_request.content
def test_feed_wrapper_passes_task_and_schedule_filters(httpx_mock: HTTPXMock) -> None:
httpx_mock.add_response(
method="GET",
url=(
"https://api.ebay.com/sell/feed/v1/task?feed_type=LMS_ORDER_REPORT"
"&schedule_id=SCHEDULE-1&date_range=2026-01-01T00:00:00.000Z..2026-01-02T00:00:00.000Z"
"&look_back_days=7&limit=50&offset=0"
),
json={"tasks": [], "total": 0},
)
httpx_mock.add_response(
method="GET",
url="https://api.ebay.com/sell/feed/v1/schedule?feed_type=LMS_ORDER_REPORT&limit=25&offset=0",
json={"schedules": [], "total": 0},
)
httpx_mock.add_response(
method="GET",
url="https://api.ebay.com/sell/feed/v1/schedule_template?feed_type=LMS_ORDER_REPORT&limit=25&offset=0",
json={"scheduleTemplates": [], "total": 0},
)
client = FeedClient(build_transport())
tasks = client.get_tasks(
feed_type="LMS_ORDER_REPORT",
schedule_id="SCHEDULE-1",
date_range="2026-01-01T00:00:00.000Z..2026-01-02T00:00:00.000Z",
look_back_days=7,
limit=50,
offset=0,
)
schedules = client.get_schedules(feed_type="LMS_ORDER_REPORT", limit=25, offset=0)
templates = client.get_schedule_templates(feed_type="LMS_ORDER_REPORT", limit=25, offset=0)
assert tasks.total == 0
assert schedules.total == 0
assert templates.total == 0
def test_inventory_wrapper_accepts_readonly_or_full_scope_options(httpx_mock: HTTPXMock) -> None:
oauth_client = RecordingOAuthClient()
transport = ApiTransport(base_url="https://api.ebay.com", oauth_client=oauth_client)
@ -593,7 +860,10 @@ def test_media_create_upload_and_wait_video_orchestrates_flow(monkeypatch) -> No
poll_interval_seconds=0.0,
)
assert result.videoId == "VIDEO-9"
assert isinstance(result, VideoWorkflowResult)
assert result.video_id == "VIDEO-9"
assert result.video.videoId == "VIDEO-9"
assert result.created.resource_id == "VIDEO-9"
assert calls[0][0] == "create_video"
assert calls[1] == (
"upload_video",
@ -636,7 +906,11 @@ def test_media_create_upload_and_wait_document_orchestrates_flow(monkeypatch) ->
poll_interval_seconds=0.0,
)
assert result.documentStatus == "ACCEPTED"
assert isinstance(result, DocumentWorkflowResult)
assert result.document_id == "DOC-9"
assert result.created.documentId == "DOC-9"
assert result.uploaded is not None and result.uploaded.documentStatus == "SUBMITTED"
assert result.document.documentStatus == "ACCEPTED"
assert calls[0][0] == "create_document"
assert calls[1] == (
"upload_document",
@ -679,7 +953,11 @@ def test_media_create_document_from_url_and_wait_orchestrates_flow(monkeypatch)
poll_interval_seconds=0.0,
)
assert result.documentStatus == "ACCEPTED"
assert isinstance(result, DocumentWorkflowResult)
assert result.document_id == "DOC-10"
assert result.created.documentId == "DOC-10"
assert result.uploaded is None
assert result.document.documentStatus == "ACCEPTED"
assert calls[0][0] == "create_document_from_url"
assert calls[1] == (
"wait_for_document",
@ -721,3 +999,126 @@ def test_media_convenience_methods_raise_when_required_ids_are_missing(monkeypat
pass
else:
raise AssertionError("Expected convenience method to raise when eBay omits the required identifier")
def test_guess_media_content_type_uses_filename_extension() -> None:
assert guess_media_content_type("photo.jpg") == "image/jpeg"
assert guess_media_content_type("guide.pdf") == "application/pdf"
assert guess_media_content_type("unknown.custom") == "application/octet-stream"
def test_media_create_image_from_path_reads_file_and_infers_content_type(tmp_path, monkeypatch) -> None:
client = MediaClient(build_transport())
image_path = tmp_path / "photo.png"
image_path.write_bytes(b"png-data")
captured: dict[str, object] = {}
monkeypatch.setattr(
client,
"create_image_from_file",
lambda **kwargs: captured.update(kwargs) or ImageResponse(imageUrl="https://example.test/image"),
)
result = client.create_image_from_path(image_path)
assert result.imageUrl == "https://example.test/image"
assert captured == {
"file_name": "photo.png",
"content": b"png-data",
"content_type": "image/png",
}
def test_media_upload_document_from_path_reads_file_and_infers_content_type(tmp_path, monkeypatch) -> None:
client = MediaClient(build_transport())
document_path = tmp_path / "guide.pdf"
document_path.write_bytes(b"%PDF-1.7")
captured: dict[str, object] = {}
monkeypatch.setattr(
client,
"upload_document",
lambda document_id, **kwargs: captured.update({"document_id": document_id, **kwargs})
or DocumentResponse(documentId=document_id, documentStatus="SUBMITTED"),
)
result = client.upload_document_from_path("DOC-42", document_path)
assert result.documentId == "DOC-42"
assert captured == {
"document_id": "DOC-42",
"file_name": "guide.pdf",
"content": b"%PDF-1.7",
"content_type": "application/pdf",
}
def test_media_create_upload_and_wait_document_from_path_reads_file_and_delegates(tmp_path, monkeypatch) -> None:
client = MediaClient(build_transport())
document_path = tmp_path / "guide.pdf"
document_path.write_bytes(b"%PDF-1.7")
captured: dict[str, object] = {}
monkeypatch.setattr(
client,
"create_upload_and_wait_document",
lambda payload, **kwargs: captured.update({"payload": payload, **kwargs})
or DocumentWorkflowResult(
created=CreateDocumentResponse(documentId="DOC-77", documentStatus="PENDING_UPLOAD"),
uploaded=DocumentResponse(documentId="DOC-77", documentStatus="SUBMITTED"),
document=DocumentResponse(documentId="DOC-77", documentStatus="ACCEPTED"),
document_id="DOC-77",
),
)
result = client.create_upload_and_wait_document_from_path(
CreateDocumentRequest(documentType="USER_GUIDE_OR_MANUAL", languages=["en-US"]),
document_path,
poll_interval_seconds=0.0,
)
assert isinstance(result, DocumentWorkflowResult)
assert result.document.documentStatus == "ACCEPTED"
assert captured["file_name"] == "guide.pdf"
assert captured["content"] == b"%PDF-1.7"
assert captured["content_type"] == "application/pdf"
assert captured["poll_interval_seconds"] == 0.0
def test_media_create_upload_and_wait_video_from_path_builds_payload_and_delegates(tmp_path, monkeypatch) -> None:
client = MediaClient(build_transport())
video_path = tmp_path / "demo.mp4"
video_path.write_bytes(b"video-data")
captured: dict[str, object] = {}
monkeypatch.setattr(
client,
"create_upload_and_wait_video",
lambda payload, **kwargs: captured.update({"payload": payload, **kwargs})
or VideoWorkflowResult(
created=CreatedMediaResource(resource_id="VIDEO-88"),
video=Video(videoId="VIDEO-88", status="LIVE"),
video_id="VIDEO-88",
),
)
result = client.create_upload_and_wait_video_from_path(
video_path,
description="demo video",
poll_interval_seconds=0.0,
)
assert isinstance(result, VideoWorkflowResult)
assert result.video.videoId == "VIDEO-88"
payload = captured["payload"]
assert isinstance(payload, CreateVideoRequest)
assert payload.title == "demo"
assert payload.size == len(b"video-data")
assert payload.classification == ["ITEM"]
assert payload.description == "demo video"
assert captured["content"] == b"video-data"
assert captured["poll_interval_seconds"] == 0.0