Compare commits

...

12 commits

Author SHA1 Message Date
e723356c75 feat: Bump version to 0.7.0 and update media file upload functionality 2026-03-25 10:35:41 +01:00
9eb1f4a641 feat: Add attribute and attribute group management endpoints with Pydantic validation 2026-03-25 10:10:03 +01:00
798c763765 feat: Add tree and media endpoints with Pydantic validation 2026-03-25 10:04:24 +01:00
abd4e2c9ca cleanup 2026-03-25 09:08:58 +01:00
aee3a48687 chore: Bump version to 0.6.0 in __init__.py and pyproject.toml 2026-03-25 09:07:01 +01:00
638b68214b docs: Update README and module docstrings to clarify legacy API usage and CRUD operations in Elytra PIM Web API 2026-03-25 08:20:50 +01:00
c9353cf8ea style: Improve code formatting and readability in update_version.py 2026-03-24 16:28:13 +01:00
5dfb96874e chore: Update version to 0.5.0 in project files and enhance version update script 2026-03-24 16:28:02 +01:00
aa7db1a3ab Add mixins for various Lobster REST API client functionalities
- Implement JobsMixin for job-related operations including job overview, execution, and control.
- Implement MediaMixin for media management, including creation, updating, and file uploads.
- Implement ProductGroupsMixin for handling product groups, including bulk operations and hierarchy retrieval.
- Implement ProductsMixin for product management, including bulk creation and updates.
- Implement ProtocolsMixin for protocol-related operations, including retrieval by job ID and category.
- Implement TextMixin for managing text entries, including bulk operations.
- Implement TreeGroupsMixin for tree group management, including bulk operations and hierarchy retrieval.
2026-03-24 16:12:58 +01:00
310a4fe2f8 Refactor models in the Lobster PIM Legacy REST API
- Organized model imports in __init__.py for better structure and readability.
- Grouped related models into separate files for attributes, attribute groups, products, protocols, and tree groups.
- Removed duplicate imports and ensured all necessary models are included.
- Enhanced code maintainability by separating concerns and improving the overall organization of the models package.
2026-03-24 15:35:46 +01:00
68f0b76feb Add product, product group, tree group, attribute, and text endpoints with CRUD operations 2026-03-24 15:35:20 +01:00
ef53746129 feat: Add media and hierarchy endpoints with corresponding models to support media management 2026-03-24 15:12:29 +01:00
32 changed files with 4137 additions and 563 deletions

View file

@ -1,6 +1,6 @@
"""Elytra PIM Client - A Pythonic client for the Elytra PIM API"""
__version__ = "0.3.0"
__version__ = "0.7.0"
__author__ = "Your Name"
from . import rest_api
@ -8,27 +8,47 @@ from .client import ElytraClient
from .exceptions import ElytraAPIError, ElytraAuthenticationError
from .models import (
ProductAttributeResponse,
SingleMediaResponse,
SingleNewMediaRequestBody,
SingleNewProductGroupRequestBody,
SingleNewProductRequestBody,
SingleNewTextRequestBody,
SingleNewTreeGroupRequestBody,
SingleProductGroupResponse,
SingleProductResponse,
SingleTextResponse,
SingleTreeGroupResponse,
SingleUpdateMediaRequestBody,
SingleUpdateProductGroupRequestBody,
SingleUpdateProductRequestBody,
SingleUpdateTextRequestBody,
SingleUpdateTreeGroupRequestBody,
)
__all__ = [
"ElytraClient",
"ElytraAPIError",
"ElytraAuthenticationError",
# Response models
# Product models
"SingleProductResponse",
"SingleProductGroupResponse",
"ProductAttributeResponse",
# Request models
"SingleNewProductRequestBody",
"SingleUpdateProductRequestBody",
"SingleNewProductGroupRequestBody",
"SingleUpdateProductGroupRequestBody",
# Media models
"SingleMediaResponse",
"SingleNewMediaRequestBody",
"SingleUpdateMediaRequestBody",
# Text models
"SingleTextResponse",
"SingleNewTextRequestBody",
"SingleUpdateTextRequestBody",
# Tree group models
"SingleTreeGroupResponse",
"SingleNewTreeGroupRequestBody",
"SingleUpdateTreeGroupRequestBody",
# Legacy REST API subpackage
"rest_api",
]

File diff suppressed because it is too large Load diff

View file

@ -1,11 +1,19 @@
# Lobster PIM Legacy REST API Client
This subpackage provides a Python client for accessing the legacy REST API of Lobster PIM (Product Information Management system). It offers access to scheduled jobs and protocol logs through a clean, Pydantic-based interface.
This subpackage provides a Python client for accessing the **legacy REST API** of Lobster PIM (now called Elytra).
## ⚠️ Important: Legacy API vs. New Web API
The Lobster REST API is the **legacy API** that provides read-only access to:
- **Scheduled Jobs** (`/rest/job/*`) - Job execution and monitoring
- **Protocol Logs** (`/rest/protocol/*`) - Execution logs and protocol information
**For CRUD operations** on products, product groups, attributes, media, and other resources in the new Elytra PIM Web API, use the [`ElytraClient`](../client.py) with the OpenAPI-based Web API instead.
## Features
- **Job Management**: Access, monitor, and execute scheduled jobs
- **Protocol/Log Access**: Retrieve execution logs and protocol information
- **Job Management**: Get, monitor, and execute scheduled jobs (read-only)
- **Protocol/Log Access**: Retrieve execution logs and protocol information (read-only)
- **Authentication**: Support for both username/password and API token authentication
- **Job Control**: Execute jobs with parameter overrides and queue management
- **Type Safety**: Full Pydantic model validation for all API responses

View file

@ -1,4 +1,11 @@
"""Lobster PIM Legacy REST API client and utilities"""
"""Lobster PIM Legacy REST API client and utilities
This module provides access to the legacy Lobster REST API, which offers
read-only access to Scheduled Jobs and Protocol logs.
For CRUD operations on products, product groups, attributes, media, and other
resources in the new Elytra PIM Web API, use the ElytraClient instead.
"""
from .auth import AuthMethod, RestApiAuth
from .client import LobsterRestApiClient
@ -17,18 +24,23 @@ from .models import (
)
__all__ = [
# Authentication
"RestApiAuth",
"AuthMethod",
# Client
"LobsterRestApiClient",
# Job models
"JobInfo",
"JobDetailInfo",
"JobOverviewResponse",
"JobExecutionResponse",
"JobControlRequest",
"JobControlResponse",
# Protocol models
"ProtocolInfo",
"ProtocolListResponse",
"ProtocolCategoryInfo",
"ProtocolCategoryListResponse",
# Error model
"ErrorResponse",
]

View file

@ -1,438 +0,0 @@
"""Client for the Lobster PIM Legacy REST API"""
from typing import Any, Dict, List, Optional, Type, TypeVar, cast
from urllib.parse import urljoin
import requests
from pydantic import BaseModel, ValidationError
from ..exceptions import (
ElytraAPIError,
ElytraAuthenticationError,
ElytraNotFoundError,
ElytraValidationError,
)
from .auth import AuthMethod, RestApiAuth
from .models import (
JobControlRequest,
JobControlResponse,
JobDetailInfo,
JobExecutionResponse,
JobInfo,
JobOverviewResponse,
ProtocolCategoryInfo,
ProtocolCategoryListResponse,
ProtocolInfo,
ProtocolListResponse,
)
T = TypeVar('T', bound=BaseModel)
class LobsterRestApiClient:
"""
Client for the Lobster PIM Legacy REST API.
Provides access to scheduled jobs and protocol logs via REST endpoints.
Supports both username/password and API token authentication.
Args:
base_url: The base URL of the Lobster PIM server (e.g., http://localhost:8080)
auth: RestApiAuth instance for authentication
timeout: Request timeout in seconds (default: 30)
"""
def __init__(
self,
base_url: str,
auth: RestApiAuth,
timeout: int = 30,
):
"""Initialize the Lobster REST API client"""
self.base_url = base_url.rstrip("/")
self.auth = auth
self.timeout = timeout
self.session = requests.Session()
self._setup_headers()
def _setup_headers(self) -> None:
"""Set up request headers including authentication"""
self.session.headers.update(
{
"Content-Type": "application/json",
"Accept": "application/json",
}
)
self.session.headers.update(self.auth.get_auth_header())
def _handle_response(
self,
response: requests.Response,
expected_model: Type[T],
) -> T:
"""
Handle API response and parse into Pydantic model.
Args:
response: Response from requests
expected_model: Pydantic model to deserialize into
Returns:
Parsed response as Pydantic model
Raises:
ElytraAuthenticationError: If authentication fails
ElytraNotFoundError: If resource not found
ElytraAPIError: For other API errors
ElytraValidationError: If response validation fails
"""
if response.status_code == 401:
raise ElytraAuthenticationError("Authentication failed")
elif response.status_code == 404:
raise ElytraNotFoundError("Resource not found")
elif response.status_code == 429:
raise ElytraAPIError("Too many requests - rate limit exceeded")
elif response.status_code >= 400:
try:
error_data = response.json()
error_msg = error_data.get("error") or error_data.get("message", response.text)
except Exception:
error_msg = response.text
raise ElytraAPIError(f"API error {response.status_code}: {error_msg}")
try:
data = response.json()
except Exception as e:
raise ElytraAPIError(f"Failed to parse response as JSON: {str(e)}")
try:
return expected_model.model_validate(data)
except ValidationError as e:
raise ElytraValidationError(f"Response validation failed: {str(e)}")
def _make_request(
self,
method: str,
endpoint: str,
expected_model: Type[T],
params: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None,
) -> T:
"""
Make HTTP request to the REST API.
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint path
expected_model: Pydantic model for response
params: Query parameters
json_data: JSON request body
Returns:
Parsed response as Pydantic model
"""
url = urljoin(self.base_url, f"/rest/{endpoint}")
# Add authentication parameters for GET requests
if method.upper() == "GET" and self.auth.auth_method == AuthMethod.USERNAME_PASSWORD:
if params is None:
params = {}
params.update(self.auth.get_url_parameters())
response = self.session.request(
method=method,
url=url,
params=params,
json=json_data,
timeout=self.timeout,
)
return self._handle_response(response, expected_model)
# ============= Job Endpoints =============
def get_job_overview(self) -> JobOverviewResponse:
"""
Get overview of all active jobs.
Returns:
JobOverviewResponse containing list of active jobs
"""
return self._make_request(
"GET",
"job/overview",
JobOverviewResponse,
)
def get_job_html_overview(self) -> str:
"""
Get HTML overview of all active jobs.
Returns:
HTML page content with job overview
"""
url = urljoin(self.base_url, "/rest/job/overview")
params = None
if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD:
params = self.auth.get_url_parameters()
response = self.session.get(url, params=params, timeout=self.timeout)
if response.status_code >= 400:
raise ElytraAPIError(f"Failed to get job overview: {response.status_code}")
return response.text
def get_all_active_jobs(self) -> JobOverviewResponse:
"""
Get list of all active jobs.
Returns:
JobOverviewResponse containing list of all active jobs
"""
return self._make_request(
"GET",
"job/job_id",
JobOverviewResponse,
)
def get_job_detail(self, job_id: int) -> JobDetailInfo:
"""
Get details of a specific active job.
Args:
job_id: ID of the job
Returns:
JobDetailInfo with job details
"""
return self._make_request(
"GET",
f"job/job_id/{job_id}",
JobDetailInfo,
)
def execute_job(
self,
job_id: int,
) -> JobExecutionResponse:
"""
Execute a job and get details of the started job.
Args:
job_id: ID of the job to execute
Returns:
JobExecutionResponse with execution details
"""
return self._make_request(
"GET",
f"job/execute/{job_id}",
JobExecutionResponse,
)
def get_running_job_instances(self) -> JobOverviewResponse:
"""
Get list of running job instances.
Returns:
JobOverviewResponse containing list of running job instances
"""
return self._make_request(
"GET",
"job/runtime_id",
JobOverviewResponse,
)
def get_running_job_instance(self, runtime_id: str) -> JobOverviewResponse:
"""
Get details of a specific running job instance.
Args:
runtime_id: Runtime ID of the job instance
Returns:
JobOverviewResponse with instance details
"""
return self._make_request(
"GET",
f"job/runtime_id/{runtime_id}",
JobOverviewResponse,
)
def control_job(
self,
job_id: int,
action: str = "start",
additional_reference: Optional[str] = None,
parameters: Optional[Dict[str, Any]] = None,
queue_id: Optional[str] = None,
max_job_duration_seconds: Optional[int] = None,
) -> JobControlResponse:
"""
Control a job using the control endpoint (POST).
Supports starting jobs with parameter overrides and queue management.
Args:
job_id: ID of the job to control
action: Action to perform (default: "start")
additional_reference: Optional reference for external tracking
parameters: Optional parameters to override job settings
queue_id: Optional queue ID for serialized execution
max_job_duration_seconds: Max duration in seconds (default 12 hours)
Returns:
JobControlResponse with execution details
"""
request_body = {
"action": action,
"objectId": job_id,
"objectType": "job",
}
# Add authentication credentials for POST
request_body.update(self.auth.get_json_body_params())
if additional_reference:
request_body["additionalReference"] = additional_reference
if parameters:
request_body["parameter"] = parameters
if queue_id:
request_body["queueId"] = queue_id
if max_job_duration_seconds:
request_body["maxJobDurationSeconds"] = max_job_duration_seconds
return self._make_request(
"POST",
"job/control",
JobControlResponse,
json_data=request_body,
)
# ============= Protocol Endpoints =============
def get_protocols(self, limit: int = 50) -> ProtocolListResponse:
"""
Get the last N protocols.
Args:
limit: Number of protocols to retrieve (default: 50)
Returns:
ProtocolListResponse containing list of protocols
"""
response = self.session.get(
urljoin(self.base_url, "/rest/protocol"),
params={"limit": limit, **self.auth.get_url_parameters()},
timeout=self.timeout,
)
return self._handle_response(response, ProtocolListResponse)
def get_protocol(self, protocol_id: str) -> ProtocolInfo:
"""
Get details of a specific protocol.
Args:
protocol_id: ID of the protocol
Returns:
ProtocolInfo with protocol details
"""
return self._make_request(
"GET",
f"protocol/{protocol_id}",
ProtocolInfo,
)
def get_protocol_by_job_id(self, job_id: int) -> ProtocolListResponse:
"""
Get all protocols for a specific job ID.
Args:
job_id: ID of the job
Returns:
ProtocolListResponse containing protocols for the job
"""
return self._make_request(
"GET",
f"protocol/job/{job_id}",
ProtocolListResponse,
)
def get_protocol_by_runtime_id(self, runtime_id: str) -> ProtocolInfo:
"""
Get protocol for a specific job instance runtime ID.
Args:
runtime_id: Runtime ID of the job instance
Returns:
ProtocolInfo with protocol details
"""
return self._make_request(
"GET",
f"protocol/job/{runtime_id}",
ProtocolInfo,
)
def get_protocol_by_additional_reference(self, reference: str) -> ProtocolListResponse:
"""
Get all protocols for a specific additional reference.
Args:
reference: Additional reference value
Returns:
ProtocolListResponse containing protocols for the reference
"""
return self._make_request(
"GET",
f"protocol/job/{reference}",
ProtocolListResponse,
)
def get_all_protocol_categories(self) -> ProtocolCategoryListResponse:
"""
Get all available protocol categories.
Returns:
ProtocolCategoryListResponse containing list of categories
"""
return self._make_request(
"GET",
"protocol/category",
ProtocolCategoryListResponse,
)
def get_protocol_by_category(self, category_id: str, limit: int = 50) -> ProtocolListResponse:
"""
Get the last N protocols from a specific category.
Args:
category_id: ID of the protocol category
limit: Number of protocols to retrieve (default: 50)
Returns:
ProtocolListResponse containing protocols for the category
"""
return self._make_request(
"GET",
f"protocol/category/{category_id}",
ProtocolListResponse,
params={"limit": limit},
)
def close(self) -> None:
"""Close the session and clean up resources."""
self.session.close()
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.close()

