elytra_client/elytra_client/rest_api/models/jobs.py

84 lines
4 KiB
Python

"""Job management models"""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class JobInfo(BaseModel):
"""Base job information model"""
id: int = Field(..., description="The ID of the job")
name: str = Field(..., description="The name of the job")
jobIdentifier: str = Field(..., description="The unique job identifier")
jobDescription: Optional[str] = Field(None, description="Description of the job")
status: str = Field(..., description="Current status of the job")
nextExecutionDate: str = Field(..., description="Next scheduled execution date")
previousExecutionDate: Optional[str] = Field(None, description="Previous execution date")
protocolId: Optional[str] = Field(None, description="ID of the associated protocol")
errors: List[str] = Field(default_factory=list, description="List of errors")
messages: List[str] = Field(default_factory=list, description="List of messages")
warnings: List[str] = Field(default_factory=list, description="List of warnings")
class JobDetailInfo(JobInfo):
"""Detailed job information including error level and runtime ID"""
errorLevel: Optional[str] = Field(None, description="Error level (e.g., 'Erfolgreich')")
runtimeId: Optional[str] = Field(None, description="Runtime ID for active job execution")
class JobOverviewResponse(BaseModel):
"""Response containing multiple job information items"""
jobInfoObjects: List[JobDetailInfo] = Field(..., description="List of job information objects")
errors: List[str] = Field(default_factory=list, description="List of errors")
warnings: List[str] = Field(default_factory=list, description="List of warnings")
class JobExecutionResponse(BaseModel):
"""Response from executing a job"""
id: int = Field(..., description="The ID of the job")
name: str = Field(..., description="The name of the job")
jobIdentifier: str = Field(..., description="The unique job identifier")
jobDescription: Optional[str] = Field(None, description="Description of the job")
status: str = Field(..., description="Status after execution")
nextExecutionDate: str = Field(..., description="Next execution date")
protocolId: str = Field(..., description="ID of the protocol for this execution")
runtimeId: str = Field(..., description="Runtime ID for tracking execution")
errors: List[str] = Field(default_factory=list, description="List of errors")
messages: List[str] = Field(
default_factory=list, description="List of messages (e.g., JOB_START_OK)"
)
warnings: List[str] = Field(default_factory=list, description="List of warnings")
class JobControlRequest(BaseModel):
"""Request body for job control endpoint"""
action: str = Field(..., description="Action to perform (e.g., 'start')")
objectId: int = Field(..., description="The ID of the job to control")
objectType: str = Field(default="job", description="Type of object")
username: str = Field(..., description="Username for authentication")
password: str = Field(..., description="Password for authentication")
additionalReference: Optional[str] = Field(
None, description="Custom reference for external processing tracking"
)
parameter: Optional[Dict[str, Any]] = Field(
None, description="Parameters to override job settings"
)
queueId: Optional[str] = Field(None, description="Queue ID for serialized job execution")
maxJobDurationSeconds: Optional[int] = Field(
default=43200, description="Max duration in seconds (default 12 hours)"
)
class JobControlResponse(BaseModel):
"""Response from job control endpoint"""
jobIdentifier: str = Field(..., description="The job identifier")
runtimeId: str = Field(..., description="Runtime ID for tracking")
errors: List[str] = Field(default_factory=list, description="List of errors")
messages: List[str] = Field(default_factory=list, description="List of messages")
warnings: List[str] = Field(default_factory=list, description="List of warnings")