From 8d4fcc7d1198fd47b511b439c9426b42d2f2554e Mon Sep 17 00:00:00 2001 From: claudi Date: Fri, 20 Feb 2026 10:35:34 +0100 Subject: [PATCH] Implement Lobster PIM Legacy REST API client with authentication, job control, and protocol access - Added `rest_api` module with client and authentication classes. - Implemented `LobsterRestApiClient` for interacting with the REST API. - Created models for job and protocol responses using Pydantic. - Developed examples demonstrating basic usage, API token authentication, job execution, and error handling. - Added unit tests for authentication and client methods, including error handling scenarios. --- README.md | 56 ++++ elytra_client/__init__.py | 3 + elytra_client/rest_api/README.md | 325 ++++++++++++++++++++ elytra_client/rest_api/__init__.py | 34 +++ elytra_client/rest_api/auth.py | 101 ++++++ elytra_client/rest_api/client.py | 438 +++++++++++++++++++++++++++ elytra_client/rest_api/models.py | 129 ++++++++ examples/legacy_rest_api_examples.py | 271 +++++++++++++++++ tests/test_rest_api.py | 409 +++++++++++++++++++++++++ 9 files changed, 1766 insertions(+) create mode 100644 elytra_client/rest_api/README.md create mode 100644 elytra_client/rest_api/__init__.py create mode 100644 elytra_client/rest_api/auth.py create mode 100644 elytra_client/rest_api/client.py create mode 100644 elytra_client/rest_api/models.py create mode 100644 examples/legacy_rest_api_examples.py create mode 100644 tests/test_rest_api.py diff --git a/README.md b/README.md index c400374..c6328bb 100644 --- a/README.md +++ b/README.md @@ -109,6 +109,62 @@ config = ElytraConfig.from_env() client = ElytraClient(base_url=config.base_url, api_key=config.api_key) ``` +## Legacy REST API (Lobster PIM) + +The package also includes a subpackage for the older Lobster PIM REST API, which provides access to scheduled jobs and protocol logs that are not yet available in the newer API. + +### Quick Start + +```python +from elytra_client.rest_api import LobsterRestApiClient, RestApiAuth + +# Create authentication +auth = RestApiAuth.from_username_password("username", "password") + +# Create client for the REST API +client = LobsterRestApiClient("http://lobster-server:8080", auth=auth) + +# Get all active jobs +jobs = client.get_all_active_jobs() +for job in jobs.jobInfoObjects: + print(f"Job: {job.name} - Status: {job.status}") + +# Execute a job +result = client.execute_job(job_id=123) +print(f"Execution started with runtime ID: {result.runtimeId}") + +# Get protocol/logs +protocols = client.get_protocols() +``` + +### Features + +- 📋 **Job Management**: Access, monitor, and execute scheduled jobs +- 📜 **Protocol/Logs**: Retrieve execution logs and protocol information +- 🔐 **Flexible Authentication**: Username/password or API token authentication +- ⏳ **Job Control**: Execute jobs with parameter overrides and queue management +- 🎯 **Type Safety**: Full Pydantic validation for all responses + +### Authentication Methods + +```python +# Username/Password +auth = RestApiAuth.from_username_password("admin", "password") + +# API Token (domain-specific) +auth = RestApiAuth.from_api_token("admin", "token-id", domain="Jobs") +``` + +### Documentation + +See [elytra_client/rest_api/README.md](elytra_client/rest_api/README.md) for comprehensive REST API documentation, including: +- Complete API reference +- Authentication details +- Job management examples +- Protocol/log access +- Error handling +- Common usage scenarios + ## API Methods All methods return Pydantic models with full type validation and IDE autocompletion support. diff --git a/elytra_client/__init__.py b/elytra_client/__init__.py index 7c93921..11ad15f 100644 --- a/elytra_client/__init__.py +++ b/elytra_client/__init__.py @@ -3,6 +3,7 @@ __version__ = "0.2.0" __author__ = "Your Name" +from . import rest_api from .client import ElytraClient from .exceptions import ElytraAPIError, ElytraAuthenticationError from .models import ( @@ -28,4 +29,6 @@ __all__ = [ "SingleUpdateProductRequestBody", "SingleNewProductGroupRequestBody", "SingleUpdateProductGroupRequestBody", + # Legacy REST API subpackage + "rest_api", ] diff --git a/elytra_client/rest_api/README.md b/elytra_client/rest_api/README.md new file mode 100644 index 0000000..215d6ba --- /dev/null +++ b/elytra_client/rest_api/README.md @@ -0,0 +1,325 @@ +# Lobster PIM Legacy REST API Client + +This subpackage provides a Python client for accessing the legacy REST API of Lobster PIM (Product Information Management system). It offers access to scheduled jobs and protocol logs through a clean, Pydantic-based interface. + +## Features + +- **Job Management**: Access, monitor, and execute scheduled jobs +- **Protocol/Log Access**: Retrieve execution logs and protocol information +- **Authentication**: Support for both username/password and API token authentication +- **Job Control**: Execute jobs with parameter overrides and queue management +- **Type Safety**: Full Pydantic model validation for all API responses +- **Error Handling**: Comprehensive exception handling for API errors + +## Installation + +The REST API client is included as a subpackage of `elytra-pim-client`. To use it: + +```python +from elytra_client.rest_api import LobsterRestApiClient, RestApiAuth +``` + +## Quick Start + +### Username/Password Authentication + +```python +from elytra_client.rest_api import LobsterRestApiClient, RestApiAuth + +# Create authentication +auth = RestApiAuth.from_username_password("username", "password") + +# Create client +client = LobsterRestApiClient("http://lobster-server:8080", auth=auth) + +# Get list of active jobs +jobs = client.get_all_active_jobs() +for job in jobs.jobInfoObjects: + print(f"Job: {job.name} (ID: {job.id}) - Status: {job.status}") + +# Close when done +client.close() +``` + +### API Token Authentication + +```python +from elytra_client.rest_api import LobsterRestApiClient, RestApiAuth + +# Create authentication with API token +auth = RestApiAuth.from_api_token("username", "your-api-token", domain="Jobs") + +# Create client +client = LobsterRestApiClient("http://lobster-server:8080", auth=auth) +``` + +### Using as Context Manager + +```python +from elytra_client.rest_api import LobsterRestApiClient, RestApiAuth + +auth = RestApiAuth.from_username_password("username", "password") + +with LobsterRestApiClient("http://lobster-server:8080", auth=auth) as client: + jobs = client.get_all_active_jobs() + # Client is automatically closed when exiting the context +``` + +## Job Management + +### Get Job Overview + +```python +# Get overview of all active jobs (HTML response) +html_overview = client.get_job_html_overview() + +# Get all active jobs as structured data +jobs = client.get_all_active_jobs() +for job in jobs.jobInfoObjects: + print(f"{job.name}: {job.status}") +``` + +### Get Specific Job Details + +```python +# Get details of a specific job +job = client.get_job_detail(job_id=123) +print(f"Job: {job.name}") +print(f"Description: {job.jobDescription}") +print(f"Status: {job.status}") +print(f"Error Level: {job.errorLevel}") +``` + +### Execute a Job + +```python +# Execute a job +result = client.execute_job(job_id=123) +print(f"Runtime ID: {result.runtimeId}") +print(f"Status: {result.status}") +print(f"Protocol ID: {result.protocolId}") +``` + +### Advanced Job Control + +Execute jobs with parameter overrides and queue management: + +```python +# Execute job with language parameter override +result = client.control_job( + job_id=123, + action="start", + parameters={"defaultlanguage": "en"}, + additional_reference="my-import-batch-001", +) +print(f"Job launched with runtime ID: {result.runtimeId}") +``` + +#### Queue Management (v2024 R1+) + +For serialized job execution using queues: + +```python +# Execute multiple jobs in sequence using a queue +for i in range(5): + result = client.control_job( + job_id=123, + action="start", + queueId="import-queue-1", + maxJobDurationSeconds=3600, # 1 hour timeout + parameters={"item_id": f"item_{i}"}, + ) +``` + +## Running Job Instances + +### Get All Running Instances + +```python +# Get all currently running job instances +running = client.get_running_job_instances() +for instance in running.jobInfoObjects: + print(f"{instance.name} (Runtime: {instance.runtimeId}) - {instance.status}") +``` + +### Get Specific Running Instance + +```python +# Get details of a specific running instance +instance = client.get_running_job_instance(runtime_id="1698914697288") +print(f"Job: {instance.jobInfoObjects[0].name}") +print(f"Status: {instance.jobInfoObjects[0].status}") +``` + +## Protocol/Log Management + +### Get Recent Protocols + +```python +# Get the last 50 protocols +protocols = client.get_protocols(limit=50) +``` + +### Get Protocol by ID + +```python +# Get details of a specific protocol +protocol = client.get_protocol(protocol_id="456") +``` + +### Get Protocols by Job ID + +```python +# Get all protocols for a specific job +protocols = client.get_protocol_by_job_id(job_id=123) +``` + +### Get Protocols by Runtime ID + +```python +# Get protocol for a specific job execution +protocol = client.get_protocol_by_runtime_id(runtime_id="1698914697288") +``` + +### Get Protocols by Additional Reference + +```python +# Get all protocols associated with a custom reference +protocols = client.get_protocol_by_additional_reference("my-batch-001") +``` + +### Protocol Categories + +```python +# Get all available protocol categories +categories = client.get_all_protocol_categories() +for cat in categories.categories: + print(f"Category: {cat.name} (ID: {cat.id})") + +# Get recent protocols from a specific category +cat_protocols = client.get_protocol_by_category(category_id="error-logs", limit=10) +``` + +## Error Handling + +The client provides specific exception types for different error scenarios: + +```python +from elytra_client.exceptions import ( + ElytraAuthenticationError, + ElytraNotFoundError, + ElytraAPIError, + ElytraValidationError, +) + +try: + job = client.get_job_detail(job_id=999) +except ElytraAuthenticationError: + print("Authentication failed - check credentials") +except ElytraNotFoundError: + print("Job not found") +except ElytraAPIError as e: + print(f"API error: {e}") +except ElytraValidationError as e: + print(f"Response validation error: {e}") +``` + +## Authentication Methods + +### Username/Password + +Credentials are passed as URL parameters or in JSON request body depending on the endpoint. + +```python +auth = RestApiAuth.from_username_password("admin", "secret_password") +``` + +### API Token + +API tokens are domain-specific (e.g., "Jobs", "Protokolle") and included in the Authorization header. + +```python +# API tokens are created per domain in the web client +auth = RestApiAuth.from_api_token("admin", "e96129a5-a314-4100-a44d-f851bf68dc18", domain="Jobs") +``` + +Note: A token created for "Jobs" cannot be used for "Protokolle" (protocols). + +## Models + +### Job Models + +- `JobInfo`: Basic job information +- `JobDetailInfo`: Detailed job information with error level +- `JobOverviewResponse`: Response containing multiple jobs +- `JobExecutionResponse`: Response from executing a job +- `JobControlRequest`: Request body for job control +- `JobControlResponse`: Response from job control endpoint + +### Protocol Models + +- `ProtocolInfo`: Single protocol/log information +- `ProtocolEntry`: Individual log entry +- `ProtocolListResponse`: Response containing multiple protocols +- `ProtocolCategoryInfo`: Protocol category information +- `ProtocolCategoryListResponse`: Response containing categories + +## Common Scenarios + +### Monitor Job Execution + +```python +import time + +auth = RestApiAuth.from_username_password("admin", "password") +client = LobsterRestApiClient("http://localhost:8080", auth=auth) + +# Start a job +result = client.execute_job(job_id=123) +runtime_id = result.runtimeId + +# Poll for completion +while True: + running = client.get_running_job_instances() + job_running = any(j.runtimeId == runtime_id for j in running.jobInfoObjects) + + if not job_running: + print("Job completed!") + break + + print("Job still running...") + time.sleep(5) + +# Get the protocol for this execution +protocol = client.get_protocol_by_runtime_id(runtime_id) +print(f"Protocol: {protocol}") +``` + +### Batch Job Execution + +```python +# Execute multiple jobs in sequence with queue management +for item_id in ["item_001", "item_002", "item_003"]: + result = client.control_job( + job_id=456, # Import job + action="start", + queueId="batch-processing", + parameters={"item_id": item_id}, + additional_reference=f"batch-item-{item_id}", + ) + print(f"Queued: {result.jobIdentifier}") +``` + +## Versioning + +This client supports Lobster PIM starting from version 2023 R1 for API tokens. Some features require specific versions: + +- **API Tokens**: 2023 R1+ +- **Additional Reference in Job Control**: 2022 R4+ +- **Queue Management**: 2024 R1+ +- **Temporary Job Limiting**: 2023 R3+ + +## References + +- [Lobster PIM REST API Documentation](https://documentation.lobster-pim.de/) +- Lobster PIM Web Client Manual - Chapter on API Token generation and REST API usage diff --git a/elytra_client/rest_api/__init__.py b/elytra_client/rest_api/__init__.py new file mode 100644 index 0000000..93f4111 --- /dev/null +++ b/elytra_client/rest_api/__init__.py @@ -0,0 +1,34 @@ +"""Lobster PIM Legacy REST API client and utilities""" + +from .auth import AuthMethod, RestApiAuth +from .client import LobsterRestApiClient +from .models import ( + ErrorResponse, + JobControlRequest, + JobControlResponse, + JobDetailInfo, + JobExecutionResponse, + JobInfo, + JobOverviewResponse, + ProtocolCategoryInfo, + ProtocolCategoryListResponse, + ProtocolInfo, + ProtocolListResponse, +) + +__all__ = [ + "RestApiAuth", + "AuthMethod", + "LobsterRestApiClient", + "JobInfo", + "JobDetailInfo", + "JobOverviewResponse", + "JobExecutionResponse", + "JobControlRequest", + "JobControlResponse", + "ProtocolInfo", + "ProtocolListResponse", + "ProtocolCategoryInfo", + "ProtocolCategoryListResponse", + "ErrorResponse", +] diff --git a/elytra_client/rest_api/auth.py b/elytra_client/rest_api/auth.py new file mode 100644 index 0000000..81eec98 --- /dev/null +++ b/elytra_client/rest_api/auth.py @@ -0,0 +1,101 @@ +"""Authentication module for the Lobster PIM Legacy REST API""" + +from enum import Enum +from typing import Any, Dict, Optional + + +class AuthMethod(Enum): + """Authentication methods supported by the REST API""" + USERNAME_PASSWORD = "username_password" + API_TOKEN = "api_token" + + +class RestApiAuth: + """Handles authentication for the Lobster PIM REST API""" + + def __init__( + self, + username: str, + password_or_token: str, + auth_method: AuthMethod = AuthMethod.USERNAME_PASSWORD, + domain: Optional[str] = None, + ): + """ + Initialize authentication. + + Args: + username: Username or account name + password_or_token: Password (for USERNAME_PASSWORD) or API token (for API_TOKEN) + auth_method: Type of authentication to use + domain: Domain for API token (e.g., "Jobs", "Protokolle"). Required for API_TOKEN. + """ + self.username = username + self.password_or_token = password_or_token + self.auth_method = auth_method + self.domain = domain + + if auth_method == AuthMethod.API_TOKEN and not domain: + raise ValueError("Domain is required when using API_TOKEN authentication") + + def get_auth_header(self) -> Dict[str, str]: + """ + Get the Authorization header for requests. + + Returns: + Dictionary with Authorization header + + Raises: + ValueError: If authentication method is not supported + """ + if self.auth_method == AuthMethod.USERNAME_PASSWORD: + return {} # Username/password are passed as URL parameters + elif self.auth_method == AuthMethod.API_TOKEN: + # Format: "username:api_token" + auth_value = f"{self.username}:{self.password_or_token}" + return {"Authorization": auth_value} + else: + raise ValueError(f"Unsupported authentication method: {self.auth_method}") + + def get_url_parameters(self) -> Dict[str, str]: + """ + Get URL parameters for authentication. + + Returns: + Dictionary with URL parameters (empty for API_TOKEN, username/password for other methods) + """ + if self.auth_method == AuthMethod.USERNAME_PASSWORD: + return { + "username": self.username, + "password": self.password_or_token, + } + else: + return {} + + def get_json_body_params(self) -> Dict[str, str]: + """ + Get JSON body parameters for authentication (for control endpoint). + + Returns: + Dictionary with username and password for JSON request body + """ + return { + "username": self.username, + "password": self.password_or_token, + } + + @classmethod + def from_username_password(cls, username: str, password: str) -> "RestApiAuth": + """Create authentication from username and password.""" + return cls(username, password, AuthMethod.USERNAME_PASSWORD) + + @classmethod + def from_api_token(cls, username: str, token: str, domain: str) -> "RestApiAuth": + """ + Create authentication from API token. + + Args: + username: Username or account name + token: API token + domain: Domain for the token (e.g., "Jobs", "Protokolle") + """ + return cls(username, token, AuthMethod.API_TOKEN, domain) diff --git a/elytra_client/rest_api/client.py b/elytra_client/rest_api/client.py new file mode 100644 index 0000000..f24b8a2 --- /dev/null +++ b/elytra_client/rest_api/client.py @@ -0,0 +1,438 @@ +"""Client for the Lobster PIM Legacy REST API""" + +from typing import Any, Dict, List, Optional, Type, TypeVar, cast +from urllib.parse import urljoin + +import requests +from pydantic import BaseModel, ValidationError + +from ..exceptions import ( + ElytraAPIError, + ElytraAuthenticationError, + ElytraNotFoundError, + ElytraValidationError, +) +from .auth import AuthMethod, RestApiAuth +from .models import ( + JobControlRequest, + JobControlResponse, + JobDetailInfo, + JobExecutionResponse, + JobInfo, + JobOverviewResponse, + ProtocolCategoryInfo, + ProtocolCategoryListResponse, + ProtocolInfo, + ProtocolListResponse, +) + +T = TypeVar('T', bound=BaseModel) + + +class LobsterRestApiClient: + """ + Client for the Lobster PIM Legacy REST API. + + Provides access to scheduled jobs and protocol logs via REST endpoints. + Supports both username/password and API token authentication. + + Args: + base_url: The base URL of the Lobster PIM server (e.g., http://localhost:8080) + auth: RestApiAuth instance for authentication + timeout: Request timeout in seconds (default: 30) + """ + + def __init__( + self, + base_url: str, + auth: RestApiAuth, + timeout: int = 30, + ): + """Initialize the Lobster REST API client""" + self.base_url = base_url.rstrip("/") + self.auth = auth + self.timeout = timeout + self.session = requests.Session() + self._setup_headers() + + def _setup_headers(self) -> None: + """Set up request headers including authentication""" + self.session.headers.update( + { + "Content-Type": "application/json", + "Accept": "application/json", + } + ) + self.session.headers.update(self.auth.get_auth_header()) + + def _handle_response( + self, + response: requests.Response, + expected_model: Type[T], + ) -> T: + """ + Handle API response and parse into Pydantic model. + + Args: + response: Response from requests + expected_model: Pydantic model to deserialize into + + Returns: + Parsed response as Pydantic model + + Raises: + ElytraAuthenticationError: If authentication fails + ElytraNotFoundError: If resource not found + ElytraAPIError: For other API errors + ElytraValidationError: If response validation fails + """ + if response.status_code == 401: + raise ElytraAuthenticationError("Authentication failed") + elif response.status_code == 404: + raise ElytraNotFoundError("Resource not found") + elif response.status_code == 429: + raise ElytraAPIError("Too many requests - rate limit exceeded") + elif response.status_code >= 400: + try: + error_data = response.json() + error_msg = error_data.get("error") or error_data.get("message", response.text) + except Exception: + error_msg = response.text + raise ElytraAPIError(f"API error {response.status_code}: {error_msg}") + + try: + data = response.json() + except Exception as e: + raise ElytraAPIError(f"Failed to parse response as JSON: {str(e)}") + + try: + return expected_model.model_validate(data) + except ValidationError as e: + raise ElytraValidationError(f"Response validation failed: {str(e)}") + + def _make_request( + self, + method: str, + endpoint: str, + expected_model: Type[T], + params: Optional[Dict[str, Any]] = None, + json_data: Optional[Dict[str, Any]] = None, + ) -> T: + """ + Make HTTP request to the REST API. + + Args: + method: HTTP method (GET, POST, etc.) + endpoint: API endpoint path + expected_model: Pydantic model for response + params: Query parameters + json_data: JSON request body + + Returns: + Parsed response as Pydantic model + """ + url = urljoin(self.base_url, f"/rest/{endpoint}") + + # Add authentication parameters for GET requests + if method.upper() == "GET" and self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: + if params is None: + params = {} + params.update(self.auth.get_url_parameters()) + + response = self.session.request( + method=method, + url=url, + params=params, + json=json_data, + timeout=self.timeout, + ) + + return self._handle_response(response, expected_model) + + # ============= Job Endpoints ============= + + def get_job_overview(self) -> JobOverviewResponse: + """ + Get overview of all active jobs. + + Returns: + JobOverviewResponse containing list of active jobs + """ + return self._make_request( + "GET", + "job/overview", + JobOverviewResponse, + ) + + def get_job_html_overview(self) -> str: + """ + Get HTML overview of all active jobs. + + Returns: + HTML page content with job overview + """ + url = urljoin(self.base_url, "/rest/job/overview") + params = None + if self.auth.auth_method == AuthMethod.USERNAME_PASSWORD: + params = self.auth.get_url_parameters() + + response = self.session.get(url, params=params, timeout=self.timeout) + if response.status_code >= 400: + raise ElytraAPIError(f"Failed to get job overview: {response.status_code}") + return response.text + + def get_all_active_jobs(self) -> JobOverviewResponse: + """ + Get list of all active jobs. + + Returns: + JobOverviewResponse containing list of all active jobs + """ + return self._make_request( + "GET", + "job/job_id", + JobOverviewResponse, + ) + + def get_job_detail(self, job_id: int) -> JobDetailInfo: + """ + Get details of a specific active job. + + Args: + job_id: ID of the job + + Returns: + JobDetailInfo with job details + """ + return self._make_request( + "GET", + f"job/job_id/{job_id}", + JobDetailInfo, + ) + + def execute_job( + self, + job_id: int, + ) -> JobExecutionResponse: + """ + Execute a job and get details of the started job. + + Args: + job_id: ID of the job to execute + + Returns: + JobExecutionResponse with execution details + """ + return self._make_request( + "GET", + f"job/execute/{job_id}", + JobExecutionResponse, + ) + + def get_running_job_instances(self) -> JobOverviewResponse: + """ + Get list of running job instances. + + Returns: + JobOverviewResponse containing list of running job instances + """ + return self._make_request( + "GET", + "job/runtime_id", + JobOverviewResponse, + ) + + def get_running_job_instance(self, runtime_id: str) -> JobOverviewResponse: + """ + Get details of a specific running job instance. + + Args: + runtime_id: Runtime ID of the job instance + + Returns: + JobOverviewResponse with instance details + """ + return self._make_request( + "GET", + f"job/runtime_id/{runtime_id}", + JobOverviewResponse, + ) + + def control_job( + self, + job_id: int, + action: str = "start", + additional_reference: Optional[str] = None, + parameters: Optional[Dict[str, Any]] = None, + queue_id: Optional[str] = None, + max_job_duration_seconds: Optional[int] = None, + ) -> JobControlResponse: + """ + Control a job using the control endpoint (POST). + + Supports starting jobs with parameter overrides and queue management. + + Args: + job_id: ID of the job to control + action: Action to perform (default: "start") + additional_reference: Optional reference for external tracking + parameters: Optional parameters to override job settings + queue_id: Optional queue ID for serialized execution + max_job_duration_seconds: Max duration in seconds (default 12 hours) + + Returns: + JobControlResponse with execution details + """ + request_body = { + "action": action, + "objectId": job_id, + "objectType": "job", + } + + # Add authentication credentials for POST + request_body.update(self.auth.get_json_body_params()) + + if additional_reference: + request_body["additionalReference"] = additional_reference + + if parameters: + request_body["parameter"] = parameters + + if queue_id: + request_body["queueId"] = queue_id + + if max_job_duration_seconds: + request_body["maxJobDurationSeconds"] = max_job_duration_seconds + + return self._make_request( + "POST", + "job/control", + JobControlResponse, + json_data=request_body, + ) + + # ============= Protocol Endpoints ============= + + def get_protocols(self, limit: int = 50) -> ProtocolListResponse: + """ + Get the last N protocols. + + Args: + limit: Number of protocols to retrieve (default: 50) + + Returns: + ProtocolListResponse containing list of protocols + """ + response = self.session.get( + urljoin(self.base_url, "/rest/protocol"), + params={"limit": limit, **self.auth.get_url_parameters()}, + timeout=self.timeout, + ) + return self._handle_response(response, ProtocolListResponse) + + def get_protocol(self, protocol_id: str) -> ProtocolInfo: + """ + Get details of a specific protocol. + + Args: + protocol_id: ID of the protocol + + Returns: + ProtocolInfo with protocol details + """ + return self._make_request( + "GET", + f"protocol/{protocol_id}", + ProtocolInfo, + ) + + def get_protocol_by_job_id(self, job_id: int) -> ProtocolListResponse: + """ + Get all protocols for a specific job ID. + + Args: + job_id: ID of the job + + Returns: + ProtocolListResponse containing protocols for the job + """ + return self._make_request( + "GET", + f"protocol/job/{job_id}", + ProtocolListResponse, + ) + + def get_protocol_by_runtime_id(self, runtime_id: str) -> ProtocolInfo: + """ + Get protocol for a specific job instance runtime ID. + + Args: + runtime_id: Runtime ID of the job instance + + Returns: + ProtocolInfo with protocol details + """ + return self._make_request( + "GET", + f"protocol/job/{runtime_id}", + ProtocolInfo, + ) + + def get_protocol_by_additional_reference(self, reference: str) -> ProtocolListResponse: + """ + Get all protocols for a specific additional reference. + + Args: + reference: Additional reference value + + Returns: + ProtocolListResponse containing protocols for the reference + """ + return self._make_request( + "GET", + f"protocol/job/{reference}", + ProtocolListResponse, + ) + + def get_all_protocol_categories(self) -> ProtocolCategoryListResponse: + """ + Get all available protocol categories. + + Returns: + ProtocolCategoryListResponse containing list of categories + """ + return self._make_request( + "GET", + "protocol/category", + ProtocolCategoryListResponse, + ) + + def get_protocol_by_category(self, category_id: str, limit: int = 50) -> ProtocolListResponse: + """ + Get the last N protocols from a specific category. + + Args: + category_id: ID of the protocol category + limit: Number of protocols to retrieve (default: 50) + + Returns: + ProtocolListResponse containing protocols for the category + """ + return self._make_request( + "GET", + f"protocol/category/{category_id}", + ProtocolListResponse, + params={"limit": limit}, + ) + + def close(self) -> None: + """Close the session and clean up resources.""" + self.session.close() + + def __enter__(self): + """Context manager entry""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit""" + self.close() diff --git a/elytra_client/rest_api/models.py b/elytra_client/rest_api/models.py new file mode 100644 index 0000000..0bbabe9 --- /dev/null +++ b/elytra_client/rest_api/models.py @@ -0,0 +1,129 @@ +"""Models for the Lobster PIM Legacy REST API""" + +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + + +class JobInfo(BaseModel): + """Base job information model""" + id: int = Field(..., description="The ID of the job") + name: str = Field(..., description="The name of the job") + jobIdentifier: str = Field(..., description="The unique job identifier") + jobDescription: Optional[str] = Field(None, description="Description of the job") + status: str = Field(..., description="Current status of the job") + nextExecutionDate: str = Field(..., description="Next scheduled execution date") + previousExecutionDate: Optional[str] = Field(None, description="Previous execution date") + protocolId: Optional[str] = Field(None, description="ID of the associated protocol") + errors: List[str] = Field(default_factory=list, description="List of errors") + messages: List[str] = Field(default_factory=list, description="List of messages") + warnings: List[str] = Field(default_factory=list, description="List of warnings") + + +class JobDetailInfo(JobInfo): + """Detailed job information including error level and runtime ID""" + errorLevel: Optional[str] = Field(None, description="Error level (e.g., 'Erfolgreich')") + runtimeId: Optional[str] = Field(None, description="Runtime ID for active job execution") + + +class JobOverviewResponse(BaseModel): + """Response containing multiple job information items""" + jobInfoObjects: List[JobDetailInfo] = Field(..., description="List of job information objects") + errors: List[str] = Field(default_factory=list, description="List of errors") + warnings: List[str] = Field(default_factory=list, description="List of warnings") + + +class JobExecutionResponse(BaseModel): + """Response from executing a job""" + id: int = Field(..., description="The ID of the job") + name: str = Field(..., description="The name of the job") + jobIdentifier: str = Field(..., description="The unique job identifier") + jobDescription: Optional[str] = Field(None, description="Description of the job") + status: str = Field(..., description="Status after execution") + nextExecutionDate: str = Field(..., description="Next execution date") + protocolId: str = Field(..., description="ID of the protocol for this execution") + runtimeId: str = Field(..., description="Runtime ID for tracking execution") + errors: List[str] = Field(default_factory=list, description="List of errors") + messages: List[str] = Field(default_factory=list, description="List of messages (e.g., JOB_START_OK)") + warnings: List[str] = Field(default_factory=list, description="List of warnings") + + +class JobControlRequest(BaseModel): + """Request body for job control endpoint""" + action: str = Field(..., description="Action to perform (e.g., 'start')") + objectId: int = Field(..., description="The ID of the job to control") + objectType: str = Field(default="job", description="Type of object") + username: str = Field(..., description="Username for authentication") + password: str = Field(..., description="Password for authentication") + additionalReference: Optional[str] = Field( + None, description="Custom reference for external processing tracking" + ) + parameter: Optional[Dict[str, Any]] = Field( + None, description="Parameters to override job settings" + ) + queueId: Optional[str] = Field( + None, description="Queue ID for serialized job execution" + ) + maxJobDurationSeconds: Optional[int] = Field( + default=43200, description="Max duration in seconds (default 12 hours)" + ) + + +class JobControlResponse(BaseModel): + """Response from job control endpoint""" + jobIdentifier: str = Field(..., description="The job identifier") + runtimeId: str = Field(..., description="Runtime ID for tracking") + errors: List[str] = Field(default_factory=list, description="List of errors") + messages: List[str] = Field(default_factory=list, description="List of messages") + warnings: List[str] = Field(default_factory=list, description="List of warnings") + + +class ProtocolEntry(BaseModel): + """A single entry in a protocol log""" + timestamp: Optional[str] = Field(None, description="Timestamp of the entry") + level: Optional[str] = Field(None, description="Log level (ERROR, WARNING, INFO, etc.)") + message: Optional[str] = Field(None, description="Message content") + + +class ProtocolInfo(BaseModel): + """Protocol/Log information""" + id: Optional[int] = Field(None, description="Protocol ID") + protocolId: Optional[str] = Field(None, description="Protocol ID as string") + jobId: Optional[int] = Field(None, description="Associated job ID") + runtimeId: Optional[str] = Field(None, description="Runtime ID of the job execution") + jobIdentifier: Optional[str] = Field(None, description="Job identifier") + status: Optional[str] = Field(None, description="Status of the job") + startTime: Optional[str] = Field(None, description="Start time of execution") + endTime: Optional[str] = Field(None, description="End time of execution") + errors: List[str] = Field(default_factory=list, description="List of errors") + messages: List[str] = Field(default_factory=list, description="List of messages") + entries: Optional[List[ProtocolEntry]] = Field( + None, description="Protocol entries" + ) + + +class ProtocolListResponse(BaseModel): + """Response containing list of protocols""" + protocols: Optional[List[ProtocolInfo]] = Field(None, description="List of protocols") + errors: List[str] = Field(default_factory=list, description="List of errors") + warnings: List[str] = Field(default_factory=list, description="List of warnings") + + +class ProtocolCategoryInfo(BaseModel): + """Protocol category information""" + id: str = Field(..., description="Category ID") + name: str = Field(..., description="Category name") + description: Optional[str] = Field(None, description="Category description") + + +class ProtocolCategoryListResponse(BaseModel): + """Response containing list of protocol categories""" + categories: List[ProtocolCategoryInfo] = Field(..., description="List of protocol categories") + errors: List[str] = Field(default_factory=list, description="List of errors") + + +class ErrorResponse(BaseModel): + """Error response from the REST API""" + error: str = Field(..., description="Error message") + errorCode: Optional[str] = Field(None, description="Error code") + details: Optional[str] = Field(None, description="Error details") diff --git a/examples/legacy_rest_api_examples.py b/examples/legacy_rest_api_examples.py new file mode 100644 index 0000000..99be68c --- /dev/null +++ b/examples/legacy_rest_api_examples.py @@ -0,0 +1,271 @@ +"""Examples of using the Lobster PIM Legacy REST API client""" + +from elytra_client.rest_api import LobsterRestApiClient, RestApiAuth + + +def example_1_basic_usage(): + """Example 1: Basic usage with job overview""" + print("=" * 60) + print("Example 1: Basic Job Overview") + print("=" * 60) + + # Create authentication + auth = RestApiAuth.from_username_password("admin", "password") + + # Create client + client = LobsterRestApiClient("http://localhost:8080", auth=auth) + + try: + # Get all active jobs + jobs = client.get_all_active_jobs() + print(f"\nFound {len(jobs.jobInfoObjects)} active jobs:") + for job in jobs.jobInfoObjects: + print(f" - {job.name} (ID: {job.id})") + print(f" Status: {job.status}") + print(f" Next Execution: {job.nextExecutionDate}") + finally: + client.close() + + +def example_2_api_token_auth(): + """Example 2: Using API token authentication""" + print("\n" + "=" * 60) + print("Example 2: API Token Authentication") + print("=" * 60) + + # Create authentication with API token + # Note: API token is domain-specific (e.g., "Jobs", "Protokolle") + auth = RestApiAuth.from_api_token( + username="admin", + token="e96129a5-a314-4100-a44d-f851bf68dc18", + domain="Jobs" + ) + + client = LobsterRestApiClient("http://localhost:8080", auth=auth) + + try: + jobs = client.get_all_active_jobs() + print(f"\nAuthenticated with API token. Found {len(jobs.jobInfoObjects)} jobs.") + finally: + client.close() + + +def example_3_execute_job(): + """Example 3: Execute a job""" + print("\n" + "=" * 60) + print("Example 3: Execute a Job") + print("=" * 60) + + auth = RestApiAuth.from_username_password("admin", "password") + client = LobsterRestApiClient("http://localhost:8080", auth=auth) + + try: + # Execute a job + job_id = 172475107 # Example job ID + print(f"\nExecuting job {job_id}...") + + result = client.execute_job(job_id=job_id) + + print(f"Job execution started:") + print(f" Job Name: {result.name}") + print(f" Runtime ID: {result.runtimeId}") + print(f" Status: {result.status}") + print(f" Protocol ID: {result.protocolId}") + print(f" Messages: {result.messages}") + finally: + client.close() + + +def example_4_job_with_parameters(): + """Example 4: Execute job with parameter overrides""" + print("\n" + "=" * 60) + print("Example 4: Job with Parameter Overrides") + print("=" * 60) + + auth = RestApiAuth.from_username_password("admin", "password") + client = LobsterRestApiClient("http://localhost:8080", auth=auth) + + try: + # Execute job with language parameter override + print("\nExecuting job with language parameter override (en)...") + + result = client.control_job( + job_id=2966557, + action="start", + parameters={ + "sprache": { + "defaultlanguage": "en" + } + }, + additional_reference="example-import-en" + ) + + print(f"Job started with parameters:") + print(f" Job Identifier: {result.jobIdentifier}") + print(f" Runtime ID: {result.runtimeId}") + finally: + client.close() + + +def example_5_queue_management(): + """Example 5: Batch job execution with queue management""" + print("\n" + "=" * 60) + print("Example 5: Queue Management (v2024 R1+)") + print("=" * 60) + + auth = RestApiAuth.from_username_password("admin", "password") + client = LobsterRestApiClient("http://localhost:8080", auth=auth) + + try: + # Execute multiple jobs in sequence using a queue + queue_id = "article-updates-batch-1" + + print(f"\nExecuting jobs in queue: {queue_id}") + + articles = [ + {"id": "ARTICLE-001", "name": "Article 1"}, + {"id": "ARTICLE-002", "name": "Article 2"}, + {"id": "ARTICLE-003", "name": "Article 3"}, + ] + + for article in articles: + result = client.control_job( + job_id=123, + action="start", + queueId=queue_id, + maxJobDurationSeconds=3600, # 1 hour timeout per job + parameters={ + "article_id": article["id"], + "article_name": article["name"] + }, + additional_reference=f"batch-{article['id']}" + ) + print(f" Queued: {article['id']} - Runtime ID: {result.runtimeId}") + finally: + client.close() + + +def example_6_get_protocols(): + """Example 6: Access protocol/log information""" + print("\n" + "=" * 60) + print("Example 6: Protocol/Log Access") + print("=" * 60) + + auth = RestApiAuth.from_username_password("admin", "password") + client = LobsterRestApiClient("http://localhost:8080", auth=auth) + + try: + # Get recent protocols + print("\nGetting last 10 protocols...") + protocols = client.get_protocols(limit=10) + + if protocols.protocols: + for protocol in protocols.protocols: + print(f" Protocol {protocol.protocolId}: {protocol.status}") + else: + print(" No protocols available") + + # Get protocols by job ID + job_id = 172475107 + print(f"\nGetting protocols for job {job_id}...") + job_protocols = client.get_protocol_by_job_id(job_id) + + # Get protocol categories + print("\nAvailable protocol categories:") + categories = client.get_all_protocol_categories() + for cat in categories.categories: + print(f" - {cat.name} (ID: {cat.id})") + finally: + client.close() + + +def example_7_context_manager(): + """Example 7: Using client as context manager""" + print("\n" + "=" * 60) + print("Example 7: Context Manager Usage") + print("=" * 60) + + auth = RestApiAuth.from_username_password("admin", "password") + + # Client is automatically closed when exiting the context + with LobsterRestApiClient("http://localhost:8080", auth=auth) as client: + jobs = client.get_all_active_jobs() + print(f"\nUsing context manager - Found {len(jobs.jobInfoObjects)} jobs") + # Client automatically closed here + + +def example_8_error_handling(): + """Example 8: Error handling""" + print("\n" + "=" * 60) + print("Example 8: Error Handling") + print("=" * 60) + + from elytra_client.exceptions import ( + ElytraAPIError, + ElytraAuthenticationError, + ElytraNotFoundError, + ) + + auth = RestApiAuth.from_username_password("admin", "password") + client = LobsterRestApiClient("http://localhost:8080", auth=auth) + + try: + # Try to get a non-existent job + try: + job = client.get_job_detail(job_id=999999) + except ElytraNotFoundError: + print("\nJob not found - this is expected for non-existent IDs") + except ElytraAuthenticationError: + print("\nAuthentication failed - check credentials") + except ElytraAPIError as e: + print(f"\nAPI error: {e}") + finally: + client.close() + + +def example_9_running_jobs(): + """Example 9: Monitor running jobs""" + print("\n" + "=" * 60) + print("Example 9: Monitor Running Jobs") + print("=" * 60) + + auth = RestApiAuth.from_username_password("admin", "password") + client = LobsterRestApiClient("http://localhost:8080", auth=auth) + + try: + # Get all running job instances + print("\nGetting running job instances...") + running = client.get_running_job_instances() + + if running.jobInfoObjects: + print(f"Found {len(running.jobInfoObjects)} running jobs:") + for job in running.jobInfoObjects: + print(f" - {job.name}") + print(f" Runtime ID: {job.runtimeId}") + print(f" Status: {job.status}") + else: + print("No jobs currently running") + finally: + client.close() + + +if __name__ == "__main__": + # Run examples + # Note: These examples assume a running Lobster PIM server at localhost:8080 + # with user "admin" / "password" + + print("\nLobster PIM Legacy REST API Examples") + print("=" * 60) + print("\nNote: These examples require a running Lobster PIM server") + print("Uncomment the examples you want to run below:\n") + + # Uncomment to run examples: + # example_1_basic_usage() + # example_2_api_token_auth() + # example_3_execute_job() + # example_4_job_with_parameters() + # example_5_queue_management() + # example_6_get_protocols() + # example_7_context_manager() + # example_8_error_handling() + # example_9_running_jobs() diff --git a/tests/test_rest_api.py b/tests/test_rest_api.py new file mode 100644 index 0000000..60898fb --- /dev/null +++ b/tests/test_rest_api.py @@ -0,0 +1,409 @@ +"""Tests for the Lobster PIM Legacy REST API client""" + +from unittest.mock import MagicMock, Mock, patch + +import pytest +import requests + +from elytra_client.exceptions import ( + ElytraAPIError, + ElytraAuthenticationError, + ElytraNotFoundError, + ElytraValidationError, +) +from elytra_client.rest_api import ( + AuthMethod, + JobControlResponse, + JobDetailInfo, + JobExecutionResponse, + JobOverviewResponse, + LobsterRestApiClient, + ProtocolInfo, + RestApiAuth, +) + + +class TestRestApiAuth: + """Tests for RestApiAuth""" + + def test_username_password_auth(self): + """Test username/password authentication""" + auth = RestApiAuth.from_username_password("testuser", "password123") + assert auth.username == "testuser" + assert auth.password_or_token == "password123" + assert auth.auth_method == AuthMethod.USERNAME_PASSWORD + assert auth.get_url_parameters() == {"username": "testuser", "password": "password123"} + assert auth.get_auth_header() == {} + + def test_api_token_auth(self): + """Test API token authentication""" + auth = RestApiAuth.from_api_token("admin", "token123", "Jobs") + assert auth.username == "admin" + assert auth.password_or_token == "token123" + assert auth.domain == "Jobs" + assert auth.auth_method == AuthMethod.API_TOKEN + assert auth.get_auth_header() == {"Authorization": "admin:token123"} + assert auth.get_url_parameters() == {} + + def test_api_token_auth_without_domain(self): + """Test that API token auth requires a domain""" + with pytest.raises(ValueError, match="Domain is required"): + RestApiAuth.from_api_token("admin", "token123", None) + + def test_json_body_params(self): + """Test JSON body parameters""" + auth = RestApiAuth.from_username_password("user", "pass") + params = auth.get_json_body_params() + assert params == {"username": "user", "password": "pass"} + + +class TestLobsterRestApiClient: + """Tests for LobsterRestApiClient""" + + @pytest.fixture + def auth(self): + """Fixture for test authentication""" + return RestApiAuth.from_username_password("testuser", "password123") + + @pytest.fixture + def client(self, auth): + """Fixture for test client""" + return LobsterRestApiClient("http://localhost:8080", auth=auth) + + def test_client_initialization(self, auth): + """Test client initialization""" + client = LobsterRestApiClient("http://localhost:8080/", auth=auth) + assert client.base_url == "http://localhost:8080" + assert client.auth == auth + assert client.timeout == 30 + + def test_client_with_custom_timeout(self, auth): + """Test client with custom timeout""" + client = LobsterRestApiClient("http://localhost:8080", auth=auth, timeout=60) + assert client.timeout == 60 + + @patch("requests.Session.request") + def test_get_job_overview(self, mock_request, client): + """Test getting job overview""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "jobInfoObjects": [ + { + "id": 123, + "name": "Test Job", + "jobIdentifier": "TEST_JOB", + "status": "Warten", + "nextExecutionDate": "manual execution", + "errors": [], + "messages": [], + "warnings": [], + } + ], + "errors": [], + "warnings": [], + } + mock_request.return_value = mock_response + + result = client.get_job_overview() + + assert isinstance(result, JobOverviewResponse) + assert len(result.jobInfoObjects) == 1 + assert result.jobInfoObjects[0].name == "Test Job" + + @patch("requests.Session.request") + def test_get_all_active_jobs(self, mock_request, client): + """Test getting all active jobs""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "jobInfoObjects": [], + "errors": [], + "warnings": [], + } + mock_request.return_value = mock_response + + result = client.get_all_active_jobs() + + assert isinstance(result, JobOverviewResponse) + mock_request.assert_called() + + @patch("requests.Session.request") + def test_get_job_detail(self, mock_request, client): + """Test getting job detail""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "id": 123, + "name": "Test Job", + "jobIdentifier": "TEST_JOB", + "status": "Warten", + "nextExecutionDate": "manual execution", + "errorLevel": "Erfolgreich", + "errors": [], + "messages": [], + "warnings": [], + } + mock_request.return_value = mock_response + + result = client.get_job_detail(123) + + assert isinstance(result, JobDetailInfo) + assert result.id == 123 + assert result.errorLevel == "Erfolgreich" + + @patch("requests.Session.request") + def test_execute_job(self, mock_request, client): + """Test executing a job""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "id": 123, + "name": "Test Job", + "jobIdentifier": "TEST_JOB", + "status": "Wird ausgeführt", + "nextExecutionDate": "manual execution", + "protocolId": "456", + "runtimeId": "1698914697288", + "errors": [], + "messages": ["JOB_START_OK"], + "warnings": [], + } + mock_request.return_value = mock_response + + result = client.execute_job(123) + + assert isinstance(result, JobExecutionResponse) + assert result.runtimeId == "1698914697288" + assert "JOB_START_OK" in result.messages + + @patch("requests.Session.request") + def test_control_job(self, mock_request, client): + """Test controlling a job""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "jobIdentifier": "TEST_JOB.tmp_(1698914396035)", + "runtimeId": "1698914396035", + "errors": [], + "messages": [], + "warnings": [], + } + mock_request.return_value = mock_response + + result = client.control_job(123, action="start") + + assert isinstance(result, JobControlResponse) + assert "tmp_" in result.jobIdentifier + + @patch("requests.Session.request") + def test_control_job_with_parameters(self, mock_request, client): + """Test controlling a job with parameter overrides""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "jobIdentifier": "TEST_JOB.tmp_(1698914396035)", + "runtimeId": "1698914396035", + "errors": [], + "messages": [], + "warnings": [], + } + mock_request.return_value = mock_response + + result = client.control_job( + 123, + action="start", + parameters={"defaultlanguage": "en"}, + additional_reference="my-ref", + queue_id="queue-1", + ) + + assert isinstance(result, JobControlResponse) + # Verify the request body includes the parameters + call_args = mock_request.call_args + json_data = call_args.kwargs.get("json") + assert json_data is not None + assert json_data.get("parameter") == {"defaultlanguage": "en"} + assert json_data.get("additionalReference") == "my-ref" + assert json_data.get("queueId") == "queue-1" + + @patch("requests.Session.request") + def test_get_protocol(self, mock_request, client): + """Test getting protocol details""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "id": 456, + "protocolId": "456", + "jobId": 123, + "status": "Success", + "errors": [], + "messages": [], + } + mock_request.return_value = mock_response + + result = client.get_protocol("456") + + assert isinstance(result, ProtocolInfo) + assert result.protocolId == "456" + + @patch("requests.Session.request") + def test_get_protocol_by_job_id(self, mock_request, client): + """Test getting protocol by job ID""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "protocols": [ + { + "protocolId": "456", + "jobId": 123, + "status": "Success", + "errors": [], + "messages": [], + } + ], + "errors": [], + "warnings": [], + } + mock_request.return_value = mock_response + + result = client.get_protocol_by_job_id(123) + + assert mock_request.call_count == 1 + + @patch("requests.Session.request") + def test_get_running_job_instances(self, mock_request, client): + """Test getting running job instances""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "jobInfoObjects": [], + "errors": [], + "warnings": [], + } + mock_request.return_value = mock_response + + result = client.get_running_job_instances() + + assert isinstance(result, JobOverviewResponse) + + @patch("requests.Session.request") + def test_authentication_error(self, mock_request, client): + """Test authentication error handling""" + mock_response = Mock() + mock_response.status_code = 401 + mock_request.return_value = mock_response + + with pytest.raises(ElytraAuthenticationError): + client.get_job_overview() + + @patch("requests.Session.request") + def test_not_found_error(self, mock_request, client): + """Test not found error handling""" + mock_response = Mock() + mock_response.status_code = 404 + mock_request.return_value = mock_response + + with pytest.raises(ElytraNotFoundError): + client.get_job_detail(999) + + @patch("requests.Session.request") + def test_rate_limit_error(self, mock_request, client): + """Test rate limit error handling""" + mock_response = Mock() + mock_response.status_code = 429 + mock_request.return_value = mock_response + + with pytest.raises(ElytraAPIError, match="rate limit"): + client.get_job_overview() + + @patch("requests.Session.request") + def test_json_parse_error(self, mock_request, client): + """Test JSON parsing error""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.side_effect = Exception("Invalid JSON") + mock_request.return_value = mock_response + + with pytest.raises(ElytraAPIError): + client.get_job_overview() + + @patch("requests.Session.request") + def test_validation_error(self, mock_request, client): + """Test response validation error""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "jobInfoObjects": [ + {"invalid": "data"} # Missing required fields + ], + "errors": [], + "warnings": [], + } + mock_request.return_value = mock_response + + with pytest.raises(ElytraValidationError): + client.get_job_overview() + + def test_context_manager(self, auth): + """Test using client as context manager""" + with LobsterRestApiClient("http://localhost:8080", auth=auth) as client: + assert client is not None + + +class TestIntegration: + """Integration tests with example scenarios""" + + @pytest.fixture + def auth(self): + """Fixture for test authentication""" + return RestApiAuth.from_username_password("admin", "password") + + @pytest.fixture + def client(self, auth): + """Fixture for test client""" + return LobsterRestApiClient("http://localhost:8080", auth=auth) + + @patch("requests.Session.request") + def test_job_execution_workflow(self, mock_request, client): + """Test complete job execution workflow""" + # Step 1: Get job details + mock_response1 = Mock() + mock_response1.status_code = 200 + mock_response1.json.return_value = { + "id": 123, + "name": "Import Job", + "jobIdentifier": "IMPORT_PRODUCTS", + "status": "Warten", + "nextExecutionDate": "manual execution", + "errorLevel": "Erfolgreich", + "errors": [], + "messages": [], + "warnings": [], + } + + # Step 2: Execute job + mock_response2 = Mock() + mock_response2.status_code = 200 + mock_response2.json.return_value = { + "id": 123, + "name": "Import Job", + "jobIdentifier": "IMPORT_PRODUCTS", + "status": "Wird ausgeführt", + "nextExecutionDate": "manual execution", + "protocolId": "789", + "runtimeId": "1698914697288", + "errors": [], + "messages": ["JOB_START_OK"], + "warnings": [], + } + + mock_request.side_effect = [mock_response1, mock_response2] + + # Get job details + job_details = client.get_job_detail(123) + assert job_details.name == "Import Job" + + # Execute the job + execution_result = client.execute_job(123) + assert execution_result.runtimeId == "1698914697288"