View file

@ -0,0 +1,43 @@
"""Lobster PIM Legacy REST API Client."""
from .base import LobsterRestApiClientBase
from .jobs import JobsMixin
from .protocols import ProtocolsMixin
class LobsterRestApiClient(
LobsterRestApiClientBase,
JobsMixin,
ProtocolsMixin,
):
"""
Legacy REST API client for the Lobster PIM system.
Provides read-only access to Scheduled Jobs and Protocol logs via the
Lobster REST API endpoints. Only supports GET operations on:
- Scheduled Jobs (/rest/job/*)
- Protocols (/rest/protocol/*)
The new Elytra PIM Web API (for CRUD operations on products, groups,
attributes, media, etc.) should use the ElytraClient instead.
Example:
>>> from elytra_client.rest_api.client import LobsterRestApiClient
>>> from elytra_client.rest_api.auth import RestApiAuth, AuthMethod
>>>
>>> auth = RestApiAuth.from_bearer_token("your-token")
>>> client = LobsterRestApiClient("http://localhost:8080", auth)
>>>
>>> # Access jobs (read-only)
>>> jobs = client.get_all_active_jobs()
>>> job_detail = client.get_job_detail(job_id=172475107)
>>>
>>> # Access protocols (read-only)
>>> protocols = client.get_protocols()
>>> protocol = client.get_protocol(protocol_id="176728573")
"""
pass
__all__ = ["LobsterRestApiClient"]

View file

@ -0,0 +1,36 @@
"""Protocol defining the interface expected by all mixin classes."""
from typing import Any, Dict, Optional, Protocol, Type, TypeVar
import requests
from pydantic import BaseModel
T = TypeVar("T", bound=BaseModel)
class ClientBaseProtocol(Protocol):
"""Protocol that defines the base client interface for mixins."""
base_url: str
auth: Any
timeout: int
session: requests.Session
def _make_request(
self,
method: str,
endpoint: str,
expected_model: Type[T],
params: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None,
) -> T:
"""Make HTTP request to the REST API."""
...
def _handle_response(
self,
response: requests.Response,
expected_model: Type[T],
) -> T:
"""Handle API response and parse into Pydantic model."""
...

View file

@ -0,0 +1,198 @@
# mypy: disable-error-code="attr-defined, no-any-return"
"""Attribute Groups mixin for the Lobster REST API client."""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from urllib.parse import urljoin
from ...exceptions import ElytraAPIError
from ..auth import AuthMethod
from ..models import (
AttributeGroupListResponse,
SingleAttributeGroupResponse,
SingleUpdateAttributeGroupRequestBody,
)
class AttributeGroupsMixin:
"""Mixin for attribute group-related operations.
# type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base
Expects to be mixed with LobsterRestApiClientBase or a compatible base class
that provides: base_url, auth, timeout, session, _make_request(), _handle_response().
"""
# Type hints for mixed-in attributes from base class (docstring only for clarity)
# base_url: str - from LobsterRestApiClientBase
# auth: Any - from LobsterRestApiClientBase
# timeout: int - from LobsterRestApiClientBase
# session: requests.Session - from LobsterRestApiClientBase
def get_all_attribute_groups(
self,
lang: Optional[str] = None,
page: int = 1,
limit: int = 10,
) -> AttributeGroupListResponse:
"""
Get all attribute groups.
Args:
lang: Language code (optional)
page: Page number (default: 1)
limit: Number of groups per page (default: 10)
Returns:
AttributeGroupListResponse with paginated list of attribute groups
"""
params: Dict[str, Any] = {"page": page, "limit": limit}
if lang:
params["lang"] = lang
return self._make_request( # type: ignore[attr-defined]
"GET",
"attribute/groups",
AttributeGroupListResponse,
params=params,
)
def create_attribute_group(self, group_data: Dict[str, Any]) -> SingleAttributeGroupResponse:
"""
Create a new attribute group.
Args:
group_data: Attribute group data
Returns:
SingleAttributeGroupResponse with created group
"""
return self._make_request( # type: ignore[attr-defined]
"POST",
"attribute/groups",
SingleAttributeGroupResponse,
json_data=group_data,
)
def get_attribute_group_by_id(
self, group_id: int, lang: Optional[str] = None
) -> SingleAttributeGroupResponse:
"""
Get an attribute group by ID.
Args:
group_id: ID of the attribute group
lang: Language code (optional)
Returns:
SingleAttributeGroupResponse with group details
"""
params: Dict[str, Any] = {}
if lang:
params["lang"] = lang
return self._make_request( # type: ignore[attr-defined]
"GET",
f"attribute/groups/{group_id}",
SingleAttributeGroupResponse,
params=params if params else None,
)
def delete_attribute_group(self, group_id: int) -> None:
"""
Delete an attribute group by ID.
Args:
group_id: ID of the attribute group to delete
"""
url = urljoin(self.base_url, f"/rest/attribute/groups/{group_id}") # type: ignore[attr-defined]
params = None
if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined]
params = self.auth.get_url_parameters() # type: ignore[attr-defined]
response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined]
if response.status_code >= 400:
raise ElytraAPIError(f"Failed to delete attribute group: {response.status_code}")
def get_attribute_group_by_name(
self, group_name: str, lang: Optional[str] = None
) -> SingleAttributeGroupResponse:
"""
Get an attribute group by name.
Args:
group_name: Name of the attribute group
lang: Language code (optional)
Returns:
SingleAttributeGroupResponse with group details
"""
params: Dict[str, Any] = {}
if lang:
params["lang"] = lang
return self._make_request( # type: ignore[attr-defined]
"GET",
f"attribute/groups/name/{group_name}",
SingleAttributeGroupResponse,
params=params if params else None,
)
def update_attribute_group_by_name(
self, group_name: str, group_data: Dict[str, Any]
) -> SingleAttributeGroupResponse:
"""
Update an attribute group by name.
Args:
group_name: Name of the attribute group to update
group_data: Updated group data
Returns:
SingleAttributeGroupResponse with updated group
"""
return self._make_request( # type: ignore[attr-defined]
"PATCH",
f"attribute/groups/name/{group_name}",
SingleAttributeGroupResponse,
json_data=group_data,
)
def delete_attribute_group_by_name(self, group_name: str) -> None:
"""
Delete an attribute group by name.
Args:
group_name: Name of the attribute group to delete
"""
url = urljoin(
self.base_url, f"/rest/attribute/groups/name/{group_name}" # type: ignore[attr-defined]
)
params = None
if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined]
params = self.auth.get_url_parameters() # type: ignore[attr-defined]
response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined]
if response.status_code >= 400:
raise ElytraAPIError(
f"Failed to delete attribute group by name: {response.status_code}"
)
def attribute_group_add_operation(
self, operation_data: Dict[str, Any]
) -> AttributeGroupListResponse:
"""
Perform bulk operations on attribute groups.
Args:
operation_data: Operation details
Returns:
AttributeGroupListResponse with affected groups
"""
return self._make_request( # type: ignore[attr-defined]
"POST",
"attribute/groups/operation",
AttributeGroupListResponse,
json_data=operation_data,
)

View file

@ -0,0 +1,220 @@
# mypy: disable-error-code="attr-defined, no-any-return"
"""Attributes mixin for the Lobster REST API client."""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from urllib.parse import urljoin
from ...exceptions import ElytraAPIError
from ..auth import AuthMethod
from ..models import (
AttributeBulkCreateResponse,
AttributeBulkUpdateResponse,
AttributeGetByNameResponse,
AttributeGroupHierarchyResponse,
AttributeListResponse,
SimpleAttributeResponse,
)
class AttributesMixin:
"""Mixin for attribute-related operations.
# type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base
Expects to be mixed with LobsterRestApiClientBase or a compatible base class
that provides: base_url, auth, timeout, session, _make_request(), _handle_response().
"""
# Type hints for mixed-in attributes from base class (docstring only for clarity)
# base_url: str - from LobsterRestApiClientBase
# auth: Any - from LobsterRestApiClientBase
# timeout: int - from LobsterRestApiClientBase
# session: requests.Session - from LobsterRestApiClientBase
def get_all_attributes(
self,
lang: Optional[str] = None,
page: int = 1,
limit: int = 10,
) -> AttributeListResponse:
"""
Get all attributes.
Args:
lang: Language code (optional)
page: Page number (default: 1)
limit: Number of attributes per page (default: 10)
Returns:
AttributeListResponse with paginated list of attributes
"""
params: Dict[str, Any] = {"page": page, "limit": limit}
if lang:
params["lang"] = lang
return self._make_request( # type: ignore[attr-defined]
"GET",
"attributes",
AttributeListResponse,
params=params,
)
def create_attribute(self, attribute_data: Dict[str, Any]) -> SimpleAttributeResponse:
"""
Create a new attribute.
Args:
attribute_data: Attribute data
Returns:
SimpleAttributeResponse with created attribute
"""
return self._make_request( # type: ignore[attr-defined]
"POST",
"attributes",
SimpleAttributeResponse,
json_data=attribute_data,
)
def update_attribute(self, attribute_data: Dict[str, Any]) -> SimpleAttributeResponse:
"""
Update an attribute.
Args:
attribute_data: Updated attribute data (must include 'id')
Returns:
SimpleAttributeResponse with updated attribute
"""
return self._make_request( # type: ignore[attr-defined]
"PATCH",
"attributes",
SimpleAttributeResponse,
json_data=attribute_data,
)
def create_multiple_attributes(
self, attributes_list: List[Dict[str, Any]]
) -> AttributeBulkCreateResponse:
"""
Create multiple attributes in bulk.
Args:
attributes_list: List of attribute data
Returns:
AttributeBulkCreateResponse with created attributes
"""
return self._make_request( # type: ignore[attr-defined]
"POST",
"attributes/bulk",
AttributeBulkCreateResponse,
json_data=attributes_list,
)
def update_multiple_attributes(
self, attributes_list: List[Dict[str, Any]]
) -> AttributeBulkUpdateResponse:
"""
Update multiple attributes in bulk.
Args:
attributes_list: List of attribute data to update (each must include 'id')
Returns:
AttributeBulkUpdateResponse with updated attributes
"""
return self._make_request( # type: ignore[attr-defined]
"PATCH",
"attributes/bulk",
AttributeBulkUpdateResponse,
json_data=attributes_list,
)
def get_attribute_by_id(
self, attribute_id: int, lang: Optional[str] = None
) -> SimpleAttributeResponse:
"""
Get an attribute by ID.
Args:
attribute_id: ID of the attribute
lang: Language code (optional)
Returns:
SimpleAttributeResponse with attribute details
"""
params: Dict[str, Any] = {}
if lang:
params["lang"] = lang
return self._make_request( # type: ignore[attr-defined]
"GET",
f"attributes/{attribute_id}",
SimpleAttributeResponse,
params=params if params else None,
)
def delete_attribute(self, attribute_id: int) -> None:
"""
Delete an attribute by ID.
Args:
attribute_id: ID of the attribute to delete
"""
url = urljoin(
self.base_url, f"/rest/attributes/{attribute_id}" # type: ignore[attr-defined]
)
params = None
if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined]
params = self.auth.get_url_parameters() # type: ignore[attr-defined]
response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined]
if response.status_code >= 400:
raise ElytraAPIError(f"Failed to delete attribute: {response.status_code}")
def get_attribute_by_name(
self, attribute_name: str, lang: Optional[str] = None
) -> AttributeGetByNameResponse:
"""
Get an attribute by name with language-dependent properties.
Args:
attribute_name: Name of the attribute
lang: Language code (optional)
Returns:
AttributeGetByNameResponse with attribute details and languageDependents
"""
params: Dict[str, Any] = {}
if lang:
params["lang"] = lang
return self._make_request( # type: ignore[attr-defined]
"GET",
f"attributes/name/{attribute_name}",
AttributeGetByNameResponse,
params=params if params else None,
)
def get_attribute_group_hierarchy(
self, attribute_group_id: int, depth: int = 10
) -> AttributeGroupHierarchyResponse:
"""
Get the hierarchy of an attribute group.
Args:
attribute_group_id: ID of the attribute group
depth: Depth of the hierarchy (default: 10)
Returns:
AttributeGroupHierarchyResponse with hierarchical attribute group structure
"""
params = {"depth": depth}
return self._make_request( # type: ignore[attr-defined]
"GET",
f"attribute/groups/hierarchy/{attribute_group_id}",
AttributeGroupHierarchyResponse,
params=params,
)

