From aa7db1a3ab02697dec304acb7d77cdb848a5e2e1 Mon Sep 17 00:00:00 2001 From: claudi Date: Tue, 24 Mar 2026 16:12:58 +0100 Subject: [PATCH] Add mixins for various Lobster REST API client functionalities - Implement JobsMixin for job-related operations including job overview, execution, and control. - Implement MediaMixin for media management, including creation, updating, and file uploads. - Implement ProductGroupsMixin for handling product groups, including bulk operations and hierarchy retrieval. - Implement ProductsMixin for product management, including bulk creation and updates. - Implement ProtocolsMixin for protocol-related operations, including retrieval by job ID and category. - Implement TextMixin for managing text entries, including bulk operations. - Implement TreeGroupsMixin for tree group management, including bulk operations and hierarchy retrieval. --- elytra_client/rest_api/client.py | 1594 ----------------- elytra_client/rest_api/client/__init__.py | 49 + .../rest_api/client/_base_protocol.py | 36 + .../rest_api/client/attribute_groups.py | 198 ++ elytra_client/rest_api/client/attributes.py | 220 +++ elytra_client/rest_api/client/base.py | 152 ++ elytra_client/rest_api/client/jobs.py | 195 ++ elytra_client/rest_api/client/media.py | 240 +++ .../rest_api/client/product_groups.py | 193 ++ elytra_client/rest_api/client/products.py | 212 +++ elytra_client/rest_api/client/protocols.py | 134 ++ elytra_client/rest_api/client/text.py | 164 ++ elytra_client/rest_api/client/tree_groups.py | 209 +++ elytra_client/rest_api/models.py | 999 ----------- pyproject.toml | 3 +- tests/test_client.py | 33 +- 16 files changed, 2019 insertions(+), 2612 deletions(-) delete mode 100644 elytra_client/rest_api/client.py create mode 100644 elytra_client/rest_api/client/__init__.py create mode 100644 elytra_client/rest_api/client/_base_protocol.py create mode 100644 elytra_client/rest_api/client/attribute_groups.py create mode 100644 elytra_client/rest_api/client/attributes.py create mode 100644 elytra_client/rest_api/client/base.py create mode 100644 elytra_client/rest_api/client/jobs.py create mode 100644 elytra_client/rest_api/client/media.py create mode 100644 elytra_client/rest_api/client/product_groups.py create mode 100644 elytra_client/rest_api/client/products.py create mode 100644 elytra_client/rest_api/client/protocols.py create mode 100644 elytra_client/rest_api/client/text.py create mode 100644 elytra_client/rest_api/client/tree_groups.py delete mode 100644 elytra_client/rest_api/models.py diff --git a/elytra_client/rest_api/client.py b/elytra_client/rest_api/client.py deleted file mode 100644 index 1103195..0000000 --- a/elytra_client/rest_api/client.py +++ /dev/null @@ -1,1594 +0,0 @@ -"""Client for the Lobster PIM Legacy REST API""" - -from typing import Any, Dict, List, Optional, Type, TypeVar, cast -from urllib.parse import urljoin - -import requests -from pydantic import BaseModel, ValidationError - -from ..exceptions import ( - ElytraAPIError, - ElytraAuthenticationError, - ElytraNotFoundError, - ElytraValidationError, -) -from .auth import AuthMethod, RestApiAuth -from .models import ( - AttributeBulkCreateResponse, - AttributeBulkUpdateResponse, - AttributeGetByNameResponse, - AttributeGroupHierarchyResponse, - AttributeGroupListResponse, - AttributeListResponse, - AttributeResponse, - JobControlRequest, - JobControlResponse, - JobDetailInfo, - JobExecutionResponse, - JobInfo, - JobOverviewResponse, - MediaBulkCreateResponse, - MediaBulkUpdateResponse, - MediaFileResponse, - MediaListResponse, - ProductBulkCreateResponse, - ProductBulkUpdateResponse, - ProductGroupBulkCreateResponse, - ProductGroupBulkUpdateResponse, - ProductGroupHierarchyResponse, - ProductGroupListResponse, - ProductHierarchyResponse, - ProductListResponse, - ProtocolCategoryInfo, - ProtocolCategoryListResponse, - ProtocolInfo, - ProtocolListResponse, - SimpleAttributeResponse, - SingleAttributeGroupResponse, - SingleMediaResponse, - SingleNewAttributeGroupRequestBody, - SingleNewAttributeRequestBody, - SingleNewMediaRequestBody, - SingleNewProductGroupRequestBody, - SingleNewProductRequestBody, - SingleNewTextRequestBody, - SingleNewTreeGroupRequestBody, - SingleProductGroupResponse, - SingleProductResponse, - SingleTextResponse, - SingleTreeGroupResponse, - SingleUpdateAttributeGroupRequestBody, - SingleUpdateAttributeRequestBody, - SingleUpdateMediaRequestBody, - SingleUpdateProductGroupRequestBody, - SingleUpdateProductRequestBody, - SingleUpdateTextRequestBody, - SingleUpdateTreeGroupRequestBody, - TextBulkCreateResponse, - TextBulkUpdateResponse, - TextListResponse, - TreeGroupBulkCreateResponse, - TreeGroupBulkUpdateResponse, - TreeGroupHierarchyResponse, - TreeGroupListResponse, -) - -T = TypeVar("T", bound=BaseModel) - - -class LobsterRestApiClient: - """ - Client for the Lobster PIM Legacy REST API. - - Provides access to scheduled jobs and protocol logs via REST endpoints. - Supports both username/password and API token authentication. - - Args: - base_url: The base URL of the Lobster PIM server (e.g., http://localhost:8080) - auth: RestApiAuth instance for authentication - timeout: Request timeout in seconds (default: 30) - """ - - def __init__( - self, - base_url: str, - auth: RestApiAuth, - timeout: int = 30, - ): - """Initialize the Lobster REST API client""" - self.base_url = base_url.rstrip("/") - self.auth = auth - self.timeout = timeout - self.session = requests.Session() - self._setup_headers() - - def _setup_headers(self) -> None: - """Set up request headers including authentication""" - self.session.headers.update( - { - "Content-Type": "application/json", - "Accept": "application/json", - } - ) - self.session.headers.update(self.auth.get_auth_header()) - - def _handle_response( - self, - response: requests.Response, - expected_model: Type[T], - ) -> T: - """ - Handle API response and parse into Pydantic model. - - Args: - response: Response from requests - expected_model: Pydantic model to deserialize into - - Returns: - Parsed response as Pydantic model - - Raises: - ElytraAuthenticationError: If authentication fails - ElytraNotFoundError: If resource not found - ElytraAPIError: For other API errors - ElytraValidationError: If response validation fails - """ - if response.status_code == 401: - raise ElytraAuthenticationError("Authentication failed") - elif response.status_code == 404: - raise ElytraNotFoundError("Resource not found") - elif response.status_code == 429: - raise ElytraAPIError("Too many requests - rate limit exceeded") - elif response.status_code >= 400: - try: - error_data = response.json() - error_msg = error_data.get("error") or error_data.get("message", response.text) - except Exception: - error_msg = response.text - raise ElytraAPIError(f"API error {response.status_code}: {error_msg}") - - try: - data = response.json() - except Exception as e: - raise ElytraAPIError(f"Failed to parse response as JSON: {str(e)}") - - try: - return expected_model.model_validate(data) - except ValidationError as e: - raise ElytraValidationError(f"Response validation failed: {str(e)}") - - def _make_request( - self, - method: str, - endpoint: str, - expected_model: Type[T], - params: Optional[Dict[str, Any]] = None, - json_data: Optional[Dict[str, Any]] = None, - ) -> T: - """ - Make HTTP request to the REST API. - - Args: - method: HTTP method (GET, POST, etc.) - endpoint: API endpoint path - expected_model: Pydantic model for response - params: Query parameters - json_data: JSON request body - - Returns: - Parsed response as Pydantic model - """ - url = urljoin(self.base_url, f"/rest/{endpoint}") - - # Add authentication parameters for GET requests - if method.upper() == "GET" and self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: - if params is None: - params = {} - params.update(self.auth.get_url_parameters()) - - response = self.session.request( - method=method, - url=url, - params=params, - json=json_data, - timeout=self.timeout, - ) - - return self._handle_response(response, expected_model) - - # ============= Job Endpoints ============= - - def get_job_overview(self) -> JobOverviewResponse: - """ - Get overview of all active jobs. - - Returns: - JobOverviewResponse containing list of active jobs - """ - return self._make_request( - "GET", - "job/overview", - JobOverviewResponse, - ) - - def get_job_html_overview(self) -> str: - """ - Get HTML overview of all active jobs. - - Returns: - HTML page content with job overview - """ - url = urljoin(self.base_url, "/rest/job/overview") - params = None - if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: - params = self.auth.get_url_parameters() - - response = self.session.get(url, params=params, timeout=self.timeout) - if response.status_code >= 400: - raise ElytraAPIError(f"Failed to get job overview: {response.status_code}") - return response.text - - def get_all_active_jobs(self) -> JobOverviewResponse: - """ - Get list of all active jobs. - - Returns: - JobOverviewResponse containing list of all active jobs - """ - return self._make_request( - "GET", - "job/job_id", - JobOverviewResponse, - ) - - def get_job_detail(self, job_id: int) -> JobDetailInfo: - """ - Get details of a specific active job. - - Args: - job_id: ID of the job - - Returns: - JobDetailInfo with job details - """ - return self._make_request( - "GET", - f"job/job_id/{job_id}", - JobDetailInfo, - ) - - def execute_job( - self, - job_id: int, - ) -> JobExecutionResponse: - """ - Execute a job and get details of the started job. - - Args: - job_id: ID of the job to execute - - Returns: - JobExecutionResponse with execution details - """ - return self._make_request( - "GET", - f"job/execute/{job_id}", - JobExecutionResponse, - ) - - def get_running_job_instances(self) -> JobOverviewResponse: - """ - Get list of running job instances. - - Returns: - JobOverviewResponse containing list of running job instances - """ - return self._make_request( - "GET", - "job/runtime_id", - JobOverviewResponse, - ) - - def get_running_job_instance(self, runtime_id: str) -> JobOverviewResponse: - """ - Get details of a specific running job instance. - - Args: - runtime_id: Runtime ID of the job instance - - Returns: - JobOverviewResponse with instance details - """ - return self._make_request( - "GET", - f"job/runtime_id/{runtime_id}", - JobOverviewResponse, - ) - - def control_job( - self, - job_id: int, - action: str = "start", - additional_reference: Optional[str] = None, - parameters: Optional[Dict[str, Any]] = None, - queue_id: Optional[str] = None, - max_job_duration_seconds: Optional[int] = None, - ) -> JobControlResponse: - """ - Control a job using the control endpoint (POST). - - Supports starting jobs with parameter overrides and queue management. - - Args: - job_id: ID of the job to control - action: Action to perform (default: "start") - additional_reference: Optional reference for external tracking - parameters: Optional parameters to override job settings - queue_id: Optional queue ID for serialized execution - max_job_duration_seconds: Max duration in seconds (default 12 hours) - - Returns: - JobControlResponse with execution details - """ - request_body = { - "action": action, - "objectId": job_id, - "objectType": "job", - } - - # Add authentication credentials for POST - request_body.update(self.auth.get_json_body_params()) - - if additional_reference: - request_body["additionalReference"] = additional_reference - - if parameters: - request_body["parameter"] = parameters - - if queue_id: - request_body["queueId"] = queue_id - - if max_job_duration_seconds: - request_body["maxJobDurationSeconds"] = max_job_duration_seconds - - return self._make_request( - "POST", - "job/control", - JobControlResponse, - json_data=request_body, - ) - - # ============= Protocol Endpoints ============= - - def get_protocols(self, limit: int = 50) -> ProtocolListResponse: - """ - Get the last N protocols. - - Args: - limit: Number of protocols to retrieve (default: 50) - - Returns: - ProtocolListResponse containing list of protocols - """ - response = self.session.get( - urljoin(self.base_url, "/rest/protocol"), - params={"limit": limit, **self.auth.get_url_parameters()}, - timeout=self.timeout, - ) - return self._handle_response(response, ProtocolListResponse) - - def get_protocol(self, protocol_id: str) -> ProtocolInfo: - """ - Get details of a specific protocol. - - Args: - protocol_id: ID of the protocol - - Returns: - ProtocolInfo with protocol details - """ - return self._make_request( - "GET", - f"protocol/{protocol_id}", - ProtocolInfo, - ) - - def get_protocol_by_job_id(self, job_id: int) -> ProtocolListResponse: - """ - Get all protocols for a specific job ID. - - Args: - job_id: ID of the job - - Returns: - ProtocolListResponse containing protocols for the job - """ - return self._make_request( - "GET", - f"protocol/job/{job_id}", - ProtocolListResponse, - ) - - def get_protocol_by_runtime_id(self, runtime_id: str) -> ProtocolInfo: - """ - Get protocol for a specific job instance runtime ID. - - Args: - runtime_id: Runtime ID of the job instance - - Returns: - ProtocolInfo with protocol details - """ - return self._make_request( - "GET", - f"protocol/job/{runtime_id}", - ProtocolInfo, - ) - - def get_protocol_by_additional_reference(self, reference: str) -> ProtocolListResponse: - """ - Get all protocols for a specific additional reference. - - Args: - reference: Additional reference value - - Returns: - ProtocolListResponse containing protocols for the reference - """ - return self._make_request( - "GET", - f"protocol/job/{reference}", - ProtocolListResponse, - ) - - def get_all_protocol_categories(self) -> ProtocolCategoryListResponse: - """ - Get all available protocol categories. - - Returns: - ProtocolCategoryListResponse containing list of categories - """ - return self._make_request( - "GET", - "protocol/category", - ProtocolCategoryListResponse, - ) - - def get_protocol_by_category(self, category_id: str, limit: int = 50) -> ProtocolListResponse: - """ - Get the last N protocols from a specific category. - - Args: - category_id: ID of the protocol category - limit: Number of protocols to retrieve (default: 50) - - Returns: - ProtocolListResponse containing protocols for the category - """ - return self._make_request( - "GET", - f"protocol/category/{category_id}", - ProtocolListResponse, - params={"limit": limit}, - ) - - # ============= Media Endpoints ============= - - def get_all_media( - self, lang: Optional[str] = None, page: int = 1, limit: int = 10 - ) -> MediaListResponse: - """ - Get all media descriptors with pagination. - - Args: - lang: Language code (optional) - page: Page number (default: 1) - limit: Number of media items per page (default: 10) - - Returns: - MediaListResponse containing paginated list of media items - """ - params: Dict[str, Any] = {"page": page, "limit": limit} - if lang: - params["lang"] = lang - - return self._make_request( - "GET", - "media", - MediaListResponse, - params=params, - ) - - def get_media_by_id(self, media_id: int, lang: Optional[str] = None) -> SingleMediaResponse: - """ - Get a media descriptor by ID. - - Args: - media_id: ID of the media - lang: Language code (optional) - - Returns: - SingleMediaResponse with media details - """ - params: Dict[str, Any] = {} - if lang: - params["lang"] = lang - - return self._make_request( - "GET", - f"media/{media_id}", - SingleMediaResponse, - params=params if params else None, - ) - - def create_media(self, media_data: Dict[str, Any]) -> SingleMediaResponse: - """ - Create a new media descriptor. - - Args: - media_data: Media descriptor data - - Returns: - SingleMediaResponse with created media - """ - return self._make_request( - "POST", - "media", - SingleMediaResponse, - json_data=media_data, - ) - - def update_media(self, media_data: Dict[str, Any]) -> SingleMediaResponse: - """ - Update a media descriptor. - - Args: - media_data: Updated media descriptor data (must include 'id') - - Returns: - SingleMediaResponse with updated media - """ - return self._make_request( - "PATCH", - "media", - SingleMediaResponse, - json_data=media_data, - ) - - def create_multiple_media(self, media_list: List[Dict[str, Any]]) -> MediaBulkCreateResponse: - """ - Create multiple media descriptors in bulk. - - Args: - media_list: List of media descriptor data - - Returns: - MediaBulkCreateResponse with created media items - """ - return self._make_request( - "POST", - "media/bulk", - MediaBulkCreateResponse, - json_data=media_list, - ) - - def update_multiple_media(self, media_list: List[Dict[str, Any]]) -> MediaBulkUpdateResponse: - """ - Update multiple media descriptors in bulk. - - Args: - media_list: List of media descriptor data to update (each must include 'id') - - Returns: - MediaBulkUpdateResponse with updated media items - """ - return self._make_request( - "PATCH", - "media/bulk", - MediaBulkUpdateResponse, - json_data=media_list, - ) - - def delete_media(self, media_id: int) -> None: - """ - Delete a media descriptor by ID. - - Args: - media_id: ID of the media to delete - """ - url = urljoin(self.base_url, f"/rest/media/{media_id}") - params = None - if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: - params = self.auth.get_url_parameters() - - response = self.session.delete(url, params=params, timeout=self.timeout) - if response.status_code >= 400: - raise ElytraAPIError(f"Failed to delete media: {response.status_code}") - - def upload_media_file( - self, - file_path: str, - media_id: int, - mam_system: str, - language_code: str = "independent", - ) -> MediaFileResponse: - """ - Upload a media file for a media descriptor. - - Args: - file_path: Path to the file to upload - media_id: ID of the media descriptor - mam_system: MAM system code (fs, sixomc, cumulus, etc.) - language_code: Language code for the file (default: independent) - - Returns: - MediaFileResponse with uploaded file metadata - """ - url = urljoin(self.base_url, "/rest/media/file") - - with open(file_path, "rb") as file: - files = {"file": file} - data = { - "mediaId": media_id, - "mamSystem": mam_system, - "languageCode": language_code, - } - # Add authentication credentials - data.update(self.auth.get_json_body_params()) - - response = self.session.post( - url, - files=files, - data=data, - timeout=self.timeout, - ) - - return self._handle_response(response, MediaFileResponse) - - def get_media_content(self, media_file_id: int) -> bytes: - """ - Download the binary content of a media file. - - Args: - media_file_id: ID of the media file - - Returns: - Binary content of the media file - """ - url = urljoin(self.base_url, f"/rest/media/file/{media_file_id}") - params = None - if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: - params = self.auth.get_url_parameters() - - response = self.session.get(url, params=params, timeout=self.timeout) - if response.status_code >= 400: - raise ElytraAPIError(f"Failed to download media: {response.status_code}") - - return response.content - - def delete_media_file(self, media_file_id: int) -> None: - """ - Delete a media file by ID. - - Args: - media_file_id: ID of the media file to delete - """ - url = urljoin(self.base_url, f"/rest/media/file/{media_file_id}") - params = None - if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: - params = self.auth.get_url_parameters() - - response = self.session.delete(url, params=params, timeout=self.timeout) - if response.status_code >= 400: - raise ElytraAPIError(f"Failed to delete media file: {response.status_code}") - - # ============= Hierarchy Endpoints ============= - - def get_product_hierarchy(self, product_id: int, depth: int = 10) -> ProductHierarchyResponse: - """ - Get the hierarchy of a product. - - Args: - product_id: ID of the product - depth: Depth of the hierarchy (default: 10) - - Returns: - ProductHierarchyResponse with hierarchical product structure - """ - params = {"depth": depth} - return self._make_request( - "GET", - f"products/{product_id}/hierarchy", - ProductHierarchyResponse, - params=params, - ) - - def get_product_group_hierarchy( - self, group_id: int, depth: int = 10 - ) -> ProductGroupHierarchyResponse: - """ - Get the hierarchy of a product group. - - Args: - group_id: ID of the product group - depth: Depth of the hierarchy (default: 10) - - Returns: - ProductGroupHierarchyResponse with hierarchical group structure - """ - params = {"depth": depth} - return self._make_request( - "GET", - f"groups/{group_id}/hierarchy", - ProductGroupHierarchyResponse, - params=params, - ) - - def get_tree_group_hierarchy(self, depth: int = 10) -> List[TreeGroupHierarchyResponse]: - """ - Get the hierarchy of tree groups. - - Args: - depth: Depth of the hierarchy (default: 10) - - Returns: - List of TreeGroupHierarchyResponse with hierarchical tree structure - """ - params = {"depth": depth} - url = urljoin(self.base_url, "/rest/tree/groups/hierarchy") - - if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: - params.update(self.auth.get_url_parameters()) - - response = self.session.request( - method="GET", - url=url, - params=params, - timeout=self.timeout, - ) - - if response.status_code >= 400: - raise ElytraAPIError(f"Failed to get tree hierarchy: {response.status_code}") - - try: - data = response.json() - if isinstance(data, list): - return [TreeGroupHierarchyResponse.model_validate(item) for item in data] - else: - return [TreeGroupHierarchyResponse.model_validate(data)] - except Exception as e: - raise ElytraValidationError(f"Failed to parse tree hierarchy: {str(e)}") - - def get_attribute_group_hierarchy( - self, attribute_group_id: int, depth: int = 10 - ) -> AttributeGroupHierarchyResponse: - """ - Get the hierarchy of an attribute group. - - Args: - attribute_group_id: ID of the attribute group - depth: Depth of the hierarchy (default: 10) - - Returns: - AttributeGroupHierarchyResponse with hierarchical attribute group structure - """ - params = {"depth": depth} - return self._make_request( - "GET", - f"attribute/groups/hierarchy/{attribute_group_id}", - AttributeGroupHierarchyResponse, - params=params, - ) - - # ============= Product Endpoints ============= - - def get_all_products( - self, lang: Optional[str] = None, page: int = 1, limit: int = 10, group_id: Optional[int] = None - ) -> ProductListResponse: - """ - Get all products with optional group filter. - - Args: - lang: Language code (optional) - page: Page number (default: 1) - limit: Number of products per page (default: 10) - group_id: Optional product group ID to filter products - - Returns: - ProductListResponse with paginated list of products - """ - params: Dict[str, Any] = {"page": page, "limit": limit} - if lang: - params["lang"] = lang - if group_id: - params["groupId"] = group_id - - return self._make_request( - "GET", - "products", - ProductListResponse, - params=params, - ) - - def create_product(self, product_data: Dict[str, Any]) -> SingleProductResponse: - """ - Create a new product. - - Args: - product_data: Product data - - Returns: - SingleProductResponse with created product - """ - return self._make_request( - "POST", - "products", - SingleProductResponse, - json_data=product_data, - ) - - def update_product(self, product_data: Dict[str, Any]) -> ProductListResponse: - """ - Update a product. - - Args: - product_data: Updated product data (must include 'id') - - Returns: - ProductListResponse with updated product info - """ - return self._make_request( - "PATCH", - "products", - ProductListResponse, - json_data=product_data, - ) - - def create_multiple_products(self, products_list: List[Dict[str, Any]]) -> ProductBulkCreateResponse: - """ - Create multiple products in bulk. - - Args: - products_list: List of product data - - Returns: - ProductBulkCreateResponse with created products - """ - return self._make_request( - "POST", - "products/bulk", - ProductBulkCreateResponse, - json_data=products_list, - ) - - def update_multiple_products(self, products_list: List[Dict[str, Any]]) -> ProductBulkUpdateResponse: - """ - Update multiple products in bulk. - - Args: - products_list: List of product data to update (each must include 'id') - - Returns: - ProductBulkUpdateResponse with updated products - """ - return self._make_request( - "PATCH", - "products/bulk", - ProductBulkUpdateResponse, - json_data=products_list, - ) - - def get_product_by_id(self, product_id: int, lang: Optional[str] = None) -> SingleProductResponse: - """ - Get a product by ID. - - Args: - product_id: ID of the product - lang: Language code (optional) - - Returns: - SingleProductResponse with product details - """ - params: Dict[str, Any] = {} - if lang: - params["lang"] = lang - - return self._make_request( - "GET", - f"products/{product_id}", - SingleProductResponse, - params=params if params else None, - ) - - def delete_product(self, product_id: int) -> None: - """ - Delete a product by ID. - - Args: - product_id: ID of the product to delete - """ - url = urljoin(self.base_url, f"/rest/products/{product_id}") - params = None - if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: - params = self.auth.get_url_parameters() - - response = self.session.delete(url, params=params, timeout=self.timeout) - if response.status_code >= 400: - raise ElytraAPIError(f"Failed to delete product: {response.status_code}") - - def product_operation(self, operation_data: Dict[str, Any]) -> ProductListResponse: - """ - Perform bulk operations on products (copy, move, link, copy-structure). - - Args: - operation_data: Operation details including operation type, source, target, etc. - - Returns: - ProductListResponse with affected products - """ - return self._make_request( - "POST", - "products/operation", - ProductListResponse, - json_data=operation_data, - ) - - # ============= Product Group Endpoints ============= - - def get_all_product_groups( - self, lang: Optional[str] = None, page: int = 1, limit: int = 10 - ) -> ProductGroupListResponse: - """ - Get all product groups. - - Args: - lang: Language code (optional) - page: Page number (default: 1) - limit: Number of groups per page (default: 10) - - Returns: - ProductGroupListResponse with paginated list of groups - """ - params: Dict[str, Any] = {"page": page, "limit": limit} - if lang: - params["lang"] = lang - - return self._make_request( - "GET", - "groups", - ProductGroupListResponse, - params=params, - ) - - def create_product_group(self, group_data: Dict[str, Any]) -> SingleProductGroupResponse: - """ - Create a new product group. - - Args: - group_data: Product group data - - Returns: - SingleProductGroupResponse with created group - """ - return self._make_request( - "POST", - "groups", - SingleProductGroupResponse, - json_data=group_data, - ) - - def update_product_group(self, group_data: Dict[str, Any]) -> ProductGroupListResponse: - """ - Update a product group. - - Args: - group_data: Updated group data (must include 'id') - - Returns: - ProductGroupListResponse with updated group info - """ - return self._make_request( - "PATCH", - "groups", - ProductGroupListResponse, - json_data=group_data, - ) - - def get_product_group_by_id(self, group_id: int, lang: Optional[str] = None) -> SingleProductGroupResponse: - """ - Get a product group by ID. - - Args: - group_id: ID of the product group - lang: Language code (optional) - - Returns: - SingleProductGroupResponse with group details - """ - params: Dict[str, Any] = {} - if lang: - params["lang"] = lang - - return self._make_request( - "GET", - f"groups/{group_id}", - SingleProductGroupResponse, - params=params if params else None, - ) - - def delete_product_group(self, group_id: int) -> None: - """ - Delete a product group by ID. - - Args: - group_id: ID of the product group to delete - """ - url = urljoin(self.base_url, f"/rest/groups/{group_id}") - params = None - if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: - params = self.auth.get_url_parameters() - - response = self.session.delete(url, params=params, timeout=self.timeout) - if response.status_code >= 400: - raise ElytraAPIError(f"Failed to delete product group: {response.status_code}") - - # ============= Tree Group Endpoints ============= - - def get_all_tree_groups( - self, lang: Optional[str] = None, page: int = 1, limit: int = 10 - ) -> TreeGroupListResponse: - """ - Get all tree groups. - - Args: - lang: Language code (optional) - page: Page number (default: 1) - limit: Number of groups per page (default: 10) - - Returns: - TreeGroupListResponse with paginated list of tree groups - """ - params: Dict[str, Any] = {"page": page, "limit": limit} - if lang: - params["lang"] = lang - - return self._make_request( - "GET", - "tree/groups", - TreeGroupListResponse, - params=params, - ) - - def create_tree_group(self, group_data: Dict[str, Any]) -> SingleTreeGroupResponse: - """ - Create a new tree group. - - Args: - group_data: Tree group data - - Returns: - SingleTreeGroupResponse with created tree group - """ - return self._make_request( - "POST", - "tree/groups", - SingleTreeGroupResponse, - json_data=group_data, - ) - - def update_tree_group(self, group_data: Dict[str, Any]) -> SingleTreeGroupResponse: - """ - Update a tree group. - - Args: - group_data: Updated tree group data (must include 'id') - - Returns: - SingleTreeGroupResponse with updated tree group - """ - return self._make_request( - "PATCH", - "tree/groups", - SingleTreeGroupResponse, - json_data=group_data, - ) - - def get_tree_group_by_id(self, tree_group_id: int, lang: Optional[str] = None) -> SingleTreeGroupResponse: - """ - Get a tree group by ID. - - Args: - tree_group_id: ID of the tree group - lang: Language code (optional) - - Returns: - SingleTreeGroupResponse with tree group details - """ - params: Dict[str, Any] = {} - if lang: - params["lang"] = lang - - return self._make_request( - "GET", - f"tree/groups/{tree_group_id}", - SingleTreeGroupResponse, - params=params if params else None, - ) - - def delete_tree_group(self, tree_group_id: int) -> None: - """ - Delete a tree group by ID. - - Args: - tree_group_id: ID of the tree group to delete - """ - url = urljoin(self.base_url, f"/rest/tree/groups/{tree_group_id}") - params = None - if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: - params = self.auth.get_url_parameters() - - response = self.session.delete(url, params=params, timeout=self.timeout) - if response.status_code >= 400: - raise ElytraAPIError(f"Failed to delete tree group: {response.status_code}") - - # ============= Attribute Endpoints ============= - - def get_all_attributes( - self, lang: Optional[str] = None, page: int = 1, limit: int = 10 - ) -> AttributeListResponse: - """ - Get all attributes. - - Args: - lang: Language code (optional) - page: Page number (default: 1) - limit: Number of attributes per page (default: 10) - - Returns: - AttributeListResponse with paginated list of attributes - """ - params: Dict[str, Any] = {"page": page, "limit": limit} - if lang: - params["lang"] = lang - - return self._make_request( - "GET", - "attributes", - AttributeListResponse, - params=params, - ) - - def create_attribute(self, attribute_data: Dict[str, Any]) -> SimpleAttributeResponse: - """ - Create a new attribute. - - Args: - attribute_data: Attribute data - - Returns: - SimpleAttributeResponse with created attribute - """ - return self._make_request( - "POST", - "attributes", - SimpleAttributeResponse, - json_data=attribute_data, - ) - - def update_attribute(self, attribute_data: Dict[str, Any]) -> SimpleAttributeResponse: - """ - Update an attribute. - - Args: - attribute_data: Updated attribute data (must include 'id') - - Returns: - SimpleAttributeResponse with updated attribute - """ - return self._make_request( - "PATCH", - "attributes", - SimpleAttributeResponse, - json_data=attribute_data, - ) - - def create_multiple_attributes(self, attributes_list: List[Dict[str, Any]]) -> AttributeBulkCreateResponse: - """ - Create multiple attributes in bulk. - - Args: - attributes_list: List of attribute data - - Returns: - AttributeBulkCreateResponse with created attributes - """ - return self._make_request( - "POST", - "attributes/bulk", - AttributeBulkCreateResponse, - json_data=attributes_list, - ) - - def update_multiple_attributes(self, attributes_list: List[Dict[str, Any]]) -> AttributeBulkUpdateResponse: - """ - Update multiple attributes in bulk. - - Args: - attributes_list: List of attribute data to update (each must include 'id') - - Returns: - AttributeBulkUpdateResponse with updated attributes - """ - return self._make_request( - "PATCH", - "attributes/bulk", - AttributeBulkUpdateResponse, - json_data=attributes_list, - ) - - def get_attribute_by_id(self, attribute_id: int, lang: Optional[str] = None) -> SimpleAttributeResponse: - """ - Get an attribute by ID. - - Args: - attribute_id: ID of the attribute - lang: Language code (optional) - - Returns: - SimpleAttributeResponse with attribute details - """ - params: Dict[str, Any] = {} - if lang: - params["lang"] = lang - - return self._make_request( - "GET", - f"attributes/{attribute_id}", - SimpleAttributeResponse, - params=params if params else None, - ) - - def delete_attribute(self, attribute_id: int) -> None: - """ - Delete an attribute by ID. - - Args: - attribute_id: ID of the attribute to delete - """ - url = urljoin(self.base_url, f"/rest/attributes/{attribute_id}") - params = None - if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: - params = self.auth.get_url_parameters() - - response = self.session.delete(url, params=params, timeout=self.timeout) - if response.status_code >= 400: - raise ElytraAPIError(f"Failed to delete attribute: {response.status_code}") - - def get_attribute_by_name(self, attribute_name: str, lang: Optional[str] = None) -> AttributeGetByNameResponse: - """ - Get an attribute by name with language-dependent properties. - - Args: - attribute_name: Name of the attribute - lang: Language code (optional) - - Returns: - AttributeGetByNameResponse with attribute details and languageDependents - """ - params: Dict[str, Any] = {} - if lang: - params["lang"] = lang - - return self._make_request( - "GET", - f"attributes/name/{attribute_name}", - AttributeGetByNameResponse, - params=params if params else None, - ) - - # ============= Attribute Group Endpoints ============= - - def get_all_attribute_groups( - self, lang: Optional[str] = None, page: int = 1, limit: int = 10 - ) -> AttributeGroupListResponse: - """ - Get all attribute groups. - - Args: - lang: Language code (optional) - page: Page number (default: 1) - limit: Number of groups per page (default: 10) - - Returns: - AttributeGroupListResponse with paginated list of attribute groups - """ - params: Dict[str, Any] = {"page": page, "limit": limit} - if lang: - params["lang"] = lang - - return self._make_request( - "GET", - "attribute/groups", - AttributeGroupListResponse, - params=params, - ) - - def create_attribute_group(self, group_data: Dict[str, Any]) -> SingleAttributeGroupResponse: - """ - Create a new attribute group. - - Args: - group_data: Attribute group data - - Returns: - SingleAttributeGroupResponse with created attribute group - """ - return self._make_request( - "POST", - "attribute/groups", - SingleAttributeGroupResponse, - json_data=group_data, - ) - - def get_attribute_group_by_id(self, attribute_group_id: int, lang: Optional[str] = None) -> SingleAttributeGroupResponse: - """ - Get an attribute group by ID. - - Args: - attribute_group_id: ID of the attribute group - lang: Language code (optional) - - Returns: - SingleAttributeGroupResponse with attribute group details - """ - params: Dict[str, Any] = {} - if lang: - params["lang"] = lang - - return self._make_request( - "GET", - f"attribute/groups/{attribute_group_id}", - SingleAttributeGroupResponse, - params=params if params else None, - ) - - def delete_attribute_group(self, attribute_group_id: int) -> None: - """ - Delete an attribute group by ID. - - Args: - attribute_group_id: ID of the attribute group to delete - """ - url = urljoin(self.base_url, f"/rest/attribute/groups/{attribute_group_id}") - params = None - if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: - params = self.auth.get_url_parameters() - - response = self.session.delete(url, params=params, timeout=self.timeout) - if response.status_code >= 400: - raise ElytraAPIError(f"Failed to delete attribute group: {response.status_code}") - - def get_attribute_group_by_name(self, attribute_group_name: str, lang: Optional[str] = None) -> SingleAttributeGroupResponse: - """ - Get an attribute group by name. - - Args: - attribute_group_name: Name of the attribute group - lang: Language code (optional) - - Returns: - SingleAttributeGroupResponse with attribute group details - """ - params: Dict[str, Any] = {} - if lang: - params["lang"] = lang - - return self._make_request( - "GET", - f"attribute/groups/name/{attribute_group_name}", - SingleAttributeGroupResponse, - params=params if params else None, - ) - - def update_attribute_group_by_name(self, attribute_group_name: str, group_data: Dict[str, Any]) -> SingleAttributeGroupResponse: - """ - Update an attribute group by name. - - Args: - attribute_group_name: Name of the attribute group to update - group_data: Updated attribute group data - - Returns: - SingleAttributeGroupResponse with updated attribute group - """ - return self._make_request( - "PATCH", - f"attribute/groups/name/{attribute_group_name}", - SingleAttributeGroupResponse, - json_data=group_data, - ) - - def delete_attribute_group_by_name(self, attribute_group_name: str) -> None: - """ - Delete an attribute group by name. - - Args: - attribute_group_name: Name of the attribute group to delete - """ - url = urljoin(self.base_url, f"/rest/attribute/groups/name/{attribute_group_name}") - params = None - if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: - params = self.auth.get_url_parameters() - - response = self.session.delete(url, params=params, timeout=self.timeout) - if response.status_code >= 400: - raise ElytraAPIError(f"Failed to delete attribute group: {response.status_code}") - - def attribute_group_add_operation(self, operation_data: Dict[str, Any]) -> SingleAttributeGroupResponse: - """ - Perform an add operation on an attribute group. - - Args: - operation_data: Operation details for adding to attribute group - - Returns: - SingleAttributeGroupResponse with updated attribute group - """ - return self._make_request( - "POST", - "attribute/groups/operations/add", - SingleAttributeGroupResponse, - json_data=operation_data, - ) - - # ============= Text Endpoints ============= - - def get_all_texts( - self, lang: Optional[str] = None, page: int = 1, limit: int = 10, - include_content: bool = False, include_attributes: bool = False - ) -> TextListResponse: - """ - Get all texts with optional content and attributes. - - Args: - lang: Language code (optional) - page: Page number (default: 1) - limit: Number of texts per page (default: 10) - include_content: Include text content (default: False) - include_attributes: Include text attributes (default: False) - - Returns: - TextListResponse with paginated list of texts - """ - params: Dict[str, Any] = {"page": page, "limit": limit} - if lang: - params["lang"] = lang - if include_content: - params["includeContent"] = "true" - if include_attributes: - params["includeAttributes"] = "true" - - return self._make_request( - "GET", - "text", - TextListResponse, - params=params, - ) - - def create_text(self, text_data: Dict[str, Any]) -> SingleTextResponse: - """ - Create a new text. - - Args: - text_data: Text data - - Returns: - SingleTextResponse with created text - """ - return self._make_request( - "POST", - "text", - SingleTextResponse, - json_data=text_data, - ) - - def update_text(self, text_data: Dict[str, Any]) -> SingleTextResponse: - """ - Update a text. - - Args: - text_data: Updated text data (must include 'id') - - Returns: - SingleTextResponse with updated text - """ - return self._make_request( - "PATCH", - "text", - SingleTextResponse, - json_data=text_data, - ) - - def create_multiple_texts(self, texts_list: List[Dict[str, Any]]) -> TextBulkCreateResponse: - """ - Create multiple texts in bulk. - - Args: - texts_list: List of text data - - Returns: - TextBulkCreateResponse with created texts - """ - return self._make_request( - "POST", - "text/bulk", - TextBulkCreateResponse, - json_data=texts_list, - ) - - def update_multiple_texts(self, texts_list: List[Dict[str, Any]]) -> TextBulkUpdateResponse: - """ - Update multiple texts in bulk. - - Args: - texts_list: List of text data to update (each must include 'id') - - Returns: - TextBulkUpdateResponse with updated texts - """ - return self._make_request( - "PATCH", - "text/bulk", - TextBulkUpdateResponse, - json_data=texts_list, - ) - - def get_text_by_id(self, text_id: int, lang: Optional[str] = None) -> SingleTextResponse: - """ - Get a text by ID. - - Args: - text_id: ID of the text - lang: Language code (optional) - - Returns: - SingleTextResponse with text details - """ - params: Dict[str, Any] = {} - if lang: - params["lang"] = lang - - return self._make_request( - "GET", - f"text/{text_id}", - SingleTextResponse, - params=params if params else None, - ) - - def delete_text(self, text_id: int) -> None: - """ - Delete a text by ID. - - Args: - text_id: ID of the text to delete - """ - url = urljoin(self.base_url, f"/rest/text/{text_id}") - params = None - if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: - params = self.auth.get_url_parameters() - - response = self.session.delete(url, params=params, timeout=self.timeout) - if response.status_code >= 400: - raise ElytraAPIError(f"Failed to delete text: {response.status_code}") - - def close(self) -> None: - """Close the session and clean up resources.""" - self.session.close() - - def __enter__(self): - """Context manager entry""" - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit""" - self.close() diff --git a/elytra_client/rest_api/client/__init__.py b/elytra_client/rest_api/client/__init__.py new file mode 100644 index 0000000..eef7e3d --- /dev/null +++ b/elytra_client/rest_api/client/__init__.py @@ -0,0 +1,49 @@ +"""Lobster PIM Legacy REST API Client.""" + +from .attribute_groups import AttributeGroupsMixin +from .attributes import AttributesMixin +from .base import LobsterRestApiClientBase +from .jobs import JobsMixin +from .media import MediaMixin +from .product_groups import ProductGroupsMixin +from .products import ProductsMixin +from .protocols import ProtocolsMixin +from .text import TextMixin +from .tree_groups import TreeGroupsMixin + + +class LobsterRestApiClient( + LobsterRestApiClientBase, + JobsMixin, + ProtocolsMixin, + ProductsMixin, + ProductGroupsMixin, + TreeGroupsMixin, + AttributesMixin, + AttributeGroupsMixin, + MediaMixin, + TextMixin, +): + """ + Complete client for the Lobster PIM Legacy REST API. + + Combines base infrastructure with domain-specific mixins for organized + access to all API endpoints across jobs, protocols, products, media, etc. + + Example: + >>> from elytra_client.rest_api.client import LobsterRestApiClient + >>> from elytra_client.rest_api.auth import RestApiAuth, AuthMethod + >>> + >>> auth = RestApiAuth.from_bearer_token("your-token") + >>> client = LobsterRestApiClient("http://localhost:8080", auth) + >>> + >>> # Access jobs + >>> jobs = client.get_all_active_jobs() + >>> protocols = client.get_protocols() + >>> products = client.get_all_products() + """ + + pass + + +__all__ = ["LobsterRestApiClient"] diff --git a/elytra_client/rest_api/client/_base_protocol.py b/elytra_client/rest_api/client/_base_protocol.py new file mode 100644 index 0000000..b3efe49 --- /dev/null +++ b/elytra_client/rest_api/client/_base_protocol.py @@ -0,0 +1,36 @@ +"""Protocol defining the interface expected by all mixin classes.""" + +from typing import Any, Dict, Optional, Protocol, Type, TypeVar + +import requests +from pydantic import BaseModel + +T = TypeVar("T", bound=BaseModel) + + +class ClientBaseProtocol(Protocol): + """Protocol that defines the base client interface for mixins.""" + + base_url: str + auth: Any + timeout: int + session: requests.Session + + def _make_request( + self, + method: str, + endpoint: str, + expected_model: Type[T], + params: Optional[Dict[str, Any]] = None, + json_data: Optional[Dict[str, Any]] = None, + ) -> T: + """Make HTTP request to the REST API.""" + ... + + def _handle_response( + self, + response: requests.Response, + expected_model: Type[T], + ) -> T: + """Handle API response and parse into Pydantic model.""" + ... diff --git a/elytra_client/rest_api/client/attribute_groups.py b/elytra_client/rest_api/client/attribute_groups.py new file mode 100644 index 0000000..1f4b8cd --- /dev/null +++ b/elytra_client/rest_api/client/attribute_groups.py @@ -0,0 +1,198 @@ +# mypy: disable-error-code="attr-defined, no-any-return" +"""Attribute Groups mixin for the Lobster REST API client.""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional +from urllib.parse import urljoin + +from ...exceptions import ElytraAPIError +from ..auth import AuthMethod +from ..models import ( + AttributeGroupListResponse, + SingleAttributeGroupResponse, + SingleUpdateAttributeGroupRequestBody, +) + + +class AttributeGroupsMixin: + """Mixin for attribute group-related operations. + # type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base + + Expects to be mixed with LobsterRestApiClientBase or a compatible base class + that provides: base_url, auth, timeout, session, _make_request(), _handle_response(). + """ + + # Type hints for mixed-in attributes from base class (docstring only for clarity) + # base_url: str - from LobsterRestApiClientBase + # auth: Any - from LobsterRestApiClientBase + # timeout: int - from LobsterRestApiClientBase + # session: requests.Session - from LobsterRestApiClientBase + + def get_all_attribute_groups( + self, + lang: Optional[str] = None, + page: int = 1, + limit: int = 10, + ) -> AttributeGroupListResponse: + """ + Get all attribute groups. + + Args: + lang: Language code (optional) + page: Page number (default: 1) + limit: Number of groups per page (default: 10) + + Returns: + AttributeGroupListResponse with paginated list of attribute groups + """ + params: Dict[str, Any] = {"page": page, "limit": limit} + if lang: + params["lang"] = lang + + return self._make_request( # type: ignore[attr-defined] + "GET", + "attribute/groups", + AttributeGroupListResponse, + params=params, + ) + + def create_attribute_group(self, group_data: Dict[str, Any]) -> SingleAttributeGroupResponse: + """ + Create a new attribute group. + + Args: + group_data: Attribute group data + + Returns: + SingleAttributeGroupResponse with created group + """ + return self._make_request( # type: ignore[attr-defined] + "POST", + "attribute/groups", + SingleAttributeGroupResponse, + json_data=group_data, + ) + + def get_attribute_group_by_id( + self, group_id: int, lang: Optional[str] = None + ) -> SingleAttributeGroupResponse: + """ + Get an attribute group by ID. + + Args: + group_id: ID of the attribute group + lang: Language code (optional) + + Returns: + SingleAttributeGroupResponse with group details + """ + params: Dict[str, Any] = {} + if lang: + params["lang"] = lang + + return self._make_request( # type: ignore[attr-defined] + "GET", + f"attribute/groups/{group_id}", + SingleAttributeGroupResponse, + params=params if params else None, + ) + + def delete_attribute_group(self, group_id: int) -> None: + """ + Delete an attribute group by ID. + + Args: + group_id: ID of the attribute group to delete + """ + url = urljoin(self.base_url, f"/rest/attribute/groups/{group_id}") # type: ignore[attr-defined] + params = None + if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined] + params = self.auth.get_url_parameters() # type: ignore[attr-defined] + + response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined] + if response.status_code >= 400: + raise ElytraAPIError(f"Failed to delete attribute group: {response.status_code}") + + def get_attribute_group_by_name( + self, group_name: str, lang: Optional[str] = None + ) -> SingleAttributeGroupResponse: + """ + Get an attribute group by name. + + Args: + group_name: Name of the attribute group + lang: Language code (optional) + + Returns: + SingleAttributeGroupResponse with group details + """ + params: Dict[str, Any] = {} + if lang: + params["lang"] = lang + + return self._make_request( # type: ignore[attr-defined] + "GET", + f"attribute/groups/name/{group_name}", + SingleAttributeGroupResponse, + params=params if params else None, + ) + + def update_attribute_group_by_name( + self, group_name: str, group_data: Dict[str, Any] + ) -> SingleAttributeGroupResponse: + """ + Update an attribute group by name. + + Args: + group_name: Name of the attribute group to update + group_data: Updated group data + + Returns: + SingleAttributeGroupResponse with updated group + """ + return self._make_request( # type: ignore[attr-defined] + "PATCH", + f"attribute/groups/name/{group_name}", + SingleAttributeGroupResponse, + json_data=group_data, + ) + + def delete_attribute_group_by_name(self, group_name: str) -> None: + """ + Delete an attribute group by name. + + Args: + group_name: Name of the attribute group to delete + """ + url = urljoin( + self.base_url, f"/rest/attribute/groups/name/{group_name}" # type: ignore[attr-defined] + ) + params = None + if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined] + params = self.auth.get_url_parameters() # type: ignore[attr-defined] + + response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined] + if response.status_code >= 400: + raise ElytraAPIError( + f"Failed to delete attribute group by name: {response.status_code}" + ) + + def attribute_group_add_operation( + self, operation_data: Dict[str, Any] + ) -> AttributeGroupListResponse: + """ + Perform bulk operations on attribute groups. + + Args: + operation_data: Operation details + + Returns: + AttributeGroupListResponse with affected groups + """ + return self._make_request( # type: ignore[attr-defined] + "POST", + "attribute/groups/operation", + AttributeGroupListResponse, + json_data=operation_data, + ) diff --git a/elytra_client/rest_api/client/attributes.py b/elytra_client/rest_api/client/attributes.py new file mode 100644 index 0000000..d8f34af --- /dev/null +++ b/elytra_client/rest_api/client/attributes.py @@ -0,0 +1,220 @@ +# mypy: disable-error-code="attr-defined, no-any-return" +"""Attributes mixin for the Lobster REST API client.""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional +from urllib.parse import urljoin + +from ...exceptions import ElytraAPIError +from ..auth import AuthMethod +from ..models import ( + AttributeBulkCreateResponse, + AttributeBulkUpdateResponse, + AttributeGetByNameResponse, + AttributeGroupHierarchyResponse, + AttributeListResponse, + SimpleAttributeResponse, +) + + +class AttributesMixin: + """Mixin for attribute-related operations. + # type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base + + Expects to be mixed with LobsterRestApiClientBase or a compatible base class + that provides: base_url, auth, timeout, session, _make_request(), _handle_response(). + """ + + # Type hints for mixed-in attributes from base class (docstring only for clarity) + # base_url: str - from LobsterRestApiClientBase + # auth: Any - from LobsterRestApiClientBase + # timeout: int - from LobsterRestApiClientBase + # session: requests.Session - from LobsterRestApiClientBase + + def get_all_attributes( + self, + lang: Optional[str] = None, + page: int = 1, + limit: int = 10, + ) -> AttributeListResponse: + """ + Get all attributes. + + Args: + lang: Language code (optional) + page: Page number (default: 1) + limit: Number of attributes per page (default: 10) + + Returns: + AttributeListResponse with paginated list of attributes + """ + params: Dict[str, Any] = {"page": page, "limit": limit} + if lang: + params["lang"] = lang + + return self._make_request( # type: ignore[attr-defined] + "GET", + "attributes", + AttributeListResponse, + params=params, + ) + + def create_attribute(self, attribute_data: Dict[str, Any]) -> SimpleAttributeResponse: + """ + Create a new attribute. + + Args: + attribute_data: Attribute data + + Returns: + SimpleAttributeResponse with created attribute + """ + return self._make_request( # type: ignore[attr-defined] + "POST", + "attributes", + SimpleAttributeResponse, + json_data=attribute_data, + ) + + def update_attribute(self, attribute_data: Dict[str, Any]) -> SimpleAttributeResponse: + """ + Update an attribute. + + Args: + attribute_data: Updated attribute data (must include 'id') + + Returns: + SimpleAttributeResponse with updated attribute + """ + return self._make_request( # type: ignore[attr-defined] + "PATCH", + "attributes", + SimpleAttributeResponse, + json_data=attribute_data, + ) + + def create_multiple_attributes( + self, attributes_list: List[Dict[str, Any]] + ) -> AttributeBulkCreateResponse: + """ + Create multiple attributes in bulk. + + Args: + attributes_list: List of attribute data + + Returns: + AttributeBulkCreateResponse with created attributes + """ + return self._make_request( # type: ignore[attr-defined] + "POST", + "attributes/bulk", + AttributeBulkCreateResponse, + json_data=attributes_list, + ) + + def update_multiple_attributes( + self, attributes_list: List[Dict[str, Any]] + ) -> AttributeBulkUpdateResponse: + """ + Update multiple attributes in bulk. + + Args: + attributes_list: List of attribute data to update (each must include 'id') + + Returns: + AttributeBulkUpdateResponse with updated attributes + """ + return self._make_request( # type: ignore[attr-defined] + "PATCH", + "attributes/bulk", + AttributeBulkUpdateResponse, + json_data=attributes_list, + ) + + def get_attribute_by_id( + self, attribute_id: int, lang: Optional[str] = None + ) -> SimpleAttributeResponse: + """ + Get an attribute by ID. + + Args: + attribute_id: ID of the attribute + lang: Language code (optional) + + Returns: + SimpleAttributeResponse with attribute details + """ + params: Dict[str, Any] = {} + if lang: + params["lang"] = lang + + return self._make_request( # type: ignore[attr-defined] + "GET", + f"attributes/{attribute_id}", + SimpleAttributeResponse, + params=params if params else None, + ) + + def delete_attribute(self, attribute_id: int) -> None: + """ + Delete an attribute by ID. + + Args: + attribute_id: ID of the attribute to delete + """ + url = urljoin( + self.base_url, f"/rest/attributes/{attribute_id}" # type: ignore[attr-defined] + ) + params = None + if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined] + params = self.auth.get_url_parameters() # type: ignore[attr-defined] + + response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined] + if response.status_code >= 400: + raise ElytraAPIError(f"Failed to delete attribute: {response.status_code}") + + def get_attribute_by_name( + self, attribute_name: str, lang: Optional[str] = None + ) -> AttributeGetByNameResponse: + """ + Get an attribute by name with language-dependent properties. + + Args: + attribute_name: Name of the attribute + lang: Language code (optional) + + Returns: + AttributeGetByNameResponse with attribute details and languageDependents + """ + params: Dict[str, Any] = {} + if lang: + params["lang"] = lang + + return self._make_request( # type: ignore[attr-defined] + "GET", + f"attributes/name/{attribute_name}", + AttributeGetByNameResponse, + params=params if params else None, + ) + + def get_attribute_group_hierarchy( + self, attribute_group_id: int, depth: int = 10 + ) -> AttributeGroupHierarchyResponse: + """ + Get the hierarchy of an attribute group. + + Args: + attribute_group_id: ID of the attribute group + depth: Depth of the hierarchy (default: 10) + + Returns: + AttributeGroupHierarchyResponse with hierarchical attribute group structure + """ + params = {"depth": depth} + return self._make_request( # type: ignore[attr-defined] + "GET", + f"attribute/groups/hierarchy/{attribute_group_id}", + AttributeGroupHierarchyResponse, + params=params, + ) diff --git a/elytra_client/rest_api/client/base.py b/elytra_client/rest_api/client/base.py new file mode 100644 index 0000000..e999849 --- /dev/null +++ b/elytra_client/rest_api/client/base.py @@ -0,0 +1,152 @@ +"""Base client for the Lobster PIM Legacy REST API.""" + +from __future__ import annotations + +from typing import Any, Dict, Optional, Type, TypeVar +from urllib.parse import urljoin + +import requests +from pydantic import BaseModel, ValidationError + +from ...exceptions import ( + ElytraAPIError, + ElytraAuthenticationError, + ElytraNotFoundError, + ElytraValidationError, +) +from ..auth import AuthMethod, RestApiAuth + +T = TypeVar("T", bound=BaseModel) + + +class LobsterRestApiClientBase: + """ + Base client for the Lobster PIM Legacy REST API. + + Provides core infrastructure for authentication, HTTP requests, and response handling. + Subclasses should extend this with domain-specific methods via mixins. + + Args: + base_url: The base URL of the Lobster PIM server (e.g., http://localhost:8080) + auth: RestApiAuth instance for authentication + timeout: Request timeout in seconds (default: 30) + """ + + def __init__( + self, + base_url: str, + auth: RestApiAuth, + timeout: int = 30, + ): + """Initialize the Lobster REST API client""" + self.base_url = base_url.rstrip("/") + self.auth = auth + self.timeout = timeout + self.session = requests.Session() + self._setup_headers() + + def _setup_headers(self) -> None: + """Set up request headers including authentication""" + self.session.headers.update( + { + "Content-Type": "application/json", + "Accept": "application/json", + } + ) + self.session.headers.update(self.auth.get_auth_header()) + + def _handle_response( + self, + response: requests.Response, + expected_model: Type[T], + ) -> T: + """ + Handle API response and parse into Pydantic model. + + Args: + response: Response from requests + expected_model: Pydantic model to deserialize into + + Returns: + Parsed response as Pydantic model + + Raises: + ElytraAuthenticationError: If authentication fails + ElytraNotFoundError: If resource not found + ElytraAPIError: For other API errors + ElytraValidationError: If response validation fails + """ + if response.status_code == 401: + raise ElytraAuthenticationError("Authentication failed") + elif response.status_code == 404: + raise ElytraNotFoundError("Resource not found") + elif response.status_code == 429: + raise ElytraAPIError("Too many requests - rate limit exceeded") + elif response.status_code >= 400: + try: + error_data = response.json() + error_msg = error_data.get("error") or error_data.get("message", response.text) + except Exception: + error_msg = response.text + raise ElytraAPIError(f"API error {response.status_code}: {error_msg}") + + try: + data = response.json() + except Exception as e: + raise ElytraAPIError(f"Failed to parse response as JSON: {str(e)}") + + try: + return expected_model.model_validate(data) + except ValidationError as e: + raise ElytraValidationError(f"Response validation failed: {str(e)}") + + def _make_request( + self, + method: str, + endpoint: str, + expected_model: Type[T], + params: Optional[Dict[str, Any]] = None, + json_data: Optional[Dict[str, Any]] = None, + ) -> T: + """ + Make HTTP request to the REST API. + + Args: + method: HTTP method (GET, POST, etc.) + endpoint: API endpoint path + expected_model: Pydantic model for response + params: Query parameters + json_data: JSON request body + + Returns: + Parsed response as Pydantic model + """ + url = urljoin(self.base_url, f"/rest/{endpoint}") + + # Add authentication parameters for GET requests + if method.upper() == "GET" and self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: + if params is None: + params = {} + params.update(self.auth.get_url_parameters()) + + response = self.session.request( + method=method, + url=url, + params=params, + json=json_data, + timeout=self.timeout, + ) + + return self._handle_response(response, expected_model) + + def __enter__(self): + """Context manager entry""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit""" + self.close() + + def close(self): + """Close the session""" + self.session.close() diff --git a/elytra_client/rest_api/client/jobs.py b/elytra_client/rest_api/client/jobs.py new file mode 100644 index 0000000..1d2ae3a --- /dev/null +++ b/elytra_client/rest_api/client/jobs.py @@ -0,0 +1,195 @@ +# mypy: disable-error-code="attr-defined, no-any-return" +"""Jobs mixin for the Lobster REST API client.""" + +from __future__ import annotations + +from typing import Any, Dict, Optional + +from ..models import ( + JobControlRequest, + JobControlResponse, + JobDetailInfo, + JobExecutionResponse, + JobInfo, + JobOverviewResponse, +) + + +class JobsMixin: + """Mixin for job-related operations. + # type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base + + Expects to be mixed with LobsterRestApiClientBase or a compatible base class + that provides: base_url, auth, timeout, session, _make_request(), _handle_response(). + """ + + # Type hints for mixed-in attributes from base class (docstring only for clarity) + # base_url: str - from LobsterRestApiClientBase + # auth: Any - from LobsterRestApiClientBase + # timeout: int - from LobsterRestApiClientBase + # session: requests.Session - from LobsterRestApiClientBase + + def get_job_overview(self) -> JobOverviewResponse: + """ + Get overview of all active jobs. + + Returns: + JobOverviewResponse containing list of active jobs + """ + return self._make_request( # type: ignore[attr-defined] + "GET", + "job/overview", + JobOverviewResponse, + ) + + def get_job_html_overview(self) -> str: + """ + Get HTML overview of all active jobs. + + Returns: + HTML page content with job overview + """ + from urllib.parse import urljoin + + from ...exceptions import ElytraAPIError + from ..auth import AuthMethod + + url = urljoin(self.base_url, "/rest/job/overview") # type: ignore[attr-defined] + params = None + if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined] + params = self.auth.get_url_parameters() # type: ignore[attr-defined] + + response = self.session.get(url, params=params, timeout=self.timeout) # type: ignore[attr-defined] + if response.status_code >= 400: + raise ElytraAPIError(f"Failed to get job overview: {response.status_code}") + return response.text + + def get_all_active_jobs(self) -> JobOverviewResponse: + """ + Get list of all active jobs. + + Returns: + JobOverviewResponse containing list of all active jobs + """ + return self._make_request( # type: ignore[attr-defined] + "GET", + "job/job_id", + JobOverviewResponse, + ) + + def get_job_detail(self, job_id: int) -> JobDetailInfo: + """ + Get details of a specific active job. + + Args: + job_id: ID of the job + + Returns: + JobDetailInfo with job details + """ + return self._make_request( # type: ignore[attr-defined] + "GET", + f"job/job_id/{job_id}", + JobDetailInfo, + ) + + def execute_job( + self, + job_id: int, + ) -> JobExecutionResponse: + """ + Execute a job and get details of the started job. + + Args: + job_id: ID of the job to execute + + Returns: + JobExecutionResponse with execution details + """ + return self._make_request( # type: ignore[attr-defined] + "GET", + f"job/execute/{job_id}", + JobExecutionResponse, + ) + + def get_running_job_instances(self) -> JobOverviewResponse: + """ + Get list of running job instances. + + Returns: + JobOverviewResponse containing list of running job instances + """ + return self._make_request( # type: ignore[attr-defined] + "GET", + "job/runtime_id", + JobOverviewResponse, + ) + + def get_running_job_instance(self, runtime_id: str) -> JobOverviewResponse: + """ + Get details of a specific running job instance. + + Args: + runtime_id: Runtime ID of the job instance + + Returns: + JobOverviewResponse with instance details + """ + return self._make_request( # type: ignore[attr-defined] + "GET", + f"job/runtime_id/{runtime_id}", + JobOverviewResponse, + ) + + def control_job( + self, + job_id: int, + action: str = "start", + additional_reference: Optional[str] = None, + parameters: Optional[Dict[str, Any]] = None, + queue_id: Optional[str] = None, + max_job_duration_seconds: Optional[int] = None, + ) -> JobControlResponse: + """ + Control a job using the control endpoint (POST). + + Supports starting jobs with parameter overrides and queue management. + + Args: + job_id: ID of the job to control + action: Action to perform (default: "start") + additional_reference: Optional reference for external tracking + parameters: Optional parameters to override job settings + queue_id: Optional queue ID for serialized execution + max_job_duration_seconds: Max duration in seconds (default 12 hours) + + Returns: + JobControlResponse with execution details + """ + request_body = { + "action": action, + "objectId": job_id, + "objectType": "job", + } + + # Add authentication credentials for POST + request_body.update(self.auth.get_json_body_params()) # type: ignore[attr-defined] + + if additional_reference: + request_body["additionalReference"] = additional_reference + + if parameters: + request_body["parameter"] = parameters + + if queue_id: + request_body["queueId"] = queue_id + + if max_job_duration_seconds: + request_body["maxJobDurationSeconds"] = max_job_duration_seconds + + return self._make_request( # type: ignore[attr-defined] + "POST", + "job/control", + JobControlResponse, + json_data=request_body, + ) diff --git a/elytra_client/rest_api/client/media.py b/elytra_client/rest_api/client/media.py new file mode 100644 index 0000000..d027964 --- /dev/null +++ b/elytra_client/rest_api/client/media.py @@ -0,0 +1,240 @@ +# mypy: disable-error-code="attr-defined, no-any-return" +"""Media mixin for the Lobster REST API client.""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional +from urllib.parse import urljoin + +from ...exceptions import ElytraAPIError +from ..auth import AuthMethod +from ..models import ( + MediaBulkCreateResponse, + MediaBulkUpdateResponse, + MediaFileResponse, + MediaListResponse, + SingleMediaResponse, +) + + +class MediaMixin: + """Mixin for media-related operations. + # type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base + + Expects to be mixed with LobsterRestApiClientBase or a compatible base class + that provides: base_url, auth, timeout, session, _make_request(), _handle_response(). + """ + + # Type hints for mixed-in attributes from base class (docstring only for clarity) + # base_url: str - from LobsterRestApiClientBase + # auth: Any - from LobsterRestApiClientBase + # timeout: int - from LobsterRestApiClientBase + # session: requests.Session - from LobsterRestApiClientBase + + def get_all_media( + self, lang: Optional[str] = None, page: int = 1, limit: int = 10 + ) -> MediaListResponse: + """ + Get all media descriptors with pagination. + + Args: + lang: Language code (optional) + page: Page number (default: 1) + limit: Number of media items per page (default: 10) + + Returns: + MediaListResponse containing paginated list of media items + """ + params: Dict[str, Any] = {"page": page, "limit": limit} + if lang: + params["lang"] = lang + + return self._make_request( # type: ignore[attr-defined] + "GET", + "media", + MediaListResponse, + params=params, + ) + + def get_media_by_id(self, media_id: int, lang: Optional[str] = None) -> SingleMediaResponse: + """ + Get a media descriptor by ID. + + Args: + media_id: ID of the media + lang: Language code (optional) + + Returns: + SingleMediaResponse with media details + """ + params: Dict[str, Any] = {} + if lang: + params["lang"] = lang + + return self._make_request( # type: ignore[attr-defined] + "GET", + f"media/{media_id}", + SingleMediaResponse, + params=params if params else None, + ) + + def create_media(self, media_data: Dict[str, Any]) -> SingleMediaResponse: + """ + Create a new media descriptor. + + Args: + media_data: Media descriptor data + + Returns: + SingleMediaResponse with created media + """ + return self._make_request( # type: ignore[attr-defined] + "POST", + "media", + SingleMediaResponse, + json_data=media_data, + ) + + def update_media(self, media_data: Dict[str, Any]) -> SingleMediaResponse: + """ + Update a media descriptor. + + Args: + media_data: Updated media descriptor data (must include 'id') + + Returns: + SingleMediaResponse with updated media + """ + return self._make_request( # type: ignore[attr-defined] + "PATCH", + "media", + SingleMediaResponse, + json_data=media_data, + ) + + def create_multiple_media(self, media_list: List[Dict[str, Any]]) -> MediaBulkCreateResponse: + """ + Create multiple media descriptors in bulk. + + Args: + media_list: List of media descriptor data + + Returns: + MediaBulkCreateResponse with created media items + """ + return self._make_request( # type: ignore[attr-defined] + "POST", + "media/bulk", + MediaBulkCreateResponse, + json_data=media_list, + ) + + def update_multiple_media(self, media_list: List[Dict[str, Any]]) -> MediaBulkUpdateResponse: + """ + Update multiple media descriptors in bulk. + + Args: + media_list: List of media descriptor data to update (each must include 'id') + + Returns: + MediaBulkUpdateResponse with updated media items + """ + return self._make_request( # type: ignore[attr-defined] + "PATCH", + "media/bulk", + MediaBulkUpdateResponse, + json_data=media_list, + ) + + def delete_media(self, media_id: int) -> None: + """ + Delete a media descriptor by ID. + + Args: + media_id: ID of the media to delete + """ + url = urljoin(self.base_url, f"/rest/media/{media_id}") # type: ignore[attr-defined] + params = None + if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined] + params = self.auth.get_url_parameters() # type: ignore[attr-defined] + + response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined] + if response.status_code >= 400: + raise ElytraAPIError(f"Failed to delete media: {response.status_code}") + + def upload_media_file( + self, + file_path: str, + media_id: int, + mam_system: str, + language_code: str = "independent", + ) -> MediaFileResponse: + """ + Upload a media file for a media descriptor. + + Args: + file_path: Path to the file to upload + media_id: ID of the media descriptor + mam_system: MAM system code (fs, sixomc, cumulus, etc.) + language_code: Language code for the file (default: independent) + + Returns: + MediaFileResponse with uploaded file metadata + """ + url = urljoin(self.base_url, "/rest/media/file") # type: ignore[attr-defined] + + with open(file_path, "rb") as file: + files = {"file": file} + data = { + "mediaId": media_id, + "mamSystem": mam_system, + "languageCode": language_code, + } + # Add authentication credentials + data.update(self.auth.get_json_body_params()) # type: ignore[attr-defined] + + response = self.session.post( # type: ignore[attr-defined] + url, + files=files, + data=data, + timeout=self.timeout, # type: ignore[attr-defined] + ) + + return self._handle_response(response, MediaFileResponse) # type: ignore[attr-defined] + + def get_media_content(self, media_file_id: int) -> bytes: + """ + Download the binary content of a media file. + + Args: + media_file_id: ID of the media file + + Returns: + Binary content of the media file + """ + url = urljoin(self.base_url, f"/rest/media/file/{media_file_id}") # type: ignore[attr-defined] + params = None + if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined] + params = self.auth.get_url_parameters() # type: ignore[attr-defined] + + response = self.session.get(url, params=params, timeout=self.timeout) # type: ignore[attr-defined] + if response.status_code >= 400: + raise ElytraAPIError(f"Failed to download media: {response.status_code}") + + return response.content + + def delete_media_file(self, media_file_id: int) -> None: + """ + Delete a media file by ID. + + Args: + media_file_id: ID of the media file to delete + """ + url = urljoin(self.base_url, f"/rest/media/file/{media_file_id}") # type: ignore[attr-defined] + params = None + if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined] + params = self.auth.get_url_parameters() # type: ignore[attr-defined] + + response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined] + if response.status_code >= 400: + raise ElytraAPIError(f"Failed to delete media file: {response.status_code}") diff --git a/elytra_client/rest_api/client/product_groups.py b/elytra_client/rest_api/client/product_groups.py new file mode 100644 index 0000000..81606ea --- /dev/null +++ b/elytra_client/rest_api/client/product_groups.py @@ -0,0 +1,193 @@ +# mypy: disable-error-code="attr-defined, no-any-return" +"""Product Groups mixin for the Lobster REST API client.""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional +from urllib.parse import urljoin + +from ...exceptions import ElytraAPIError +from ..auth import AuthMethod +from ..models import ( + ProductGroupBulkCreateResponse, + ProductGroupBulkUpdateResponse, + ProductGroupHierarchyResponse, + ProductGroupListResponse, + SingleProductGroupResponse, +) + + +class ProductGroupsMixin: + """Mixin for product group-related operations. + # type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base + + Expects to be mixed with LobsterRestApiClientBase or a compatible base class + that provides: base_url, auth, timeout, session, _make_request(), _handle_response(). + """ + + # Type hints for mixed-in attributes from base class (docstring only for clarity) + # base_url: str - from LobsterRestApiClientBase + # auth: Any - from LobsterRestApiClientBase + # timeout: int - from LobsterRestApiClientBase + # session: requests.Session - from LobsterRestApiClientBase + + def get_all_product_groups( + self, + lang: Optional[str] = None, + page: int = 1, + limit: int = 10, + ) -> ProductGroupListResponse: + """ + Get all product groups. + + Args: + lang: Language code (optional) + page: Page number (default: 1) + limit: Number of groups per page (default: 10) + + Returns: + ProductGroupListResponse with paginated list of groups + """ + params: Dict[str, Any] = {"page": page, "limit": limit} + if lang: + params["lang"] = lang + + return self._make_request( # type: ignore[attr-defined] + "GET", + "groups", + ProductGroupListResponse, + params=params, + ) + + def create_product_group(self, group_data: Dict[str, Any]) -> SingleProductGroupResponse: + """ + Create a new product group. + + Args: + group_data: Product group data + + Returns: + SingleProductGroupResponse with created group + """ + return self._make_request( # type: ignore[attr-defined] + "POST", + "groups", + SingleProductGroupResponse, + json_data=group_data, + ) + + def update_product_group(self, group_data: Dict[str, Any]) -> ProductGroupListResponse: + """ + Update a product group. + + Args: + group_data: Updated group data (must include 'id') + + Returns: + ProductGroupListResponse with updated group info + """ + return self._make_request( # type: ignore[attr-defined] + "PATCH", + "groups", + ProductGroupListResponse, + json_data=group_data, + ) + + def get_product_group_by_id( + self, group_id: int, lang: Optional[str] = None + ) -> SingleProductGroupResponse: + """ + Get a product group by ID. + + Args: + group_id: ID of the product group + lang: Language code (optional) + + Returns: + SingleProductGroupResponse with group details + """ + params: Dict[str, Any] = {} + if lang: + params["lang"] = lang + + return self._make_request( # type: ignore[attr-defined] + "GET", + f"groups/{group_id}", + SingleProductGroupResponse, + params=params if params else None, + ) + + def delete_product_group(self, group_id: int) -> None: + """ + Delete a product group by ID. + + Args: + group_id: ID of the product group to delete + """ + url = urljoin(self.base_url, f"/rest/groups/{group_id}") # type: ignore[attr-defined] + params = None + if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined] + params = self.auth.get_url_parameters() # type: ignore[attr-defined] + + response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined] + if response.status_code >= 400: + raise ElytraAPIError(f"Failed to delete product group: {response.status_code}") + + def create_multiple_product_groups( + self, groups_list: List[Dict[str, Any]] + ) -> ProductGroupBulkCreateResponse: + """ + Create multiple product groups in bulk. + + Args: + groups_list: List of product group data + + Returns: + ProductGroupBulkCreateResponse with created groups + """ + return self._make_request( # type: ignore[attr-defined] + "POST", + "groups/bulk", + ProductGroupBulkCreateResponse, + json_data=groups_list, + ) + + def update_multiple_product_groups( + self, groups_list: List[Dict[str, Any]] + ) -> ProductGroupBulkUpdateResponse: + """ + Update multiple product groups in bulk. + + Args: + groups_list: List of product group data to update (each must include 'id') + + Returns: + ProductGroupBulkUpdateResponse with updated groups + """ + return self._make_request( # type: ignore[attr-defined] + "PATCH", + "groups/bulk", + ProductGroupBulkUpdateResponse, + json_data=groups_list, + ) + + def get_product_group_hierarchy( + self, group_id: int, depth: int = 10 + ) -> ProductGroupHierarchyResponse: + """ + Get the hierarchy of a product group. + + Args: + group_id: ID of the product group + depth: Depth of the hierarchy (default: 10) + + Returns: + ProductGroupHierarchyResponse with hierarchical group structure + """ + params = {"depth": depth} + return self._make_request( # type: ignore[attr-defined] + "GET", + f"groups/{group_id}/hierarchy", + ProductGroupHierarchyResponse, + params=params, + ) diff --git a/elytra_client/rest_api/client/products.py b/elytra_client/rest_api/client/products.py new file mode 100644 index 0000000..a148b65 --- /dev/null +++ b/elytra_client/rest_api/client/products.py @@ -0,0 +1,212 @@ +# mypy: disable-error-code="attr-defined, no-any-return" +"""Products mixin for the Lobster REST API client.""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional +from urllib.parse import urljoin + +from ...exceptions import ElytraAPIError +from ..auth import AuthMethod +from ..models import ( + ProductBulkCreateResponse, + ProductBulkUpdateResponse, + ProductHierarchyResponse, + ProductListResponse, + SingleProductResponse, +) + + +class ProductsMixin: + """Mixin for product-related operations. + # type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base + + Expects to be mixed with LobsterRestApiClientBase or a compatible base class + that provides: base_url, auth, timeout, session, _make_request(), _handle_response(). + """ + + # Type hints for mixed-in attributes from base class (docstring only for clarity) + # base_url: str - from LobsterRestApiClientBase + # auth: Any - from LobsterRestApiClientBase + # timeout: int - from LobsterRestApiClientBase + # session: requests.Session - from LobsterRestApiClientBase + + def get_all_products( + self, + lang: Optional[str] = None, + page: int = 1, + limit: int = 10, + group_id: Optional[int] = None, + ) -> ProductListResponse: + """ + Get all products with optional group filter. + + Args: + lang: Language code (optional) + page: Page number (default: 1) + limit: Number of products per page (default: 10) + group_id: Optional product group ID to filter products + + Returns: + ProductListResponse with paginated list of products + """ + params: Dict[str, Any] = {"page": page, "limit": limit} + if lang: + params["lang"] = lang + if group_id: + params["groupId"] = group_id + + return self._make_request( # type: ignore[attr-defined] + "GET", + "products", + ProductListResponse, + params=params, + ) + + def create_product(self, product_data: Dict[str, Any]) -> SingleProductResponse: + """ + Create a new product. + + Args: + product_data: Product data + + Returns: + SingleProductResponse with created product + """ + return self._make_request( # type: ignore[attr-defined] + "POST", + "products", + SingleProductResponse, + json_data=product_data, + ) + + def update_product(self, product_data: Dict[str, Any]) -> ProductListResponse: + """ + Update a product. + + Args: + product_data: Updated product data (must include 'id') + + Returns: + ProductListResponse with updated product info + """ + return self._make_request( # type: ignore[attr-defined] + "PATCH", + "products", + ProductListResponse, + json_data=product_data, + ) + + def create_multiple_products( + self, products_list: List[Dict[str, Any]] + ) -> ProductBulkCreateResponse: + """ + Create multiple products in bulk. + + Args: + products_list: List of product data + + Returns: + ProductBulkCreateResponse with created products + """ + return self._make_request( # type: ignore[attr-defined] + "POST", + "products/bulk", + ProductBulkCreateResponse, + json_data=products_list, + ) + + def update_multiple_products( + self, products_list: List[Dict[str, Any]] + ) -> ProductBulkUpdateResponse: + """ + Update multiple products in bulk. + + Args: + products_list: List of product data to update (each must include 'id') + + Returns: + ProductBulkUpdateResponse with updated products + """ + return self._make_request( # type: ignore[attr-defined] + "PATCH", + "products/bulk", + ProductBulkUpdateResponse, + json_data=products_list, + ) + + def get_product_by_id( + self, product_id: int, lang: Optional[str] = None + ) -> SingleProductResponse: + """ + Get a product by ID. + + Args: + product_id: ID of the product + lang: Language code (optional) + + Returns: + SingleProductResponse with product details + """ + params: Dict[str, Any] = {} + if lang: + params["lang"] = lang + + return self._make_request( # type: ignore[attr-defined] + "GET", + f"products/{product_id}", + SingleProductResponse, + params=params if params else None, + ) + + def delete_product(self, product_id: int) -> None: + """ + Delete a product by ID. + + Args: + product_id: ID of the product to delete + """ + url = urljoin(self.base_url, f"/rest/products/{product_id}") # type: ignore[attr-defined] + params = None + if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined] + params = self.auth.get_url_parameters() # type: ignore[attr-defined] + + response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined] + if response.status_code >= 400: + raise ElytraAPIError(f"Failed to delete product: {response.status_code}") + + def product_operation(self, operation_data: Dict[str, Any]) -> ProductListResponse: + """ + Perform bulk operations on products (copy, move, link, copy-structure). + + Args: + operation_data: Operation details including operation type, source, target, etc. + + Returns: + ProductListResponse with affected products + """ + return self._make_request( # type: ignore[attr-defined] + "POST", + "products/operation", + ProductListResponse, + json_data=operation_data, + ) + + def get_product_hierarchy(self, product_id: int, depth: int = 10) -> ProductHierarchyResponse: + """ + Get the hierarchy of a product. + + Args: + product_id: ID of the product + depth: Depth of the hierarchy (default: 10) + + Returns: + ProductHierarchyResponse with hierarchical product structure + """ + params = {"depth": depth} + return self._make_request( # type: ignore[attr-defined] + "GET", + f"products/{product_id}/hierarchy", + ProductHierarchyResponse, + params=params, + ) diff --git a/elytra_client/rest_api/client/protocols.py b/elytra_client/rest_api/client/protocols.py new file mode 100644 index 0000000..99708cf --- /dev/null +++ b/elytra_client/rest_api/client/protocols.py @@ -0,0 +1,134 @@ +# mypy: disable-error-code="attr-defined, no-any-return" +"""Protocols mixin for the Lobster REST API client.""" + +from typing import Optional +from urllib.parse import urljoin + +from ..models import ProtocolCategoryListResponse, ProtocolInfo, ProtocolListResponse + + +class ProtocolsMixin: + """Mixin for protocol-related operations. + # type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base + + Expects to be mixed with LobsterRestApiClientBase or a compatible base class + that provides: base_url, auth, timeout, session, _make_request(), _handle_response(). + """ + + # Type hints for mixed-in attributes from base class (docstring only for clarity) + # base_url: str - from LobsterRestApiClientBase + # auth: Any - from LobsterRestApiClientBase + # timeout: int - from LobsterRestApiClientBase + # session: requests.Session - from LobsterRestApiClientBase + + def get_protocols(self, limit: int = 50) -> ProtocolListResponse: + """ + Get the last N protocols. + + Args: + limit: Number of protocols to retrieve (default: 50) + + Returns: + ProtocolListResponse containing list of protocols + """ + response = self.session.get( # type: ignore[attr-defined] + urljoin(self.base_url, "/rest/protocol"), # type: ignore[attr-defined] + params={"limit": limit, **self.auth.get_url_parameters()}, # type: ignore[attr-defined] + timeout=self.timeout, # type: ignore[attr-defined] + ) + return self._handle_response(response, ProtocolListResponse) # type: ignore[attr-defined] + + def get_protocol(self, protocol_id: str) -> ProtocolInfo: + """ + Get details of a specific protocol. + + Args: + protocol_id: ID of the protocol + + Returns: + ProtocolInfo with protocol details + """ + return self._make_request( # type: ignore[attr-defined] + "GET", + f"protocol/{protocol_id}", + ProtocolInfo, + ) + + def get_protocol_by_job_id(self, job_id: int) -> ProtocolListResponse: + """ + Get all protocols for a specific job ID. + + Args: + job_id: ID of the job + + Returns: + ProtocolListResponse containing protocols for the job + """ + return self._make_request( # type: ignore[attr-defined] + "GET", + f"protocol/job/{job_id}", + ProtocolListResponse, + ) + + def get_protocol_by_runtime_id(self, runtime_id: str) -> ProtocolInfo: + """ + Get protocol for a specific job instance runtime ID. + + Args: + runtime_id: Runtime ID of the job instance + + Returns: + ProtocolInfo with protocol details + """ + return self._make_request( # type: ignore[attr-defined] + "GET", + f"protocol/job/{runtime_id}", + ProtocolInfo, + ) + + def get_protocol_by_additional_reference(self, reference: str) -> ProtocolListResponse: + """ + Get all protocols for a specific additional reference. + + Args: + reference: Additional reference value + + Returns: + ProtocolListResponse containing protocols for the reference + """ + return self._make_request( # type: ignore[attr-defined] + "GET", + f"protocol/job/{reference}", + ProtocolListResponse, + ) + + def get_all_protocol_categories(self) -> ProtocolCategoryListResponse: + """ + Get all available protocol categories. + + Returns: + ProtocolCategoryListResponse containing list of categories + """ + return self._make_request( # type: ignore[attr-defined] + "GET", + "protocol/category", + ProtocolCategoryListResponse, + ) + + def get_protocol_by_category(self, category_id: str, limit: int = 50) -> ProtocolListResponse: + """ + Get the last N protocols from a specific category. + + Args: + category_id: ID of the protocol category + limit: Number of protocols to retrieve (default: 50) + + Returns: + ProtocolListResponse containing protocols for the category + """ + return self._make_request( # type: ignore[attr-defined] + "GET", + f"protocol/category/{category_id}", + ProtocolListResponse, + params={"limit": limit}, + ) diff --git a/elytra_client/rest_api/client/text.py b/elytra_client/rest_api/client/text.py new file mode 100644 index 0000000..f7e0780 --- /dev/null +++ b/elytra_client/rest_api/client/text.py @@ -0,0 +1,164 @@ +# mypy: disable-error-code="attr-defined, no-any-return" +"""Text mixin for the Lobster REST API client.""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional + +from ..models import ( + SingleTextResponse, + TextBulkCreateResponse, + TextBulkUpdateResponse, + TextListResponse, +) + + +class TextMixin: + """Mixin for text-related operations. + # type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base + + Expects to be mixed with LobsterRestApiClientBase or a compatible base class + that provides: base_url, auth, timeout, session, _make_request(), _handle_response(). + """ + + # Type hints for mixed-in attributes from base class (docstring only for clarity) + # base_url: str - from LobsterRestApiClientBase + # auth: Any - from LobsterRestApiClientBase + # timeout: int - from LobsterRestApiClientBase + # session: requests.Session - from LobsterRestApiClientBase + + def get_all_texts( + self, lang: Optional[str] = None, page: int = 1, limit: int = 10 + ) -> TextListResponse: + """ + Get all text entries with pagination. + + Args: + lang: Language code (optional) + page: Page number (default: 1) + limit: Number of text items per page (default: 10) + + Returns: + TextListResponse containing paginated list of text items + """ + params: Dict[str, Any] = {"page": page, "limit": limit} + if lang: + params["lang"] = lang + + return self._make_request( # type: ignore[attr-defined] + "GET", + "text", + TextListResponse, + params=params, + ) + + def get_text_by_id(self, text_id: int, lang: Optional[str] = None) -> SingleTextResponse: + """ + Get a text entry by ID. + + Args: + text_id: ID of the text entry + lang: Language code (optional) + + Returns: + SingleTextResponse with text details + """ + params: Dict[str, Any] = {} + if lang: + params["lang"] = lang + + return self._make_request( # type: ignore[attr-defined] + "GET", + f"text/{text_id}", + SingleTextResponse, + params=params if params else None, + ) + + def create_text(self, text_data: Dict[str, Any]) -> SingleTextResponse: + """ + Create a new text entry. + + Args: + text_data: Text entry data + + Returns: + SingleTextResponse with created text + """ + return self._make_request( # type: ignore[attr-defined] + "POST", + "text", + SingleTextResponse, + json_data=text_data, + ) + + def update_text(self, text_data: Dict[str, Any]) -> SingleTextResponse: + """ + Update a text entry. + + Args: + text_data: Updated text entry data (must include 'id') + + Returns: + SingleTextResponse with updated text + """ + return self._make_request( # type: ignore[attr-defined] + "PATCH", + "text", + SingleTextResponse, + json_data=text_data, + ) + + def create_multiple_texts(self, texts_list: List[Dict[str, Any]]) -> TextBulkCreateResponse: + """ + Create multiple text entries in bulk. + + Args: + texts_list: List of text entry data + + Returns: + TextBulkCreateResponse with created text entries + """ + return self._make_request( # type: ignore[attr-defined] + "POST", + "text/bulk", + TextBulkCreateResponse, + json_data=texts_list, + ) + + def update_multiple_texts(self, texts_list: List[Dict[str, Any]]) -> TextBulkUpdateResponse: + """ + Update multiple text entries in bulk. + + Args: + texts_list: List of text entry data to update (each must include 'id') + + Returns: + TextBulkUpdateResponse with updated text entries + """ + return self._make_request( # type: ignore[attr-defined] + "PATCH", + "text/bulk", + TextBulkUpdateResponse, + json_data=texts_list, + ) + + def delete_text(self, text_id: int) -> None: + """ + Delete a text entry by ID. + + Args: + text_id: ID of the text entry to delete + """ + from urllib.parse import urljoin + + from ...exceptions import ElytraAPIError + from ..auth import AuthMethod + + url = urljoin(self.base_url, f"/rest/text/{text_id}") # type: ignore[attr-defined] + params = None + if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined] + params = self.auth.get_url_parameters() # type: ignore[attr-defined] + + response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined] + if response.status_code >= 400: + raise ElytraAPIError(f"Failed to delete text: {response.status_code}") diff --git a/elytra_client/rest_api/client/tree_groups.py b/elytra_client/rest_api/client/tree_groups.py new file mode 100644 index 0000000..f65d4d9 --- /dev/null +++ b/elytra_client/rest_api/client/tree_groups.py @@ -0,0 +1,209 @@ +# mypy: disable-error-code="attr-defined, no-any-return" +"""Tree Groups mixin for the Lobster REST API client.""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional +from urllib.parse import urljoin + +from ...exceptions import ElytraAPIError, ElytraValidationError +from ..auth import AuthMethod +from ..models import ( + SingleTreeGroupResponse, + TreeGroupBulkCreateResponse, + TreeGroupBulkUpdateResponse, + TreeGroupHierarchyResponse, + TreeGroupListResponse, +) + + +class TreeGroupsMixin: + """Mixin for tree group-related operations. + # type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base + + Expects to be mixed with LobsterRestApiClientBase or a compatible base class + that provides: base_url, auth, timeout, session, _make_request(), _handle_response(). + """ + + # Type hints for mixed-in attributes from base class (docstring only for clarity) + # base_url: str - from LobsterRestApiClientBase + # auth: Any - from LobsterRestApiClientBase + # timeout: int - from LobsterRestApiClientBase + # session: requests.Session - from LobsterRestApiClientBase + + def get_all_tree_groups( + self, + lang: Optional[str] = None, + page: int = 1, + limit: int = 10, + ) -> TreeGroupListResponse: + """ + Get all tree groups. + + Args: + lang: Language code (optional) + page: Page number (default: 1) + limit: Number of groups per page (default: 10) + + Returns: + TreeGroupListResponse with paginated list of tree groups + """ + params: Dict[str, Any] = {"page": page, "limit": limit} + if lang: + params["lang"] = lang + + return self._make_request( # type: ignore[attr-defined] + "GET", + "tree/groups", + TreeGroupListResponse, + params=params, + ) + + def create_tree_group(self, group_data: Dict[str, Any]) -> SingleTreeGroupResponse: + """ + Create a new tree group. + + Args: + group_data: Tree group data + + Returns: + SingleTreeGroupResponse with created tree group + """ + return self._make_request( # type: ignore[attr-defined] + "POST", + "tree/groups", + SingleTreeGroupResponse, + json_data=group_data, + ) + + def update_tree_group(self, group_data: Dict[str, Any]) -> SingleTreeGroupResponse: + """ + Update a tree group. + + Args: + group_data: Updated tree group data (must include 'id') + + Returns: + SingleTreeGroupResponse with updated tree group + """ + return self._make_request( # type: ignore[attr-defined] + "PATCH", + "tree/groups", + SingleTreeGroupResponse, + json_data=group_data, + ) + + def get_tree_group_by_id( + self, tree_group_id: int, lang: Optional[str] = None + ) -> SingleTreeGroupResponse: + """ + Get a tree group by ID. + + Args: + tree_group_id: ID of the tree group + lang: Language code (optional) + + Returns: + SingleTreeGroupResponse with tree group details + """ + params: Dict[str, Any] = {} + if lang: + params["lang"] = lang + + return self._make_request( # type: ignore[attr-defined] + "GET", + f"tree/groups/{tree_group_id}", + SingleTreeGroupResponse, + params=params if params else None, + ) + + def delete_tree_group(self, tree_group_id: int) -> None: + """ + Delete a tree group by ID. + + Args: + tree_group_id: ID of the tree group to delete + """ + url = urljoin( + self.base_url, f"/rest/tree/groups/{tree_group_id}" # type: ignore[attr-defined] + ) + params = None + if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined] + params = self.auth.get_url_parameters() # type: ignore[attr-defined] + + response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined] + if response.status_code >= 400: + raise ElytraAPIError(f"Failed to delete tree group: {response.status_code}") + + def create_multiple_tree_groups( + self, groups_list: List[Dict[str, Any]] + ) -> TreeGroupBulkCreateResponse: + """ + Create multiple tree groups in bulk. + + Args: + groups_list: List of tree group data + + Returns: + TreeGroupBulkCreateResponse with created tree groups + """ + return self._make_request( # type: ignore[attr-defined] + "POST", + "tree/groups/bulk", + TreeGroupBulkCreateResponse, + json_data=groups_list, + ) + + def update_multiple_tree_groups( + self, groups_list: List[Dict[str, Any]] + ) -> TreeGroupBulkUpdateResponse: + """ + Update multiple tree groups in bulk. + + Args: + groups_list: List of tree group data to update (each must include 'id') + + Returns: + TreeGroupBulkUpdateResponse with updated tree groups + """ + return self._make_request( # type: ignore[attr-defined] + "PATCH", + "tree/groups/bulk", + TreeGroupBulkUpdateResponse, + json_data=groups_list, + ) + + def get_tree_group_hierarchy(self, depth: int = 10) -> List[TreeGroupHierarchyResponse]: + """ + Get the hierarchy of tree groups. + + Args: + depth: Depth of the hierarchy (default: 10) + + Returns: + List of TreeGroupHierarchyResponse with hierarchical tree structure + """ + params = {"depth": depth} + url = urljoin(self.base_url, "/rest/tree/groups/hierarchy") # type: ignore[attr-defined] + + if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined] + params.update(self.auth.get_url_parameters()) # type: ignore[attr-defined] + + response = self.session.request( # type: ignore[attr-defined] + method="GET", + url=url, + params=params, + timeout=self.timeout, # type: ignore[attr-defined] + ) + + if response.status_code >= 400: + raise ElytraAPIError(f"Failed to get tree hierarchy: {response.status_code}") + + try: + data = response.json() + if isinstance(data, list): + return [TreeGroupHierarchyResponse.model_validate(item) for item in data] + else: + return [TreeGroupHierarchyResponse.model_validate(data)] + except Exception as e: + raise ElytraValidationError(f"Failed to parse tree hierarchy: {str(e)}") diff --git a/elytra_client/rest_api/models.py b/elytra_client/rest_api/models.py deleted file mode 100644 index de4e2ac..0000000 --- a/elytra_client/rest_api/models.py +++ /dev/null @@ -1,999 +0,0 @@ -"""Models for the Lobster PIM Legacy REST API""" - -from typing import Any, Dict, List, Optional - -from pydantic import BaseModel, Field - - -class JobInfo(BaseModel): - """Base job information model""" - - id: int = Field(..., description="The ID of the job") - name: str = Field(..., description="The name of the job") - jobIdentifier: str = Field(..., description="The unique job identifier") - jobDescription: Optional[str] = Field(None, description="Description of the job") - status: str = Field(..., description="Current status of the job") - nextExecutionDate: str = Field(..., description="Next scheduled execution date") - previousExecutionDate: Optional[str] = Field(None, description="Previous execution date") - protocolId: Optional[str] = Field(None, description="ID of the associated protocol") - errors: List[str] = Field(default_factory=list, description="List of errors") - messages: List[str] = Field(default_factory=list, description="List of messages") - warnings: List[str] = Field(default_factory=list, description="List of warnings") - - -class JobDetailInfo(JobInfo): - """Detailed job information including error level and runtime ID""" - - errorLevel: Optional[str] = Field(None, description="Error level (e.g., 'Erfolgreich')") - runtimeId: Optional[str] = Field(None, description="Runtime ID for active job execution") - - -class JobOverviewResponse(BaseModel): - """Response containing multiple job information items""" - - jobInfoObjects: List[JobDetailInfo] = Field(..., description="List of job information objects") - errors: List[str] = Field(default_factory=list, description="List of errors") - warnings: List[str] = Field(default_factory=list, description="List of warnings") - - -class JobExecutionResponse(BaseModel): - """Response from executing a job""" - - id: int = Field(..., description="The ID of the job") - name: str = Field(..., description="The name of the job") - jobIdentifier: str = Field(..., description="The unique job identifier") - jobDescription: Optional[str] = Field(None, description="Description of the job") - status: str = Field(..., description="Status after execution") - nextExecutionDate: str = Field(..., description="Next execution date") - protocolId: str = Field(..., description="ID of the protocol for this execution") - runtimeId: str = Field(..., description="Runtime ID for tracking execution") - errors: List[str] = Field(default_factory=list, description="List of errors") - messages: List[str] = Field( - default_factory=list, description="List of messages (e.g., JOB_START_OK)" - ) - warnings: List[str] = Field(default_factory=list, description="List of warnings") - - -class JobControlRequest(BaseModel): - """Request body for job control endpoint""" - - action: str = Field(..., description="Action to perform (e.g., 'start')") - objectId: int = Field(..., description="The ID of the job to control") - objectType: str = Field(default="job", description="Type of object") - username: str = Field(..., description="Username for authentication") - password: str = Field(..., description="Password for authentication") - additionalReference: Optional[str] = Field( - None, description="Custom reference for external processing tracking" - ) - parameter: Optional[Dict[str, Any]] = Field( - None, description="Parameters to override job settings" - ) - queueId: Optional[str] = Field(None, description="Queue ID for serialized job execution") - maxJobDurationSeconds: Optional[int] = Field( - default=43200, description="Max duration in seconds (default 12 hours)" - ) - - -class JobControlResponse(BaseModel): - """Response from job control endpoint""" - - jobIdentifier: str = Field(..., description="The job identifier") - runtimeId: str = Field(..., description="Runtime ID for tracking") - errors: List[str] = Field(default_factory=list, description="List of errors") - messages: List[str] = Field(default_factory=list, description="List of messages") - warnings: List[str] = Field(default_factory=list, description="List of warnings") - - -class ProtocolEntry(BaseModel): - """A single entry in a protocol log""" - - timestamp: Optional[str] = Field(None, description="Timestamp of the entry") - level: Optional[str] = Field(None, description="Log level (ERROR, WARNING, INFO, etc.)") - message: Optional[str] = Field(None, description="Message content") - - -class ProtocolInfo(BaseModel): - """Protocol/Log information""" - - id: Optional[int] = Field(None, description="Protocol ID") - protocolId: Optional[str] = Field(None, description="Protocol ID as string") - jobId: Optional[int] = Field(None, description="Associated job ID") - runtimeId: Optional[str] = Field(None, description="Runtime ID of the job execution") - jobIdentifier: Optional[str] = Field(None, description="Job identifier") - status: Optional[str] = Field(None, description="Status of the job") - startTime: Optional[str] = Field(None, description="Start time of execution") - endTime: Optional[str] = Field(None, description="End time of execution") - errors: List[str] = Field(default_factory=list, description="List of errors") - messages: List[str] = Field(default_factory=list, description="List of messages") - entries: Optional[List[ProtocolEntry]] = Field(None, description="Protocol entries") - - -class ProtocolListResponse(BaseModel): - """Response containing list of protocols""" - - protocols: Optional[List[ProtocolInfo]] = Field(None, description="List of protocols") - errors: List[str] = Field(default_factory=list, description="List of errors") - warnings: List[str] = Field(default_factory=list, description="List of warnings") - - -class ProtocolCategoryInfo(BaseModel): - """Protocol category information""" - - id: str = Field(..., description="Category ID") - name: str = Field(..., description="Category name") - description: Optional[str] = Field(None, description="Category description") - - -class ProtocolCategoryListResponse(BaseModel): - """Response containing list of protocol categories""" - - categories: List[ProtocolCategoryInfo] = Field(..., description="List of protocol categories") - errors: List[str] = Field(default_factory=list, description="List of errors") - - -class ErrorResponse(BaseModel): - """Error response from the REST API""" - - error: str = Field(..., description="Error message") - errorCode: Optional[str] = Field(None, description="Error code") - details: Optional[str] = Field(None, description="Error details") - - -# ============= Media and Hierarchy Models ============= - - -class PaginationLinks(BaseModel): - """Pagination links for list responses""" - - self: str = Field(..., description="Link to current page") - next: Optional[str] = Field(None, description="Link to next page") - previous: Optional[str] = Field(None, description="Link to previous page") - first: str = Field(..., description="Link to first page") - last: str = Field(..., description="Link to last page") - - -class AttributeResponse(BaseModel): - """Attribute value associated with an object""" - - id: int = Field(..., description="The ID of the attribute value") - attributeId: int = Field(..., description="The ID of the attribute definition") - attributeName: str = Field(..., description="The independent name of the attribute") - attributeType: str = Field( - ..., description="The category type of the attribute (normal, meta, internal)" - ) - type: str = Field(..., description="The type of the attribute") - value: str = Field(..., description="The value of the attribute") - parentId: Optional[int] = Field(None, description="The ID of the parent object") - autoSync: str = Field(..., description="The auto sync mode") - languageCode: Optional[str] = Field(None, description="The language code of the attribute") - modifiedAt: Optional[str] = Field( - None, description="The date and time the attribute was modified" - ) - modifiedBy: Optional[int] = Field( - None, description="The ID of the user who modified the attribute" - ) - inherited: bool = Field(False, description="Whether the attribute is inherited") - - -class MediaFileResponse(BaseModel): - """Media file metadata""" - - id: int = Field(..., description="The ID of the media file") - clientId: int = Field(..., description="The ID of the client") - mediaId: int = Field(..., description="The ID of the media descriptor") - languageCode: str = Field(..., description="The language code for this media file") - mimeType: str = Field(..., description="The MIME type of the media file") - sourceMimeType: str = Field(..., description="The original MIME type before any conversion") - mamSystem: str = Field(..., description="The Media Asset Management system name") - mamId1: Optional[str] = Field(None, description="MAM system identifier 1") - mamId2: Optional[str] = Field(None, description="MAM system identifier 2") - mamId3: Optional[str] = Field(None, description="MAM system identifier 3") - mamId4: Optional[str] = Field(None, description="MAM system identifier 4") - contentLength: int = Field(..., description="The size of the media file in bytes") - updateCount: int = Field( - ..., description="The number of times this media file has been updated" - ) - changedAt: Optional[str] = Field( - None, description="The date and time the media file was last changed" - ) - changedBy: Optional[int] = Field( - None, description="The ID of the user who last changed the media file" - ) - - -class SingleMediaResponse(BaseModel): - """Complete media descriptor""" - - id: int = Field(..., description="The ID of the media content") - name: str = Field(..., description="The name of the media") - treeId: int = Field(..., description="The ID of the tree this media belongs to") - clientId: int = Field(..., description="The ID of the client") - attributeGroupId: int = Field(..., description="The ID of the media default attribute group") - pictureTypeId: Optional[int] = Field(None, description="The ID of the picture type") - originalId: Optional[int] = Field( - None, description="The ID of the original media if this is a copy" - ) - objectStatus: Optional[str] = Field( - None, description="The status of the object (original, copy)" - ) - userObjectStatus: Optional[int] = Field(None, description="User-defined object status ID") - createdAt: Optional[str] = Field(None, description="The date and time the media was created") - createdBy: Optional[int] = Field(None, description="The ID of the user who created the media") - modifiedAt: Optional[str] = Field( - None, description="The date and time the media was last modified" - ) - modifiedBy: Optional[int] = Field( - None, description="The ID of the user who last modified the media" - ) - files: List[MediaFileResponse] = Field( - default_factory=list, description="The files associated with this media" - ) - attributes: List[AttributeResponse] = Field( - default_factory=list, description="The attribute values of the media" - ) - translations: Optional[Dict[str, str]] = Field( - None, description="Translations of the media name by language code" - ) - - -class SingleNewMediaRequestBody(BaseModel): - """Request body for creating a new media descriptor""" - - name: str = Field(..., description="Name of the media item") - attributeGroupId: Optional[int] = Field( - None, description="The ID of the media default attribute group" - ) - pictureTypeId: Optional[int] = Field(None, description="The ID of the picture type") - treeId: Optional[int] = Field(None, description="The ID of the tree this media belongs to") - originalId: Optional[int] = Field( - None, description="If this is a copy, the ID of the original media" - ) - objectStatus: Optional[str] = Field(None, description="The status of the object") - userObjectStatus: Optional[int] = Field(None, description="Custom user object status ID") - attributes: Optional[List[Dict[str, Any]]] = Field(None, description="List of media attributes") - translations: Optional[Dict[str, str]] = Field( - None, description="Translations of the media name" - ) - - -class SingleUpdateMediaRequestBody(BaseModel): - """Request body for updating a media descriptor""" - - id: int = Field(..., description="The ID of the media descriptor to update") - name: Optional[str] = Field(None, description="Name of the media item") - attributeGroupId: Optional[int] = Field( - None, description="The ID of the media default attribute group" - ) - pictureTypeId: Optional[int] = Field(None, description="The ID of the picture type") - treeId: Optional[int] = Field(None, description="The ID of the tree this media belongs to") - originalId: Optional[int] = Field( - None, description="If this is a copy, the ID of the original media" - ) - objectStatus: Optional[str] = Field(None, description="The status of the object") - userObjectStatus: Optional[int] = Field(None, description="Custom user object status ID") - attributes: Optional[List[Dict[str, Any]]] = Field(None, description="List of media attributes") - translations: Optional[Dict[str, str]] = Field( - None, description="Translations of the media name" - ) - - -# ============================================================================ -# PRODUCT MODELS -# ============================================================================ - - -class ProductOperationRequestBody(BaseModel): - """Request body for product operations (copy, move, link, copy-structure)""" - - operation: str = Field(..., description="Operation: copy, move, link, or copy-structure") - productId: int = Field(..., description="The ID of the product to perform operation on") - parentId: int = Field(..., description="The ID of the destination parent") - - -class ProductAttributeResponse(BaseModel): - """An attribute value associated with a product""" - - id: int = Field(..., description="The ID of the attribute") - attributeId: int = Field(..., description="The ID of the attribute definition") - attributeName: str = Field(..., description="The independent name of the attribute") - attributeType: str = Field(..., description="The type of attribute (normal, meta, internal)") - type: str = Field(..., description="The attribute type") - value: Optional[str] = Field(None, description="The value of the attribute") - autoSync: Optional[str] = Field(None, description="The auto sync mode of the attribute") - languageCode: Optional[str] = Field(None, description="The language code of the attribute") - modified: Optional[str] = Field( - None, description="The date and time the attribute was modified" - ) - modifierByUserId: Optional[int] = Field( - None, description="The ID of user who modified the attribute" - ) - inherited: bool = Field(False, description="Whether the attribute is inherited") - - -class SingleProductResponse(BaseModel): - """Complete product descriptor""" - - id: int = Field(..., description="The ID of the product") - clientId: int = Field(..., description="The ID of the client") - productName: str = Field(..., description="The name of the product") - treeId: int = Field(..., description="The ID of the tree") - created: Optional[str] = Field(None, description="The date and time the product was created") - modified: Optional[str] = Field(None, description="The date and time the product was modified") - creatorUserId: Optional[int] = Field(None, description="The ID of user who created the product") - modifierUserId: Optional[int] = Field( - None, description="The ID of user who modified the product" - ) - objectStatus: Optional[str] = Field(None, description="The status of the object") - originalId: Optional[int] = Field(None, description="The ID of the original product") - attributes: List[ProductAttributeResponse] = Field( - default_factory=list, description="The attributes of the product" - ) - - -class SingleNewProductRequestBody(BaseModel): - """Request body for creating a new product""" - - productName: str = Field(..., description="The name of the product") - parentId: int = Field(..., description="The ID of the parent group or product") - attributeGroupId: int = Field(..., description="The ID of the attribute group") - attributes: Optional[List[Dict[str, Any]]] = Field( - None, description="The attributes of the product" - ) - - -class SingleUpdateProductRequestBody(BaseModel): - """Request body for updating a product""" - - id: int = Field(..., description="The ID of the product") - productName: Optional[str] = Field(None, description="The name of the product") - parentId: Optional[int] = Field(None, description="The ID of the parent group or product") - attributeGroupId: Optional[int] = Field(None, description="The ID of the attribute group") - attributes: Optional[List[Dict[str, Any]]] = Field( - None, description="The attributes of the product" - ) - - -class ProductListResponse(BaseModel): - """Paginated response containing multiple products""" - - items: List[SingleProductResponse] = Field(..., description="List of products") - total: int = Field(..., description="The total number of products") - page: int = Field(..., description="The current page number") - limit: int = Field(..., description="The number of products per page") - links: Optional[Dict[str, Optional[str]]] = Field(None, description="Pagination links") - - -class ProductBulkCreateResponse(BaseModel): - """Response from bulk product creation""" - - items: List[SingleProductResponse] = Field(..., description="The created products") - totalItemsCreated: int = Field(..., description="The total number of products created") - - -class ProductBulkUpdateResponse(BaseModel): - """Response from bulk product update""" - - items: List[SingleProductResponse] = Field(..., description="The updated products") - totalItemsUpdated: int = Field(..., description="The total number of products updated") - - -class ProductHierarchyNode(BaseModel): - """A node in the product hierarchy""" - - id: int = Field(..., description="The ID of the node") - name: str = Field(..., description="The name of the node") - type: str = Field(..., description="The type of node (product, variant, text, media)") - children: List["ProductHierarchyNode"] = Field( - default_factory=list, description="The immediate children of the node" - ) - - -ProductHierarchyNode.model_rebuild() - - -class ProductHierarchyResponse(ProductHierarchyNode): - """Product hierarchy response""" - - pass - - -# ============================================================================ -# PRODUCT GROUP MODELS -# ============================================================================ - - -class SingleProductGroupResponse(BaseModel): - """Complete product group descriptor""" - - id: int = Field(..., description="The ID of the product group") - name: str = Field(..., description="The independent name of the product group") - type: Optional[str] = Field(None, description="The type of product group") - parentId: Optional[int] = Field( - None, description="The ID of the parent product group or tree group" - ) - objectStatus: Optional[str] = Field(None, description="The status of the object") - attributeGroupId: Optional[int] = Field(None, description="The ID of the attribute group") - clientId: int = Field(..., description="The ID of the client") - translations: Optional[Dict[str, str]] = Field( - None, description="Translations of the product group name" - ) - attributes: List[ProductAttributeResponse] = Field( - default_factory=list, description="The attributes of the product group" - ) - - -class SingleNewProductGroupRequestBody(BaseModel): - """Request body for creating a new product group""" - - name: str = Field(..., description="The independent name of the product group") - type: Optional[str] = Field(None, description="The type of product group") - parentId: Optional[int] = Field( - None, description="The ID of the parent product group or tree group" - ) - attributeGroupId: Optional[int] = Field(None, description="The ID of the attribute group") - translations: Optional[Dict[str, str]] = Field( - None, description="Translations of the product group name" - ) - attributes: Optional[List[Dict[str, Any]]] = Field( - None, description="The attributes of the product group" - ) - - -class SingleUpdateProductGroupRequestBody(BaseModel): - """Request body for updating a product group""" - - id: int = Field(..., description="The ID of the product group") - name: Optional[str] = Field(None, description="The independent name of the product group") - parentId: Optional[int] = Field(None, description="The ID of the parent product group") - attributeGroupId: Optional[int] = Field(None, description="The ID of the attribute group") - translations: Optional[Dict[str, str]] = Field( - None, description="Translations of the product group name" - ) - attributes: Optional[List[Dict[str, Any]]] = Field( - None, description="The attributes of the product group" - ) - - -class ProductGroupListResponse(BaseModel): - """Paginated response containing multiple product groups""" - - items: List[SingleProductGroupResponse] = Field(..., description="List of product groups") - total: int = Field(..., description="The total number of product groups") - page: int = Field(..., description="The current page number") - limit: int = Field(..., description="The number of product groups per page") - links: Optional[Dict[str, Optional[str]]] = Field(None, description="Pagination links") - - -class ProductGroupBulkCreateResponse(BaseModel): - """Response from bulk product group creation""" - - items: List[SingleProductGroupResponse] = Field(..., description="The created product groups") - totalItemsCreated: int = Field(..., description="The total number of product groups created") - - -class ProductGroupBulkUpdateResponse(BaseModel): - """Response from bulk product group update""" - - items: List[SingleProductGroupResponse] = Field(..., description="The updated product groups") - totalItemsUpdated: int = Field(..., description="The total number of product groups updated") - - -class ProductGroupHierarchyNode(BaseModel): - """A node in the product group hierarchy""" - - id: int = Field(..., description="The ID of the node") - name: str = Field(..., description="The name of the node") - type: str = Field( - ..., description="The type of node (product-group, product, variant, text, media)" - ) - children: List["ProductGroupHierarchyNode"] = Field( - default_factory=list, description="The immediate children of the node" - ) - - -ProductGroupHierarchyNode.model_rebuild() - - -class ProductGroupHierarchyResponse(ProductGroupHierarchyNode): - """Product group hierarchy response""" - - pass - - -# ============================================================================ -# TREE GROUP MODELS -# ============================================================================ - - -class SingleTreeGroupResponse(BaseModel): - """Complete tree group descriptor""" - - id: int = Field(..., description="The ID of the tree group") - name: str = Field(..., description="The independent name of the tree group") - parentId: Optional[int] = Field(None, description="The ID of the parent tree group") - clientId: int = Field(..., description="The ID of the client") - status: Optional[str] = Field(None, description="The status of the group (normal, internal)") - translations: Optional[Dict[str, str]] = Field( - None, description="Translations of the tree group name" - ) - - -class SingleNewTreeGroupRequestBody(BaseModel): - """Request body for creating a new tree group""" - - name: str = Field(..., description="The name of the tree group") - parentId: int = Field(..., description="The ID of the parent tree group") - status: Optional[str] = Field( - "normal", description="The status of the tree group (normal, internal)" - ) - translations: Optional[Dict[str, str]] = Field( - None, description="Translations of the tree group name" - ) - - -class SingleUpdateTreeGroupRequestBody(BaseModel): - """Request body for updating a tree group""" - - id: int = Field(..., description="The ID of the tree group") - name: Optional[str] = Field(None, description="The name of the tree group") - parentId: Optional[int] = Field(None, description="The ID of the parent tree group") - status: Optional[str] = Field(None, description="The status of the tree group") - translations: Optional[Dict[str, str]] = Field( - None, description="Translations of the tree group name" - ) - - -class TreeGroupListResponse(BaseModel): - """Paginated response containing multiple tree groups""" - - items: List[SingleTreeGroupResponse] = Field(..., description="List of tree groups") - total: int = Field(..., description="The total number of tree groups") - page: int = Field(..., description="The current page number") - limit: int = Field(..., description="The number of tree groups per page") - links: Optional[Dict[str, Optional[str]]] = Field(None, description="Pagination links") - - -class TreeGroupBulkCreateResponse(BaseModel): - """Response from bulk tree group creation""" - - items: List[SingleTreeGroupResponse] = Field(..., description="The created tree groups") - totalItemsCreated: int = Field(..., description="The total number of tree groups created") - - -class TreeGroupBulkUpdateResponse(BaseModel): - """Response from bulk tree group update""" - - items: List[SingleTreeGroupResponse] = Field(..., description="The updated tree groups") - totalItemsUpdated: int = Field(..., description="The total number of tree groups updated") - - -class TreeGroupHierarchyNode(BaseModel): - """A node in the tree group hierarchy""" - - id: int = Field(..., description="The ID of the node") - name: str = Field(..., description="The name of the node") - type: str = Field(..., description="The type of node (tree-group, product-group)") - children: List["TreeGroupHierarchyNode"] = Field( - default_factory=list, description="The immediate children of the node" - ) - - -TreeGroupHierarchyNode.model_rebuild() - - -class TreeGroupHierarchyResponse(TreeGroupHierarchyNode): - """Tree group hierarchy response""" - - pass - - -# ============================================================================ -# ATTRIBUTE MODELS -# ============================================================================ - - -class SimpleAttributeResponse(BaseModel): - """Simplified attribute definition""" - - id: int = Field(..., description="The ID of the attribute") - name: str = Field(..., description="The independent name of the attribute") - description: Optional[str] = Field(None, description="The description of the attribute") - attributeType: Optional[str] = Field( - None, description="The type of attribute (normal, meta, internal)" - ) - type: Optional[str] = Field(None, description="The type of the attribute") - autoSync: Optional[str] = Field(None, description="The auto sync mode of the attribute") - translations: Optional[Dict[str, str]] = Field( - None, description="Translations of the attribute name" - ) - - -class AttributeGetByNameResponse(SimpleAttributeResponse): - """Attribute response when fetching by name""" - - languageDependents: Optional[Dict[str, Any]] = Field( - None, description="Language-dependent properties" - ) - - -class SingleNewAttributeRequestBody(BaseModel): - """Request body for creating a new attribute""" - - name: str = Field(..., description="The independent name of the attribute") - type: str = Field(..., description="The type of the attribute") - description: Optional[str] = Field(None, description="The description of the attribute") - translations: Optional[Dict[str, str]] = Field( - None, description="Translations of the attribute name" - ) - - -class SingleUpdateAttributeRequestBody(BaseModel): - """Request body for updating an attribute""" - - id: int = Field(..., description="The ID of the attribute") - name: Optional[str] = Field(None, description="The independent name of the attribute") - description: Optional[str] = Field(None, description="The description of the attribute") - translations: Optional[Dict[str, str]] = Field( - None, description="Translations of the attribute name" - ) - - -class AttributeListResponse(BaseModel): - """Paginated response containing multiple attributes""" - - items: List[SimpleAttributeResponse] = Field(..., description="List of attributes") - total: int = Field(..., description="The total number of attributes") - page: int = Field(..., description="The current page number") - limit: int = Field(..., description="The number of attributes per page") - links: Optional[Dict[str, Optional[str]]] = Field(None, description="Pagination links") - - -class AttributeBulkCreateResponse(BaseModel): - """Response from bulk attribute creation""" - - items: List[SimpleAttributeResponse] = Field(..., description="The created attributes") - totalItemsCreated: int = Field(..., description="The total number of attributes created") - - -class AttributeBulkUpdateResponse(BaseModel): - """Response from bulk attribute update""" - - items: List[SimpleAttributeResponse] = Field(..., description="The updated attributes") - totalItemsUpdated: int = Field(..., description="The total number of attributes updated") - - -# ============================================================================ -# ATTRIBUTE GROUP MODELS -# ============================================================================ - - -class AttributeGroupValidFor(BaseModel): - """Valid object types for an attribute group""" - - product: Optional[bool] = Field(None, description="Valid for products") - productGroup: Optional[bool] = Field(None, description="Valid for product groups") - media: Optional[bool] = Field(None, description="Valid for media") - text: Optional[bool] = Field(None, description="Valid for texts") - - -class SingleAttributeGroupResponse(BaseModel): - """Complete attribute group descriptor""" - - id: int = Field(..., description="The ID of the attribute group") - name: str = Field(..., description="The independent name of the attribute group") - parentId: Optional[int] = Field(None, description="The ID of the parent attribute group") - validForObjectTypes: Optional[AttributeGroupValidFor] = Field( - None, description="Valid object types for this attribute group" - ) - isTemplate: bool = Field(False, description="Whether the attribute group is a template") - templateId: Optional[int] = Field(None, description="The ID of the template attribute group") - clientId: int = Field(..., description="The ID of the client") - createdAt: Optional[str] = Field( - None, description="The date and time the attribute group was created" - ) - createdByUserId: Optional[int] = Field( - None, description="The ID of user who created the attribute group" - ) - modifiedAt: Optional[str] = Field( - None, description="The date and time the attribute group was modified" - ) - modifiedByUserId: Optional[int] = Field( - None, description="The ID of user who modified the attribute group" - ) - translations: Optional[Dict[str, str]] = Field( - None, description="Translations of the attribute group name" - ) - - -class SingleNewAttributeGroupRequestBody(BaseModel): - """Request body for creating a new attribute group""" - - name: str = Field(..., description="The independent name of the attribute group") - parentId: Optional[int] = Field(None, description="The ID of the parent attribute group") - validForObjectTypes: Optional[AttributeGroupValidFor] = Field( - None, description="Valid object types for this attribute group" - ) - isTemplate: Optional[bool] = Field( - False, description="Whether the attribute group is a template" - ) - templateId: Optional[int] = Field(None, description="The ID of the template attribute group") - translations: Optional[Dict[str, str]] = Field( - None, description="Translations of the attribute group name" - ) - - -class SingleUpdateAttributeGroupRequestBody(BaseModel): - """Request body for updating an attribute group""" - - id: Optional[int] = Field(None, description="The ID of the attribute group") - name: Optional[str] = Field(None, description="The independent name of the attribute group") - newName: Optional[str] = Field( - None, description="The new independent name of the attribute group" - ) - parentId: Optional[int] = Field(None, description="The ID of the parent attribute group") - validForObjectTypes: Optional[AttributeGroupValidFor] = Field( - None, description="Valid object types for this attribute group" - ) - isTemplate: Optional[bool] = Field( - None, description="Whether the attribute group is a template" - ) - templateId: Optional[int] = Field(None, description="The ID of the template attribute group") - translations: Optional[Dict[str, str]] = Field( - None, description="Translations of the attribute group name" - ) - - -class AttributeGroupListResponse(BaseModel): - """Paginated response containing multiple attribute groups""" - - items: List[SingleAttributeGroupResponse] = Field(..., description="List of attribute groups") - total: int = Field(..., description="The total number of attribute groups") - page: int = Field(..., description="The current page number") - limit: int = Field(..., description="The number of attribute groups per page") - links: Optional[Dict[str, Optional[str]]] = Field(None, description="Pagination links") - - -class AttributeGroupBulkCreateResponse(BaseModel): - """Response from bulk attribute group creation""" - - items: List[SingleAttributeGroupResponse] = Field( - ..., description="The created attribute groups" - ) - totalItemsCreated: int = Field(..., description="The total number of attribute groups created") - - -class AttributeGroupHierarchyNode(BaseModel): - """A node in the attribute group hierarchy""" - - id: int = Field(..., description="The ID of the node") - name: str = Field(..., description="The name of the node") - type: str = Field( - ..., - description="The type of node (attribute-group, attribute-group-template, attribute-group-derived-template, attribute)", - ) - children: List["AttributeGroupHierarchyNode"] = Field( - default_factory=list, description="The immediate children of the node" - ) - - -AttributeGroupHierarchyNode.model_rebuild() - - -class AttributeGroupHierarchyResponse(AttributeGroupHierarchyNode): - """Attribute group hierarchy response""" - - pass - - -# ============================================================================ -# TEXT MODELS -# ============================================================================ - - -class TextContentResponse(BaseModel): - """Text content with metadata""" - - id: int = Field(..., description="The ID of the text content") - clientId: int = Field(..., description="The ID of the client") - languageCode: str = Field(..., description="The language code for this text content") - mimeType: str = Field(..., description="The MIME type of the text content") - content: str = Field(..., description="The text content") - contentLength: int = Field(..., description="The size of the text content in bytes") - workflowStatus: Optional[str] = Field( - None, description="The workflow status of the text content" - ) - workflowComment: Optional[str] = Field(None, description="Comments related to the workflow") - changedAt: Optional[str] = Field( - None, description="The date and time the text content was last changed" - ) - changedBy: Optional[int] = Field( - None, description="The ID of the user who last changed the text content" - ) - - -class TextContentRequestBody(BaseModel): - """Request body for text content""" - - mimeType: str = Field(..., description="The MIME type of the text content") - content: str = Field(..., description="The text content") - languageCode: str = Field(..., description="The language code for this text content") - workflowStatus: Optional[str] = Field( - None, description="The workflow status of the text content" - ) - workflowComment: Optional[str] = Field(None, description="Comments related to the workflow") - - -class SingleTextResponse(BaseModel): - """Complete text descriptor""" - - id: int = Field(..., description="The ID of the text descriptor") - name: str = Field(..., description="The name of the text") - treeId: Optional[int] = Field(None, description="The ID of the tree this text belongs to") - clientId: int = Field(..., description="The ID of the client") - originalId: Optional[int] = Field( - None, description="The ID of the original text if this is a copy" - ) - objectStatus: Optional[str] = Field(None, description="The status of the object") - userObjectStatus: Optional[int] = Field(None, description="User-defined object status") - createdAt: Optional[str] = Field(None, description="The date and time the text was created") - createdBy: Optional[int] = Field(None, description="The ID of the user who created the text") - modifiedAt: Optional[str] = Field( - None, description="The date and time the text was last modified" - ) - modifiedBy: Optional[int] = Field( - None, description="The ID of the user who last modified the text" - ) - contents: List[TextContentResponse] = Field( - default_factory=list, description="The text contents for different languages" - ) - attributes: List[AttributeResponse] = Field( - default_factory=list, description="The attribute values of the text" - ) - - -class SingleNewTextRequestBody(BaseModel): - """Request body for creating a new text""" - - name: str = Field(..., description="The name of the text descriptor") - parentId: int = Field(..., description="The ID of the parent product or group") - languageCode: str = Field(..., description="The language code for the response") - textTypeId: int = Field(..., description="The ID of the text type") - contents: Optional[List[TextContentRequestBody]] = Field( - None, description="List of text contents for different languages" - ) - treeId: Optional[int] = Field(None, description="The ID of the tree this text belongs to") - attributeGroupId: Optional[int] = Field( - None, description="The ID of the attribute group to assign to this text" - ) - attributes: Optional[List[Dict[str, Any]]] = Field( - None, description="Optional attributes to set" - ) - - -class SingleUpdateTextRequestBody(BaseModel): - """Request body for updating a text""" - - id: int = Field(..., description="The ID of the text descriptor to update") - name: Optional[str] = Field(None, description="The name of the text descriptor") - userObjectStatus: Optional[int] = Field(None, description="User-defined object status") - textTypeId: Optional[int] = Field(None, description="The ID of the text type") - treeId: Optional[int] = Field(None, description="The ID of the tree this text belongs to") - contents: Optional[List[TextContentRequestBody]] = Field( - None, description="List of text contents to update" - ) - attributes: Optional[List[Dict[str, Any]]] = Field(None, description="Attributes to update") - - -class TextListResponse(BaseModel): - """Paginated response containing multiple texts""" - - items: List[SingleTextResponse] = Field(..., description="List of text items") - total: int = Field(..., description="The total number of text items") - page: int = Field(..., description="The current page number") - limit: int = Field(..., description="The number of text items per page") - links: Optional[Dict[str, Optional[str]]] = Field(None, description="Pagination links") - - -class TextBulkCreateResponse(BaseModel): - """Response from bulk text creation""" - - items: List[SingleTextResponse] = Field(..., description="The created text items") - totalItemsCreated: int = Field(..., description="The total number of text items created") - - -class TextBulkUpdateResponse(BaseModel): - """Response from bulk text update""" - - items: List[SingleTextResponse] = Field(..., description="The updated text items") - totalItemsUpdated: int = Field(..., description="The total number of text items updated") - - -class MediaListResponse(BaseModel): - """Paginated list of media descriptors""" - - items: List[SingleMediaResponse] = Field(..., description="List of media items") - total: int = Field(..., description="The total number of media items") - page: int = Field(..., description="The current page number") - limit: int = Field(..., description="The number of media items per page") - links: PaginationLinks = Field(..., description="Pagination links") - - -class MediaBulkCreateResponse(BaseModel): - """Response from bulk media creation""" - - items: List[SingleMediaResponse] = Field(..., description="List of created media items") - totalItemsCreated: int = Field(..., description="The total number of media items created") - - -class MediaBulkUpdateResponse(BaseModel): - """Response from bulk media update""" - - items: List[SingleMediaResponse] = Field(..., description="List of updated media items") - totalItemsUpdated: int = Field(..., description="The total number of media items updated") - - -# ============= Hierarchy Models ============= - - -class HierarchyNode(BaseModel): - """A node in a hierarchy tree""" - - id: int = Field(..., description="The ID of the node") - name: str = Field(..., description="The name of the node") - type: str = Field(..., description="The type of node (product, variant, media, text, etc.)") - children: List["HierarchyNode"] = Field( - default_factory=list, description="The immediate children of the node" - ) - - -HierarchyNode.model_rebuild() - - -class ProductHierarchyResponse(HierarchyNode): - """Hierarchical product structure""" - - pass - - -class ProductGroupHierarchyResponse(BaseModel): - """Hierarchical product group structure""" - - id: int = Field(..., description="The ID of the node") - name: str = Field(..., description="The name of the node") - type: str = Field( - ..., description="The type of node (product-group, product, variant, media, text)" - ) - children: List["ProductGroupHierarchyResponse"] = Field( - default_factory=list, description="The immediate children" - ) - - -ProductGroupHierarchyResponse.model_rebuild() - - -class TreeGroupHierarchyResponse(BaseModel): - """Hierarchical tree group structure""" - - id: int = Field(..., description="The ID of the node") - name: str = Field(..., description="The name of the node") - type: str = Field(..., description="The type of node (tree-group, product-group)") - children: List["TreeGroupHierarchyResponse"] = Field( - default_factory=list, description="The immediate children" - ) - - -TreeGroupHierarchyResponse.model_rebuild() - - -class AttributeGroupHierarchyResponse(BaseModel): - """Hierarchical attribute group structure""" - - id: int = Field(..., description="The ID of the node") - name: str = Field(..., description="The name of the node") - type: str = Field(..., description="The type of node") - children: List["AttributeGroupHierarchyResponse"] = Field( - default_factory=list, description="The immediate children" - ) - - -AttributeGroupHierarchyResponse.model_rebuild() diff --git a/pyproject.toml b/pyproject.toml index 22759dc..d021b2e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,7 +56,8 @@ profile = "black" line_length = 100 [tool.mypy] -python_version = "0.3.0" +python_version = "3.10" warn_return_any = true warn_unused_configs = true disallow_untyped_defs = false +ignore_missing_imports = true diff --git a/tests/test_client.py b/tests/test_client.py index 20e64ce..bc55293 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,14 +1,11 @@ """Tests for the Elytra PIM Client with Pydantic validation""" -import pytest from unittest.mock import Mock, patch + +import pytest from pydantic import ValidationError -from elytra_client import ( - ElytraClient, - SingleProductResponse, - SingleNewProductRequestBody, -) +from elytra_client import ElytraClient, SingleNewProductRequestBody, SingleProductResponse from elytra_client.exceptions import ElytraAuthenticationError, ElytraNotFoundError @@ -118,11 +115,11 @@ def test_create_product_with_pydantic(mock_request, client): mock_request.return_value = mock_response # Create with Pydantic model - new_product = SingleNewProductRequestBody( + new_product = SingleNewProductRequestBody( # type: ignore[arg-type] productName="NEW-PRODUCT-001", parentId=1, attributeGroupId=10, - ) # type: ignore - validation happens automatically, so type checker should recognize this as valid + ) result = client.create_product(new_product) @@ -134,20 +131,20 @@ def test_create_product_with_pydantic(mock_request, client): def test_pydantic_validation_on_creation(): """Test Pydantic validation on model creation""" # Valid model - valid_product = SingleNewProductRequestBody( + valid_product = SingleNewProductRequestBody( # type: ignore[arg-type] productName="VALID-PRODUCT", parentId=1, attributeGroupId=10, - ) # type: ignore - validation happens automatically, so type checker should recognize this as valid + ) assert valid_product.productName == "VALID-PRODUCT" # Invalid model - missing required field with pytest.raises(ValidationError): - SingleNewProductRequestBody( + SingleNewProductRequestBody( # type: ignore[arg-type] productName="INVALID-PRODUCT", # Missing parentId - required attributeGroupId=10, - ) # type: ignore - this will raise a ValidationError, so type checker should recognize this as invalid + ) @patch("elytra_client.client.requests.Session.request") @@ -156,9 +153,9 @@ def test_authentication_error(mock_request, client): mock_response = Mock() mock_response.status_code = 401 mock_response.text = "Unauthorized" - mock_request.return_value.raise_for_status.side_effect = ( - __import__("requests").exceptions.HTTPError(response=mock_response) - ) + mock_request.return_value.raise_for_status.side_effect = __import__( + "requests" + ).exceptions.HTTPError(response=mock_response) with pytest.raises(ElytraAuthenticationError): client.get_products() @@ -170,9 +167,9 @@ def test_not_found_error(mock_request, client): mock_response = Mock() mock_response.status_code = 404 mock_response.text = "Not Found" - mock_request.return_value.raise_for_status.side_effect = ( - __import__("requests").exceptions.HTTPError(response=mock_response) - ) + mock_request.return_value.raise_for_status.side_effect = __import__( + "requests" + ).exceptions.HTTPError(response=mock_response) with pytest.raises(ElytraNotFoundError): client.get_product(product_id=999)