ebay_client/examples/feed_task_flow.py

95 lines
No EOL
3.1 KiB
Python

from __future__ import annotations
import os
from pathlib import Path
from time import monotonic, sleep
from ebay_client import EbayClient
from ebay_client.core.auth.models import EbayOAuthConfig
from ebay_client.generated.feed.models import CreateTaskRequest
TERMINAL_TASK_STATUSES = {
"COMPLETED",
"COMPLETED_WITH_ERROR",
"FAILED",
"CANCELLED",
}
def require_env(name: str) -> str:
value = os.environ.get(name)
if not value:
raise RuntimeError(f"Environment variable {name} is required")
return value
def build_client() -> EbayClient:
oauth_config = EbayOAuthConfig(
client_id=require_env("EBAY_CLIENT_ID"),
client_secret=require_env("EBAY_CLIENT_SECRET"),
default_scopes=[os.environ.get("EBAY_FEED_SCOPE", "https://api.ebay.com/oauth/api_scope/sell.fulfillment")],
)
return EbayClient(oauth_config)
def wait_for_task(client: EbayClient, task_id: str, *, timeout_seconds: float = 120.0, poll_interval_seconds: float = 3.0):
deadline = monotonic() + timeout_seconds
last_status: str | None = None
while monotonic() < deadline:
task = client.feed.get_task(task_id)
last_status = task.status
print("task_status:", last_status)
if last_status in TERMINAL_TASK_STATUSES:
return task
sleep(poll_interval_seconds)
raise TimeoutError(f"Feed task {task_id} did not complete within {timeout_seconds} seconds; last status was {last_status}")
def main() -> None:
client = build_client()
marketplace_id = os.environ.get("EBAY_MARKETPLACE_ID", "EBAY_US")
feed_type = os.environ.get("EBAY_FEED_TYPE", "LMS_ORDER_REPORT")
schema_version = os.environ.get("EBAY_FEED_SCHEMA_VERSION", "1.0")
upload_path = Path(require_env("EBAY_FEED_UPLOAD_FILE"))
result_output_path = os.environ.get("EBAY_FEED_RESULT_OUTPUT")
created = client.feed.create_task(
CreateTaskRequest(feedType=feed_type, schemaVersion=schema_version),
marketplace_id=marketplace_id,
)
print("task_id:", created.resource_id)
task_id = created.resource_id
if not task_id:
raise RuntimeError("create_task did not return a task id")
client.feed.upload_file(
task_id,
file_name=upload_path.name,
content=upload_path.read_bytes(),
content_type=os.environ.get("EBAY_FEED_UPLOAD_CONTENT_TYPE", "application/xml"),
)
print("uploaded_file:", upload_path.name)
task = wait_for_task(
client,
task_id,
timeout_seconds=float(os.environ.get("EBAY_FEED_TIMEOUT_SECONDS", "120")),
poll_interval_seconds=float(os.environ.get("EBAY_FEED_POLL_INTERVAL_SECONDS", "3")),
)
print("final_task_status:", task.status)
if task.status in {"COMPLETED", "COMPLETED_WITH_ERROR"}:
result_file = client.feed.get_result_file(task_id)
print("result_file_name:", result_file.file_name)
print("result_file_bytes:", len(result_file.content))
if result_output_path:
Path(result_output_path).write_bytes(result_file.content)
print("result_saved_to:", result_output_path)
if __name__ == "__main__":
main()