View file

@ -0,0 +1,152 @@
"""Base client for the Lobster PIM Legacy REST API."""
from __future__ import annotations
from typing import Any, Dict, Optional, Type, TypeVar
from urllib.parse import urljoin
import requests
from pydantic import BaseModel, ValidationError
from ...exceptions import (
ElytraAPIError,
ElytraAuthenticationError,
ElytraNotFoundError,
ElytraValidationError,
)
from ..auth import AuthMethod, RestApiAuth
T = TypeVar("T", bound=BaseModel)
class LobsterRestApiClientBase:
"""
Base client for the Lobster PIM Legacy REST API.
Provides core infrastructure for authentication, HTTP requests, and response handling.
Subclasses should extend this with domain-specific methods via mixins.
Args:
base_url: The base URL of the Lobster PIM server (e.g., http://localhost:8080)
auth: RestApiAuth instance for authentication
timeout: Request timeout in seconds (default: 30)
"""
def __init__(
self,
base_url: str,
auth: RestApiAuth,
timeout: int = 30,
):
"""Initialize the Lobster REST API client"""
self.base_url = base_url.rstrip("/")
self.auth = auth
self.timeout = timeout
self.session = requests.Session()
self._setup_headers()
def _setup_headers(self) -> None:
"""Set up request headers including authentication"""
self.session.headers.update(
{
"Content-Type": "application/json",
"Accept": "application/json",
}
)
self.session.headers.update(self.auth.get_auth_header())
def _handle_response(
self,
response: requests.Response,
expected_model: Type[T],
) -> T:
"""
Handle API response and parse into Pydantic model.
Args:
response: Response from requests
expected_model: Pydantic model to deserialize into
Returns:
Parsed response as Pydantic model
Raises:
ElytraAuthenticationError: If authentication fails
ElytraNotFoundError: If resource not found
ElytraAPIError: For other API errors
ElytraValidationError: If response validation fails
"""
if response.status_code == 401:
raise ElytraAuthenticationError("Authentication failed")
elif response.status_code == 404:
raise ElytraNotFoundError("Resource not found")
elif response.status_code == 429:
raise ElytraAPIError("Too many requests - rate limit exceeded")
elif response.status_code >= 400:
try:
error_data = response.json()
error_msg = error_data.get("error") or error_data.get("message", response.text)
except Exception:
error_msg = response.text
raise ElytraAPIError(f"API error {response.status_code}: {error_msg}")
try:
data = response.json()
except Exception as e:
raise ElytraAPIError(f"Failed to parse response as JSON: {str(e)}")
try:
return expected_model.model_validate(data)
except ValidationError as e:
raise ElytraValidationError(f"Response validation failed: {str(e)}")
def _make_request(
self,
method: str,
endpoint: str,
expected_model: Type[T],
params: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None,
) -> T:
"""
Make HTTP request to the REST API.
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint path
expected_model: Pydantic model for response
params: Query parameters
json_data: JSON request body
Returns:
Parsed response as Pydantic model
"""
url = urljoin(self.base_url, f"/rest/{endpoint}")
# Add authentication parameters for GET requests
if method.upper() == "GET" and self.auth.auth_method == AuthMethod.USERNAME_PASSWORD:
if params is None:
params = {}
params.update(self.auth.get_url_parameters())
response = self.session.request(
method=method,
url=url,
params=params,
json=json_data,
timeout=self.timeout,
)
return self._handle_response(response, expected_model)
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.close()
def close(self):
"""Close the session"""
self.session.close()

View file

@ -0,0 +1,195 @@
# mypy: disable-error-code="attr-defined, no-any-return"
"""Jobs mixin for the Lobster REST API client."""
from __future__ import annotations
from typing import Any, Dict, Optional
from ..models import (
JobControlRequest,
JobControlResponse,
JobDetailInfo,
JobExecutionResponse,
JobInfo,
JobOverviewResponse,
)
class JobsMixin:
"""Mixin for job-related operations.
# type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base
Expects to be mixed with LobsterRestApiClientBase or a compatible base class
that provides: base_url, auth, timeout, session, _make_request(), _handle_response().
"""
# Type hints for mixed-in attributes from base class (docstring only for clarity)
# base_url: str - from LobsterRestApiClientBase
# auth: Any - from LobsterRestApiClientBase
# timeout: int - from LobsterRestApiClientBase
# session: requests.Session - from LobsterRestApiClientBase
def get_job_overview(self) -> JobOverviewResponse:
"""
Get overview of all active jobs.
Returns:
JobOverviewResponse containing list of active jobs
"""
return self._make_request( # type: ignore[attr-defined]
"GET",
"job/overview",
JobOverviewResponse,
)
def get_job_html_overview(self) -> str:
"""
Get HTML overview of all active jobs.
Returns:
HTML page content with job overview
"""
from urllib.parse import urljoin
from ...exceptions import ElytraAPIError
from ..auth import AuthMethod
url = urljoin(self.base_url, "/rest/job/overview") # type: ignore[attr-defined]
params = None
if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined]
params = self.auth.get_url_parameters() # type: ignore[attr-defined]
response = self.session.get(url, params=params, timeout=self.timeout) # type: ignore[attr-defined]
if response.status_code >= 400:
raise ElytraAPIError(f"Failed to get job overview: {response.status_code}")
return response.text
def get_all_active_jobs(self) -> JobOverviewResponse:
"""
Get list of all active jobs.
Returns:
JobOverviewResponse containing list of all active jobs
"""
return self._make_request( # type: ignore[attr-defined]
"GET",
"job/job_id",
JobOverviewResponse,
)
def get_job_detail(self, job_id: int) -> JobDetailInfo:
"""
Get details of a specific active job.
Args:
job_id: ID of the job
Returns:
JobDetailInfo with job details
"""
return self._make_request( # type: ignore[attr-defined]
"GET",
f"job/job_id/{job_id}",
JobDetailInfo,
)
def execute_job(
self,
job_id: int,
) -> JobExecutionResponse:
"""
Execute a job and get details of the started job.
Args:
job_id: ID of the job to execute
Returns:
JobExecutionResponse with execution details
"""
return self._make_request( # type: ignore[attr-defined]
"GET",
f"job/execute/{job_id}",
JobExecutionResponse,
)
def get_running_job_instances(self) -> JobOverviewResponse:
"""
Get list of running job instances.
Returns:
JobOverviewResponse containing list of running job instances
"""
return self._make_request( # type: ignore[attr-defined]
"GET",
"job/runtime_id",
JobOverviewResponse,
)
def get_running_job_instance(self, runtime_id: str) -> JobOverviewResponse:
"""
Get details of a specific running job instance.
Args:
runtime_id: Runtime ID of the job instance
Returns:
JobOverviewResponse with instance details
"""
return self._make_request( # type: ignore[attr-defined]
"GET",
f"job/runtime_id/{runtime_id}",
JobOverviewResponse,
)
def control_job(
self,
job_id: int,
action: str = "start",
additional_reference: Optional[str] = None,
parameters: Optional[Dict[str, Any]] = None,
queue_id: Optional[str] = None,
max_job_duration_seconds: Optional[int] = None,
) -> JobControlResponse:
"""
Control a job using the control endpoint (POST).
Supports starting jobs with parameter overrides and queue management.
Args:
job_id: ID of the job to control
action: Action to perform (default: "start")
additional_reference: Optional reference for external tracking
parameters: Optional parameters to override job settings
queue_id: Optional queue ID for serialized execution
max_job_duration_seconds: Max duration in seconds (default 12 hours)
Returns:
JobControlResponse with execution details
"""
request_body = {
"action": action,
"objectId": job_id,
"objectType": "job",
}
# Add authentication credentials for POST
request_body.update(self.auth.get_json_body_params()) # type: ignore[attr-defined]
if additional_reference:
request_body["additionalReference"] = additional_reference
if parameters:
request_body["parameter"] = parameters
if queue_id:
request_body["queueId"] = queue_id
if max_job_duration_seconds:
request_body["maxJobDurationSeconds"] = max_job_duration_seconds
return self._make_request( # type: ignore[attr-defined]
"POST",
"job/control",
JobControlResponse,
json_data=request_body,
)

View file

@ -0,0 +1,240 @@
# mypy: disable-error-code="attr-defined, no-any-return"
"""Media mixin for the Lobster REST API client."""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from urllib.parse import urljoin
from ...exceptions import ElytraAPIError
from ..auth import AuthMethod
from ..models import (
MediaBulkCreateResponse,
MediaBulkUpdateResponse,
MediaFileResponse,
MediaListResponse,
SingleMediaResponse,
)
class MediaMixin:
"""Mixin for media-related operations.
# type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base
Expects to be mixed with LobsterRestApiClientBase or a compatible base class
that provides: base_url, auth, timeout, session, _make_request(), _handle_response().
"""
# Type hints for mixed-in attributes from base class (docstring only for clarity)
# base_url: str - from LobsterRestApiClientBase
# auth: Any - from LobsterRestApiClientBase
# timeout: int - from LobsterRestApiClientBase
# session: requests.Session - from LobsterRestApiClientBase
def get_all_media(
self, lang: Optional[str] = None, page: int = 1, limit: int = 10
) -> MediaListResponse:
"""
Get all media descriptors with pagination.
Args:
lang: Language code (optional)
page: Page number (default: 1)
limit: Number of media items per page (default: 10)
Returns:
MediaListResponse containing paginated list of media items
"""
params: Dict[str, Any] = {"page": page, "limit": limit}
if lang:
params["lang"] = lang
return self._make_request( # type: ignore[attr-defined]
"GET",
"media",
MediaListResponse,
params=params,
)
def get_media_by_id(self, media_id: int, lang: Optional[str] = None) -> SingleMediaResponse:
"""
Get a media descriptor by ID.
Args:
media_id: ID of the media
lang: Language code (optional)
Returns:
SingleMediaResponse with media details
"""
params: Dict[str, Any] = {}
if lang:
params["lang"] = lang
return self._make_request( # type: ignore[attr-defined]
"GET",
f"media/{media_id}",
SingleMediaResponse,
params=params if params else None,
)
def create_media(self, media_data: Dict[str, Any]) -> SingleMediaResponse:
"""
Create a new media descriptor.
Args:
media_data: Media descriptor data
Returns:
SingleMediaResponse with created media
"""
return self._make_request( # type: ignore[attr-defined]
"POST",
"media",
SingleMediaResponse,
json_data=media_data,
)
def update_media(self, media_data: Dict[str, Any]) -> SingleMediaResponse:
"""
Update a media descriptor.
Args:
media_data: Updated media descriptor data (must include 'id')
Returns:
SingleMediaResponse with updated media
"""
return self._make_request( # type: ignore[attr-defined]
"PATCH",
"media",
SingleMediaResponse,
json_data=media_data,
)
def create_multiple_media(self, media_list: List[Dict[str, Any]]) -> MediaBulkCreateResponse:
"""
Create multiple media descriptors in bulk.
Args:
media_list: List of media descriptor data
Returns:
MediaBulkCreateResponse with created media items
"""
return self._make_request( # type: ignore[attr-defined]
"POST",
"media/bulk",
MediaBulkCreateResponse,
json_data=media_list,
)
def update_multiple_media(self, media_list: List[Dict[str, Any]]) -> MediaBulkUpdateResponse:
"""
Update multiple media descriptors in bulk.
Args:
media_list: List of media descriptor data to update (each must include 'id')
Returns:
MediaBulkUpdateResponse with updated media items
"""
return self._make_request( # type: ignore[attr-defined]
"PATCH",
"media/bulk",
MediaBulkUpdateResponse,
json_data=media_list,
)
def delete_media(self, media_id: int) -> None:
"""
Delete a media descriptor by ID.
Args:
media_id: ID of the media to delete
"""
url = urljoin(self.base_url, f"/rest/media/{media_id}") # type: ignore[attr-defined]
params = None
if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined]
params = self.auth.get_url_parameters() # type: ignore[attr-defined]
response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined]
if response.status_code >= 400:
raise ElytraAPIError(f"Failed to delete media: {response.status_code}")
def upload_media_file(
self,
file_path: str,
media_id: int,
mam_system: str,
language_code: str = "independent",
) -> MediaFileResponse:
"""
Upload a media file for a media descriptor.
Args:
file_path: Path to the file to upload
media_id: ID of the media descriptor
mam_system: MAM system code (fs, sixomc, cumulus, etc.)
language_code: Language code for the file (default: independent)
Returns:
MediaFileResponse with uploaded file metadata
"""
url = urljoin(self.base_url, "/rest/media/file") # type: ignore[attr-defined]
with open(file_path, "rb") as file:
files = {"file": file}
data = {
"mediaId": media_id,
"mamSystem": mam_system,
"languageCode": language_code,
}
# Add authentication credentials
data.update(self.auth.get_json_body_params()) # type: ignore[attr-defined]
response = self.session.post( # type: ignore[attr-defined]
url,
files=files,
data=data,
timeout=self.timeout, # type: ignore[attr-defined]
)
return self._handle_response(response, MediaFileResponse) # type: ignore[attr-defined]
def get_media_content(self, media_file_id: int) -> bytes:
"""
Download the binary content of a media file.
Args:
media_file_id: ID of the media file
Returns:
Binary content of the media file
"""
url = urljoin(self.base_url, f"/rest/media/file/{media_file_id}") # type: ignore[attr-defined]
params = None
if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined]
params = self.auth.get_url_parameters() # type: ignore[attr-defined]
response = self.session.get(url, params=params, timeout=self.timeout) # type: ignore[attr-defined]
if response.status_code >= 400:
raise ElytraAPIError(f"Failed to download media: {response.status_code}")
return response.content
def delete_media_file(self, media_file_id: int) -> None:
"""
Delete a media file by ID.
Args:
media_file_id: ID of the media file to delete
"""
url = urljoin(self.base_url, f"/rest/media/file/{media_file_id}") # type: ignore[attr-defined]
params = None
if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined]
params = self.auth.get_url_parameters() # type: ignore[attr-defined]
response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined]
if response.status_code >= 400:
raise ElytraAPIError(f"Failed to delete media file: {response.status_code}")

