from __future__ import annotations import os from pathlib import Path from time import monotonic, sleep from ebay_client import EbayClient from ebay_client.core.auth.models import EbayOAuthConfig from ebay_client.generated.feed.models import CreateTaskRequest TERMINAL_TASK_STATUSES = { "COMPLETED", "COMPLETED_WITH_ERROR", "FAILED", "CANCELLED", } def require_env(name: str) -> str: value = os.environ.get(name) if not value: raise RuntimeError(f"Environment variable {name} is required") return value def build_client() -> EbayClient: oauth_config = EbayOAuthConfig( client_id=require_env("EBAY_CLIENT_ID"), client_secret=require_env("EBAY_CLIENT_SECRET"), default_scopes=[os.environ.get("EBAY_FEED_SCOPE", "https://api.ebay.com/oauth/api_scope/sell.fulfillment")], ) return EbayClient(oauth_config) def wait_for_task(client: EbayClient, task_id: str, *, timeout_seconds: float = 120.0, poll_interval_seconds: float = 3.0): deadline = monotonic() + timeout_seconds last_status: str | None = None while monotonic() < deadline: task = client.feed.get_task(task_id) last_status = task.status print("task_status:", last_status) if last_status in TERMINAL_TASK_STATUSES: return task sleep(poll_interval_seconds) raise TimeoutError(f"Feed task {task_id} did not complete within {timeout_seconds} seconds; last status was {last_status}") def main() -> None: client = build_client() marketplace_id = os.environ.get("EBAY_MARKETPLACE_ID", "EBAY_US") feed_type = os.environ.get("EBAY_FEED_TYPE", "LMS_ORDER_REPORT") schema_version = os.environ.get("EBAY_FEED_SCHEMA_VERSION", "1.0") upload_path = Path(require_env("EBAY_FEED_UPLOAD_FILE")) result_output_path = os.environ.get("EBAY_FEED_RESULT_OUTPUT") created = client.feed.create_task( CreateTaskRequest(feedType=feed_type, schemaVersion=schema_version), marketplace_id=marketplace_id, ) print("task_id:", created.resource_id) task_id = created.resource_id if not task_id: raise RuntimeError("create_task did not return a task id") client.feed.upload_file( task_id, file_name=upload_path.name, content=upload_path.read_bytes(), content_type=os.environ.get("EBAY_FEED_UPLOAD_CONTENT_TYPE", "application/xml"), ) print("uploaded_file:", upload_path.name) task = wait_for_task( client, task_id, timeout_seconds=float(os.environ.get("EBAY_FEED_TIMEOUT_SECONDS", "120")), poll_interval_seconds=float(os.environ.get("EBAY_FEED_POLL_INTERVAL_SECONDS", "3")), ) print("final_task_status:", task.status) if task.status in {"COMPLETED", "COMPLETED_WITH_ERROR"}: result_file = client.feed.get_result_file(task_id) print("result_file_name:", result_file.file_name) print("result_file_bytes:", len(result_file.content)) if result_output_path: Path(result_output_path).write_bytes(result_file.content) print("result_saved_to:", result_output_path) if __name__ == "__main__": main()