271 lines
8.6 KiB
Python
271 lines
8.6 KiB
Python
"""Examples of using the Lobster PIM Legacy REST API client"""
|
|
|
|
from elytra_client.rest_api import LobsterRestApiClient, RestApiAuth
|
|
|
|
|
|
def example_1_basic_usage():
|
|
"""Example 1: Basic usage with job overview"""
|
|
print("=" * 60)
|
|
print("Example 1: Basic Job Overview")
|
|
print("=" * 60)
|
|
|
|
# Create authentication
|
|
auth = RestApiAuth.from_username_password("admin", "password")
|
|
|
|
# Create client
|
|
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
|
|
|
|
try:
|
|
# Get all active jobs
|
|
jobs = client.get_all_active_jobs()
|
|
print(f"\nFound {len(jobs.jobInfoObjects)} active jobs:")
|
|
for job in jobs.jobInfoObjects:
|
|
print(f" - {job.name} (ID: {job.id})")
|
|
print(f" Status: {job.status}")
|
|
print(f" Next Execution: {job.nextExecutionDate}")
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
def example_2_api_token_auth():
|
|
"""Example 2: Using API token authentication"""
|
|
print("\n" + "=" * 60)
|
|
print("Example 2: API Token Authentication")
|
|
print("=" * 60)
|
|
|
|
# Create authentication with API token
|
|
# Note: API token is domain-specific (e.g., "Jobs", "Protokolle")
|
|
auth = RestApiAuth.from_api_token(
|
|
username="admin",
|
|
token="e96129a5-a314-4100-a44d-f851bf68dc18",
|
|
domain="Jobs"
|
|
)
|
|
|
|
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
|
|
|
|
try:
|
|
jobs = client.get_all_active_jobs()
|
|
print(f"\nAuthenticated with API token. Found {len(jobs.jobInfoObjects)} jobs.")
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
def example_3_execute_job():
|
|
"""Example 3: Execute a job"""
|
|
print("\n" + "=" * 60)
|
|
print("Example 3: Execute a Job")
|
|
print("=" * 60)
|
|
|
|
auth = RestApiAuth.from_username_password("admin", "password")
|
|
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
|
|
|
|
try:
|
|
# Execute a job
|
|
job_id = 172475107 # Example job ID
|
|
print(f"\nExecuting job {job_id}...")
|
|
|
|
result = client.execute_job(job_id=job_id)
|
|
|
|
print(f"Job execution started:")
|
|
print(f" Job Name: {result.name}")
|
|
print(f" Runtime ID: {result.runtimeId}")
|
|
print(f" Status: {result.status}")
|
|
print(f" Protocol ID: {result.protocolId}")
|
|
print(f" Messages: {result.messages}")
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
def example_4_job_with_parameters():
|
|
"""Example 4: Execute job with parameter overrides"""
|
|
print("\n" + "=" * 60)
|
|
print("Example 4: Job with Parameter Overrides")
|
|
print("=" * 60)
|
|
|
|
auth = RestApiAuth.from_username_password("admin", "password")
|
|
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
|
|
|
|
try:
|
|
# Execute job with language parameter override
|
|
print("\nExecuting job with language parameter override (en)...")
|
|
|
|
result = client.control_job(
|
|
job_id=2966557,
|
|
action="start",
|
|
parameters={
|
|
"sprache": {
|
|
"defaultlanguage": "en"
|
|
}
|
|
},
|
|
additional_reference="example-import-en"
|
|
) # Note: parameters is nested dict following API spec
|
|
|
|
print(f"Job started with parameters:")
|
|
print(f" Job Identifier: {result.jobIdentifier}")
|
|
print(f" Runtime ID: {result.runtimeId}")
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
def example_5_queue_management():
|
|
"""Example 5: Batch job execution with queue management"""
|
|
print("\n" + "=" * 60)
|
|
print("Example 5: Queue Management (v2024 R1+)")
|
|
print("=" * 60)
|
|
|
|
auth = RestApiAuth.from_username_password("admin", "password")
|
|
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
|
|
|
|
try:
|
|
# Execute multiple jobs in sequence using a queue
|
|
queue_id = "article-updates-batch-1"
|
|
|
|
print(f"\nExecuting jobs in queue: {queue_id}")
|
|
|
|
articles = [
|
|
{"id": "ARTICLE-001", "name": "Article 1"},
|
|
{"id": "ARTICLE-002", "name": "Article 2"},
|
|
{"id": "ARTICLE-003", "name": "Article 3"},
|
|
]
|
|
|
|
for article in articles:
|
|
result = client.control_job(
|
|
job_id=123,
|
|
action="start",
|
|
queue_id=queue_id,
|
|
max_job_duration_seconds=3600, # 1 hour timeout per job
|
|
parameters={
|
|
"article_id": article["id"],
|
|
"article_name": article["name"]
|
|
},
|
|
additional_reference=f"batch-{article['id']}"
|
|
)
|
|
print(f" Queued: {article['id']} - Runtime ID: {result.runtimeId}")
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
def example_6_get_protocols():
|
|
"""Example 6: Access protocol/log information"""
|
|
print("\n" + "=" * 60)
|
|
print("Example 6: Protocol/Log Access")
|
|
print("=" * 60)
|
|
|
|
auth = RestApiAuth.from_username_password("admin", "password")
|
|
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
|
|
|
|
try:
|
|
# Get recent protocols
|
|
print("\nGetting last 10 protocols...")
|
|
protocols = client.get_protocols(limit=10)
|
|
|
|
if protocols.protocols:
|
|
for protocol in protocols.protocols:
|
|
print(f" Protocol {protocol.protocolId}: {protocol.status}")
|
|
else:
|
|
print(" No protocols available")
|
|
|
|
# Get protocols by job ID
|
|
job_id = 172475107
|
|
print(f"\nGetting protocols for job {job_id}...")
|
|
job_protocols = client.get_protocol_by_job_id(job_id)
|
|
|
|
# Get protocol categories
|
|
print("\nAvailable protocol categories:")
|
|
categories = client.get_all_protocol_categories()
|
|
for cat in categories.categories:
|
|
print(f" - {cat.name} (ID: {cat.id})")
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
def example_7_context_manager():
|
|
"""Example 7: Using client as context manager"""
|
|
print("\n" + "=" * 60)
|
|
print("Example 7: Context Manager Usage")
|
|
print("=" * 60)
|
|
|
|
auth = RestApiAuth.from_username_password("admin", "password")
|
|
|
|
# Client is automatically closed when exiting the context
|
|
with LobsterRestApiClient("http://localhost:8080", auth=auth) as client:
|
|
jobs = client.get_all_active_jobs()
|
|
print(f"\nUsing context manager - Found {len(jobs.jobInfoObjects)} jobs")
|
|
# Client automatically closed here
|
|
|
|
|
|
def example_8_error_handling():
|
|
"""Example 8: Error handling"""
|
|
print("\n" + "=" * 60)
|
|
print("Example 8: Error Handling")
|
|
print("=" * 60)
|
|
|
|
from elytra_client.exceptions import (
|
|
ElytraAPIError,
|
|
ElytraAuthenticationError,
|
|
ElytraNotFoundError,
|
|
)
|
|
|
|
auth = RestApiAuth.from_username_password("admin", "password")
|
|
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
|
|
|
|
try:
|
|
# Try to get a non-existent job
|
|
try:
|
|
job = client.get_job_detail(job_id=999999)
|
|
except ElytraNotFoundError:
|
|
print("\nJob not found - this is expected for non-existent IDs")
|
|
except ElytraAuthenticationError:
|
|
print("\nAuthentication failed - check credentials")
|
|
except ElytraAPIError as e:
|
|
print(f"\nAPI error: {e}")
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
def example_9_running_jobs():
|
|
"""Example 9: Monitor running jobs"""
|
|
print("\n" + "=" * 60)
|
|
print("Example 9: Monitor Running Jobs")
|
|
print("=" * 60)
|
|
|
|
auth = RestApiAuth.from_username_password("admin", "password")
|
|
client = LobsterRestApiClient("http://localhost:8080", auth=auth)
|
|
|
|
try:
|
|
# Get all running job instances
|
|
print("\nGetting running job instances...")
|
|
running = client.get_running_job_instances()
|
|
|
|
if running.jobInfoObjects:
|
|
print(f"Found {len(running.jobInfoObjects)} running jobs:")
|
|
for job in running.jobInfoObjects:
|
|
print(f" - {job.name}")
|
|
print(f" Runtime ID: {job.runtimeId}")
|
|
print(f" Status: {job.status}")
|
|
else:
|
|
print("No jobs currently running")
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Run examples
|
|
# Note: These examples assume a running Lobster PIM server at localhost:8080
|
|
# with user "admin" / "password"
|
|
|
|
print("\nLobster PIM Legacy REST API Examples")
|
|
print("=" * 60)
|
|
print("\nNote: These examples require a running Lobster PIM server")
|
|
print("Uncomment the examples you want to run below:\n")
|
|
|
|
# Uncomment to run examples:
|
|
# example_1_basic_usage()
|
|
# example_2_api_token_auth()
|
|
# example_3_execute_job()
|
|
# example_4_job_with_parameters()
|
|
# example_5_queue_management()
|
|
# example_6_get_protocols()
|
|
# example_7_context_manager()
|
|
# example_8_error_handling()
|
|
# example_9_running_jobs()
|