View file

@ -0,0 +1,193 @@
# mypy: disable-error-code="attr-defined, no-any-return"
"""Product Groups mixin for the Lobster REST API client."""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from urllib.parse import urljoin
from ...exceptions import ElytraAPIError
from ..auth import AuthMethod
from ..models import (
ProductGroupBulkCreateResponse,
ProductGroupBulkUpdateResponse,
ProductGroupHierarchyResponse,
ProductGroupListResponse,
SingleProductGroupResponse,
)
class ProductGroupsMixin:
"""Mixin for product group-related operations.
# type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base
Expects to be mixed with LobsterRestApiClientBase or a compatible base class
that provides: base_url, auth, timeout, session, _make_request(), _handle_response().
"""
# Type hints for mixed-in attributes from base class (docstring only for clarity)
# base_url: str - from LobsterRestApiClientBase
# auth: Any - from LobsterRestApiClientBase
# timeout: int - from LobsterRestApiClientBase
# session: requests.Session - from LobsterRestApiClientBase
def get_all_product_groups(
self,
lang: Optional[str] = None,
page: int = 1,
limit: int = 10,
) -> ProductGroupListResponse:
"""
Get all product groups.
Args:
lang: Language code (optional)
page: Page number (default: 1)
limit: Number of groups per page (default: 10)
Returns:
ProductGroupListResponse with paginated list of groups
"""
params: Dict[str, Any] = {"page": page, "limit": limit}
if lang:
params["lang"] = lang
return self._make_request( # type: ignore[attr-defined]
"GET",
"groups",
ProductGroupListResponse,
params=params,
)
def create_product_group(self, group_data: Dict[str, Any]) -> SingleProductGroupResponse:
"""
Create a new product group.
Args:
group_data: Product group data
Returns:
SingleProductGroupResponse with created group
"""
return self._make_request( # type: ignore[attr-defined]
"POST",
"groups",
SingleProductGroupResponse,
json_data=group_data,
)
def update_product_group(self, group_data: Dict[str, Any]) -> ProductGroupListResponse:
"""
Update a product group.
Args:
group_data: Updated group data (must include 'id')
Returns:
ProductGroupListResponse with updated group info
"""
return self._make_request( # type: ignore[attr-defined]
"PATCH",
"groups",
ProductGroupListResponse,
json_data=group_data,
)
def get_product_group_by_id(
self, group_id: int, lang: Optional[str] = None
) -> SingleProductGroupResponse:
"""
Get a product group by ID.
Args:
group_id: ID of the product group
lang: Language code (optional)
Returns:
SingleProductGroupResponse with group details
"""
params: Dict[str, Any] = {}
if lang:
params["lang"] = lang
return self._make_request( # type: ignore[attr-defined]
"GET",
f"groups/{group_id}",
SingleProductGroupResponse,
params=params if params else None,
)
def delete_product_group(self, group_id: int) -> None:
"""
Delete a product group by ID.
Args:
group_id: ID of the product group to delete
"""
url = urljoin(self.base_url, f"/rest/groups/{group_id}") # type: ignore[attr-defined]
params = None
if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined]
params = self.auth.get_url_parameters() # type: ignore[attr-defined]
response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined]
if response.status_code >= 400:
raise ElytraAPIError(f"Failed to delete product group: {response.status_code}")
def create_multiple_product_groups(
self, groups_list: List[Dict[str, Any]]
) -> ProductGroupBulkCreateResponse:
"""
Create multiple product groups in bulk.
Args:
groups_list: List of product group data
Returns:
ProductGroupBulkCreateResponse with created groups
"""
return self._make_request( # type: ignore[attr-defined]
"POST",
"groups/bulk",
ProductGroupBulkCreateResponse,
json_data=groups_list,
)
def update_multiple_product_groups(
self, groups_list: List[Dict[str, Any]]
) -> ProductGroupBulkUpdateResponse:
"""
Update multiple product groups in bulk.
Args:
groups_list: List of product group data to update (each must include 'id')
Returns:
ProductGroupBulkUpdateResponse with updated groups
"""
return self._make_request( # type: ignore[attr-defined]
"PATCH",
"groups/bulk",
ProductGroupBulkUpdateResponse,
json_data=groups_list,
)
def get_product_group_hierarchy(
self, group_id: int, depth: int = 10
) -> ProductGroupHierarchyResponse:
"""
Get the hierarchy of a product group.
Args:
group_id: ID of the product group
depth: Depth of the hierarchy (default: 10)
Returns:
ProductGroupHierarchyResponse with hierarchical group structure
"""
params = {"depth": depth}
return self._make_request( # type: ignore[attr-defined]
"GET",
f"groups/{group_id}/hierarchy",
ProductGroupHierarchyResponse,
params=params,
)

View file

@ -0,0 +1,212 @@
# mypy: disable-error-code="attr-defined, no-any-return"
"""Products mixin for the Lobster REST API client."""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from urllib.parse import urljoin
from ...exceptions import ElytraAPIError
from ..auth import AuthMethod
from ..models import (
ProductBulkCreateResponse,
ProductBulkUpdateResponse,
ProductHierarchyResponse,
ProductListResponse,
SingleProductResponse,
)
class ProductsMixin:
"""Mixin for product-related operations.
# type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base
Expects to be mixed with LobsterRestApiClientBase or a compatible base class
that provides: base_url, auth, timeout, session, _make_request(), _handle_response().
"""
# Type hints for mixed-in attributes from base class (docstring only for clarity)
# base_url: str - from LobsterRestApiClientBase
# auth: Any - from LobsterRestApiClientBase
# timeout: int - from LobsterRestApiClientBase
# session: requests.Session - from LobsterRestApiClientBase
def get_all_products(
self,
lang: Optional[str] = None,
page: int = 1,
limit: int = 10,
group_id: Optional[int] = None,
) -> ProductListResponse:
"""
Get all products with optional group filter.
Args:
lang: Language code (optional)
page: Page number (default: 1)
limit: Number of products per page (default: 10)
group_id: Optional product group ID to filter products
Returns:
ProductListResponse with paginated list of products
"""
params: Dict[str, Any] = {"page": page, "limit": limit}
if lang:
params["lang"] = lang
if group_id:
params["groupId"] = group_id
return self._make_request( # type: ignore[attr-defined]
"GET",
"products",
ProductListResponse,
params=params,
)
def create_product(self, product_data: Dict[str, Any]) -> SingleProductResponse:
"""
Create a new product.
Args:
product_data: Product data
Returns:
SingleProductResponse with created product
"""
return self._make_request( # type: ignore[attr-defined]
"POST",
"products",
SingleProductResponse,
json_data=product_data,
)
def update_product(self, product_data: Dict[str, Any]) -> ProductListResponse:
"""
Update a product.
Args:
product_data: Updated product data (must include 'id')
Returns:
ProductListResponse with updated product info
"""
return self._make_request( # type: ignore[attr-defined]
"PATCH",
"products",
ProductListResponse,
json_data=product_data,
)
def create_multiple_products(
self, products_list: List[Dict[str, Any]]
) -> ProductBulkCreateResponse:
"""
Create multiple products in bulk.
Args:
products_list: List of product data
Returns:
ProductBulkCreateResponse with created products
"""
return self._make_request( # type: ignore[attr-defined]
"POST",
"products/bulk",
ProductBulkCreateResponse,
json_data=products_list,
)
def update_multiple_products(
self, products_list: List[Dict[str, Any]]
) -> ProductBulkUpdateResponse:
"""
Update multiple products in bulk.
Args:
products_list: List of product data to update (each must include 'id')
Returns:
ProductBulkUpdateResponse with updated products
"""
return self._make_request( # type: ignore[attr-defined]
"PATCH",
"products/bulk",
ProductBulkUpdateResponse,
json_data=products_list,
)
def get_product_by_id(
self, product_id: int, lang: Optional[str] = None
) -> SingleProductResponse:
"""
Get a product by ID.
Args:
product_id: ID of the product
lang: Language code (optional)
Returns:
SingleProductResponse with product details
"""
params: Dict[str, Any] = {}
if lang:
params["lang"] = lang
return self._make_request( # type: ignore[attr-defined]
"GET",
f"products/{product_id}",
SingleProductResponse,
params=params if params else None,
)
def delete_product(self, product_id: int) -> None:
"""
Delete a product by ID.
Args:
product_id: ID of the product to delete
"""
url = urljoin(self.base_url, f"/rest/products/{product_id}") # type: ignore[attr-defined]
params = None
if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined]
params = self.auth.get_url_parameters() # type: ignore[attr-defined]
response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined]
if response.status_code >= 400:
raise ElytraAPIError(f"Failed to delete product: {response.status_code}")
def product_operation(self, operation_data: Dict[str, Any]) -> ProductListResponse:
"""
Perform bulk operations on products (copy, move, link, copy-structure).
Args:
operation_data: Operation details including operation type, source, target, etc.
Returns:
ProductListResponse with affected products
"""
return self._make_request( # type: ignore[attr-defined]
"POST",
"products/operation",
ProductListResponse,
json_data=operation_data,
)
def get_product_hierarchy(self, product_id: int, depth: int = 10) -> ProductHierarchyResponse:
"""
Get the hierarchy of a product.
Args:
product_id: ID of the product
depth: Depth of the hierarchy (default: 10)
Returns:
ProductHierarchyResponse with hierarchical product structure
"""
params = {"depth": depth}
return self._make_request( # type: ignore[attr-defined]
"GET",
f"products/{product_id}/hierarchy",
ProductHierarchyResponse,
params=params,
)

View file

@ -0,0 +1,134 @@
# mypy: disable-error-code="attr-defined, no-any-return"
"""Protocols mixin for the Lobster REST API client."""
from typing import Optional
from urllib.parse import urljoin
from ..models import ProtocolCategoryListResponse, ProtocolInfo, ProtocolListResponse
class ProtocolsMixin:
"""Mixin for protocol-related operations.
# type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base
Expects to be mixed with LobsterRestApiClientBase or a compatible base class
that provides: base_url, auth, timeout, session, _make_request(), _handle_response().
"""
# Type hints for mixed-in attributes from base class (docstring only for clarity)
# base_url: str - from LobsterRestApiClientBase
# auth: Any - from LobsterRestApiClientBase
# timeout: int - from LobsterRestApiClientBase
# session: requests.Session - from LobsterRestApiClientBase
def get_protocols(self, limit: int = 50) -> ProtocolListResponse:
"""
Get the last N protocols.
Args:
limit: Number of protocols to retrieve (default: 50)
Returns:
ProtocolListResponse containing list of protocols
"""
response = self.session.get( # type: ignore[attr-defined]
urljoin(self.base_url, "/rest/protocol"), # type: ignore[attr-defined]
params={"limit": limit, **self.auth.get_url_parameters()}, # type: ignore[attr-defined]
timeout=self.timeout, # type: ignore[attr-defined]
)
return self._handle_response(response, ProtocolListResponse) # type: ignore[attr-defined]
def get_protocol(self, protocol_id: str) -> ProtocolInfo:
"""
Get details of a specific protocol.
Args:
protocol_id: ID of the protocol
Returns:
ProtocolInfo with protocol details
"""
return self._make_request( # type: ignore[attr-defined]
"GET",
f"protocol/{protocol_id}",
ProtocolInfo,
)
def get_protocol_by_job_id(self, job_id: int) -> ProtocolListResponse:
"""
Get all protocols for a specific job ID.
Args:
job_id: ID of the job
Returns:
ProtocolListResponse containing protocols for the job
"""
return self._make_request( # type: ignore[attr-defined]
"GET",
f"protocol/job/{job_id}",
ProtocolListResponse,
)
def get_protocol_by_runtime_id(self, runtime_id: str) -> ProtocolInfo:
"""
Get protocol for a specific job instance runtime ID.
Args:
runtime_id: Runtime ID of the job instance
Returns:
ProtocolInfo with protocol details
"""
return self._make_request( # type: ignore[attr-defined]
"GET",
f"protocol/job/{runtime_id}",
ProtocolInfo,
)
def get_protocol_by_additional_reference(self, reference: str) -> ProtocolListResponse:
"""
Get all protocols for a specific additional reference.
Args:
reference: Additional reference value
Returns:
ProtocolListResponse containing protocols for the reference
"""
return self._make_request( # type: ignore[attr-defined]
"GET",
f"protocol/job/{reference}",
ProtocolListResponse,
)
def get_all_protocol_categories(self) -> ProtocolCategoryListResponse:
"""
Get all available protocol categories.
Returns:
ProtocolCategoryListResponse containing list of categories
"""
return self._make_request( # type: ignore[attr-defined]
"GET",
"protocol/category",
ProtocolCategoryListResponse,
)
def get_protocol_by_category(self, category_id: str, limit: int = 50) -> ProtocolListResponse:
"""
Get the last N protocols from a specific category.
Args:
category_id: ID of the protocol category
limit: Number of protocols to retrieve (default: 50)
Returns:
ProtocolListResponse containing protocols for the category
"""
return self._make_request( # type: ignore[attr-defined]
"GET",
f"protocol/category/{category_id}",
ProtocolListResponse,
params={"limit": limit},
)

View file

