Implement Lobster PIM Legacy REST API client with authentication, job control, and protocol access

- Added `rest_api` module with client and authentication classes.
- Implemented `LobsterRestApiClient` for interacting with the REST API.
- Created models for job and protocol responses using Pydantic.
- Developed examples demonstrating basic usage, API token authentication, job execution, and error handling.
- Added unit tests for authentication and client methods, including error handling scenarios.
This commit is contained in:
claudi 2026-02-20 10:35:34 +01:00
parent 1324be4084
commit 8d4fcc7d11
9 changed files with 1766 additions and 0 deletions

View file

@ -109,6 +109,62 @@ config = ElytraConfig.from_env()
client = ElytraClient(base_url=config.base_url, api_key=config.api_key)
```
## Legacy REST API (Lobster PIM)
The package also includes a subpackage for the older Lobster PIM REST API, which provides access to scheduled jobs and protocol logs that are not yet available in the newer API.
### Quick Start
```python
from elytra_client.rest_api import LobsterRestApiClient, RestApiAuth
# Create authentication
auth = RestApiAuth.from_username_password("username", "password")
# Create client for the REST API
client = LobsterRestApiClient("http://lobster-server:8080", auth=auth)
# Get all active jobs
jobs = client.get_all_active_jobs()
for job in jobs.jobInfoObjects:
print(f"Job: {job.name} - Status: {job.status}")
# Execute a job
result = client.execute_job(job_id=123)
print(f"Execution started with runtime ID: {result.runtimeId}")
# Get protocol/logs
protocols = client.get_protocols()
```
### Features
- 📋 **Job Management**: Access, monitor, and execute scheduled jobs
- 📜 **Protocol/Logs**: Retrieve execution logs and protocol information
- 🔐 **Flexible Authentication**: Username/password or API token authentication
- ⏳ **Job Control**: Execute jobs with parameter overrides and queue management
- 🎯 **Type Safety**: Full Pydantic validation for all responses
### Authentication Methods
```python
# Username/Password
auth = RestApiAuth.from_username_password("admin", "password")
# API Token (domain-specific)
auth = RestApiAuth.from_api_token("admin", "token-id", domain="Jobs")
```
### Documentation
See [elytra_client/rest_api/README.md](elytra_client/rest_api/README.md) for comprehensive REST API documentation, including:
- Complete API reference
- Authentication details
- Job management examples
- Protocol/log access
- Error handling
- Common usage scenarios
## API Methods
All methods return Pydantic models with full type validation and IDE autocompletion support.

View file

@ -3,6 +3,7 @@
__version__ = "0.2.0"
__author__ = "Your Name"
from . import rest_api
from .client import ElytraClient
from .exceptions import ElytraAPIError, ElytraAuthenticationError
from .models import (
@ -28,4 +29,6 @@ __all__ = [
"SingleUpdateProductRequestBody",
"SingleNewProductGroupRequestBody",
"SingleUpdateProductGroupRequestBody",
# Legacy REST API subpackage
"rest_api",
]

View file

@ -0,0 +1,325 @@
# Lobster PIM Legacy REST API Client
This subpackage provides a Python client for accessing the legacy REST API of Lobster PIM (Product Information Management system). It offers access to scheduled jobs and protocol logs through a clean, Pydantic-based interface.
## Features
- **Job Management**: Access, monitor, and execute scheduled jobs
- **Protocol/Log Access**: Retrieve execution logs and protocol information
- **Authentication**: Support for both username/password and API token authentication
- **Job Control**: Execute jobs with parameter overrides and queue management
- **Type Safety**: Full Pydantic model validation for all API responses
- **Error Handling**: Comprehensive exception handling for API errors
## Installation
The REST API client is included as a subpackage of `elytra-pim-client`. To use it:
```python
from elytra_client.rest_api import LobsterRestApiClient, RestApiAuth
```
## Quick Start
### Username/Password Authentication
```python
from elytra_client.rest_api import LobsterRestApiClient, RestApiAuth
# Create authentication
auth = RestApiAuth.from_username_password("username", "password")
# Create client
client = LobsterRestApiClient("http://lobster-server:8080", auth=auth)
# Get list of active jobs
jobs = client.get_all_active_jobs()
for job in jobs.jobInfoObjects:
print(f"Job: {job.name} (ID: {job.id}) - Status: {job.status}")
# Close when done
client.close()
```
### API Token Authentication
```python
from elytra_client.rest_api import LobsterRestApiClient, RestApiAuth
# Create authentication with API token
auth = RestApiAuth.from_api_token("username", "your-api-token", domain="Jobs")
# Create client
client = LobsterRestApiClient("http://lobster-server:8080", auth=auth)
```
### Using as Context Manager
```python
from elytra_client.rest_api import LobsterRestApiClient, RestApiAuth
auth = RestApiAuth.from_username_password("username", "password")
with LobsterRestApiClient("http://lobster-server:8080", auth=auth) as client:
jobs = client.get_all_active_jobs()
# Client is automatically closed when exiting the context
```
## Job Management
### Get Job Overview
```python
# Get overview of all active jobs (HTML response)
html_overview = client.get_job_html_overview()
# Get all active jobs as structured data
jobs = client.get_all_active_jobs()
for job in jobs.jobInfoObjects:
print(f"{job.name}: {job.status}")
```
### Get Specific Job Details
```python
# Get details of a specific job
job = client.get_job_detail(job_id=123)
print(f"Job: {job.name}")
print(f"Description: {job.jobDescription}")
print(f"Status: {job.status}")
print(f"Error Level: {job.errorLevel}")
```
### Execute a Job
```python
# Execute a job
result = client.execute_job(job_id=123)
print(f"Runtime ID: {result.runtimeId}")
print(f"Status: {result.status}")
print(f"Protocol ID: {result.protocolId}")
```
### Advanced Job Control
Execute jobs with parameter overrides and queue management:
```python
# Execute job with language parameter override
result = client.control_job(
job_id=123,
action="start",
parameters={"defaultlanguage": "en"},
additional_reference="my-import-batch-001",
)
print(f"Job launched with runtime ID: {result.runtimeId}")
```
#### Queue Management (v2024 R1+)
For serialized job execution using queues:
```python
# Execute multiple jobs in sequence using a queue
for i in range(5):
result = client.control_job(
job_id=123,
action="start",
queueId="import-queue-1",
maxJobDurationSeconds=3600, # 1 hour timeout
parameters={"item_id": f"item_{i}"},
)
```
## Running Job Instances
### Get All Running Instances
```python
# Get all currently running job instances
running = client.get_running_job_instances()
for instance in running.jobInfoObjects:
print(f"{instance.name} (Runtime: {instance.runtimeId}) - {instance.status}")
```
### Get Specific Running Instance
```python
# Get details of a specific running instance
instance = client.get_running_job_instance(runtime_id="1698914697288")
print(f"Job: {instance.jobInfoObjects[0].name}")
print(f"Status: {instance.jobInfoObjects[0].status}")
```
## Protocol/Log Management
### Get Recent Protocols
```python
# Get the last 50 protocols
protocols = client.get_protocols(limit=50)
```
### Get Protocol by ID
```python
# Get details of a specific protocol
protocol = client.get_protocol(protocol_id="456")
```
### Get Protocols by Job ID
```python
# Get all protocols for a specific job
protocols = client.get_protocol_by_job_id(job_id=123)
```
### Get Protocols by Runtime ID
```python
# Get protocol for a specific job execution
protocol = client.get_protocol_by_runtime_id(runtime_id="1698914697288")
```
### Get Protocols by Additional Reference
```python
# Get all protocols associated with a custom reference
protocols = client.get_protocol_by_additional_reference("my-batch-001")
```
### Protocol Categories
```python
# Get all available protocol categories
categories = client.get_all_protocol_categories()
for cat in categories.categories:
print(f"Category: {cat.name} (ID: {cat.id})")
# Get recent protocols from a specific category
cat_protocols = client.get_protocol_by_category(category_id="error-logs", limit=10)
```
## Error Handling
The client provides specific exception types for different error scenarios:
```python
from elytra_client.exceptions import (
ElytraAuthenticationError,
ElytraNotFoundError,
ElytraAPIError,
ElytraValidationError,
)
try:
job = client.get_job_detail(job_id=999)
except ElytraAuthenticationError:
print("Authentication failed - check credentials")
except ElytraNotFoundError:
print("Job not found")
except ElytraAPIError as e:
print(f"API error: {e}")
except ElytraValidationError as e:
print(f"Response validation error: {e}")
```
## Authentication Methods
### Username/Password
Credentials are passed as URL parameters or in JSON request body depending on the endpoint.
```python
auth = RestApiAuth.from_username_password("admin", "secret_password")
```
### API Token
API tokens are domain-specific (e.g., "Jobs", "Protokolle") and included in the Authorization header.
```python
# API tokens are created per domain in the web client
auth = RestApiAuth.from_api_token("admin", "e96129a5-a314-4100-a44d-f851bf68dc18", domain="Jobs")
```
Note: A token created for "Jobs" cannot be used for "Protokolle" (protocols).
## Models
### Job Models
- `JobInfo`: Basic job information
- `JobDetailInfo`: Detailed job information with error level
- `JobOverviewResponse`: Response containing multiple jobs
- `JobExecutionResponse`: Response from executing a job
- `JobControlRequest`: Request body for job control
- `JobControlResponse`: Response from job control endpoint
### Protocol Models
- `ProtocolInfo`: Single protocol/log information
- `ProtocolEntry`: Individual log entry
- `ProtocolListResponse`: Response containing multiple protocols
- `ProtocolCategoryInfo`: Protocol category information
- `ProtocolCategoryListResponse`: Response containing categories
## Common Scenarios
### Monitor Job Execution
```python
import time
auth = RestApiAuth.from_username_password("admin", "password")
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
# Start a job
result = client.execute_job(job_id=123)
runtime_id = result.runtimeId
# Poll for completion
while True:
running = client.get_running_job_instances()
job_running = any(j.runtimeId == runtime_id for j in running.jobInfoObjects)
if not job_running:
print("Job completed!")
break
print("Job still running...")
time.sleep(5)
# Get the protocol for this execution
protocol = client.get_protocol_by_runtime_id(runtime_id)
print(f"Protocol: {protocol}")
```
### Batch Job Execution
```python
# Execute multiple jobs in sequence with queue management
for item_id in ["item_001", "item_002", "item_003"]:
result = client.control_job(
job_id=456, # Import job
action="start",
queueId="batch-processing",
parameters={"item_id": item_id},
additional_reference=f"batch-item-{item_id}",
)
print(f"Queued: {result.jobIdentifier}")
```
## Versioning
This client supports Lobster PIM starting from version 2023 R1 for API tokens. Some features require specific versions:
- **API Tokens**: 2023 R1+
- **Additional Reference in Job Control**: 2022 R4+
- **Queue Management**: 2024 R1+
- **Temporary Job Limiting**: 2023 R3+
## References
- [Lobster PIM REST API Documentation](https://documentation.lobster-pim.de/)
- Lobster PIM Web Client Manual - Chapter on API Token generation and REST API usage

View file

@ -0,0 +1,34 @@
"""Lobster PIM Legacy REST API client and utilities"""
from .auth import AuthMethod, RestApiAuth
from .client import LobsterRestApiClient
from .models import (
ErrorResponse,
JobControlRequest,
JobControlResponse,
JobDetailInfo,
JobExecutionResponse,
JobInfo,
JobOverviewResponse,
ProtocolCategoryInfo,
ProtocolCategoryListResponse,
ProtocolInfo,
ProtocolListResponse,
)
__all__ = [
"RestApiAuth",
"AuthMethod",
"LobsterRestApiClient",
"JobInfo",
"JobDetailInfo",
"JobOverviewResponse",
"JobExecutionResponse",
"JobControlRequest",
"JobControlResponse",
"ProtocolInfo",
"ProtocolListResponse",
"ProtocolCategoryInfo",
"ProtocolCategoryListResponse",
"ErrorResponse",
]

View file

@ -0,0 +1,101 @@
"""Authentication module for the Lobster PIM Legacy REST API"""
from enum import Enum
from typing import Any, Dict, Optional
class AuthMethod(Enum):
"""Authentication methods supported by the REST API"""
USERNAME_PASSWORD = "username_password"
API_TOKEN = "api_token"
class RestApiAuth:
"""Handles authentication for the Lobster PIM REST API"""
def __init__(
self,
username: str,
password_or_token: str,
auth_method: AuthMethod = AuthMethod.USERNAME_PASSWORD,
domain: Optional[str] = None,
):
"""
Initialize authentication.
Args:
username: Username or account name
password_or_token: Password (for USERNAME_PASSWORD) or API token (for API_TOKEN)
auth_method: Type of authentication to use
domain: Domain for API token (e.g., "Jobs", "Protokolle"). Required for API_TOKEN.
"""
self.username = username
self.password_or_token = password_or_token
self.auth_method = auth_method
self.domain = domain
if auth_method == AuthMethod.API_TOKEN and not domain:
raise ValueError("Domain is required when using API_TOKEN authentication")
def get_auth_header(self) -> Dict[str, str]:
"""
Get the Authorization header for requests.
Returns:
Dictionary with Authorization header
Raises:
ValueError: If authentication method is not supported
"""
if self.auth_method == AuthMethod.USERNAME_PASSWORD:
return {} # Username/password are passed as URL parameters
elif self.auth_method == AuthMethod.API_TOKEN:
# Format: "username:api_token"
auth_value = f"{self.username}:{self.password_or_token}"
return {"Authorization": auth_value}
else:
raise ValueError(f"Unsupported authentication method: {self.auth_method}")
def get_url_parameters(self) -> Dict[str, str]:
"""
Get URL parameters for authentication.
Returns:
Dictionary with URL parameters (empty for API_TOKEN, username/password for other methods)
"""
if self.auth_method == AuthMethod.USERNAME_PASSWORD:
return {
"username": self.username,
"password": self.password_or_token,
}
else:
return {}
def get_json_body_params(self) -> Dict[str, str]:
"""
Get JSON body parameters for authentication (for control endpoint).
Returns:
Dictionary with username and password for JSON request body
"""
return {
"username": self.username,
"password": self.password_or_token,
}
@classmethod
def from_username_password(cls, username: str, password: str) -> "RestApiAuth":
"""Create authentication from username and password."""
return cls(username, password, AuthMethod.USERNAME_PASSWORD)
@classmethod
def from_api_token(cls, username: str, token: str, domain: str) -> "RestApiAuth":
"""
Create authentication from API token.
Args:
username: Username or account name
token: API token
domain: Domain for the token (e.g., "Jobs", "Protokolle")
"""
return cls(username, token, AuthMethod.API_TOKEN, domain)

View file

@ -0,0 +1,438 @@
"""Client for the Lobster PIM Legacy REST API"""
from typing import Any, Dict, List, Optional, Type, TypeVar, cast
from urllib.parse import urljoin
import requests
from pydantic import BaseModel, ValidationError
from ..exceptions import (
ElytraAPIError,
ElytraAuthenticationError,
ElytraNotFoundError,
ElytraValidationError,
)
from .auth import AuthMethod, RestApiAuth
from .models import (
JobControlRequest,
JobControlResponse,
JobDetailInfo,
JobExecutionResponse,
JobInfo,
JobOverviewResponse,
ProtocolCategoryInfo,
ProtocolCategoryListResponse,
ProtocolInfo,
ProtocolListResponse,
)
T = TypeVar('T', bound=BaseModel)
class LobsterRestApiClient:
"""
Client for the Lobster PIM Legacy REST API.
Provides access to scheduled jobs and protocol logs via REST endpoints.
Supports both username/password and API token authentication.
Args:
base_url: The base URL of the Lobster PIM server (e.g., http://localhost:8080)
auth: RestApiAuth instance for authentication
timeout: Request timeout in seconds (default: 30)
"""
def __init__(
self,
base_url: str,
auth: RestApiAuth,
timeout: int = 30,
):
"""Initialize the Lobster REST API client"""
self.base_url = base_url.rstrip("/")
self.auth = auth
self.timeout = timeout
self.session = requests.Session()
self._setup_headers()
def _setup_headers(self) -> None:
"""Set up request headers including authentication"""
self.session.headers.update(
{
"Content-Type": "application/json",
"Accept": "application/json",
}
)
self.session.headers.update(self.auth.get_auth_header())
def _handle_response(
self,
response: requests.Response,
expected_model: Type[T],
) -> T:
"""
Handle API response and parse into Pydantic model.
Args:
response: Response from requests
expected_model: Pydantic model to deserialize into
Returns:
Parsed response as Pydantic model
Raises:
ElytraAuthenticationError: If authentication fails
ElytraNotFoundError: If resource not found
ElytraAPIError: For other API errors
ElytraValidationError: If response validation fails
"""
if response.status_code == 401:
raise ElytraAuthenticationError("Authentication failed")
elif response.status_code == 404:
raise ElytraNotFoundError("Resource not found")
elif response.status_code == 429:
raise ElytraAPIError("Too many requests - rate limit exceeded")
elif response.status_code >= 400:
try:
error_data = response.json()
error_msg = error_data.get("error") or error_data.get("message", response.text)
except Exception:
error_msg = response.text
raise ElytraAPIError(f"API error {response.status_code}: {error_msg}")
try:
data = response.json()
except Exception as e:
raise ElytraAPIError(f"Failed to parse response as JSON: {str(e)}")
try:
return expected_model.model_validate(data)
except ValidationError as e:
raise ElytraValidationError(f"Response validation failed: {str(e)}")
def _make_request(
self,
method: str,
endpoint: str,
expected_model: Type[T],
params: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None,
) -> T:
"""
Make HTTP request to the REST API.
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint path
expected_model: Pydantic model for response
params: Query parameters
json_data: JSON request body
Returns:
Parsed response as Pydantic model
"""
url = urljoin(self.base_url, f"/rest/{endpoint}")
# Add authentication parameters for GET requests
if method.upper() == "GET" and self.auth.auth_method == AuthMethod.USERNAME_PASSWORD:
if params is None:
params = {}
params.update(self.auth.get_url_parameters())
response = self.session.request(
method=method,
url=url,
params=params,
json=json_data,
timeout=self.timeout,
)
return self._handle_response(response, expected_model)
# ============= Job Endpoints =============
def get_job_overview(self) -> JobOverviewResponse:
"""
Get overview of all active jobs.
Returns:
JobOverviewResponse containing list of active jobs
"""
return self._make_request(
"GET",
"job/overview",
JobOverviewResponse,
)
def get_job_html_overview(self) -> str:
"""
Get HTML overview of all active jobs.
Returns:
HTML page content with job overview
"""
url = urljoin(self.base_url, "/rest/job/overview")
params = None
if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD:
params = self.auth.get_url_parameters()
response = self.session.get(url, params=params, timeout=self.timeout)
if response.status_code >= 400:
raise ElytraAPIError(f"Failed to get job overview: {response.status_code}")
return response.text
def get_all_active_jobs(self) -> JobOverviewResponse:
"""
Get list of all active jobs.
Returns:
JobOverviewResponse containing list of all active jobs
"""
return self._make_request(
"GET",
"job/job_id",
JobOverviewResponse,
)
def get_job_detail(self, job_id: int) -> JobDetailInfo:
"""
Get details of a specific active job.
Args:
job_id: ID of the job
Returns:
JobDetailInfo with job details
"""
return self._make_request(
"GET",
f"job/job_id/{job_id}",
JobDetailInfo,
)
def execute_job(
self,
job_id: int,
) -> JobExecutionResponse:
"""
Execute a job and get details of the started job.
Args:
job_id: ID of the job to execute
Returns:
JobExecutionResponse with execution details
"""
return self._make_request(
"GET",
f"job/execute/{job_id}",
JobExecutionResponse,
)
def get_running_job_instances(self) -> JobOverviewResponse:
"""
Get list of running job instances.
Returns:
JobOverviewResponse containing list of running job instances
"""
return self._make_request(
"GET",
"job/runtime_id",
JobOverviewResponse,
)
def get_running_job_instance(self, runtime_id: str) -> JobOverviewResponse:
"""
Get details of a specific running job instance.
Args:
runtime_id: Runtime ID of the job instance
Returns:
JobOverviewResponse with instance details
"""
return self._make_request(
"GET",
f"job/runtime_id/{runtime_id}",
JobOverviewResponse,
)
def control_job(
self,
job_id: int,
action: str = "start",
additional_reference: Optional[str] = None,
parameters: Optional[Dict[str, Any]] = None,
queue_id: Optional[str] = None,
max_job_duration_seconds: Optional[int] = None,
) -> JobControlResponse:
"""
Control a job using the control endpoint (POST).
Supports starting jobs with parameter overrides and queue management.
Args:
job_id: ID of the job to control
action: Action to perform (default: "start")
additional_reference: Optional reference for external tracking
parameters: Optional parameters to override job settings
queue_id: Optional queue ID for serialized execution
max_job_duration_seconds: Max duration in seconds (default 12 hours)
Returns:
JobControlResponse with execution details
"""
request_body = {
"action": action,
"objectId": job_id,
"objectType": "job",
}
# Add authentication credentials for POST
request_body.update(self.auth.get_json_body_params())
if additional_reference:
request_body["additionalReference"] = additional_reference
if parameters:
request_body["parameter"] = parameters
if queue_id:
request_body["queueId"] = queue_id
if max_job_duration_seconds:
request_body["maxJobDurationSeconds"] = max_job_duration_seconds
return self._make_request(
"POST",
"job/control",
JobControlResponse,
json_data=request_body,
)
# ============= Protocol Endpoints =============
def get_protocols(self, limit: int = 50) -> ProtocolListResponse:
"""
Get the last N protocols.
Args:
limit: Number of protocols to retrieve (default: 50)
Returns:
ProtocolListResponse containing list of protocols
"""
response = self.session.get(
urljoin(self.base_url, "/rest/protocol"),
params={"limit": limit, **self.auth.get_url_parameters()},
timeout=self.timeout,
)
return self._handle_response(response, ProtocolListResponse)
def get_protocol(self, protocol_id: str) -> ProtocolInfo:
"""
Get details of a specific protocol.
Args:
protocol_id: ID of the protocol
Returns:
ProtocolInfo with protocol details
"""
return self._make_request(
"GET",
f"protocol/{protocol_id}",
ProtocolInfo,
)
def get_protocol_by_job_id(self, job_id: int) -> ProtocolListResponse:
"""
Get all protocols for a specific job ID.
Args:
job_id: ID of the job
Returns:
ProtocolListResponse containing protocols for the job
"""
return self._make_request(
"GET",
f"protocol/job/{job_id}",
ProtocolListResponse,
)
def get_protocol_by_runtime_id(self, runtime_id: str) -> ProtocolInfo:
"""
Get protocol for a specific job instance runtime ID.
Args:
runtime_id: Runtime ID of the job instance
Returns:
ProtocolInfo with protocol details
"""
return self._make_request(
"GET",
f"protocol/job/{runtime_id}",
ProtocolInfo,
)
def get_protocol_by_additional_reference(self, reference: str) -> ProtocolListResponse:
"""
Get all protocols for a specific additional reference.
Args:
reference: Additional reference value
Returns:
ProtocolListResponse containing protocols for the reference
"""
return self._make_request(
"GET",
f"protocol/job/{reference}",
ProtocolListResponse,
)
def get_all_protocol_categories(self) -> ProtocolCategoryListResponse:
"""
Get all available protocol categories.
Returns:
ProtocolCategoryListResponse containing list of categories
"""
return self._make_request(
"GET",
"protocol/category",
ProtocolCategoryListResponse,
)
def get_protocol_by_category(self, category_id: str, limit: int = 50) -> ProtocolListResponse:
"""
Get the last N protocols from a specific category.
Args:
category_id: ID of the protocol category
limit: Number of protocols to retrieve (default: 50)
Returns:
ProtocolListResponse containing protocols for the category
"""
return self._make_request(
"GET",
f"protocol/category/{category_id}",
ProtocolListResponse,
params={"limit": limit},
)
def close(self) -> None:
"""Close the session and clean up resources."""
self.session.close()
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.close()

View file

@ -0,0 +1,129 @@
"""Models for the Lobster PIM Legacy REST API"""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class JobInfo(BaseModel):
"""Base job information model"""
id: int = Field(..., description="The ID of the job")
name: str = Field(..., description="The name of the job")
jobIdentifier: str = Field(..., description="The unique job identifier")
jobDescription: Optional[str] = Field(None, description="Description of the job")
status: str = Field(..., description="Current status of the job")
nextExecutionDate: str = Field(..., description="Next scheduled execution date")
previousExecutionDate: Optional[str] = Field(None, description="Previous execution date")
protocolId: Optional[str] = Field(None, description="ID of the associated protocol")
errors: List[str] = Field(default_factory=list, description="List of errors")
messages: List[str] = Field(default_factory=list, description="List of messages")
warnings: List[str] = Field(default_factory=list, description="List of warnings")
class JobDetailInfo(JobInfo):
"""Detailed job information including error level and runtime ID"""
errorLevel: Optional[str] = Field(None, description="Error level (e.g., 'Erfolgreich')")
runtimeId: Optional[str] = Field(None, description="Runtime ID for active job execution")
class JobOverviewResponse(BaseModel):
"""Response containing multiple job information items"""
jobInfoObjects: List[JobDetailInfo] = Field(..., description="List of job information objects")
errors: List[str] = Field(default_factory=list, description="List of errors")
warnings: List[str] = Field(default_factory=list, description="List of warnings")
class JobExecutionResponse(BaseModel):
"""Response from executing a job"""
id: int = Field(..., description="The ID of the job")
name: str = Field(..., description="The name of the job")
jobIdentifier: str = Field(..., description="The unique job identifier")
jobDescription: Optional[str] = Field(None, description="Description of the job")
status: str = Field(..., description="Status after execution")
nextExecutionDate: str = Field(..., description="Next execution date")
protocolId: str = Field(..., description="ID of the protocol for this execution")
runtimeId: str = Field(..., description="Runtime ID for tracking execution")
errors: List[str] = Field(default_factory=list, description="List of errors")
messages: List[str] = Field(default_factory=list, description="List of messages (e.g., JOB_START_OK)")
warnings: List[str] = Field(default_factory=list, description="List of warnings")
class JobControlRequest(BaseModel):
"""Request body for job control endpoint"""
action: str = Field(..., description="Action to perform (e.g., 'start')")
objectId: int = Field(..., description="The ID of the job to control")
objectType: str = Field(default="job", description="Type of object")
username: str = Field(..., description="Username for authentication")
password: str = Field(..., description="Password for authentication")
additionalReference: Optional[str] = Field(
None, description="Custom reference for external processing tracking"
)
parameter: Optional[Dict[str, Any]] = Field(
None, description="Parameters to override job settings"
)
queueId: Optional[str] = Field(
None, description="Queue ID for serialized job execution"
)
maxJobDurationSeconds: Optional[int] = Field(
default=43200, description="Max duration in seconds (default 12 hours)"
)
class JobControlResponse(BaseModel):
"""Response from job control endpoint"""
jobIdentifier: str = Field(..., description="The job identifier")
runtimeId: str = Field(..., description="Runtime ID for tracking")
errors: List[str] = Field(default_factory=list, description="List of errors")
messages: List[str] = Field(default_factory=list, description="List of messages")
warnings: List[str] = Field(default_factory=list, description="List of warnings")
class ProtocolEntry(BaseModel):
"""A single entry in a protocol log"""
timestamp: Optional[str] = Field(None, description="Timestamp of the entry")
level: Optional[str] = Field(None, description="Log level (ERROR, WARNING, INFO, etc.)")
message: Optional[str] = Field(None, description="Message content")
class ProtocolInfo(BaseModel):
"""Protocol/Log information"""
id: Optional[int] = Field(None, description="Protocol ID")
protocolId: Optional[str] = Field(None, description="Protocol ID as string")
jobId: Optional[int] = Field(None, description="Associated job ID")
runtimeId: Optional[str] = Field(None, description="Runtime ID of the job execution")
jobIdentifier: Optional[str] = Field(None, description="Job identifier")
status: Optional[str] = Field(None, description="Status of the job")
startTime: Optional[str] = Field(None, description="Start time of execution")
endTime: Optional[str] = Field(None, description="End time of execution")
errors: List[str] = Field(default_factory=list, description="List of errors")
messages: List[str] = Field(default_factory=list, description="List of messages")
entries: Optional[List[ProtocolEntry]] = Field(
None, description="Protocol entries"
)
class ProtocolListResponse(BaseModel):
"""Response containing list of protocols"""
protocols: Optional[List[ProtocolInfo]] = Field(None, description="List of protocols")
errors: List[str] = Field(default_factory=list, description="List of errors")
warnings: List[str] = Field(default_factory=list, description="List of warnings")
class ProtocolCategoryInfo(BaseModel):
"""Protocol category information"""
id: str = Field(..., description="Category ID")
name: str = Field(..., description="Category name")
description: Optional[str] = Field(None, description="Category description")
class ProtocolCategoryListResponse(BaseModel):
"""Response containing list of protocol categories"""
categories: List[ProtocolCategoryInfo] = Field(..., description="List of protocol categories")
errors: List[str] = Field(default_factory=list, description="List of errors")
class ErrorResponse(BaseModel):
"""Error response from the REST API"""
error: str = Field(..., description="Error message")
errorCode: Optional[str] = Field(None, description="Error code")
details: Optional[str] = Field(None, description="Error details")

View file

@ -0,0 +1,271 @@
"""Examples of using the Lobster PIM Legacy REST API client"""
from elytra_client.rest_api import LobsterRestApiClient, RestApiAuth
def example_1_basic_usage():
"""Example 1: Basic usage with job overview"""
print("=" * 60)
print("Example 1: Basic Job Overview")
print("=" * 60)
# Create authentication
auth = RestApiAuth.from_username_password("admin", "password")
# Create client
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
try:
# Get all active jobs
jobs = client.get_all_active_jobs()
print(f"\nFound {len(jobs.jobInfoObjects)} active jobs:")
for job in jobs.jobInfoObjects:
print(f" - {job.name} (ID: {job.id})")
print(f" Status: {job.status}")
print(f" Next Execution: {job.nextExecutionDate}")
finally:
client.close()
def example_2_api_token_auth():
"""Example 2: Using API token authentication"""
print("\n" + "=" * 60)
print("Example 2: API Token Authentication")
print("=" * 60)
# Create authentication with API token
# Note: API token is domain-specific (e.g., "Jobs", "Protokolle")
auth = RestApiAuth.from_api_token(
username="admin",
token="e96129a5-a314-4100-a44d-f851bf68dc18",
domain="Jobs"
)
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
try:
jobs = client.get_all_active_jobs()
print(f"\nAuthenticated with API token. Found {len(jobs.jobInfoObjects)} jobs.")
finally:
client.close()
def example_3_execute_job():
"""Example 3: Execute a job"""
print("\n" + "=" * 60)
print("Example 3: Execute a Job")
print("=" * 60)
auth = RestApiAuth.from_username_password("admin", "password")
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
try:
# Execute a job
job_id = 172475107 # Example job ID
print(f"\nExecuting job {job_id}...")
result = client.execute_job(job_id=job_id)
print(f"Job execution started:")
print(f" Job Name: {result.name}")
print(f" Runtime ID: {result.runtimeId}")
print(f" Status: {result.status}")
print(f" Protocol ID: {result.protocolId}")
print(f" Messages: {result.messages}")
finally:
client.close()
def example_4_job_with_parameters():
"""Example 4: Execute job with parameter overrides"""
print("\n" + "=" * 60)
print("Example 4: Job with Parameter Overrides")
print("=" * 60)
auth = RestApiAuth.from_username_password("admin", "password")
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
try:
# Execute job with language parameter override
print("\nExecuting job with language parameter override (en)...")
result = client.control_job(
job_id=2966557,
action="start",
parameters={
"sprache": {
"defaultlanguage": "en"
}
},
additional_reference="example-import-en"
)
print(f"Job started with parameters:")
print(f" Job Identifier: {result.jobIdentifier}")
print(f" Runtime ID: {result.runtimeId}")
finally:
client.close()
def example_5_queue_management():
"""Example 5: Batch job execution with queue management"""
print("\n" + "=" * 60)
print("Example 5: Queue Management (v2024 R1+)")
print("=" * 60)
auth = RestApiAuth.from_username_password("admin", "password")
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
try:
# Execute multiple jobs in sequence using a queue
queue_id = "article-updates-batch-1"
print(f"\nExecuting jobs in queue: {queue_id}")
articles = [
{"id": "ARTICLE-001", "name": "Article 1"},
{"id": "ARTICLE-002", "name": "Article 2"},
{"id": "ARTICLE-003", "name": "Article 3"},
]
for article in articles:
result = client.control_job(
job_id=123,
action="start",
queueId=queue_id,
maxJobDurationSeconds=3600, # 1 hour timeout per job
parameters={
"article_id": article["id"],
"article_name": article["name"]
},
additional_reference=f"batch-{article['id']}"
)
print(f" Queued: {article['id']} - Runtime ID: {result.runtimeId}")
finally:
client.close()
def example_6_get_protocols():
"""Example 6: Access protocol/log information"""
print("\n" + "=" * 60)
print("Example 6: Protocol/Log Access")
print("=" * 60)
auth = RestApiAuth.from_username_password("admin", "password")
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
try:
# Get recent protocols
print("\nGetting last 10 protocols...")
protocols = client.get_protocols(limit=10)
if protocols.protocols:
for protocol in protocols.protocols:
print(f" Protocol {protocol.protocolId}: {protocol.status}")
else:
print(" No protocols available")
# Get protocols by job ID
job_id = 172475107
print(f"\nGetting protocols for job {job_id}...")
job_protocols = client.get_protocol_by_job_id(job_id)
# Get protocol categories
print("\nAvailable protocol categories:")
categories = client.get_all_protocol_categories()
for cat in categories.categories:
print(f" - {cat.name} (ID: {cat.id})")
finally:
client.close()
def example_7_context_manager():
"""Example 7: Using client as context manager"""
print("\n" + "=" * 60)
print("Example 7: Context Manager Usage")
print("=" * 60)
auth = RestApiAuth.from_username_password("admin", "password")
# Client is automatically closed when exiting the context
with LobsterRestApiClient("http://localhost:8080", auth=auth) as client:
jobs = client.get_all_active_jobs()
print(f"\nUsing context manager - Found {len(jobs.jobInfoObjects)} jobs")
# Client automatically closed here
def example_8_error_handling():
"""Example 8: Error handling"""
print("\n" + "=" * 60)
print("Example 8: Error Handling")
print("=" * 60)
from elytra_client.exceptions import (
ElytraAPIError,
ElytraAuthenticationError,
ElytraNotFoundError,
)
auth = RestApiAuth.from_username_password("admin", "password")
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
try:
# Try to get a non-existent job
try:
job = client.get_job_detail(job_id=999999)
except ElytraNotFoundError:
print("\nJob not found - this is expected for non-existent IDs")
except ElytraAuthenticationError:
print("\nAuthentication failed - check credentials")
except ElytraAPIError as e:
print(f"\nAPI error: {e}")
finally:
client.close()
def example_9_running_jobs():
"""Example 9: Monitor running jobs"""
print("\n" + "=" * 60)
print("Example 9: Monitor Running Jobs")
print("=" * 60)
auth = RestApiAuth.from_username_password("admin", "password")
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
try:
# Get all running job instances
print("\nGetting running job instances...")
running = client.get_running_job_instances()
if running.jobInfoObjects:
print(f"Found {len(running.jobInfoObjects)} running jobs:")
for job in running.jobInfoObjects:
print(f" - {job.name}")
print(f" Runtime ID: {job.runtimeId}")
print(f" Status: {job.status}")
else:
print("No jobs currently running")
finally:
client.close()
if __name__ == "__main__":
# Run examples
# Note: These examples assume a running Lobster PIM server at localhost:8080
# with user "admin" / "password"
print("\nLobster PIM Legacy REST API Examples")
print("=" * 60)
print("\nNote: These examples require a running Lobster PIM server")
print("Uncomment the examples you want to run below:\n")
# Uncomment to run examples:
# example_1_basic_usage()
# example_2_api_token_auth()
# example_3_execute_job()
# example_4_job_with_parameters()
# example_5_queue_management()
# example_6_get_protocols()
# example_7_context_manager()
# example_8_error_handling()
# example_9_running_jobs()

409
tests/test_rest_api.py Normal file
View file

@ -0,0 +1,409 @@
"""Tests for the Lobster PIM Legacy REST API client"""
from unittest.mock import MagicMock, Mock, patch
import pytest
import requests
from elytra_client.exceptions import (
ElytraAPIError,
ElytraAuthenticationError,
ElytraNotFoundError,
ElytraValidationError,
)
from elytra_client.rest_api import (
AuthMethod,
JobControlResponse,
JobDetailInfo,
JobExecutionResponse,
JobOverviewResponse,
LobsterRestApiClient,
ProtocolInfo,
RestApiAuth,
)
class TestRestApiAuth:
"""Tests for RestApiAuth"""
def test_username_password_auth(self):
"""Test username/password authentication"""
auth = RestApiAuth.from_username_password("testuser", "password123")
assert auth.username == "testuser"
assert auth.password_or_token == "password123"
assert auth.auth_method == AuthMethod.USERNAME_PASSWORD
assert auth.get_url_parameters() == {"username": "testuser", "password": "password123"}
assert auth.get_auth_header() == {}
def test_api_token_auth(self):
"""Test API token authentication"""
auth = RestApiAuth.from_api_token("admin", "token123", "Jobs")
assert auth.username == "admin"
assert auth.password_or_token == "token123"
assert auth.domain == "Jobs"
assert auth.auth_method == AuthMethod.API_TOKEN
assert auth.get_auth_header() == {"Authorization": "admin:token123"}
assert auth.get_url_parameters() == {}
def test_api_token_auth_without_domain(self):
"""Test that API token auth requires a domain"""
with pytest.raises(ValueError, match="Domain is required"):
RestApiAuth.from_api_token("admin", "token123", None)
def test_json_body_params(self):
"""Test JSON body parameters"""
auth = RestApiAuth.from_username_password("user", "pass")
params = auth.get_json_body_params()
assert params == {"username": "user", "password": "pass"}
class TestLobsterRestApiClient:
"""Tests for LobsterRestApiClient"""
@pytest.fixture
def auth(self):
"""Fixture for test authentication"""
return RestApiAuth.from_username_password("testuser", "password123")
@pytest.fixture
def client(self, auth):
"""Fixture for test client"""
return LobsterRestApiClient("http://localhost:8080", auth=auth)
def test_client_initialization(self, auth):
"""Test client initialization"""
client = LobsterRestApiClient("http://localhost:8080/", auth=auth)
assert client.base_url == "http://localhost:8080"
assert client.auth == auth
assert client.timeout == 30
def test_client_with_custom_timeout(self, auth):
"""Test client with custom timeout"""
client = LobsterRestApiClient("http://localhost:8080", auth=auth, timeout=60)
assert client.timeout == 60
@patch("requests.Session.request")
def test_get_job_overview(self, mock_request, client):
"""Test getting job overview"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"jobInfoObjects": [
{
"id": 123,
"name": "Test Job",
"jobIdentifier": "TEST_JOB",
"status": "Warten",
"nextExecutionDate": "manual execution",
"errors": [],
"messages": [],
"warnings": [],
}
],
"errors": [],
"warnings": [],
}
mock_request.return_value = mock_response
result = client.get_job_overview()
assert isinstance(result, JobOverviewResponse)
assert len(result.jobInfoObjects) == 1
assert result.jobInfoObjects[0].name == "Test Job"
@patch("requests.Session.request")
def test_get_all_active_jobs(self, mock_request, client):
"""Test getting all active jobs"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"jobInfoObjects": [],
"errors": [],
"warnings": [],
}
mock_request.return_value = mock_response
result = client.get_all_active_jobs()
assert isinstance(result, JobOverviewResponse)
mock_request.assert_called()
@patch("requests.Session.request")
def test_get_job_detail(self, mock_request, client):
"""Test getting job detail"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"id": 123,
"name": "Test Job",
"jobIdentifier": "TEST_JOB",
"status": "Warten",
"nextExecutionDate": "manual execution",
"errorLevel": "Erfolgreich",
"errors": [],
"messages": [],
"warnings": [],
}
mock_request.return_value = mock_response
result = client.get_job_detail(123)
assert isinstance(result, JobDetailInfo)
assert result.id == 123
assert result.errorLevel == "Erfolgreich"
@patch("requests.Session.request")
def test_execute_job(self, mock_request, client):
"""Test executing a job"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"id": 123,
"name": "Test Job",
"jobIdentifier": "TEST_JOB",
"status": "Wird ausgeführt",
"nextExecutionDate": "manual execution",
"protocolId": "456",
"runtimeId": "1698914697288",
"errors": [],
"messages": ["JOB_START_OK"],
"warnings": [],
}
mock_request.return_value = mock_response
result = client.execute_job(123)
assert isinstance(result, JobExecutionResponse)
assert result.runtimeId == "1698914697288"
assert "JOB_START_OK" in result.messages
@patch("requests.Session.request")
def test_control_job(self, mock_request, client):
"""Test controlling a job"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"jobIdentifier": "TEST_JOB.tmp_(1698914396035)",
"runtimeId": "1698914396035",
"errors": [],
"messages": [],
"warnings": [],
}
mock_request.return_value = mock_response
result = client.control_job(123, action="start")
assert isinstance(result, JobControlResponse)
assert "tmp_" in result.jobIdentifier
@patch("requests.Session.request")
def test_control_job_with_parameters(self, mock_request, client):
"""Test controlling a job with parameter overrides"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"jobIdentifier": "TEST_JOB.tmp_(1698914396035)",
"runtimeId": "1698914396035",
"errors": [],
"messages": [],
"warnings": [],
}
mock_request.return_value = mock_response
result = client.control_job(
123,
action="start",
parameters={"defaultlanguage": "en"},
additional_reference="my-ref",
queue_id="queue-1",
)
assert isinstance(result, JobControlResponse)
# Verify the request body includes the parameters
call_args = mock_request.call_args
json_data = call_args.kwargs.get("json")
assert json_data is not None
assert json_data.get("parameter") == {"defaultlanguage": "en"}
assert json_data.get("additionalReference") == "my-ref"
assert json_data.get("queueId") == "queue-1"
@patch("requests.Session.request")
def test_get_protocol(self, mock_request, client):
"""Test getting protocol details"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"id": 456,
"protocolId": "456",
"jobId": 123,
"status": "Success",
"errors": [],
"messages": [],
}
mock_request.return_value = mock_response
result = client.get_protocol("456")
assert isinstance(result, ProtocolInfo)
assert result.protocolId == "456"
@patch("requests.Session.request")
def test_get_protocol_by_job_id(self, mock_request, client):
"""Test getting protocol by job ID"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"protocols": [
{
"protocolId": "456",
"jobId": 123,
"status": "Success",
"errors": [],
"messages": [],
}
],
"errors": [],
"warnings": [],
}
mock_request.return_value = mock_response
result = client.get_protocol_by_job_id(123)
assert mock_request.call_count == 1
@patch("requests.Session.request")
def test_get_running_job_instances(self, mock_request, client):
"""Test getting running job instances"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"jobInfoObjects": [],
"errors": [],
"warnings": [],
}
mock_request.return_value = mock_response
result = client.get_running_job_instances()
assert isinstance(result, JobOverviewResponse)
@patch("requests.Session.request")
def test_authentication_error(self, mock_request, client):
"""Test authentication error handling"""
mock_response = Mock()
mock_response.status_code = 401
mock_request.return_value = mock_response
with pytest.raises(ElytraAuthenticationError):
client.get_job_overview()
@patch("requests.Session.request")
def test_not_found_error(self, mock_request, client):
"""Test not found error handling"""
mock_response = Mock()
mock_response.status_code = 404
mock_request.return_value = mock_response
with pytest.raises(ElytraNotFoundError):
client.get_job_detail(999)
@patch("requests.Session.request")
def test_rate_limit_error(self, mock_request, client):
"""Test rate limit error handling"""
mock_response = Mock()
mock_response.status_code = 429
mock_request.return_value = mock_response
with pytest.raises(ElytraAPIError, match="rate limit"):
client.get_job_overview()
@patch("requests.Session.request")
def test_json_parse_error(self, mock_request, client):
"""Test JSON parsing error"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.side_effect = Exception("Invalid JSON")
mock_request.return_value = mock_response
with pytest.raises(ElytraAPIError):
client.get_job_overview()
@patch("requests.Session.request")
def test_validation_error(self, mock_request, client):
"""Test response validation error"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"jobInfoObjects": [
{"invalid": "data"} # Missing required fields
],
"errors": [],
"warnings": [],
}
mock_request.return_value = mock_response
with pytest.raises(ElytraValidationError):
client.get_job_overview()
def test_context_manager(self, auth):
"""Test using client as context manager"""
with LobsterRestApiClient("http://localhost:8080", auth=auth) as client:
assert client is not None
class TestIntegration:
"""Integration tests with example scenarios"""
@pytest.fixture
def auth(self):
"""Fixture for test authentication"""
return RestApiAuth.from_username_password("admin", "password")
@pytest.fixture
def client(self, auth):
"""Fixture for test client"""
return LobsterRestApiClient("http://localhost:8080", auth=auth)
@patch("requests.Session.request")
def test_job_execution_workflow(self, mock_request, client):
"""Test complete job execution workflow"""
# Step 1: Get job details
mock_response1 = Mock()
mock_response1.status_code = 200
mock_response1.json.return_value = {
"id": 123,
"name": "Import Job",
"jobIdentifier": "IMPORT_PRODUCTS",
"status": "Warten",
"nextExecutionDate": "manual execution",
"errorLevel": "Erfolgreich",
"errors": [],
"messages": [],
"warnings": [],
}
# Step 2: Execute job
mock_response2 = Mock()
mock_response2.status_code = 200
mock_response2.json.return_value = {
"id": 123,
"name": "Import Job",
"jobIdentifier": "IMPORT_PRODUCTS",
"status": "Wird ausgeführt",
"nextExecutionDate": "manual execution",
"protocolId": "789",
"runtimeId": "1698914697288",
"errors": [],
"messages": ["JOB_START_OK"],
"warnings": [],
}
mock_request.side_effect = [mock_response1, mock_response2]
# Get job details
job_details = client.get_job_detail(123)
assert job_details.name == "Import Job"
# Execute the job
execution_result = client.execute_job(123)
assert execution_result.runtimeId == "1698914697288"