131 lines
No EOL
4.5 KiB
Python
131 lines
No EOL
4.5 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
from pytest_httpx import HTTPXMock
|
|
|
|
from ebay_client.account.client import AccountClient
|
|
from ebay_client.core.auth.models import OAuthToken
|
|
from ebay_client.core.http.transport import ApiTransport
|
|
from ebay_client.feed.client import FeedClient
|
|
from ebay_client.fulfillment.client import FulfillmentClient
|
|
from ebay_client.generated.account.models import Programs
|
|
from ebay_client.generated.feed.models import TaskCollection
|
|
from ebay_client.generated.fulfillment.models import Order
|
|
from ebay_client.generated.inventory.models import InventoryItemWithSkuLocaleGroupid
|
|
from ebay_client.generated.notification.models import (
|
|
DeliveryConfig,
|
|
DestinationRequest,
|
|
TopicSearchResponse,
|
|
)
|
|
from ebay_client.inventory.client import InventoryClient
|
|
from ebay_client.notification.client import NotificationClient
|
|
|
|
|
|
class DummyOAuthClient:
|
|
def get_valid_token(self, *, scopes: list[str] | None = None) -> OAuthToken:
|
|
return OAuthToken(access_token="test-token", scope=" ".join(scopes or []))
|
|
|
|
|
|
def build_transport() -> ApiTransport:
|
|
return ApiTransport(base_url="https://api.ebay.com", oauth_client=DummyOAuthClient())
|
|
|
|
|
|
def test_notification_wrapper_returns_pydantic_model(httpx_mock: HTTPXMock) -> None:
|
|
httpx_mock.add_response(
|
|
method="GET",
|
|
url="https://api.ebay.com/commerce/notification/v1/topic?limit=10",
|
|
json={"topics": [{"topicId": "MARKETPLACE_ACCOUNT_DELETION", "description": "topic"}], "total": 1},
|
|
)
|
|
|
|
client = NotificationClient(build_transport())
|
|
result = client.get_topics(limit=10)
|
|
|
|
assert isinstance(result, TopicSearchResponse)
|
|
assert result.total == 1
|
|
assert result.topics and result.topics[0].topicId == "MARKETPLACE_ACCOUNT_DELETION"
|
|
request = httpx_mock.get_requests()[0]
|
|
assert request.headers["Authorization"] == "Bearer test-token"
|
|
|
|
|
|
def test_notification_wrapper_serializes_typed_request_model(httpx_mock: HTTPXMock) -> None:
|
|
httpx_mock.add_response(
|
|
method="POST",
|
|
url="https://api.ebay.com/commerce/notification/v1/destination",
|
|
json={},
|
|
status_code=201,
|
|
)
|
|
|
|
client = NotificationClient(build_transport())
|
|
payload = DestinationRequest(
|
|
name="main-destination",
|
|
status="ENABLED",
|
|
deliveryConfig=DeliveryConfig(
|
|
endpoint="https://example.test/webhook",
|
|
verificationToken="verification_token_1234567890123456",
|
|
),
|
|
)
|
|
|
|
client.create_destination(payload)
|
|
|
|
request = httpx_mock.get_requests()[0]
|
|
body = json.loads(request.content.decode("utf-8"))
|
|
assert body["name"] == "main-destination"
|
|
assert body["deliveryConfig"]["endpoint"] == "https://example.test/webhook"
|
|
|
|
|
|
def test_inventory_wrapper_returns_inventory_item_model(httpx_mock: HTTPXMock) -> None:
|
|
httpx_mock.add_response(
|
|
method="GET",
|
|
url="https://api.ebay.com/sell/inventory/v1/inventory_item/SKU-1",
|
|
json={"sku": "SKU-1"},
|
|
)
|
|
|
|
client = InventoryClient(build_transport())
|
|
result = client.get_inventory_item("SKU-1")
|
|
|
|
assert isinstance(result, InventoryItemWithSkuLocaleGroupid)
|
|
assert result.sku == "SKU-1"
|
|
|
|
|
|
def test_fulfillment_wrapper_returns_order_model(httpx_mock: HTTPXMock) -> None:
|
|
httpx_mock.add_response(
|
|
method="GET",
|
|
url="https://api.ebay.com/sell/fulfillment/v1/order/ORDER-1",
|
|
json={"orderId": "ORDER-1"},
|
|
)
|
|
|
|
client = FulfillmentClient(build_transport())
|
|
result = client.get_order("ORDER-1")
|
|
|
|
assert isinstance(result, Order)
|
|
assert result.orderId == "ORDER-1"
|
|
|
|
|
|
def test_account_wrapper_returns_programs_model(httpx_mock: HTTPXMock) -> None:
|
|
httpx_mock.add_response(
|
|
method="GET",
|
|
url="https://api.ebay.com/sell/account/v1/program/get_opted_in_programs",
|
|
json={"programs": [{"programType": "OUT_OF_STOCK_CONTROL"}]},
|
|
)
|
|
|
|
client = AccountClient(build_transport())
|
|
result = client.get_opted_in_programs()
|
|
|
|
assert isinstance(result, Programs)
|
|
assert result.programs and result.programs[0].programType == "OUT_OF_STOCK_CONTROL"
|
|
|
|
|
|
def test_feed_wrapper_returns_task_collection_model(httpx_mock: HTTPXMock) -> None:
|
|
httpx_mock.add_response(
|
|
method="GET",
|
|
url="https://api.ebay.com/sell/feed/v1/task?feed_type=LMS_ORDER_REPORT",
|
|
json={"tasks": [{"taskId": "TASK-1"}], "total": 1},
|
|
)
|
|
|
|
client = FeedClient(build_transport())
|
|
result = client.get_tasks(feed_type="LMS_ORDER_REPORT")
|
|
|
|
assert isinstance(result, TaskCollection)
|
|
assert result.total == 1
|
|
assert result.tasks and result.tasks[0].taskId == "TASK-1" |