@ -0,0 +1,164 @@
# mypy: disable-error-code="attr-defined, no-any-return"
"""Text mixin for the Lobster REST API client."""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from ..models import (
SingleTextResponse,
TextBulkCreateResponse,
TextBulkUpdateResponse,
TextListResponse,
)
class TextMixin:
"""Mixin for text-related operations.
# type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base
Expects to be mixed with LobsterRestApiClientBase or a compatible base class
that provides: base_url, auth, timeout, session, _make_request(), _handle_response().
"""
# Type hints for mixed-in attributes from base class (docstring only for clarity)
# base_url: str - from LobsterRestApiClientBase
# auth: Any - from LobsterRestApiClientBase
# timeout: int - from LobsterRestApiClientBase
# session: requests.Session - from LobsterRestApiClientBase
def get_all_texts(
self, lang: Optional[str] = None, page: int = 1, limit: int = 10
) -> TextListResponse:
"""
Get all text entries with pagination.
Args:
lang: Language code (optional)
page: Page number (default: 1)
limit: Number of text items per page (default: 10)
Returns:
TextListResponse containing paginated list of text items
"""
params: Dict[str, Any] = {"page": page, "limit": limit}
if lang:
params["lang"] = lang
return self._make_request( # type: ignore[attr-defined]
"GET",
"text",
TextListResponse,
params=params,
)
def get_text_by_id(self, text_id: int, lang: Optional[str] = None) -> SingleTextResponse:
"""
Get a text entry by ID.
Args:
text_id: ID of the text entry
lang: Language code (optional)
Returns:
SingleTextResponse with text details
"""
params: Dict[str, Any] = {}
if lang:
params["lang"] = lang
return self._make_request( # type: ignore[attr-defined]
"GET",
f"text/{text_id}",
SingleTextResponse,
params=params if params else None,
)
def create_text(self, text_data: Dict[str, Any]) -> SingleTextResponse:
"""
Create a new text entry.
Args:
text_data: Text entry data
Returns:
SingleTextResponse with created text
"""
return self._make_request( # type: ignore[attr-defined]
"POST",
"text",
SingleTextResponse,
json_data=text_data,
)
def update_text(self, text_data: Dict[str, Any]) -> SingleTextResponse:
"""
Update a text entry.
Args:
text_data: Updated text entry data (must include 'id')
Returns:
SingleTextResponse with updated text
"""
return self._make_request( # type: ignore[attr-defined]
"PATCH",
"text",
SingleTextResponse,
json_data=text_data,
)
def create_multiple_texts(self, texts_list: List[Dict[str, Any]]) -> TextBulkCreateResponse:
"""
Create multiple text entries in bulk.
Args:
texts_list: List of text entry data
Returns:
TextBulkCreateResponse with created text entries
"""
return self._make_request( # type: ignore[attr-defined]
"POST",
"text/bulk",
TextBulkCreateResponse,
json_data=texts_list,
)
def update_multiple_texts(self, texts_list: List[Dict[str, Any]]) -> TextBulkUpdateResponse:
"""
Update multiple text entries in bulk.
Args:
texts_list: List of text entry data to update (each must include 'id')
Returns:
TextBulkUpdateResponse with updated text entries
"""
return self._make_request( # type: ignore[attr-defined]
"PATCH",
"text/bulk",
TextBulkUpdateResponse,
json_data=texts_list,
)
def delete_text(self, text_id: int) -> None:
"""
Delete a text entry by ID.
Args:
text_id: ID of the text entry to delete
"""
from urllib.parse import urljoin
from ...exceptions import ElytraAPIError
from ..auth import AuthMethod
url = urljoin(self.base_url, f"/rest/text/{text_id}") # type: ignore[attr-defined]
params = None
if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined]
params = self.auth.get_url_parameters() # type: ignore[attr-defined]
response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined]
if response.status_code >= 400:
raise ElytraAPIError(f"Failed to delete text: {response.status_code}")

View file

@ -0,0 +1,209 @@
# mypy: disable-error-code="attr-defined, no-any-return"
"""Tree Groups mixin for the Lobster REST API client."""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from urllib.parse import urljoin
from ...exceptions import ElytraAPIError, ElytraValidationError
from ..auth import AuthMethod
from ..models import (
SingleTreeGroupResponse,
TreeGroupBulkCreateResponse,
TreeGroupBulkUpdateResponse,
TreeGroupHierarchyResponse,
TreeGroupListResponse,
)
class TreeGroupsMixin:
"""Mixin for tree group-related operations.
# type: ignore[attr-defined] # Mixin classes inherit _make_request/_handle_response from base
Expects to be mixed with LobsterRestApiClientBase or a compatible base class
that provides: base_url, auth, timeout, session, _make_request(), _handle_response().
"""
# Type hints for mixed-in attributes from base class (docstring only for clarity)
# base_url: str - from LobsterRestApiClientBase
# auth: Any - from LobsterRestApiClientBase
# timeout: int - from LobsterRestApiClientBase
# session: requests.Session - from LobsterRestApiClientBase
def get_all_tree_groups(
self,
lang: Optional[str] = None,
page: int = 1,
limit: int = 10,
) -> TreeGroupListResponse:
"""
Get all tree groups.
Args:
lang: Language code (optional)
page: Page number (default: 1)
limit: Number of groups per page (default: 10)
Returns:
TreeGroupListResponse with paginated list of tree groups
"""
params: Dict[str, Any] = {"page": page, "limit": limit}
if lang:
params["lang"] = lang
return self._make_request( # type: ignore[attr-defined]
"GET",
"tree/groups",
TreeGroupListResponse,
params=params,
)
def create_tree_group(self, group_data: Dict[str, Any]) -> SingleTreeGroupResponse:
"""
Create a new tree group.
Args:
group_data: Tree group data
Returns:
SingleTreeGroupResponse with created tree group
"""
return self._make_request( # type: ignore[attr-defined]
"POST",
"tree/groups",
SingleTreeGroupResponse,
json_data=group_data,
)
def update_tree_group(self, group_data: Dict[str, Any]) -> SingleTreeGroupResponse:
"""
Update a tree group.
Args:
group_data: Updated tree group data (must include 'id')
Returns:
SingleTreeGroupResponse with updated tree group
"""
return self._make_request( # type: ignore[attr-defined]
"PATCH",
"tree/groups",
SingleTreeGroupResponse,
json_data=group_data,
)
def get_tree_group_by_id(
self, tree_group_id: int, lang: Optional[str] = None
) -> SingleTreeGroupResponse:
"""
Get a tree group by ID.
Args:
tree_group_id: ID of the tree group
lang: Language code (optional)
Returns:
SingleTreeGroupResponse with tree group details
"""
params: Dict[str, Any] = {}
if lang:
params["lang"] = lang
return self._make_request( # type: ignore[attr-defined]
"GET",
f"tree/groups/{tree_group_id}",
SingleTreeGroupResponse,
params=params if params else None,
)
def delete_tree_group(self, tree_group_id: int) -> None:
"""
Delete a tree group by ID.
Args:
tree_group_id: ID of the tree group to delete
"""
url = urljoin(
self.base_url, f"/rest/tree/groups/{tree_group_id}" # type: ignore[attr-defined]
)
params = None
if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined]
params = self.auth.get_url_parameters() # type: ignore[attr-defined]
response = self.session.delete(url, params=params, timeout=self.timeout) # type: ignore[attr-defined]
if response.status_code >= 400:
raise ElytraAPIError(f"Failed to delete tree group: {response.status_code}")
def create_multiple_tree_groups(
self, groups_list: List[Dict[str, Any]]
) -> TreeGroupBulkCreateResponse:
"""
Create multiple tree groups in bulk.
Args:
groups_list: List of tree group data
Returns:
TreeGroupBulkCreateResponse with created tree groups
"""
return self._make_request( # type: ignore[attr-defined]
"POST",
"tree/groups/bulk",
TreeGroupBulkCreateResponse,
json_data=groups_list,
)
def update_multiple_tree_groups(
self, groups_list: List[Dict[str, Any]]
) -> TreeGroupBulkUpdateResponse:
"""
Update multiple tree groups in bulk.
Args:
groups_list: List of tree group data to update (each must include 'id')
Returns:
TreeGroupBulkUpdateResponse with updated tree groups
"""
return self._make_request( # type: ignore[attr-defined]
"PATCH",
"tree/groups/bulk",
TreeGroupBulkUpdateResponse,
json_data=groups_list,
)
def get_tree_group_hierarchy(self, depth: int = 10) -> List[TreeGroupHierarchyResponse]:
"""
Get the hierarchy of tree groups.
Args:
depth: Depth of the hierarchy (default: 10)
Returns:
List of TreeGroupHierarchyResponse with hierarchical tree structure
"""
params = {"depth": depth}
url = urljoin(self.base_url, "/rest/tree/groups/hierarchy") # type: ignore[attr-defined]
if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: # type: ignore[attr-defined]
params.update(self.auth.get_url_parameters()) # type: ignore[attr-defined]
response = self.session.request( # type: ignore[attr-defined]
method="GET",
url=url,
params=params,
timeout=self.timeout, # type: ignore[attr-defined]
)
if response.status_code >= 400:
raise ElytraAPIError(f"Failed to get tree hierarchy: {response.status_code}")
try:
data = response.json()
if isinstance(data, list):
return [TreeGroupHierarchyResponse.model_validate(item) for item in data]
else:
return [TreeGroupHierarchyResponse.model_validate(data)]
except Exception as e:
raise ElytraValidationError(f"Failed to parse tree hierarchy: {str(e)}")

View file

@ -0,0 +1,194 @@
"""Models package for Lobster PIM Legacy REST API"""
# Shared models
# Attribute group models
from .attribute_groups import (
AttributeGroupBulkCreateResponse,
AttributeGroupHierarchyNode,
AttributeGroupHierarchyResponse,
AttributeGroupListResponse,
AttributeGroupValidFor,
SingleAttributeGroupResponse,
SingleNewAttributeGroupRequestBody,
SingleUpdateAttributeGroupRequestBody,
)
# Attribute models
from .attributes import (
AttributeBulkCreateResponse,
AttributeBulkUpdateResponse,
AttributeGetByNameResponse,
AttributeListResponse,
SimpleAttributeResponse,
SingleNewAttributeRequestBody,
SingleUpdateAttributeRequestBody,
)
# Hierarchy models
from .hierarchy import HierarchyNode
# Job models
from .jobs import (
JobControlRequest,
JobControlResponse,
JobDetailInfo,
JobExecutionResponse,
JobInfo,
JobOverviewResponse,
)
# Media models
from .media import (
MediaBulkCreateResponse,
MediaBulkUpdateResponse,
MediaFileResponse,
MediaListResponse,
SingleMediaResponse,
SingleNewMediaRequestBody,
SingleUpdateMediaRequestBody,
)
# Product group models
from .product_groups import (
ProductGroupBulkCreateResponse,
ProductGroupBulkUpdateResponse,
ProductGroupHierarchyNode,
ProductGroupHierarchyResponse,
ProductGroupListResponse,
SingleNewProductGroupRequestBody,
SingleProductGroupResponse,
SingleUpdateProductGroupRequestBody,
)
# Product models
from .products import (
ProductAttributeResponse,
ProductBulkCreateResponse,
ProductBulkUpdateResponse,
ProductHierarchyNode,
ProductHierarchyResponse,
ProductListResponse,
ProductOperationRequestBody,
SingleNewProductRequestBody,
SingleProductResponse,
SingleUpdateProductRequestBody,
)
# Protocol models
from .protocols import (
ProtocolCategoryInfo,
ProtocolCategoryListResponse,
ProtocolEntry,
ProtocolInfo,
ProtocolListResponse,
)
from .shared import AttributeResponse, ErrorResponse, PaginationLinks
# Text models
from .text import (
SingleNewTextRequestBody,
SingleTextResponse,
SingleUpdateTextRequestBody,
TextBulkCreateResponse,
TextBulkUpdateResponse,
TextContentRequestBody,
TextContentResponse,
TextListResponse,
)
# Tree group models
from .tree_groups import (
SingleNewTreeGroupRequestBody,
SingleTreeGroupResponse,
SingleUpdateTreeGroupRequestBody,
TreeGroupBulkCreateResponse,
TreeGroupBulkUpdateResponse,
TreeGroupHierarchyNode,
TreeGroupHierarchyResponse,
TreeGroupListResponse,
)
__all__ = [
# Shared
"AttributeResponse",
"ErrorResponse",
"PaginationLinks",
# Hierarchy
"HierarchyNode",
# Jobs
"JobControlRequest",
"JobControlResponse",
"JobDetailInfo",
"JobExecutionResponse",
"JobInfo",
"JobOverviewResponse",
# Protocols
"ProtocolCategoryInfo",
"ProtocolCategoryListResponse",
"ProtocolEntry",
"ProtocolInfo",
"ProtocolListResponse",
# Media
"MediaBulkCreateResponse",
"MediaBulkUpdateResponse",
"MediaFileResponse",
"MediaListResponse",
"SingleMediaResponse",
"SingleNewMediaRequestBody",
"SingleUpdateMediaRequestBody",
# Products
"ProductAttributeResponse",
"ProductBulkCreateResponse",
"ProductBulkUpdateResponse",
"ProductHierarchyNode",
"ProductHierarchyResponse",
"ProductListResponse",
"ProductOperationRequestBody",
"SingleNewProductRequestBody",
"SingleProductResponse",
"SingleUpdateProductRequestBody",
# Product groups
"ProductGroupBulkCreateResponse",
"ProductGroupBulkUpdateResponse",
"ProductGroupHierarchyNode",
"ProductGroupHierarchyResponse",
"ProductGroupListResponse",
"SingleNewProductGroupRequestBody",
"SingleProductGroupResponse",
"SingleUpdateProductGroupRequestBody",
# Tree groups
"SingleNewTreeGroupRequestBody",
"SingleTreeGroupResponse",
"SingleUpdateTreeGroupRequestBody",
"TreeGroupBulkCreateResponse",
"TreeGroupBulkUpdateResponse",
"TreeGroupHierarchyNode",
"TreeGroupHierarchyResponse",
"TreeGroupListResponse",
# Attributes
"AttributeBulkCreateResponse",
"AttributeBulkUpdateResponse",
"AttributeGetByNameResponse",
"AttributeListResponse",
"SimpleAttributeResponse",
"SingleNewAttributeRequestBody",
"SingleUpdateAttributeRequestBody",
# Attribute groups
"AttributeGroupBulkCreateResponse",
"AttributeGroupHierarchyNode",
"AttributeGroupHierarchyResponse",
"AttributeGroupListResponse",
"AttributeGroupValidFor",
"SingleAttributeGroupResponse",
"SingleNewAttributeGroupRequestBody",
"SingleUpdateAttributeGroupRequestBody",
# Text
"SingleNewTextRequestBody",
"SingleTextResponse",
"SingleUpdateTextRequestBody",
"TextBulkCreateResponse",
"TextBulkUpdateResponse",
"TextContentRequestBody",
"TextContentResponse",
"TextListResponse",
]

