"""Examples of using the Lobster PIM Legacy REST API client""" from elytra_client.rest_api import LobsterRestApiClient, RestApiAuth def example_1_basic_usage(): """Example 1: Basic usage with job overview""" print("=" * 60) print("Example 1: Basic Job Overview") print("=" * 60) # Create authentication auth = RestApiAuth.from_username_password("admin", "password") # Create client client = LobsterRestApiClient("http://localhost:8080", auth=auth) try: # Get all active jobs jobs = client.get_all_active_jobs() print(f"\nFound {len(jobs.jobInfoObjects)} active jobs:") for job in jobs.jobInfoObjects: print(f" - {job.name} (ID: {job.id})") print(f" Status: {job.status}") print(f" Next Execution: {job.nextExecutionDate}") finally: client.close() def example_2_api_token_auth(): """Example 2: Using API token authentication""" print("\n" + "=" * 60) print("Example 2: API Token Authentication") print("=" * 60) # Create authentication with API token # Note: API token is domain-specific (e.g., "Jobs", "Protokolle") auth = RestApiAuth.from_api_token( username="admin", token="e96129a5-a314-4100-a44d-f851bf68dc18", domain="Jobs" ) client = LobsterRestApiClient("http://localhost:8080", auth=auth) try: jobs = client.get_all_active_jobs() print(f"\nAuthenticated with API token. Found {len(jobs.jobInfoObjects)} jobs.") finally: client.close() def example_3_execute_job(): """Example 3: Execute a job""" print("\n" + "=" * 60) print("Example 3: Execute a Job") print("=" * 60) auth = RestApiAuth.from_username_password("admin", "password") client = LobsterRestApiClient("http://localhost:8080", auth=auth) try: # Execute a job job_id = 172475107 # Example job ID print(f"\nExecuting job {job_id}...") result = client.execute_job(job_id=job_id) print(f"Job execution started:") print(f" Job Name: {result.name}") print(f" Runtime ID: {result.runtimeId}") print(f" Status: {result.status}") print(f" Protocol ID: {result.protocolId}") print(f" Messages: {result.messages}") finally: client.close() def example_4_job_with_parameters(): """Example 4: Execute job with parameter overrides""" print("\n" + "=" * 60) print("Example 4: Job with Parameter Overrides") print("=" * 60) auth = RestApiAuth.from_username_password("admin", "password") client = LobsterRestApiClient("http://localhost:8080", auth=auth) try: # Execute job with language parameter override print("\nExecuting job with language parameter override (en)...") result = client.control_job( job_id=2966557, action="start", parameters={ "sprache": { "defaultlanguage": "en" } }, additional_reference="example-import-en" ) # Note: parameters is nested dict following API spec print(f"Job started with parameters:") print(f" Job Identifier: {result.jobIdentifier}") print(f" Runtime ID: {result.runtimeId}") finally: client.close() def example_5_queue_management(): """Example 5: Batch job execution with queue management""" print("\n" + "=" * 60) print("Example 5: Queue Management (v2024 R1+)") print("=" * 60) auth = RestApiAuth.from_username_password("admin", "password") client = LobsterRestApiClient("http://localhost:8080", auth=auth) try: # Execute multiple jobs in sequence using a queue queue_id = "article-updates-batch-1" print(f"\nExecuting jobs in queue: {queue_id}") articles = [ {"id": "ARTICLE-001", "name": "Article 1"}, {"id": "ARTICLE-002", "name": "Article 2"}, {"id": "ARTICLE-003", "name": "Article 3"}, ] for article in articles: result = client.control_job( job_id=123, action="start", queue_id=queue_id, max_job_duration_seconds=3600, # 1 hour timeout per job parameters={ "article_id": article["id"], "article_name": article["name"] }, additional_reference=f"batch-{article['id']}" ) print(f" Queued: {article['id']} - Runtime ID: {result.runtimeId}") finally: client.close() def example_6_get_protocols(): """Example 6: Access protocol/log information""" print("\n" + "=" * 60) print("Example 6: Protocol/Log Access") print("=" * 60) auth = RestApiAuth.from_username_password("admin", "password") client = LobsterRestApiClient("http://localhost:8080", auth=auth) try: # Get recent protocols print("\nGetting last 10 protocols...") protocols = client.get_protocols(limit=10) if protocols.protocols: for protocol in protocols.protocols: print(f" Protocol {protocol.protocolId}: {protocol.status}") else: print(" No protocols available") # Get protocols by job ID job_id = 172475107 print(f"\nGetting protocols for job {job_id}...") job_protocols = client.get_protocol_by_job_id(job_id) # Get protocol categories print("\nAvailable protocol categories:") categories = client.get_all_protocol_categories() for cat in categories.categories: print(f" - {cat.name} (ID: {cat.id})") finally: client.close() def example_7_context_manager(): """Example 7: Using client as context manager""" print("\n" + "=" * 60) print("Example 7: Context Manager Usage") print("=" * 60) auth = RestApiAuth.from_username_password("admin", "password") # Client is automatically closed when exiting the context with LobsterRestApiClient("http://localhost:8080", auth=auth) as client: jobs = client.get_all_active_jobs() print(f"\nUsing context manager - Found {len(jobs.jobInfoObjects)} jobs") # Client automatically closed here def example_8_error_handling(): """Example 8: Error handling""" print("\n" + "=" * 60) print("Example 8: Error Handling") print("=" * 60) from elytra_client.exceptions import ( ElytraAPIError, ElytraAuthenticationError, ElytraNotFoundError, ) auth = RestApiAuth.from_username_password("admin", "password") client = LobsterRestApiClient("http://localhost:8080", auth=auth) try: # Try to get a non-existent job try: job = client.get_job_detail(job_id=999999) except ElytraNotFoundError: print("\nJob not found - this is expected for non-existent IDs") except ElytraAuthenticationError: print("\nAuthentication failed - check credentials") except ElytraAPIError as e: print(f"\nAPI error: {e}") finally: client.close() def example_9_running_jobs(): """Example 9: Monitor running jobs""" print("\n" + "=" * 60) print("Example 9: Monitor Running Jobs") print("=" * 60) auth = RestApiAuth.from_username_password("admin", "password") client = LobsterRestApiClient("http://localhost:8080", auth=auth) try: # Get all running job instances print("\nGetting running job instances...") running = client.get_running_job_instances() if running.jobInfoObjects: print(f"Found {len(running.jobInfoObjects)} running jobs:") for job in running.jobInfoObjects: print(f" - {job.name}") print(f" Runtime ID: {job.runtimeId}") print(f" Status: {job.status}") else: print("No jobs currently running") finally: client.close() if __name__ == "__main__": # Run examples # Note: These examples assume a running Lobster PIM server at localhost:8080 # with user "admin" / "password" print("\nLobster PIM Legacy REST API Examples") print("=" * 60) print("\nNote: These examples require a running Lobster PIM server") print("Uncomment the examples you want to run below:\n") # Uncomment to run examples: # example_1_basic_usage() # example_2_api_token_auth() # example_3_execute_job() # example_4_job_with_parameters() # example_5_queue_management() # example_6_get_protocols() # example_7_context_manager() # example_8_error_handling() # example_9_running_jobs()