409 lines
14 KiB
Python
409 lines
14 KiB
Python
"""Tests for the Lobster PIM Legacy REST API client"""
|
|
|
|
from unittest.mock import MagicMock, Mock, patch
|
|
|
|
import pytest
|
|
import requests
|
|
|
|
from elytra_client.exceptions import (
|
|
ElytraAPIError,
|
|
ElytraAuthenticationError,
|
|
ElytraNotFoundError,
|
|
ElytraValidationError,
|
|
)
|
|
from elytra_client.rest_api import (
|
|
AuthMethod,
|
|
JobControlResponse,
|
|
JobDetailInfo,
|
|
JobExecutionResponse,
|
|
JobOverviewResponse,
|
|
LobsterRestApiClient,
|
|
ProtocolInfo,
|
|
RestApiAuth,
|
|
)
|
|
|
|
|
|
class TestRestApiAuth:
|
|
"""Tests for RestApiAuth"""
|
|
|
|
def test_username_password_auth(self):
|
|
"""Test username/password authentication"""
|
|
auth = RestApiAuth.from_username_password("testuser", "password123")
|
|
assert auth.username == "testuser"
|
|
assert auth.password_or_token == "password123"
|
|
assert auth.auth_method == AuthMethod.USERNAME_PASSWORD
|
|
assert auth.get_url_parameters() == {"username": "testuser", "password": "password123"}
|
|
assert auth.get_auth_header() == {}
|
|
|
|
def test_api_token_auth(self):
|
|
"""Test API token authentication"""
|
|
auth = RestApiAuth.from_api_token("admin", "token123", "Jobs")
|
|
assert auth.username == "admin"
|
|
assert auth.password_or_token == "token123"
|
|
assert auth.domain == "Jobs"
|
|
assert auth.auth_method == AuthMethod.API_TOKEN
|
|
assert auth.get_auth_header() == {"Authorization": "admin:token123"}
|
|
assert auth.get_url_parameters() == {}
|
|
|
|
def test_api_token_auth_without_domain(self):
|
|
"""Test that API token auth requires a domain"""
|
|
with pytest.raises(ValueError, match="Domain is required"):
|
|
RestApiAuth.from_api_token("admin", "token123", None) # type: ignore
|
|
|
|
def test_json_body_params(self):
|
|
"""Test JSON body parameters"""
|
|
auth = RestApiAuth.from_username_password("user", "pass")
|
|
params = auth.get_json_body_params()
|
|
assert params == {"username": "user", "password": "pass"}
|
|
|
|
|
|
class TestLobsterRestApiClient:
|
|
"""Tests for LobsterRestApiClient"""
|
|
|
|
@pytest.fixture
|
|
def auth(self):
|
|
"""Fixture for test authentication"""
|
|
return RestApiAuth.from_username_password("testuser", "password123")
|
|
|
|
@pytest.fixture
|
|
def client(self, auth):
|
|
"""Fixture for test client"""
|
|
return LobsterRestApiClient("http://localhost:8080", auth=auth)
|
|
|
|
def test_client_initialization(self, auth):
|
|
"""Test client initialization"""
|
|
client = LobsterRestApiClient("http://localhost:8080/", auth=auth)
|
|
assert client.base_url == "http://localhost:8080"
|
|
assert client.auth == auth
|
|
assert client.timeout == 30
|
|
|
|
def test_client_with_custom_timeout(self, auth):
|
|
"""Test client with custom timeout"""
|
|
client = LobsterRestApiClient("http://localhost:8080", auth=auth, timeout=60)
|
|
assert client.timeout == 60
|
|
|
|
@patch("requests.Session.request")
|
|
def test_get_job_overview(self, mock_request, client):
|
|
"""Test getting job overview"""
|
|
mock_response = Mock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {
|
|
"jobInfoObjects": [
|
|
{
|
|
"id": 123,
|
|
"name": "Test Job",
|
|
"jobIdentifier": "TEST_JOB",
|
|
"status": "Warten",
|
|
"nextExecutionDate": "manual execution",
|
|
"errors": [],
|
|
"messages": [],
|
|
"warnings": [],
|
|
}
|
|
],
|
|
"errors": [],
|
|
"warnings": [],
|
|
}
|
|
mock_request.return_value = mock_response
|
|
|
|
result = client.get_job_overview()
|
|
|
|
assert isinstance(result, JobOverviewResponse)
|
|
assert len(result.jobInfoObjects) == 1
|
|
assert result.jobInfoObjects[0].name == "Test Job"
|
|
|
|
@patch("requests.Session.request")
|
|
def test_get_all_active_jobs(self, mock_request, client):
|
|
"""Test getting all active jobs"""
|
|
mock_response = Mock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {
|
|
"jobInfoObjects": [],
|
|
"errors": [],
|
|
"warnings": [],
|
|
}
|
|
mock_request.return_value = mock_response
|
|
|
|
result = client.get_all_active_jobs()
|
|
|
|
assert isinstance(result, JobOverviewResponse)
|
|
mock_request.assert_called()
|
|
|
|
@patch("requests.Session.request")
|
|
def test_get_job_detail(self, mock_request, client):
|
|
"""Test getting job detail"""
|
|
mock_response = Mock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {
|
|
"id": 123,
|
|
"name": "Test Job",
|
|
"jobIdentifier": "TEST_JOB",
|
|
"status": "Warten",
|
|
"nextExecutionDate": "manual execution",
|
|
"errorLevel": "Erfolgreich",
|
|
"errors": [],
|
|
"messages": [],
|
|
"warnings": [],
|
|
}
|
|
mock_request.return_value = mock_response
|
|
|
|
result = client.get_job_detail(123)
|
|
|
|
assert isinstance(result, JobDetailInfo)
|
|
assert result.id == 123
|
|
assert result.errorLevel == "Erfolgreich"
|
|
|
|
@patch("requests.Session.request")
|
|
def test_execute_job(self, mock_request, client):
|
|
"""Test executing a job"""
|
|
mock_response = Mock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {
|
|
"id": 123,
|
|
"name": "Test Job",
|
|
"jobIdentifier": "TEST_JOB",
|
|
"status": "Wird ausgeführt",
|
|
"nextExecutionDate": "manual execution",
|
|
"protocolId": "456",
|
|
"runtimeId": "1698914697288",
|
|
"errors": [],
|
|
"messages": ["JOB_START_OK"],
|
|
"warnings": [],
|
|
}
|
|
mock_request.return_value = mock_response
|
|
|
|
result = client.execute_job(123)
|
|
|
|
assert isinstance(result, JobExecutionResponse)
|
|
assert result.runtimeId == "1698914697288"
|
|
assert "JOB_START_OK" in result.messages
|
|
|
|
@patch("requests.Session.request")
|
|
def test_control_job(self, mock_request, client):
|
|
"""Test controlling a job"""
|
|
mock_response = Mock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {
|
|
"jobIdentifier": "TEST_JOB.tmp_(1698914396035)",
|
|
"runtimeId": "1698914396035",
|
|
"errors": [],
|
|
"messages": [],
|
|
"warnings": [],
|
|
}
|
|
mock_request.return_value = mock_response
|
|
|
|
result = client.control_job(123, action="start")
|
|
|
|
assert isinstance(result, JobControlResponse)
|
|
assert "tmp_" in result.jobIdentifier
|
|
|
|
@patch("requests.Session.request")
|
|
def test_control_job_with_parameters(self, mock_request, client):
|
|
"""Test controlling a job with parameter overrides"""
|
|
mock_response = Mock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {
|
|
"jobIdentifier": "TEST_JOB.tmp_(1698914396035)",
|
|
"runtimeId": "1698914396035",
|
|
"errors": [],
|
|
"messages": [],
|
|
"warnings": [],
|
|
}
|
|
mock_request.return_value = mock_response
|
|
|
|
result = client.control_job(
|
|
123,
|
|
action="start",
|
|
parameters={"defaultlanguage": "en"},
|
|
additional_reference="my-ref",
|
|
queue_id="queue-1",
|
|
)
|
|
|
|
assert isinstance(result, JobControlResponse)
|
|
# Verify the request body includes the parameters
|
|
call_args = mock_request.call_args
|
|
json_data = call_args.kwargs.get("json")
|
|
assert json_data is not None
|
|
assert json_data.get("parameter") == {"defaultlanguage": "en"}
|
|
assert json_data.get("additionalReference") == "my-ref"
|
|
assert json_data.get("queueId") == "queue-1"
|
|
|
|
@patch("requests.Session.request")
|
|
def test_get_protocol(self, mock_request, client):
|
|
"""Test getting protocol details"""
|
|
mock_response = Mock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {
|
|
"id": 456,
|
|
"protocolId": "456",
|
|
"jobId": 123,
|
|
"status": "Success",
|
|
"errors": [],
|
|
"messages": [],
|
|
}
|
|
mock_request.return_value = mock_response
|
|
|
|
result = client.get_protocol("456")
|
|
|
|
assert isinstance(result, ProtocolInfo)
|
|
assert result.protocolId == "456"
|
|
|
|
@patch("requests.Session.request")
|
|
def test_get_protocol_by_job_id(self, mock_request, client):
|
|
"""Test getting protocol by job ID"""
|
|
mock_response = Mock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {
|
|
"protocols": [
|
|
{
|
|
"protocolId": "456",
|
|
"jobId": 123,
|
|
"status": "Success",
|
|
"errors": [],
|
|
"messages": [],
|
|
}
|
|
],
|
|
"errors": [],
|
|
"warnings": [],
|
|
}
|
|
mock_request.return_value = mock_response
|
|
|
|
result = client.get_protocol_by_job_id(123)
|
|
|
|
assert mock_request.call_count == 1
|
|
|
|
@patch("requests.Session.request")
|
|
def test_get_running_job_instances(self, mock_request, client):
|
|
"""Test getting running job instances"""
|
|
mock_response = Mock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {
|
|
"jobInfoObjects": [],
|
|
"errors": [],
|
|
"warnings": [],
|
|
}
|
|
mock_request.return_value = mock_response
|
|
|
|
result = client.get_running_job_instances()
|
|
|
|
assert isinstance(result, JobOverviewResponse)
|
|
|
|
@patch("requests.Session.request")
|
|
def test_authentication_error(self, mock_request, client):
|
|
"""Test authentication error handling"""
|
|
mock_response = Mock()
|
|
mock_response.status_code = 401
|
|
mock_request.return_value = mock_response
|
|
|
|
with pytest.raises(ElytraAuthenticationError):
|
|
client.get_job_overview()
|
|
|
|
@patch("requests.Session.request")
|
|
def test_not_found_error(self, mock_request, client):
|
|
"""Test not found error handling"""
|
|
mock_response = Mock()
|
|
mock_response.status_code = 404
|
|
mock_request.return_value = mock_response
|
|
|
|
with pytest.raises(ElytraNotFoundError):
|
|
client.get_job_detail(999)
|
|
|
|
@patch("requests.Session.request")
|
|
def test_rate_limit_error(self, mock_request, client):
|
|
"""Test rate limit error handling"""
|
|
mock_response = Mock()
|
|
mock_response.status_code = 429
|
|
mock_request.return_value = mock_response
|
|
|
|
with pytest.raises(ElytraAPIError, match="rate limit"):
|
|
client.get_job_overview()
|
|
|
|
@patch("requests.Session.request")
|
|
def test_json_parse_error(self, mock_request, client):
|
|
"""Test JSON parsing error"""
|
|
mock_response = Mock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.side_effect = Exception("Invalid JSON")
|
|
mock_request.return_value = mock_response
|
|
|
|
with pytest.raises(ElytraAPIError):
|
|
client.get_job_overview()
|
|
|
|
@patch("requests.Session.request")
|
|
def test_validation_error(self, mock_request, client):
|
|
"""Test response validation error"""
|
|
mock_response = Mock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {
|
|
"jobInfoObjects": [
|
|
{"invalid": "data"} # Missing required fields
|
|
],
|
|
"errors": [],
|
|
"warnings": [],
|
|
}
|
|
mock_request.return_value = mock_response
|
|
|
|
with pytest.raises(ElytraValidationError):
|
|
client.get_job_overview()
|
|
|
|
def test_context_manager(self, auth):
|
|
"""Test using client as context manager"""
|
|
with LobsterRestApiClient("http://localhost:8080", auth=auth) as client:
|
|
assert client is not None
|
|
|
|
|
|
class TestIntegration:
|
|
"""Integration tests with example scenarios"""
|
|
|
|
@pytest.fixture
|
|
def auth(self):
|
|
"""Fixture for test authentication"""
|
|
return RestApiAuth.from_username_password("admin", "password")
|
|
|
|
@pytest.fixture
|
|
def client(self, auth):
|
|
"""Fixture for test client"""
|
|
return LobsterRestApiClient("http://localhost:8080", auth=auth)
|
|
|
|
@patch("requests.Session.request")
|
|
def test_job_execution_workflow(self, mock_request, client):
|
|
"""Test complete job execution workflow"""
|
|
# Step 1: Get job details
|
|
mock_response1 = Mock()
|
|
mock_response1.status_code = 200
|
|
mock_response1.json.return_value = {
|
|
"id": 123,
|
|
"name": "Import Job",
|
|
"jobIdentifier": "IMPORT_PRODUCTS",
|
|
"status": "Warten",
|
|
"nextExecutionDate": "manual execution",
|
|
"errorLevel": "Erfolgreich",
|
|
"errors": [],
|
|
"messages": [],
|
|
"warnings": [],
|
|
}
|
|
|
|
# Step 2: Execute job
|
|
mock_response2 = Mock()
|
|
mock_response2.status_code = 200
|
|
mock_response2.json.return_value = {
|
|
"id": 123,
|
|
"name": "Import Job",
|
|
"jobIdentifier": "IMPORT_PRODUCTS",
|
|
"status": "Wird ausgeführt",
|
|
"nextExecutionDate": "manual execution",
|
|
"protocolId": "789",
|
|
"runtimeId": "1698914697288",
|
|
"errors": [],
|
|
"messages": ["JOB_START_OK"],
|
|
"warnings": [],
|
|
}
|
|
|
|
mock_request.side_effect = [mock_response1, mock_response2]
|
|
|
|
# Get job details
|
|
job_details = client.get_job_detail(123)
|
|
assert job_details.name == "Import Job"
|
|
|
|
# Execute the job
|
|
execution_result = client.execute_job(123)
|
|
assert execution_result.runtimeId == "1698914697288"
|