View file

@ -0,0 +1,123 @@
"""Attribute group models"""
from typing import Dict, List, Optional
from pydantic import BaseModel, Field
class AttributeGroupValidFor(BaseModel):
"""Valid object types for an attribute group"""
product: Optional[bool] = Field(None, description="Valid for products")
productGroup: Optional[bool] = Field(None, description="Valid for product groups")
media: Optional[bool] = Field(None, description="Valid for media")
text: Optional[bool] = Field(None, description="Valid for texts")
class SingleAttributeGroupResponse(BaseModel):
"""Complete attribute group descriptor"""
id: int = Field(..., description="The ID of the attribute group")
name: str = Field(..., description="The independent name of the attribute group")
parentId: Optional[int] = Field(None, description="The ID of the parent attribute group")
validForObjectTypes: Optional[AttributeGroupValidFor] = Field(
None, description="Valid object types for this attribute group"
)
isTemplate: bool = Field(False, description="Whether the attribute group is a template")
templateId: Optional[int] = Field(None, description="The ID of the template attribute group")
clientId: int = Field(..., description="The ID of the client")
createdAt: Optional[str] = Field(
None, description="The date and time the attribute group was created"
)
createdByUserId: Optional[int] = Field(
None, description="The ID of user who created the attribute group"
)
modifiedAt: Optional[str] = Field(
None, description="The date and time the attribute group was modified"
)
modifiedByUserId: Optional[int] = Field(
None, description="The ID of user who modified the attribute group"
)
translations: Optional[Dict[str, str]] = Field(
None, description="Translations of the attribute group name"
)
class SingleNewAttributeGroupRequestBody(BaseModel):
"""Request body for creating a new attribute group"""
name: str = Field(..., description="The independent name of the attribute group")
parentId: Optional[int] = Field(None, description="The ID of the parent attribute group")
validForObjectTypes: Optional[AttributeGroupValidFor] = Field(
None, description="Valid object types for this attribute group"
)
isTemplate: Optional[bool] = Field(
False, description="Whether the attribute group is a template"
)
templateId: Optional[int] = Field(None, description="The ID of the template attribute group")
translations: Optional[Dict[str, str]] = Field(
None, description="Translations of the attribute group name"
)
class SingleUpdateAttributeGroupRequestBody(BaseModel):
"""Request body for updating an attribute group"""
id: Optional[int] = Field(None, description="The ID of the attribute group")
name: Optional[str] = Field(None, description="The independent name of the attribute group")
newName: Optional[str] = Field(
None, description="The new independent name of the attribute group"
)
parentId: Optional[int] = Field(None, description="The ID of the parent attribute group")
validForObjectTypes: Optional[AttributeGroupValidFor] = Field(
None, description="Valid object types for this attribute group"
)
isTemplate: Optional[bool] = Field(
None, description="Whether the attribute group is a template"
)
templateId: Optional[int] = Field(None, description="The ID of the template attribute group")
translations: Optional[Dict[str, str]] = Field(
None, description="Translations of the attribute group name"
)
class AttributeGroupListResponse(BaseModel):
"""Paginated response containing multiple attribute groups"""
items: List[SingleAttributeGroupResponse] = Field(..., description="List of attribute groups")
total: int = Field(..., description="The total number of attribute groups")
page: int = Field(..., description="The current page number")
limit: int = Field(..., description="The number of attribute groups per page")
links: Optional[Dict[str, Optional[str]]] = Field(None, description="Pagination links")
class AttributeGroupBulkCreateResponse(BaseModel):
"""Response from bulk attribute group creation"""
items: List[SingleAttributeGroupResponse] = Field(
..., description="The created attribute groups"
)
totalItemsCreated: int = Field(..., description="The total number of attribute groups created")
class AttributeGroupHierarchyNode(BaseModel):
"""A node in the attribute group hierarchy"""
id: int = Field(..., description="The ID of the node")
name: str = Field(..., description="The name of the node")
type: str = Field(
...,
description="The type of node (attribute-group, attribute-group-template, attribute-group-derived-template, attribute)",
)
children: List["AttributeGroupHierarchyNode"] = Field(
default_factory=list, description="The immediate children of the node"
)
AttributeGroupHierarchyNode.model_rebuild()
class AttributeGroupHierarchyResponse(AttributeGroupHierarchyNode):
"""Attribute group hierarchy response"""
pass

View file

@ -0,0 +1,75 @@
"""Attribute models"""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class SimpleAttributeResponse(BaseModel):
"""Simplified attribute definition"""
id: int = Field(..., description="The ID of the attribute")
name: str = Field(..., description="The independent name of the attribute")
description: Optional[str] = Field(None, description="The description of the attribute")
attributeType: Optional[str] = Field(
None, description="The type of attribute (normal, meta, internal)"
)
type: Optional[str] = Field(None, description="The type of the attribute")
autoSync: Optional[str] = Field(None, description="The auto sync mode of the attribute")
translations: Optional[Dict[str, str]] = Field(
None, description="Translations of the attribute name"
)
class AttributeGetByNameResponse(SimpleAttributeResponse):
"""Attribute response when fetching by name"""
languageDependents: Optional[Dict[str, Any]] = Field(
None, description="Language-dependent properties"
)
class SingleNewAttributeRequestBody(BaseModel):
"""Request body for creating a new attribute"""
name: str = Field(..., description="The independent name of the attribute")
type: str = Field(..., description="The type of the attribute")
description: Optional[str] = Field(None, description="The description of the attribute")
translations: Optional[Dict[str, str]] = Field(
None, description="Translations of the attribute name"
)
class SingleUpdateAttributeRequestBody(BaseModel):
"""Request body for updating an attribute"""
id: int = Field(..., description="The ID of the attribute")
name: Optional[str] = Field(None, description="The independent name of the attribute")
description: Optional[str] = Field(None, description="The description of the attribute")
translations: Optional[Dict[str, str]] = Field(
None, description="Translations of the attribute name"
)
class AttributeListResponse(BaseModel):
"""Paginated response containing multiple attributes"""
items: List[SimpleAttributeResponse] = Field(..., description="List of attributes")
total: int = Field(..., description="The total number of attributes")
page: int = Field(..., description="The current page number")
limit: int = Field(..., description="The number of attributes per page")
links: Optional[Dict[str, Optional[str]]] = Field(None, description="Pagination links")
class AttributeBulkCreateResponse(BaseModel):
"""Response from bulk attribute creation"""
items: List[SimpleAttributeResponse] = Field(..., description="The created attributes")
totalItemsCreated: int = Field(..., description="The total number of attributes created")
class AttributeBulkUpdateResponse(BaseModel):
"""Response from bulk attribute update"""
items: List[SimpleAttributeResponse] = Field(..., description="The updated attributes")
totalItemsUpdated: int = Field(..., description="The total number of attributes updated")

View file

@ -0,0 +1,19 @@
"""Hierarchy models for tree structures"""
from typing import List
from pydantic import BaseModel, Field
class HierarchyNode(BaseModel):
"""A node in a hierarchy tree"""
id: int = Field(..., description="The ID of the node")
name: str = Field(..., description="The name of the node")
type: str = Field(..., description="The type of node (product, variant, media, text, etc.)")
children: List["HierarchyNode"] = Field(
default_factory=list, description="The immediate children of the node"
)
HierarchyNode.model_rebuild()

View file

@ -1,4 +1,4 @@
"""Models for the Lobster PIM Legacy REST API"""
"""Job management models"""
from typing import Any, Dict, List, Optional
@ -7,6 +7,7 @@ from pydantic import BaseModel, Field
class JobInfo(BaseModel):
"""Base job information model"""
id: int = Field(..., description="The ID of the job")
name: str = Field(..., description="The name of the job")
jobIdentifier: str = Field(..., description="The unique job identifier")
@ -22,12 +23,14 @@ class JobInfo(BaseModel):
class JobDetailInfo(JobInfo):
"""Detailed job information including error level and runtime ID"""
errorLevel: Optional[str] = Field(None, description="Error level (e.g., 'Erfolgreich')")
runtimeId: Optional[str] = Field(None, description="Runtime ID for active job execution")
class JobOverviewResponse(BaseModel):
"""Response containing multiple job information items"""
jobInfoObjects: List[JobDetailInfo] = Field(..., description="List of job information objects")
errors: List[str] = Field(default_factory=list, description="List of errors")
warnings: List[str] = Field(default_factory=list, description="List of warnings")
@ -35,6 +38,7 @@ class JobOverviewResponse(BaseModel):
class JobExecutionResponse(BaseModel):
"""Response from executing a job"""
id: int = Field(..., description="The ID of the job")
name: str = Field(..., description="The name of the job")
jobIdentifier: str = Field(..., description="The unique job identifier")
@ -44,12 +48,15 @@ class JobExecutionResponse(BaseModel):
protocolId: str = Field(..., description="ID of the protocol for this execution")
runtimeId: str = Field(..., description="Runtime ID for tracking execution")
errors: List[str] = Field(default_factory=list, description="List of errors")
messages: List[str] = Field(default_factory=list, description="List of messages (e.g., JOB_START_OK)")
messages: List[str] = Field(
default_factory=list, description="List of messages (e.g., JOB_START_OK)"
)
warnings: List[str] = Field(default_factory=list, description="List of warnings")
class JobControlRequest(BaseModel):
"""Request body for job control endpoint"""
action: str = Field(..., description="Action to perform (e.g., 'start')")
objectId: int = Field(..., description="The ID of the job to control")
objectType: str = Field(default="job", description="Type of object")
@ -61,9 +68,7 @@ class JobControlRequest(BaseModel):
parameter: Optional[Dict[str, Any]] = Field(
None, description="Parameters to override job settings"
)
queueId: Optional[str] = Field(
None, description="Queue ID for serialized job execution"
)
queueId: Optional[str] = Field(None, description="Queue ID for serialized job execution")
maxJobDurationSeconds: Optional[int] = Field(
default=43200, description="Max duration in seconds (default 12 hours)"
)
@ -71,59 +76,9 @@ class JobControlRequest(BaseModel):
class JobControlResponse(BaseModel):
"""Response from job control endpoint"""
jobIdentifier: str = Field(..., description="The job identifier")
runtimeId: str = Field(..., description="Runtime ID for tracking")
errors: List[str] = Field(default_factory=list, description="List of errors")
messages: List[str] = Field(default_factory=list, description="List of messages")
warnings: List[str] = Field(default_factory=list, description="List of warnings")
class ProtocolEntry(BaseModel):
"""A single entry in a protocol log"""
timestamp: Optional[str] = Field(None, description="Timestamp of the entry")
level: Optional[str] = Field(None, description="Log level (ERROR, WARNING, INFO, etc.)")
message: Optional[str] = Field(None, description="Message content")
class ProtocolInfo(BaseModel):
"""Protocol/Log information"""
id: Optional[int] = Field(None, description="Protocol ID")
protocolId: Optional[str] = Field(None, description="Protocol ID as string")
jobId: Optional[int] = Field(None, description="Associated job ID")
runtimeId: Optional[str] = Field(None, description="Runtime ID of the job execution")
jobIdentifier: Optional[str] = Field(None, description="Job identifier")
status: Optional[str] = Field(None, description="Status of the job")
startTime: Optional[str] = Field(None, description="Start time of execution")
endTime: Optional[str] = Field(None, description="End time of execution")
errors: List[str] = Field(default_factory=list, description="List of errors")
messages: List[str] = Field(default_factory=list, description="List of messages")
entries: Optional[List[ProtocolEntry]] = Field(
None, description="Protocol entries"
)
class ProtocolListResponse(BaseModel):
"""Response containing list of protocols"""
protocols: Optional[List[ProtocolInfo]] = Field(None, description="List of protocols")
errors: List[str] = Field(default_factory=list, description="List of errors")
warnings: List[str] = Field(default_factory=list, description="List of warnings")
class ProtocolCategoryInfo(BaseModel):
"""Protocol category information"""
id: str = Field(..., description="Category ID")
name: str = Field(..., description="Category name")
description: Optional[str] = Field(None, description="Category description")
class ProtocolCategoryListResponse(BaseModel):
"""Response containing list of protocol categories"""
categories: List[ProtocolCategoryInfo] = Field(..., description="List of protocol categories")
errors: List[str] = Field(default_factory=list, description="List of errors")
class ErrorResponse(BaseModel):
"""Error response from the REST API"""
error: str = Field(..., description="Error message")
errorCode: Optional[str] = Field(None, description="Error code")
details: Optional[str] = Field(None, description="Error details")

View file

