Add examples for feed task management and inventory publishing workflows

This commit is contained in:
claudi 2026-04-07 11:31:21 +02:00
parent 2c6bd35ebb
commit 904f4e487e
3 changed files with 363 additions and 6 deletions

View file

@ -0,0 +1,95 @@
from __future__ import annotations
import os
from pathlib import Path
from time import monotonic, sleep
from ebay_client import EbayClient
from ebay_client.core.auth.models import EbayOAuthConfig
from ebay_client.generated.feed.models import CreateTaskRequest
TERMINAL_TASK_STATUSES = {
"COMPLETED",
"COMPLETED_WITH_ERROR",
"FAILED",
"CANCELLED",
}
def require_env(name: str) -> str:
value = os.environ.get(name)
if not value:
raise RuntimeError(f"Environment variable {name} is required")
return value
def build_client() -> EbayClient:
oauth_config = EbayOAuthConfig(
client_id=require_env("EBAY_CLIENT_ID"),
client_secret=require_env("EBAY_CLIENT_SECRET"),
default_scopes=[os.environ.get("EBAY_FEED_SCOPE", "https://api.ebay.com/oauth/api_scope/sell.fulfillment")],
)
return EbayClient(oauth_config)
def wait_for_task(client: EbayClient, task_id: str, *, timeout_seconds: float = 120.0, poll_interval_seconds: float = 3.0):
deadline = monotonic() + timeout_seconds
last_status: str | None = None
while monotonic() < deadline:
task = client.feed.get_task(task_id)
last_status = task.status
print("task_status:", last_status)
if last_status in TERMINAL_TASK_STATUSES:
return task
sleep(poll_interval_seconds)
raise TimeoutError(f"Feed task {task_id} did not complete within {timeout_seconds} seconds; last status was {last_status}")
def main() -> None:
client = build_client()
marketplace_id = os.environ.get("EBAY_MARKETPLACE_ID", "EBAY_US")
feed_type = os.environ.get("EBAY_FEED_TYPE", "LMS_ORDER_REPORT")
schema_version = os.environ.get("EBAY_FEED_SCHEMA_VERSION", "1.0")
upload_path = Path(require_env("EBAY_FEED_UPLOAD_FILE"))
result_output_path = os.environ.get("EBAY_FEED_RESULT_OUTPUT")
created = client.feed.create_task(
CreateTaskRequest(feedType=feed_type, schemaVersion=schema_version),
marketplace_id=marketplace_id,
)
print("task_id:", created.resource_id)
task_id = created.resource_id
if not task_id:
raise RuntimeError("create_task did not return a task id")
client.feed.upload_file(
task_id,
file_name=upload_path.name,
content=upload_path.read_bytes(),
content_type=os.environ.get("EBAY_FEED_UPLOAD_CONTENT_TYPE", "application/xml"),
)
print("uploaded_file:", upload_path.name)
task = wait_for_task(
client,
task_id,
timeout_seconds=float(os.environ.get("EBAY_FEED_TIMEOUT_SECONDS", "120")),
poll_interval_seconds=float(os.environ.get("EBAY_FEED_POLL_INTERVAL_SECONDS", "3")),
)
print("final_task_status:", task.status)
if task.status in {"COMPLETED", "COMPLETED_WITH_ERROR"}:
result_file = client.feed.get_result_file(task_id)
print("result_file_name:", result_file.file_name)
print("result_file_bytes:", len(result_file.content))
if result_output_path:
Path(result_output_path).write_bytes(result_file.content)
print("result_saved_to:", result_output_path)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,100 @@
from __future__ import annotations
import os
from ebay_client import EbayClient
from ebay_client.core.auth.models import EbayOAuthConfig
from ebay_client.generated.inventory.models import (
Amount,
Availability,
EbayOfferDetailsWithKeys,
InventoryItem,
ListingPolicies,
PricingSummary,
Product,
ShipToLocationAvailability,
)
def require_env(name: str) -> str:
value = os.environ.get(name)
if not value:
raise RuntimeError(f"Environment variable {name} is required")
return value
def build_client() -> EbayClient:
oauth_config = EbayOAuthConfig(
client_id=require_env("EBAY_CLIENT_ID"),
client_secret=require_env("EBAY_CLIENT_SECRET"),
default_scopes=["https://api.ebay.com/oauth/api_scope/sell.inventory"],
)
return EbayClient(oauth_config)
def build_inventory_item() -> InventoryItem:
return InventoryItem(
condition="NEW",
availability=Availability(
shipToLocationAvailability=ShipToLocationAvailability(quantity=int(os.environ.get("EBAY_AVAILABLE_QUANTITY", "1")))
),
product=Product(
title=require_env("EBAY_ITEM_TITLE"),
description=require_env("EBAY_ITEM_DESCRIPTION"),
imageUrls=[require_env("EBAY_ITEM_IMAGE_URL")],
aspects='{"Brand":["Demo Brand"],"Type":["Demo Item"]}',
),
)
def build_offer_payload(sku: str) -> EbayOfferDetailsWithKeys:
return EbayOfferDetailsWithKeys(
sku=sku,
marketplaceId=os.environ.get("EBAY_MARKETPLACE_ID", "EBAY_US"),
format="FIXED_PRICE",
availableQuantity=int(os.environ.get("EBAY_AVAILABLE_QUANTITY", "1")),
categoryId=require_env("EBAY_CATEGORY_ID"),
merchantLocationKey=require_env("EBAY_MERCHANT_LOCATION_KEY"),
listingDescription=os.environ.get("EBAY_LISTING_DESCRIPTION") or require_env("EBAY_ITEM_DESCRIPTION"),
listingDuration="GTC",
listingPolicies=ListingPolicies(
fulfillmentPolicyId=require_env("EBAY_FULFILLMENT_POLICY_ID"),
paymentPolicyId=require_env("EBAY_PAYMENT_POLICY_ID"),
returnPolicyId=require_env("EBAY_RETURN_POLICY_ID"),
),
pricingSummary=PricingSummary(
price=Amount(
currency=os.environ.get("EBAY_CURRENCY", "USD"),
value=require_env("EBAY_PRICE"),
)
),
)
def main() -> None:
client = build_client()
sku = require_env("EBAY_SKU")
content_language = os.environ.get("EBAY_CONTENT_LANGUAGE", "en-US")
item_result = client.inventory.create_or_replace_inventory_item(
sku,
build_inventory_item(),
content_language=content_language,
)
print("inventory_item_warnings:", item_result.warnings)
offer = client.inventory.create_offer(
build_offer_payload(sku),
content_language=content_language,
)
print("offer_id:", offer.offerId)
if not offer.offerId:
raise RuntimeError("create_offer did not return an offerId")
published = client.inventory.publish_offer(offer.offerId)
print("listing_id:", published.listingId)
if __name__ == "__main__":
main()