@ -0,0 +1,133 @@
"""Media models"""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from .shared import AttributeResponse, PaginationLinks
class MediaFileResponse(BaseModel):
"""Media file metadata"""
id: int = Field(..., description="The ID of the media file")
clientId: int = Field(..., description="The ID of the client")
mediaId: int = Field(..., description="The ID of the media descriptor")
languageCode: str = Field(..., description="The language code for this media file")
mimeType: str = Field(..., description="The MIME type of the media file")
sourceMimeType: str = Field(..., description="The original MIME type before any conversion")
mamSystem: str = Field(..., description="The Media Asset Management system name")
mamId1: Optional[str] = Field(None, description="MAM system identifier 1")
mamId2: Optional[str] = Field(None, description="MAM system identifier 2")
mamId3: Optional[str] = Field(None, description="MAM system identifier 3")
mamId4: Optional[str] = Field(None, description="MAM system identifier 4")
contentLength: int = Field(..., description="The size of the media file in bytes")
updateCount: int = Field(
..., description="The number of times this media file has been updated"
)
changedAt: Optional[str] = Field(
None, description="The date and time the media file was last changed"
)
changedBy: Optional[int] = Field(
None, description="The ID of the user who last changed the media file"
)
class SingleMediaResponse(BaseModel):
"""Complete media descriptor"""
id: int = Field(..., description="The ID of the media content")
name: str = Field(..., description="The name of the media")
treeId: int = Field(..., description="The ID of the tree this media belongs to")
clientId: int = Field(..., description="The ID of the client")
attributeGroupId: int = Field(..., description="The ID of the media default attribute group")
pictureTypeId: Optional[int] = Field(None, description="The ID of the picture type")
originalId: Optional[int] = Field(
None, description="The ID of the original media if this is a copy"
)
objectStatus: Optional[str] = Field(
None, description="The status of the object (original, copy)"
)
userObjectStatus: Optional[int] = Field(None, description="User-defined object status ID")
createdAt: Optional[str] = Field(None, description="The date and time the media was created")
createdBy: Optional[int] = Field(None, description="The ID of the user who created the media")
modifiedAt: Optional[str] = Field(
None, description="The date and time the media was last modified"
)
modifiedBy: Optional[int] = Field(
None, description="The ID of the user who last modified the media"
)
files: List[MediaFileResponse] = Field(
default_factory=list, description="The files associated with this media"
)
attributes: List[AttributeResponse] = Field(
default_factory=list, description="The attribute values of the media"
)
translations: Optional[Dict[str, str]] = Field(
None, description="Translations of the media name by language code"
)
class SingleNewMediaRequestBody(BaseModel):
"""Request body for creating a new media descriptor"""
name: str = Field(..., description="Name of the media item")
attributeGroupId: Optional[int] = Field(
None, description="The ID of the media default attribute group"
)
pictureTypeId: Optional[int] = Field(None, description="The ID of the picture type")
treeId: Optional[int] = Field(None, description="The ID of the tree this media belongs to")
originalId: Optional[int] = Field(
None, description="If this is a copy, the ID of the original media"
)
objectStatus: Optional[str] = Field(None, description="The status of the object")
userObjectStatus: Optional[int] = Field(None, description="Custom user object status ID")
attributes: Optional[List[Dict[str, Any]]] = Field(None, description="List of media attributes")
translations: Optional[Dict[str, str]] = Field(
None, description="Translations of the media name"
)
class SingleUpdateMediaRequestBody(BaseModel):
"""Request body for updating a media descriptor"""
id: int = Field(..., description="The ID of the media descriptor to update")
name: Optional[str] = Field(None, description="Name of the media item")
attributeGroupId: Optional[int] = Field(
None, description="The ID of the media default attribute group"
)
pictureTypeId: Optional[int] = Field(None, description="The ID of the picture type")
treeId: Optional[int] = Field(None, description="The ID of the tree this media belongs to")
originalId: Optional[int] = Field(
None, description="If this is a copy, the ID of the original media"
)
objectStatus: Optional[str] = Field(None, description="The status of the object")
userObjectStatus: Optional[int] = Field(None, description="Custom user object status ID")
attributes: Optional[List[Dict[str, Any]]] = Field(None, description="List of media attributes")
translations: Optional[Dict[str, str]] = Field(
None, description="Translations of the media name"
)
class MediaListResponse(BaseModel):
"""Paginated list of media descriptors"""
items: List[SingleMediaResponse] = Field(..., description="List of media items")
total: int = Field(..., description="The total number of media items")
page: int = Field(..., description="The current page number")
limit: int = Field(..., description="The number of media items per page")
links: PaginationLinks = Field(..., description="Pagination links")
class MediaBulkCreateResponse(BaseModel):
"""Response from bulk media creation"""
items: List[SingleMediaResponse] = Field(..., description="List of created media items")
totalItemsCreated: int = Field(..., description="The total number of media items created")
class MediaBulkUpdateResponse(BaseModel):
"""Response from bulk media update"""
items: List[SingleMediaResponse] = Field(..., description="List of updated media items")
totalItemsUpdated: int = Field(..., description="The total number of media items updated")

View file

@ -0,0 +1,105 @@
"""Product group models"""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from .products import ProductAttributeResponse
class SingleProductGroupResponse(BaseModel):
"""Complete product group descriptor"""
id: int = Field(..., description="The ID of the product group")
name: str = Field(..., description="The independent name of the product group")
type: Optional[str] = Field(None, description="The type of product group")
parentId: Optional[int] = Field(
None, description="The ID of the parent product group or tree group"
)
objectStatus: Optional[str] = Field(None, description="The status of the object")
attributeGroupId: Optional[int] = Field(None, description="The ID of the attribute group")
clientId: int = Field(..., description="The ID of the client")
translations: Optional[Dict[str, str]] = Field(
None, description="Translations of the product group name"
)
attributes: List[ProductAttributeResponse] = Field(
default_factory=list, description="The attributes of the product group"
)
class SingleNewProductGroupRequestBody(BaseModel):
"""Request body for creating a new product group"""
name: str = Field(..., description="The independent name of the product group")
type: Optional[str] = Field(None, description="The type of product group")
parentId: Optional[int] = Field(
None, description="The ID of the parent product group or tree group"
)
attributeGroupId: Optional[int] = Field(None, description="The ID of the attribute group")
translations: Optional[Dict[str, str]] = Field(
None, description="Translations of the product group name"
)
attributes: Optional[List[Dict[str, Any]]] = Field(
None, description="The attributes of the product group"
)
class SingleUpdateProductGroupRequestBody(BaseModel):
"""Request body for updating a product group"""
id: int = Field(..., description="The ID of the product group")
name: Optional[str] = Field(None, description="The independent name of the product group")
parentId: Optional[int] = Field(None, description="The ID of the parent product group")
attributeGroupId: Optional[int] = Field(None, description="The ID of the attribute group")
translations: Optional[Dict[str, str]] = Field(
None, description="Translations of the product group name"
)
attributes: Optional[List[Dict[str, Any]]] = Field(
None, description="The attributes of the product group"
)
class ProductGroupListResponse(BaseModel):
"""Paginated response containing multiple product groups"""
items: List[SingleProductGroupResponse] = Field(..., description="List of product groups")
total: int = Field(..., description="The total number of product groups")
page: int = Field(..., description="The current page number")
limit: int = Field(..., description="The number of product groups per page")
links: Optional[Dict[str, Optional[str]]] = Field(None, description="Pagination links")
class ProductGroupBulkCreateResponse(BaseModel):
"""Response from bulk product group creation"""
items: List[SingleProductGroupResponse] = Field(..., description="The created product groups")
totalItemsCreated: int = Field(..., description="The total number of product groups created")
class ProductGroupBulkUpdateResponse(BaseModel):
"""Response from bulk product group update"""
items: List[SingleProductGroupResponse] = Field(..., description="The updated product groups")
totalItemsUpdated: int = Field(..., description="The total number of product groups updated")
class ProductGroupHierarchyNode(BaseModel):
"""A node in the product group hierarchy"""
id: int = Field(..., description="The ID of the node")
name: str = Field(..., description="The name of the node")
type: str = Field(
..., description="The type of node (product-group, product, variant, text, media)"
)
children: List["ProductGroupHierarchyNode"] = Field(
default_factory=list, description="The immediate children of the node"
)
ProductGroupHierarchyNode.model_rebuild()
class ProductGroupHierarchyResponse(ProductGroupHierarchyNode):
"""Product group hierarchy response"""
pass

View file

@ -0,0 +1,120 @@
"""Product models"""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class ProductOperationRequestBody(BaseModel):
"""Request body for product operations (copy, move, link, copy-structure)"""
operation: str = Field(..., description="Operation: copy, move, link, or copy-structure")
productId: int = Field(..., description="The ID of the product to perform operation on")
parentId: int = Field(..., description="The ID of the destination parent")
class ProductAttributeResponse(BaseModel):
"""An attribute value associated with a product"""
id: int = Field(..., description="The ID of the attribute")
attributeId: int = Field(..., description="The ID of the attribute definition")
attributeName: str = Field(..., description="The independent name of the attribute")
attributeType: str = Field(..., description="The type of attribute (normal, meta, internal)")
type: str = Field(..., description="The attribute type")
value: Optional[str] = Field(None, description="The value of the attribute")
autoSync: Optional[str] = Field(None, description="The auto sync mode of the attribute")
languageCode: Optional[str] = Field(None, description="The language code of the attribute")
modified: Optional[str] = Field(
None, description="The date and time the attribute was modified"
)
modifierByUserId: Optional[int] = Field(
None, description="The ID of user who modified the attribute"
)
inherited: bool = Field(False, description="Whether the attribute is inherited")
class SingleProductResponse(BaseModel):
"""Complete product descriptor"""
id: int = Field(..., description="The ID of the product")
clientId: int = Field(..., description="The ID of the client")
productName: str = Field(..., description="The name of the product")
treeId: int = Field(..., description="The ID of the tree")
created: Optional[str] = Field(None, description="The date and time the product was created")
modified: Optional[str] = Field(None, description="The date and time the product was modified")
creatorUserId: Optional[int] = Field(None, description="The ID of user who created the product")
modifierUserId: Optional[int] = Field(
None, description="The ID of user who modified the product"
)
objectStatus: Optional[str] = Field(None, description="The status of the object")
originalId: Optional[int] = Field(None, description="The ID of the original product")
attributes: List[ProductAttributeResponse] = Field(
default_factory=list, description="The attributes of the product"
)
class SingleNewProductRequestBody(BaseModel):
"""Request body for creating a new product"""
productName: str = Field(..., description="The name of the product")
parentId: int = Field(..., description="The ID of the parent group or product")
attributeGroupId: int = Field(..., description="The ID of the attribute group")
attributes: Optional[List[Dict[str, Any]]] = Field(
None, description="The attributes of the product"
)
class SingleUpdateProductRequestBody(BaseModel):
"""Request body for updating a product"""
id: int = Field(..., description="The ID of the product")
productName: Optional[str] = Field(None, description="The name of the product")
parentId: Optional[int] = Field(None, description="The ID of the parent group or product")
attributeGroupId: Optional[int] = Field(None, description="The ID of the attribute group")
attributes: Optional[List[Dict[str, Any]]] = Field(
None, description="The attributes of the product"
)
class ProductListResponse(BaseModel):
"""Paginated response containing multiple products"""
items: List[SingleProductResponse] = Field(..., description="List of products")
total: int = Field(..., description="The total number of products")
page: int = Field(..., description="The current page number")
limit: int = Field(..., description="The number of products per page")
links: Optional[Dict[str, Optional[str]]] = Field(None, description="Pagination links")
class ProductBulkCreateResponse(BaseModel):
"""Response from bulk product creation"""
items: List[SingleProductResponse] = Field(..., description="The created products")
totalItemsCreated: int = Field(..., description="The total number of products created")
class ProductBulkUpdateResponse(BaseModel):
"""Response from bulk product update"""
items: List[SingleProductResponse] = Field(..., description="The updated products")
totalItemsUpdated: int = Field(..., description="The total number of products updated")
class ProductHierarchyNode(BaseModel):
"""A node in the product hierarchy"""
id: int = Field(..., description="The ID of the node")
name: str = Field(..., description="The name of the node")
type: str = Field(..., description="The type of node (product, variant, text, media)")
children: List["ProductHierarchyNode"] = Field(
default_factory=list, description="The immediate children of the node"
)
ProductHierarchyNode.model_rebuild()
class ProductHierarchyResponse(ProductHierarchyNode):
"""Product hierarchy response"""
pass

View file

@ -0,0 +1,52 @@
"""Protocol/Log models"""
from typing import List, Optional
from pydantic import BaseModel, Field
class ProtocolEntry(BaseModel):
"""A single entry in a protocol log"""
timestamp: Optional[str] = Field(None, description="Timestamp of the entry")
level: Optional[str] = Field(None, description="Log level (ERROR, WARNING, INFO, etc.)")
message: Optional[str] = Field(None, description="Message content")
class ProtocolInfo(BaseModel):
"""Protocol/Log information"""
id: Optional[int] = Field(None, description="Protocol ID")
protocolId: Optional[str] = Field(None, description="Protocol ID as string")
jobId: Optional[int] = Field(None, description="Associated job ID")
runtimeId: Optional[str] = Field(None, description="Runtime ID of the job execution")
jobIdentifier: Optional[str] = Field(None, description="Job identifier")
status: Optional[str] = Field(None, description="Status of the job")
startTime: Optional[str] = Field(None, description="Start time of execution")
endTime: Optional[str] = Field(None, description="End time of execution")
errors: List[str] = Field(default_factory=list, description="List of errors")
messages: List[str] = Field(default_factory=list, description="List of messages")
entries: Optional[List[ProtocolEntry]] = Field(None, description="Protocol entries")
class ProtocolListResponse(BaseModel):
"""Response containing list of protocols"""
protocols: Optional[List[ProtocolInfo]] = Field(None, description="List of protocols")
errors: List[str] = Field(default_factory=list, description="List of errors")
warnings: List[str] = Field(default_factory=list, description="List of warnings")
class ProtocolCategoryInfo(BaseModel):
"""Protocol category information"""
id: str = Field(..., description="Category ID")
name: str = Field(..., description="Category name")
description: Optional[str] = Field(None, description="Category description")
class ProtocolCategoryListResponse(BaseModel):
"""Response containing list of protocol categories"""
categories: List[ProtocolCategoryInfo] = Field(..., description="List of protocol categories")
errors: List[str] = Field(default_factory=list, description="List of errors")

View file

@ -0,0 +1,46 @@
"""Shared models and common types for the Lobster PIM Legacy REST API"""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class ErrorResponse(BaseModel):
"""Error response from the REST API"""
error: str = Field(..., description="Error message")
errorCode: Optional[str] = Field(None, description="Error code")
details: Optional[str] = Field(None, description="Error details")
class PaginationLinks(BaseModel):
"""Pagination links for list responses"""
self: str = Field(..., description="Link to current page")
next: Optional[str] = Field(None, description="Link to next page")
previous: Optional[str] = Field(None, description="Link to previous page")
first: str = Field(..., description="Link to first page")
last: str = Field(..., description="Link to last page")
class AttributeResponse(BaseModel):
"""Attribute value associated with an object"""
id: int = Field(..., description="The ID of the attribute value")
attributeId: int = Field(..., description="The ID of the attribute definition")
attributeName: str = Field(..., description="The independent name of the attribute")
attributeType: str = Field(
..., description="The category type of the attribute (normal, meta, internal)"
)
type: str = Field(..., description="The type of the attribute")
value: str = Field(..., description="The value of the attribute")
parentId: Optional[int] = Field(None, description="The ID of the parent object")
autoSync: str = Field(..., description="The auto sync mode")
languageCode: Optional[str] = Field(None, description="The language code of the attribute")
modifiedAt: Optional[str] = Field(
None, description="The date and time the attribute was modified"
)
modifiedBy: Optional[int] = Field(
None, description="The ID of the user who modified the attribute"
)
inherited: bool = Field(False, description="Whether the attribute is inherited")

View file

@ -0,0 +1,125 @@
"""Text models"""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from .shared import AttributeResponse
class TextContentResponse(BaseModel):
"""Text content with metadata"""
id: int = Field(..., description="The ID of the text content")
clientId: int = Field(..., description="The ID of the client")
languageCode: str = Field(..., description="The language code for this text content")
mimeType: str = Field(..., description="The MIME type of the text content")
content: str = Field(..., description="The text content")
contentLength: int = Field(..., description="The size of the text content in bytes")
workflowStatus: Optional[str] = Field(
None, description="The workflow status of the text content"
)
workflowComment: Optional[str] = Field(None, description="Comments related to the workflow")
changedAt: Optional[str] = Field(
None, description="The date and time the text content was last changed"
)
changedBy: Optional[int] = Field(
None, description="The ID of the user who last changed the text content"
)
class TextContentRequestBody(BaseModel):
"""Request body for text content"""
mimeType: str = Field(..., description="The MIME type of the text content")
content: str = Field(..., description="The text content")
languageCode: str = Field(..., description="The language code for this text content")
workflowStatus: Optional[str] = Field(
None, description="The workflow status of the text content"
)
workflowComment: Optional[str] = Field(None, description="Comments related to the workflow")
class SingleTextResponse(BaseModel):
"""Complete text descriptor"""
id: int = Field(..., description="The ID of the text descriptor")
name: str = Field(..., description="The name of the text")
treeId: Optional[int] = Field(None, description="The ID of the tree this text belongs to")
clientId: int = Field(..., description="The ID of the client")
originalId: Optional[int] = Field(
None, description="The ID of the original text if this is a copy"
)
objectStatus: Optional[str] = Field(None, description="The status of the object")
userObjectStatus: Optional[int] = Field(None, description="User-defined object status")
createdAt: Optional[str] = Field(None, description="The date and time the text was created")
createdBy: Optional[int] = Field(None, description="The ID of the user who created the text")
modifiedAt: Optional[str] = Field(
None, description="The date and time the text was last modified"
)
modifiedBy: Optional[int] = Field(
None, description="The ID of the user who last modified the text"
)
contents: List[TextContentResponse] = Field(
default_factory=list, description="The text contents for different languages"
)
attributes: List[AttributeResponse] = Field(
default_factory=list, description="The attribute values of the text"
)
class SingleNewTextRequestBody(BaseModel):
"""Request body for creating a new text"""
name: str = Field(..., description="The name of the text descriptor")
parentId: int = Field(..., description="The ID of the parent product or group")
languageCode: str = Field(..., description="The language code for the response")
textTypeId: int = Field(..., description="The ID of the text type")
contents: Optional[List[TextContentRequestBody]] = Field(
None, description="List of text contents for different languages"
)
treeId: Optional[int] = Field(None, description="The ID of the tree this text belongs to")
attributeGroupId: Optional[int] = Field(
None, description="The ID of the attribute group to assign to this text"
)
attributes: Optional[List[Dict[str, Any]]] = Field(
None, description="Optional attributes to set"
)
class SingleUpdateTextRequestBody(BaseModel):
"""Request body for updating a text"""
id: int = Field(..., description="The ID of the text descriptor to update")
name: Optional[str] = Field(None, description="The name of the text descriptor")
userObjectStatus: Optional[int] = Field(None, description="User-defined object status")
textTypeId: Optional[int] = Field(None, description="The ID of the text type")
treeId: Optional[int] = Field(None, description="The ID of the tree this text belongs to")
contents: Optional[List[TextContentRequestBody]] = Field(
None, description="List of text contents to update"
)
attributes: Optional[List[Dict[str, Any]]] = Field(None, description="Attributes to update")
class TextListResponse(BaseModel):
"""Paginated response containing multiple texts"""
items: List[SingleTextResponse] = Field(..., description="List of text items")
total: int = Field(..., description="The total number of text items")
page: int = Field(..., description="The current page number")
limit: int = Field(..., description="The number of text items per page")
links: Optional[Dict[str, Optional[str]]] = Field(None, description="Pagination links")
class TextBulkCreateResponse(BaseModel):
"""Response from bulk text creation"""
items: List[SingleTextResponse] = Field(..., description="The created text items")
totalItemsCreated: int = Field(..., description="The total number of text items created")
class TextBulkUpdateResponse(BaseModel):
"""Response from bulk text update"""
items: List[SingleTextResponse] = Field(..., description="The updated text items")
totalItemsUpdated: int = Field(..., description="The total number of text items updated")

View file

@ -0,0 +1,87 @@
"""Tree group models"""
from typing import Dict, List, Optional
from pydantic import BaseModel, Field
class SingleTreeGroupResponse(BaseModel):
"""Complete tree group descriptor"""
id: int = Field(..., description="The ID of the tree group")
name: str = Field(..., description="The independent name of the tree group")
parentId: Optional[int] = Field(None, description="The ID of the parent tree group")
clientId: int = Field(..., description="The ID of the client")
status: Optional[str] = Field(None, description="The status of the group (normal, internal)")
translations: Optional[Dict[str, str]] = Field(
None, description="Translations of the tree group name"
)
class SingleNewTreeGroupRequestBody(BaseModel):
"""Request body for creating a new tree group"""
name: str = Field(..., description="The name of the tree group")
parentId: int = Field(..., description="The ID of the parent tree group")
status: Optional[str] = Field(
"normal", description="The status of the tree group (normal, internal)"
)
translations: Optional[Dict[str, str]] = Field(
None, description="Translations of the tree group name"
)
class SingleUpdateTreeGroupRequestBody(BaseModel):
"""Request body for updating a tree group"""
id: int = Field(..., description="The ID of the tree group")
name: Optional[str] = Field(None, description="The name of the tree group")
parentId: Optional[int] = Field(None, description="The ID of the parent tree group")
status: Optional[str] = Field(None, description="The status of the tree group")
translations: Optional[Dict[str, str]] = Field(
None, description="Translations of the tree group name"
)
class TreeGroupListResponse(BaseModel):
"""Paginated response containing multiple tree groups"""
items: List[SingleTreeGroupResponse] = Field(..., description="List of tree groups")
total: int = Field(..., description="The total number of tree groups")
page: int = Field(..., description="The current page number")
limit: int = Field(..., description="The number of tree groups per page")
links: Optional[Dict[str, Optional[str]]] = Field(None, description="Pagination links")
class TreeGroupBulkCreateResponse(BaseModel):
"""Response from bulk tree group creation"""
items: List[SingleTreeGroupResponse] = Field(..., description="The created tree groups")
totalItemsCreated: int = Field(..., description="The total number of tree groups created")
class TreeGroupBulkUpdateResponse(BaseModel):
"""Response from bulk tree group update"""
items: List[SingleTreeGroupResponse] = Field(..., description="The updated tree groups")
totalItemsUpdated: int = Field(..., description="The total number of tree groups updated")
class TreeGroupHierarchyNode(BaseModel):
"""A node in the tree group hierarchy"""
id: int = Field(..., description="The ID of the node")
name: str = Field(..., description="The name of the node")
type: str = Field(..., description="The type of node (tree-group, product-group)")
children: List["TreeGroupHierarchyNode"] = Field(
default_factory=list, description="The immediate children of the node"
)
TreeGroupHierarchyNode.model_rebuild()
class TreeGroupHierarchyResponse(TreeGroupHierarchyNode):
"""Tree group hierarchy response"""
pass

View file

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "elytra-pim-client"
version = "0.3.0"
version = "0.7.0"
description = "A Pythonic client for the Elytra PIM API"
readme = "README.md"
requires-python = ">=3.9"
@ -44,8 +44,8 @@ Repository = "https://git.him-tools.de/HIM-public/elytra_client.git"
Documentation = "https://www.elytra.ch/"
Issues = "https://git.him-tools.de/HIM-public/elytra_client/issues"
[tool.setuptools]
packages = ["elytra_client"]
[tool.setuptools.packages.find]
include = ["elytra_client*"]
[tool.black]
line-length = 100
@ -56,7 +56,8 @@ profile = "black"
line_length = 100
[tool.mypy]
python_version = "0.3.0"
python_version = "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = false
ignore_missing_imports = true

View file

@ -1,14 +1,11 @@
"""Tests for the Elytra PIM Client with Pydantic validation"""
import pytest
from unittest.mock import Mock, patch
import pytest
from pydantic import ValidationError
from elytra_client import (
ElytraClient,
SingleProductResponse,
SingleNewProductRequestBody,
)
from elytra_client import ElytraClient, SingleNewProductRequestBody, SingleProductResponse
from elytra_client.exceptions import ElytraAuthenticationError, ElytraNotFoundError
@ -118,11 +115,11 @@ def test_create_product_with_pydantic(mock_request, client):
mock_request.return_value = mock_response
# Create with Pydantic model
new_product = SingleNewProductRequestBody(
new_product = SingleNewProductRequestBody( # type: ignore[arg-type]
productName="NEW-PRODUCT-001",
parentId=1,
attributeGroupId=10,
) # type: ignore - validation happens automatically, so type checker should recognize this as valid
)
result = client.create_product(new_product)
@ -134,20 +131,20 @@ def test_create_product_with_pydantic(mock_request, client):
def test_pydantic_validation_on_creation():
"""Test Pydantic validation on model creation"""
# Valid model
valid_product = SingleNewProductRequestBody(
valid_product = SingleNewProductRequestBody( # type: ignore[arg-type]
productName="VALID-PRODUCT",
parentId=1,
attributeGroupId=10,
) # type: ignore - validation happens automatically, so type checker should recognize this as valid
)
assert valid_product.productName == "VALID-PRODUCT"
# Invalid model - missing required field
with pytest.raises(ValidationError):
SingleNewProductRequestBody(
SingleNewProductRequestBody( # type: ignore[arg-type]
productName="INVALID-PRODUCT",
# Missing parentId - required
attributeGroupId=10,
) # type: ignore - this will raise a ValidationError, so type checker should recognize this as invalid
)
@patch("elytra_client.client.requests.Session.request")
@ -156,9 +153,9 @@ def test_authentication_error(mock_request, client):
mock_response = Mock()
mock_response.status_code = 401
mock_response.text = "Unauthorized"
mock_request.return_value.raise_for_status.side_effect = (
__import__("requests").exceptions.HTTPError(response=mock_response)
)
mock_request.return_value.raise_for_status.side_effect = __import__(
"requests"
).exceptions.HTTPError(response=mock_response)
with pytest.raises(ElytraAuthenticationError):
client.get_products()
@ -170,9 +167,9 @@ def test_not_found_error(mock_request, client):
mock_response = Mock()
mock_response.status_code = 404
mock_response.text = "Not Found"
mock_request.return_value.raise_for_status.side_effect = (
__import__("requests").exceptions.HTTPError(response=mock_response)
)
mock_request.return_value.raise_for_status.side_effect = __import__(
"requests"
).exceptions.HTTPError(response=mock_response)
with pytest.raises(ElytraNotFoundError):
client.get_product(product_id=999)

View file

@ -27,24 +27,28 @@ def validate_version(version: str) -> bool:
Returns:
True if valid, False otherwise
"""
pattern = r'^\d+\.\d+\.\d+(?:-[a-zA-Z0-9]+)?$'
pattern = r"^\d+\.\d+\.\d+(?:-[a-zA-Z0-9]+)?$"
return bool(re.match(pattern, version))
def update_file(file_path: Path, old_pattern: str, new_version: str) -> bool:
def update_file(
file_path: Path, old_pattern: str, new_version: str, multiline: bool = False
) -> bool:
"""Update version in a file.
Args:
file_path: Path to file to update
old_pattern: Regex pattern to find version
new_version: New version string
multiline: Whether to use multiline mode for regex
Returns:
True if successful, False otherwise
"""
try:
content = file_path.read_text()
updated_content = re.sub(old_pattern, new_version, content)
flags = re.MULTILINE if multiline else 0
updated_content = re.sub(old_pattern, new_version, content, flags=flags)
if content == updated_content:
print(f"{file_path.name} already up-to-date")
@ -89,11 +93,10 @@ def main() -> int:
# Update pyproject.toml
if pyproject_path.exists():
pattern = r'version = "[^"]+"'
# Use word boundary to match 'version' but not 'python_version'
pattern = r'^version = "[^"]+"'
success &= update_file(
pyproject_path,
pattern,
f'version = "{new_version}"'
pyproject_path, pattern, f'version = "{new_version}"', multiline=True
)
else:
print(f"{pyproject_path} not found")
@ -102,11 +105,7 @@ def main() -> int:
# Update __init__.py
if init_path.exists():
pattern = r'__version__ = "[^"]+"'
success &= update_file(
init_path,
pattern,
f'__version__ = "{new_version}"'
)
success &= update_file(init_path, pattern, f'__version__ = "{new_version}"')
else:
print(f"{init_path} not found")
success = False