From e7b3dcb8d0dfbf634b18760fcd2934c26bebf9ce Mon Sep 17 00:00:00 2001 From: ptaindia Date: Sun, 7 Dec 2025 23:58:09 +0530 Subject: [PATCH] Modernize routers with FastAPI 0.124+ patterns Updated all router files to use modern FastAPI patterns: - Added typed dependencies (DatabaseSession, RequiredAPIKey) - Added Annotated type hints with Doc for documentation - Added status.HTTP_* constants for all status codes - Added comprehensive OpenAPI response schemas with examples - Added proper tags for endpoint grouping - Created Pydantic response models with field documentation - Improved docstrings with detailed descriptions - Added model_config with json_schema_extra examples Files updated: - health.py: Health check and capabilities endpoints - jobs.py: Job management with SSE streaming - batch.py: Batch processing with validation - admin.py: Admin endpoints with proper access control --- api/routers/admin.py | 418 ++++++++++++++++++++++++++++++------------ api/routers/batch.py | 392 ++++++++++++++++++++++++++++----------- api/routers/health.py | 170 +++++++++++++++-- api/routers/jobs.py | 342 ++++++++++++++++++++++++---------- 4 files changed, 978 insertions(+), 344 deletions(-) diff --git a/api/routers/admin.py b/api/routers/admin.py index 45032a1..4baa9ef 100644 --- a/api/routers/admin.py +++ b/api/routers/admin.py @@ -1,19 +1,23 @@ """ -Admin endpoints for system management +Admin endpoints - System management with FastAPI 0.124+ patterns. + +Provides worker management, storage status, statistics, and cleanup operations. +Requires admin API key for access. """ -from typing import Dict, Any, List +from typing import Dict, Any, List, Annotated from datetime import datetime, timedelta -from fastapi import APIRouter, Depends, HTTPException, Query -from sqlalchemy.ext.asyncio import AsyncSession +from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy import select, func, and_ +from annotated_doc import Doc import structlog from api.config import settings -from api.dependencies import get_db, require_api_key -from api.models.job import Job, JobStatus +from api.dependencies import DatabaseSession, require_api_key +from api.models.job import Job, JobStatus, ErrorResponse from api.services.queue import QueueService from api.services.storage import StorageService +from pydantic import BaseModel logger = structlog.get_logger() router = APIRouter() @@ -22,92 +26,211 @@ storage_service = StorageService() +# Response models for OpenAPI documentation +class WorkerInfo(BaseModel): + """Worker status information.""" + id: Annotated[str, Doc("Worker identifier")] + status: Annotated[str, Doc("Current status: active, idle, offline")] + hostname: Annotated[str, Doc("Worker hostname")] + current_job: Annotated[str | None, Doc("Currently processing job ID")] + jobs_completed: Annotated[int, Doc("Total jobs completed")] + last_heartbeat: Annotated[datetime | None, Doc("Last heartbeat timestamp")] + + +class WorkersStatusResponse(BaseModel): + """Workers status response.""" + total_workers: Annotated[int, Doc("Total number of workers")] + workers: Annotated[List[Dict[str, Any]], Doc("List of worker details")] + summary: Annotated[Dict[str, int], Doc("Worker count by status")] + + +class StorageStatusResponse(BaseModel): + """Storage backends status response.""" + backends: Annotated[Dict[str, Dict[str, Any]], Doc("Status of each storage backend")] + default_backend: Annotated[str | None, Doc("Default storage backend name")] + policies: Annotated[Dict[str, Any], Doc("Storage routing policies")] + + +class SystemStatsResponse(BaseModel): + """System statistics response.""" + period: Annotated[str, Doc("Statistics period")] + start_time: Annotated[str, Doc("Period start timestamp")] + jobs: Annotated[Dict[str, Any], Doc("Job statistics")] + queue: Annotated[Dict[str, Any], Doc("Queue statistics")] + workers: Annotated[Dict[str, Any], Doc("Worker statistics")] + + +class CleanupResponse(BaseModel): + """Cleanup operation response.""" + dry_run: Annotated[bool, Doc("Whether this was a dry run")] + jobs_deleted: Annotated[int | None, Doc("Number of jobs deleted")] = None + jobs_to_delete: Annotated[int | None, Doc("Jobs that would be deleted (dry run)")] = None + errors: Annotated[List[Dict[str, Any]] | None, Doc("Any errors during cleanup")] = None + cutoff_date: Annotated[str, Doc("Cutoff date for cleanup")] + + +class PresetResponse(BaseModel): + """Encoding preset response.""" + name: Annotated[str, Doc("Preset name")] + description: Annotated[str | None, Doc("Preset description")] = None + settings: Annotated[Dict[str, Any], Doc("Encoding settings")] + + async def require_admin(api_key: str = Depends(require_api_key)) -> str: - """Require admin privileges.""" - # Check if API key is in the admin keys from environment + """ + Dependency to require admin privileges. + + Validates the API key against configured admin keys. + """ admin_keys = settings.ADMIN_API_KEYS.split(',') if hasattr(settings, 'ADMIN_API_KEYS') and settings.ADMIN_API_KEYS else [] - + if not admin_keys: logger.warning("No admin API keys configured - admin endpoints disabled") - raise HTTPException(status_code=503, detail="Admin functionality not configured") - + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail={"error": "not_configured", "message": "Admin functionality not configured"} + ) + if api_key not in admin_keys: - raise HTTPException(status_code=403, detail="Admin access required") + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail={"error": "access_denied", "message": "Admin access required"} + ) return api_key -@router.get("/workers") +@router.get( + "/workers", + response_model=WorkersStatusResponse, + status_code=status.HTTP_200_OK, + summary="Get workers status", + description="Get the current status of all workers in the processing cluster.", + response_description="List of workers with their status and statistics", + responses={ + 200: {"description": "Workers status retrieved successfully"}, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + 403: {"model": ErrorResponse, "description": "Admin access required"}, + 503: {"model": ErrorResponse, "description": "Admin functionality not configured"}, + }, + tags=["admin"], +) async def get_workers_status( - admin: str = Depends(require_admin), -) -> Dict[str, Any]: + admin: Annotated[str, Depends(require_admin), Doc("Admin API key")], +) -> WorkersStatusResponse: """ Get status of all workers in the system. + + Returns worker health, current jobs, and performance metrics. + Only accessible with admin API key. """ try: workers = await queue_service.get_workers_status() - - return { - "total_workers": len(workers), - "workers": workers, - "summary": { + + return WorkersStatusResponse( + total_workers=len(workers), + workers=workers, + summary={ "active": sum(1 for w in workers if w.get("status") == "active"), "idle": sum(1 for w in workers if w.get("status") == "idle"), "offline": sum(1 for w in workers if w.get("status") == "offline"), }, - } + ) except Exception as e: logger.error("Failed to get workers status", error=str(e)) - raise HTTPException(status_code=500, detail="Failed to get workers status") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={"error": "internal_error", "message": "Failed to get workers status"} + ) -@router.get("/storage") +@router.get( + "/storage", + response_model=StorageStatusResponse, + status_code=status.HTTP_200_OK, + summary="Get storage status", + description="Get the status of all configured storage backends.", + response_description="Storage backend health and configuration", + responses={ + 200: {"description": "Storage status retrieved successfully"}, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + 403: {"model": ErrorResponse, "description": "Admin access required"}, + 503: {"model": ErrorResponse, "description": "Admin functionality not configured"}, + }, + tags=["admin"], +) async def get_storage_status( - admin: str = Depends(require_admin), -) -> Dict[str, Any]: + admin: Annotated[str, Depends(require_admin), Doc("Admin API key")], +) -> StorageStatusResponse: """ Get status of all storage backends. + + Returns health status, available space, and configuration for each backend. """ try: storage_status = {} - + for name, backend in storage_service.backends.items(): try: # Get backend-specific status - status = await backend.get_status() + backend_status = await backend.get_status() storage_status[name] = { "status": "healthy", "type": backend.__class__.__name__, - **status, + **backend_status, } except Exception as e: storage_status[name] = { "status": "unhealthy", "error": str(e), } - - return { - "backends": storage_status, - "default_backend": storage_service.config.get("default_backend"), - "policies": storage_service.config.get("policies", {}), - } + + return StorageStatusResponse( + backends=storage_status, + default_backend=storage_service.config.get("default_backend"), + policies=storage_service.config.get("policies", {}), + ) except Exception as e: logger.error("Failed to get storage status", error=str(e)) - raise HTTPException(status_code=500, detail="Failed to get storage status") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={"error": "internal_error", "message": "Failed to get storage status"} + ) -@router.get("/stats") +@router.get( + "/stats", + response_model=SystemStatsResponse, + status_code=status.HTTP_200_OK, + summary="Get system statistics", + description="Get aggregated system statistics for the specified time period.", + response_description="Job, queue, and worker statistics", + responses={ + 200: {"description": "Statistics retrieved successfully"}, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + 403: {"model": ErrorResponse, "description": "Admin access required"}, + 503: {"model": ErrorResponse, "description": "Admin functionality not configured"}, + }, + tags=["admin"], +) async def get_system_stats( - period: str = Query("24h", regex=r"^(\d+[hdwm])$"), - db: AsyncSession = Depends(get_db), - admin: str = Depends(require_admin), -) -> Dict[str, Any]: + period: Annotated[ + str, + Query(pattern=r"^(\d+[hdwm])$", description="Time period (e.g., 24h, 7d, 4w, 1m)"), + Doc("Statistics aggregation period") + ] = "24h", + db: DatabaseSession = None, + admin: Annotated[str, Depends(require_admin), Doc("Admin API key")] = None, +) -> SystemStatsResponse: """ Get system statistics for the specified period. + + Returns aggregated metrics for jobs, queue depth, and worker performance. + Period format: number + unit (h=hours, d=days, w=weeks, m=months). """ # Parse period unit = period[-1] value = int(period[:-1]) - + if unit == "h": delta = timedelta(hours=value) elif unit == "d": @@ -116,9 +239,9 @@ async def get_system_stats( delta = timedelta(weeks=value) elif unit == "m": delta = timedelta(days=value * 30) - + start_time = datetime.utcnow() - delta - + # Get job statistics stats_query = ( select( @@ -130,39 +253,64 @@ async def get_system_stats( .where(Job.created_at >= start_time) .group_by(Job.status) ) - + result = await db.execute(stats_query) job_stats = result.all() - + # Format statistics - stats = { - "period": period, - "start_time": start_time.isoformat(), - "jobs": { + stats = SystemStatsResponse( + period=period, + start_time=start_time.isoformat(), + jobs={ "total": sum(row.count for row in job_stats), "by_status": {row.status: row.count for row in job_stats}, "avg_processing_time": sum(row.avg_time or 0 for row in job_stats) / len(job_stats) if job_stats else 0, "avg_vmaf_score": sum(row.avg_vmaf or 0 for row in job_stats if row.avg_vmaf) / sum(1 for row in job_stats if row.avg_vmaf) if any(row.avg_vmaf for row in job_stats) else None, }, - "queue": await queue_service.get_queue_stats(), - "workers": await queue_service.get_workers_stats(), - } - + queue=await queue_service.get_queue_stats(), + workers=await queue_service.get_workers_stats(), + ) + return stats -@router.post("/cleanup") +@router.post( + "/cleanup", + response_model=CleanupResponse, + status_code=status.HTTP_200_OK, + summary="Cleanup old jobs", + description="Remove old completed jobs and their associated files from storage.", + response_description="Cleanup operation results", + responses={ + 200: {"description": "Cleanup completed or dry run results"}, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + 403: {"model": ErrorResponse, "description": "Admin access required"}, + 503: {"model": ErrorResponse, "description": "Admin functionality not configured"}, + }, + tags=["admin"], +) async def cleanup_old_jobs( - days: int = Query(7, ge=1, le=90), - dry_run: bool = Query(True), - db: AsyncSession = Depends(get_db), - admin: str = Depends(require_admin), -) -> Dict[str, Any]: + days: Annotated[ + int, + Query(ge=1, le=90, description="Delete jobs older than this many days"), + Doc("Age threshold in days for job cleanup") + ] = 7, + dry_run: Annotated[ + bool, + Query(description="If true, only report what would be deleted"), + Doc("Preview cleanup without deleting") + ] = True, + db: DatabaseSession = None, + admin: Annotated[str, Depends(require_admin), Doc("Admin API key")] = None, +) -> CleanupResponse: """ Clean up old completed jobs and their associated files. + + By default runs in dry-run mode to preview deletions. + Only removes jobs in terminal states (completed, failed, cancelled). """ cutoff_date = datetime.utcnow() - timedelta(days=days) - + # Find old jobs query = select(Job).where( and_( @@ -170,21 +318,21 @@ async def cleanup_old_jobs( Job.status.in_([JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]) ) ) - + result = await db.execute(query) old_jobs = result.scalars().all() - + if dry_run: - return { - "dry_run": True, - "jobs_to_delete": len(old_jobs), - "cutoff_date": cutoff_date.isoformat(), - } - + return CleanupResponse( + dry_run=True, + jobs_to_delete=len(old_jobs), + cutoff_date=cutoff_date.isoformat(), + ) + # Delete files and jobs deleted_count = 0 errors = [] - + for job in old_jobs: try: # Delete output file if it exists @@ -193,66 +341,108 @@ async def cleanup_old_jobs( backend = storage_service.backends.get(backend_name) if backend: await backend.delete(file_path) - + # Delete job record await db.delete(job) deleted_count += 1 - + except Exception as e: errors.append({ "job_id": str(job.id), "error": str(e), }) - + await db.commit() - + logger.info(f"Cleanup completed: {deleted_count} jobs deleted") - - return { - "dry_run": False, - "jobs_deleted": deleted_count, - "errors": errors, - "cutoff_date": cutoff_date.isoformat(), - } + + return CleanupResponse( + dry_run=False, + jobs_deleted=deleted_count, + errors=errors if errors else None, + cutoff_date=cutoff_date.isoformat(), + ) -@router.post("/presets") +@router.post( + "/presets", + response_model=PresetResponse, + status_code=status.HTTP_201_CREATED, + summary="Create encoding preset", + description="Create a new custom encoding preset for reuse across jobs.", + response_description="Created preset details", + responses={ + 201: {"description": "Preset created successfully"}, + 400: { + "model": ErrorResponse, + "description": "Invalid preset configuration", + "content": { + "application/json": { + "example": {"error": "validation_error", "message": "Preset name required"} + } + } + }, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + 403: {"model": ErrorResponse, "description": "Admin access required"}, + }, + tags=["admin"], +) async def create_preset( - preset: Dict[str, Any], - admin: str = Depends(require_admin), -) -> Dict[str, Any]: + preset: Annotated[Dict[str, Any], Doc("Preset configuration with name and settings")], + admin: Annotated[str, Depends(require_admin), Doc("Admin API key")], +) -> PresetResponse: """ Create a new encoding preset. + + Presets define reusable encoding configurations for common use cases. """ # Validate preset if "name" not in preset: - raise HTTPException(status_code=400, detail="Preset name required") - + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail={"error": "validation_error", "message": "Preset name required"} + ) + if "settings" not in preset: - raise HTTPException(status_code=400, detail="Preset settings required") - + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail={"error": "validation_error", "message": "Preset settings required"} + ) + # Save preset (in production, save to database) - # For now, just return success logger.info(f"Preset created: {preset['name']}") - - return { - "message": "Preset created successfully", - "preset": preset, - } + + return PresetResponse( + name=preset["name"], + description=preset.get("description"), + settings=preset["settings"], + ) -@router.get("/presets") -async def list_presets() -> List[Dict[str, Any]]: +@router.get( + "/presets", + response_model=List[PresetResponse], + status_code=status.HTTP_200_OK, + summary="List encoding presets", + description="Get all available encoding presets including built-in and custom presets.", + response_description="List of available presets", + responses={ + 200: {"description": "Presets retrieved successfully"}, + }, + tags=["admin"], +) +async def list_presets() -> List[PresetResponse]: """ List available encoding presets. + + Returns both built-in presets and any custom presets created by admins. + This endpoint does not require admin authentication. """ - # In production, load from database - # For now, return built-in presets return [ - { - "name": "web-1080p", - "description": "Standard 1080p for web streaming", - "settings": { + PresetResponse( + name="web-1080p", + description="Standard 1080p for web streaming", + settings={ "video": { "codec": "h264", "preset": "medium", @@ -264,11 +454,11 @@ async def list_presets() -> List[Dict[str, Any]]: "bitrate": "128k", }, }, - }, - { - "name": "web-720p", - "description": "Standard 720p for web streaming", - "settings": { + ), + PresetResponse( + name="web-720p", + description="Standard 720p for web streaming", + settings={ "video": { "codec": "h264", "preset": "medium", @@ -280,11 +470,11 @@ async def list_presets() -> List[Dict[str, Any]]: "bitrate": "128k", }, }, - }, - { - "name": "archive-high", - "description": "High quality for archival", - "settings": { + ), + PresetResponse( + name="archive-high", + description="High quality for archival", + settings={ "video": { "codec": "h265", "preset": "slow", @@ -294,5 +484,5 @@ async def list_presets() -> List[Dict[str, Any]]: "codec": "flac", }, }, - }, - ] \ No newline at end of file + ), + ] diff --git a/api/routers/batch.py b/api/routers/batch.py index 4fc9652..2c8fa81 100644 --- a/api/routers/batch.py +++ b/api/routers/batch.py @@ -1,21 +1,23 @@ """ -Batch processing endpoint for multiple media files +Batch processing endpoint - Multiple media file processing with FastAPI 0.124+ patterns. + +Provides efficient batch job creation, status monitoring, and cancellation. """ -from typing import Dict, Any, List +from typing import Dict, Any, List, Annotated, Optional from uuid import uuid4 -from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks -from sqlalchemy.ext.asyncio import AsyncSession +from fastapi import APIRouter, BackgroundTasks, HTTPException, status +from annotated_doc import Doc import structlog from api.config import settings -from api.dependencies import get_db, require_api_key -from api.models.job import Job, JobStatus, JobCreateResponse, JobResponse +from api.dependencies import DatabaseSession, RequiredAPIKey +from api.models.job import Job, JobStatus, JobResponse, ErrorResponse from api.services.queue import QueueService from api.services.storage import StorageService from api.utils.validators import validate_input_path, validate_output_path, validate_operations from api.utils.media_validator import media_validator -from pydantic import BaseModel +from pydantic import BaseModel, Field logger = structlog.get_logger() router = APIRouter() @@ -25,89 +27,191 @@ class BatchJob(BaseModel): - """Single job in a batch.""" - input: str - output: str - operations: List[Dict[str, Any]] = [] - options: Dict[str, Any] = {} - priority: str = "normal" + """Single job specification within a batch.""" + input: Annotated[str, Doc("Input file path or URI")] + output: Annotated[str, Doc("Output file path or URI")] + operations: Annotated[ + List[Dict[str, Any]], + Doc("List of processing operations to apply") + ] = [] + options: Annotated[ + Dict[str, Any], + Doc("Additional processing options") + ] = {} + priority: Annotated[ + str, + Doc("Job priority: low, normal, high") + ] = "normal" class BatchProcessRequest(BaseModel): """Batch processing request model.""" - jobs: List[BatchJob] - batch_name: str = "" - webhook_url: str = None - webhook_events: List[str] = [] - validate_files: bool = True + jobs: Annotated[ + List[BatchJob], + Field(min_length=1, max_length=100), + Doc("List of jobs to process (1-100 jobs)") + ] + batch_name: Annotated[ + str, + Doc("Optional name for this batch") + ] = "" + webhook_url: Annotated[ + Optional[str], + Doc("URL to receive job completion notifications") + ] = None + webhook_events: Annotated[ + List[str], + Doc("Events to send to webhook: started, progress, completed, failed") + ] = [] + validate_files: Annotated[ + bool, + Doc("Whether to validate input files before processing") + ] = True + + model_config = { + "json_schema_extra": { + "example": { + "jobs": [ + { + "input": "s3://bucket/video1.mp4", + "output": "s3://bucket/output/video1_720p.mp4", + "operations": [{"type": "transcode", "video": {"resolution": "1280x720"}}], + "priority": "normal" + }, + { + "input": "s3://bucket/video2.mp4", + "output": "s3://bucket/output/video2_720p.mp4", + "operations": [{"type": "transcode", "video": {"resolution": "1280x720"}}], + "priority": "normal" + } + ], + "batch_name": "720p conversion batch", + "webhook_url": "https://example.com/webhook", + "webhook_events": ["completed", "failed"], + "validate_files": True + } + } + } class BatchProcessResponse(BaseModel): """Batch processing response model.""" - batch_id: str - total_jobs: int - jobs: List[JobResponse] - estimated_cost: Dict[str, Any] - warnings: List[str] + batch_id: Annotated[str, Doc("Unique batch identifier")] + total_jobs: Annotated[int, Doc("Total number of jobs created")] + jobs: Annotated[List[JobResponse], Doc("List of created job details")] + estimated_cost: Annotated[Dict[str, Any], Doc("Cost and time estimates")] + warnings: Annotated[List[str], Doc("Any warnings during batch creation")] + +class BatchStatusResponse(BaseModel): + """Batch status response model.""" + batch_id: Annotated[str, Doc("Unique batch identifier")] + status: Annotated[str, Doc("Overall batch status")] + progress: Annotated[float, Doc("Overall progress percentage")] + statistics: Annotated[Dict[str, int], Doc("Job counts by status")] + jobs: Annotated[List[Dict[str, Any]], Doc("Individual job details")] -@router.post("/batch", response_model=BatchProcessResponse) + +class BatchCancelResponse(BaseModel): + """Batch cancellation response model.""" + batch_id: Annotated[str, Doc("Unique batch identifier")] + total_jobs: Annotated[int, Doc("Total jobs in batch")] + cancelled: Annotated[int, Doc("Number of jobs cancelled")] + failed_to_cancel: Annotated[int, Doc("Jobs that couldn't be cancelled")] + message: Annotated[str, Doc("Status message")] + + +@router.post( + "/batch", + response_model=BatchProcessResponse, + status_code=status.HTTP_201_CREATED, + summary="Create batch job", + description="Submit multiple media processing jobs in a single request for efficient batch processing.", + response_description="Batch creation result with job IDs", + responses={ + 201: {"description": "Batch created successfully"}, + 400: { + "model": ErrorResponse, + "description": "Invalid request (empty batch, validation failures)", + "content": { + "application/json": { + "example": {"error": "validation_error", "message": "Batch size exceeds maximum of 100 jobs"} + } + } + }, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + 500: {"model": ErrorResponse, "description": "Internal server error"}, + }, + tags=["batch"], +) async def create_batch_job( - request: BatchProcessRequest, + request: Annotated[BatchProcessRequest, Doc("Batch processing request")], background_tasks: BackgroundTasks, - db: AsyncSession = Depends(get_db), - api_key: str = Depends(require_api_key), + db: DatabaseSession = None, + api_key: RequiredAPIKey = None, ) -> BatchProcessResponse: """ Create a batch of media processing jobs. - + This endpoint allows submitting multiple jobs at once for efficient processing. Jobs in a batch can have different operations and priorities. + + Features: + - Batch file validation before processing + - Per-job operation customization + - Webhook notifications for batch events + - Cost estimation for the entire batch """ try: if not request.jobs: - raise HTTPException(status_code=400, detail="No jobs provided in batch") - - if len(request.jobs) > 100: # Reasonable batch limit - raise HTTPException(status_code=400, detail="Batch size exceeds maximum of 100 jobs") - + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail={"error": "validation_error", "message": "No jobs provided in batch"} + ) + + if len(request.jobs) > 100: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail={"error": "validation_error", "message": "Batch size exceeds maximum of 100 jobs"} + ) + batch_id = str(uuid4()) created_jobs = [] warnings = [] total_estimated_time = 0 - + logger.info( "Starting batch job creation", batch_id=batch_id, total_jobs=len(request.jobs), api_key=api_key[:8] + "..." if len(api_key) > 8 else api_key ) - + # Validate all files first if requested if request.validate_files: file_paths = [job.input for job in request.jobs] - + # Get API key tier for validation limits api_key_tier = _get_api_key_tier(api_key) - + validation_results = await media_validator.validate_batch_files( file_paths, api_key_tier ) - + if validation_results['invalid_files'] > 0: invalid_files = [ - r for r in validation_results['results'] + r for r in validation_results['results'] if r['status'] == 'invalid' ] warnings.append(f"Found {len(invalid_files)} invalid files in batch") - - # Optionally fail the entire batch if any files are invalid + + # Fail the entire batch if all files are invalid if len(invalid_files) == len(request.jobs): raise HTTPException( - status_code=400, - detail="All files in batch failed validation" + status_code=status.HTTP_400_BAD_REQUEST, + detail={"error": "validation_error", "message": "All files in batch failed validation"} ) - + # Create individual jobs for i, job_request in enumerate(request.jobs): try: @@ -118,10 +222,10 @@ async def create_batch_job( output_backend, output_validated = await validate_output_path( job_request.output, storage_service ) - + # Validate operations operations_validated = validate_operations(job_request.operations) - + # Create job record job = Job( id=uuid4(), @@ -134,21 +238,21 @@ async def create_batch_job( api_key=api_key, webhook_url=request.webhook_url, webhook_events=request.webhook_events, - batch_id=batch_id, # Link to batch - batch_index=i, # Position in batch + batch_id=batch_id, + batch_index=i, ) - + # Add to database db.add(job) await db.commit() await db.refresh(job) - + # Queue the job await queue_service.enqueue_job( job_id=str(job.id), priority=job_request.priority, ) - + # Create job response job_response = JobResponse( id=job.id, @@ -165,13 +269,13 @@ async def create_batch_job( "batch": f"/api/v1/batch/{batch_id}" }, ) - + created_jobs.append(job_response) - - # Estimate processing time (simplified) + + # Estimate processing time estimated_time = _estimate_job_time(job_request) total_estimated_time += estimated_time - + logger.info( "Batch job created", job_id=str(job.id), @@ -179,7 +283,7 @@ async def create_batch_job( batch_index=i, input_path=job_request.input[:50] + "..." if len(job_request.input) > 50 else job_request.input ) - + except Exception as e: logger.error( "Failed to create batch job", @@ -187,11 +291,14 @@ async def create_batch_job( batch_index=i, error=str(e) ) - warnings.append(f"Job {i+1} failed to create: {str(e)}") - + warnings.append(f"Job {i + 1} failed to create: {str(e)}") + if not created_jobs: - raise HTTPException(status_code=500, detail="Failed to create any jobs in batch") - + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={"error": "batch_creation_failed", "message": "Failed to create any jobs in batch"} + ) + # Estimate cost estimated_cost = { "processing_time_seconds": total_estimated_time, @@ -199,14 +306,14 @@ async def create_batch_job( "jobs_created": len(created_jobs), "jobs_failed": len(request.jobs) - len(created_jobs) } - + logger.info( "Batch job creation completed", batch_id=batch_id, jobs_created=len(created_jobs), total_estimated_time=total_estimated_time ) - + return BatchProcessResponse( batch_id=batch_id, total_jobs=len(created_jobs), @@ -214,21 +321,51 @@ async def create_batch_job( estimated_cost=estimated_cost, warnings=warnings ) - + except HTTPException: raise except Exception as e: logger.error("Batch job creation failed", error=str(e)) - raise HTTPException(status_code=500, detail="Failed to create batch job") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={"error": "internal_error", "message": "Failed to create batch job"} + ) -@router.get("/batch/{batch_id}") +@router.get( + "/batch/{batch_id}", + response_model=BatchStatusResponse, + status_code=status.HTTP_200_OK, + summary="Get batch status", + description="Get the current status and progress of all jobs in a batch.", + response_description="Batch status with individual job progress", + responses={ + 200: {"description": "Batch status retrieved successfully"}, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + 403: {"model": ErrorResponse, "description": "Access denied to this batch"}, + 404: { + "model": ErrorResponse, + "description": "Batch not found", + "content": { + "application/json": { + "example": {"error": "not_found", "message": "Batch not found"} + } + } + }, + }, + tags=["batch"], +) async def get_batch_status( - batch_id: str, - db: AsyncSession = Depends(get_db), - api_key: str = Depends(require_api_key), -) -> Dict[str, Any]: - """Get status of a batch job.""" + batch_id: Annotated[str, Doc("Unique batch identifier")], + db: DatabaseSession = None, + api_key: RequiredAPIKey = None, +) -> BatchStatusResponse: + """ + Get status of a batch job. + + Returns overall batch progress and status of each individual job. + Only the API key that created the batch can view its status. + """ try: # Query all jobs in the batch from sqlalchemy import select @@ -236,21 +373,24 @@ async def get_batch_status( select(Job).where(Job.batch_id == batch_id, Job.api_key == api_key) ) batch_jobs = result.scalars().all() - + if not batch_jobs: - raise HTTPException(status_code=404, detail="Batch not found") - + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={"error": "not_found", "message": "Batch not found"} + ) + # Calculate batch statistics total_jobs = len(batch_jobs) completed_jobs = sum(1 for job in batch_jobs if job.status == JobStatus.COMPLETED) failed_jobs = sum(1 for job in batch_jobs if job.status == JobStatus.FAILED) processing_jobs = sum(1 for job in batch_jobs if job.status == JobStatus.PROCESSING) queued_jobs = sum(1 for job in batch_jobs if job.status == JobStatus.QUEUED) - + # Calculate overall progress total_progress = sum(job.progress or 0 for job in batch_jobs) overall_progress = total_progress / total_jobs if total_jobs > 0 else 0 - + # Determine batch status if completed_jobs == total_jobs: batch_status = "completed" @@ -262,19 +402,19 @@ async def get_batch_status( batch_status = "processing" else: batch_status = "unknown" - - return { - "batch_id": batch_id, - "status": batch_status, - "progress": overall_progress, - "statistics": { + + return BatchStatusResponse( + batch_id=batch_id, + status=batch_status, + progress=overall_progress, + statistics={ "total_jobs": total_jobs, "completed": completed_jobs, "failed": failed_jobs, "processing": processing_jobs, "queued": queued_jobs }, - "jobs": [ + jobs=[ { "id": str(job.id), "status": job.status, @@ -287,22 +427,44 @@ async def get_batch_status( } for job in sorted(batch_jobs, key=lambda x: x.batch_index or 0) ] - } - + ) + except HTTPException: raise except Exception as e: logger.error("Failed to get batch status", batch_id=batch_id, error=str(e)) - raise HTTPException(status_code=500, detail="Failed to get batch status") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={"error": "internal_error", "message": "Failed to get batch status"} + ) -@router.delete("/batch/{batch_id}") +@router.delete( + "/batch/{batch_id}", + response_model=BatchCancelResponse, + status_code=status.HTTP_200_OK, + summary="Cancel batch", + description="Cancel all queued and processing jobs in a batch.", + response_description="Batch cancellation result", + responses={ + 200: {"description": "Batch cancellation processed"}, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + 403: {"model": ErrorResponse, "description": "Access denied to this batch"}, + 404: {"model": ErrorResponse, "description": "Batch not found"}, + }, + tags=["batch"], +) async def cancel_batch( - batch_id: str, - db: AsyncSession = Depends(get_db), - api_key: str = Depends(require_api_key), -) -> Dict[str, Any]: - """Cancel all jobs in a batch.""" + batch_id: Annotated[str, Doc("Unique batch identifier to cancel")], + db: DatabaseSession = None, + api_key: RequiredAPIKey = None, +) -> BatchCancelResponse: + """ + Cancel all jobs in a batch. + + Only queued and processing jobs can be cancelled. + Already completed or failed jobs are not affected. + """ try: # Query all jobs in the batch from sqlalchemy import select, update @@ -310,13 +472,16 @@ async def cancel_batch( select(Job).where(Job.batch_id == batch_id, Job.api_key == api_key) ) batch_jobs = result.scalars().all() - + if not batch_jobs: - raise HTTPException(status_code=404, detail="Batch not found") - + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={"error": "not_found", "message": "Batch not found"} + ) + cancelled_count = 0 failed_to_cancel = 0 - + for job in batch_jobs: if job.status in [JobStatus.QUEUED, JobStatus.PROCESSING]: try: @@ -325,10 +490,10 @@ async def cancel_batch( success = await queue_service.cancel_job(str(job.id)) else: # PROCESSING success = await queue_service.cancel_running_job( - str(job.id), + str(job.id), job.worker_id or "" ) - + if success: # Update job status await db.execute( @@ -339,7 +504,7 @@ async def cancel_batch( cancelled_count += 1 else: failed_to_cancel += 1 - + except Exception as e: logger.error( "Failed to cancel job in batch", @@ -348,22 +513,25 @@ async def cancel_batch( error=str(e) ) failed_to_cancel += 1 - + await db.commit() - - return { - "batch_id": batch_id, - "total_jobs": len(batch_jobs), - "cancelled": cancelled_count, - "failed_to_cancel": failed_to_cancel, - "message": f"Cancelled {cancelled_count} jobs in batch" - } - + + return BatchCancelResponse( + batch_id=batch_id, + total_jobs=len(batch_jobs), + cancelled=cancelled_count, + failed_to_cancel=failed_to_cancel, + message=f"Cancelled {cancelled_count} jobs in batch" + ) + except HTTPException: raise except Exception as e: logger.error("Failed to cancel batch", batch_id=batch_id, error=str(e)) - raise HTTPException(status_code=500, detail="Failed to cancel batch") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={"error": "internal_error", "message": "Failed to cancel batch"} + ) def _get_api_key_tier(api_key: str) -> str: @@ -381,7 +549,7 @@ def _get_api_key_tier(api_key: str) -> str: def _estimate_job_time(job_request: BatchJob) -> int: """Estimate processing time for a single job in seconds.""" base_time = 60 # Base processing time - + # Add time based on operations for operation in job_request.operations: op_type = operation.get('type', '') @@ -393,5 +561,5 @@ def _estimate_job_time(job_request: BatchJob) -> int: base_time += 60 # Filter operations else: base_time += 30 # Other operations - - return base_time \ No newline at end of file + + return base_time diff --git a/api/routers/health.py b/api/routers/health.py index 488fda7..986d2cf 100644 --- a/api/routers/health.py +++ b/api/routers/health.py @@ -1,30 +1,78 @@ """ -Health check endpoints +Health check endpoints with FastAPI 0.124+ patterns. + +Provides system health monitoring without authentication requirement. """ from datetime import datetime -from typing import Dict, Any +from typing import Dict, Any, Annotated -from fastapi import APIRouter, Depends +from fastapi import APIRouter, status +from fastapi.responses import JSONResponse +from annotated_doc import Doc from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import text import structlog from api.config import settings -from api.dependencies import get_db +from api.dependencies import DatabaseSession from api.services.queue import QueueService from api.services.storage import StorageService logger = structlog.get_logger() + router = APIRouter() queue_service = QueueService() storage_service = StorageService() -@router.get("/health") +# Response models for OpenAPI documentation +class HealthResponse(Dict[str, Any]): + """Health check response schema.""" + pass + + +@router.get( + "/health", + response_model=Dict[str, Any], + status_code=status.HTTP_200_OK, + summary="Basic health check", + description="Quick health check endpoint for load balancers and monitoring systems.", + response_description="Service health status", + responses={ + 200: { + "description": "Service is healthy", + "content": { + "application/json": { + "example": { + "status": "healthy", + "timestamp": "2025-01-15T10:30:00Z", + "version": "1.0.0" + } + } + } + }, + 503: { + "description": "Service is unhealthy", + "content": { + "application/json": { + "example": { + "status": "unhealthy", + "timestamp": "2025-01-15T10:30:00Z", + "error": "Database connection failed" + } + } + } + } + }, + tags=["health"], +) async def health_check() -> Dict[str, Any]: """ Basic health check endpoint. + + Returns a simple health status without requiring authentication. + Use `/health/detailed` for comprehensive component status. """ return { "status": "healthy", @@ -33,12 +81,47 @@ async def health_check() -> Dict[str, Any]: } -@router.get("/health/detailed") +@router.get( + "/health/detailed", + response_model=Dict[str, Any], + status_code=status.HTTP_200_OK, + summary="Detailed health check", + description="Comprehensive health check with status of all system components.", + response_description="Detailed component health status", + responses={ + 200: { + "description": "Health check completed (may include unhealthy components)", + "content": { + "application/json": { + "example": { + "status": "healthy", + "timestamp": "2025-01-15T10:30:00Z", + "version": "1.0.0", + "components": { + "database": {"status": "healthy", "type": "postgresql"}, + "queue": {"status": "healthy", "type": "redis"}, + "storage": {"status": "healthy", "backends": ["local", "s3"]}, + "ffmpeg": {"status": "healthy", "version": "ffmpeg version 6.1"} + } + } + } + } + } + }, + tags=["health"], +) async def detailed_health_check( - db: AsyncSession = Depends(get_db), + db: DatabaseSession, ) -> Dict[str, Any]: """ Detailed health check with component status. + + Checks: + - Database connectivity and response time + - Message queue (Redis/Valkey) status + - Storage backend availability + - FFmpeg binary availability + - Hardware acceleration detection """ health_status = { "status": "healthy", @@ -46,13 +129,13 @@ async def detailed_health_check( "version": settings.VERSION, "components": {}, } - + # Check database try: result = await db.execute(text("SELECT 1")) health_status["components"]["database"] = { "status": "healthy", - "type": "postgresql", + "type": "postgresql" if "postgresql" in settings.DATABASE_URL else "sqlite", } except Exception as e: health_status["status"] = "unhealthy" @@ -60,7 +143,7 @@ async def detailed_health_check( "status": "unhealthy", "error": str(e), } - + # Check queue try: queue_health = await queue_service.health_check() @@ -71,7 +154,7 @@ async def detailed_health_check( "status": "unhealthy", "error": str(e), } - + # Check storage backends try: storage_health = await storage_service.health_check() @@ -82,7 +165,7 @@ async def detailed_health_check( "status": "unhealthy", "error": str(e), } - + # Check FFmpeg try: import asyncio @@ -92,7 +175,7 @@ async def detailed_health_check( stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=5.0) - + if proc.returncode == 0: version_line = stdout.decode().split("\n")[0] health_status["components"]["ffmpeg"] = { @@ -107,14 +190,61 @@ async def detailed_health_check( "status": "unhealthy", "error": str(e), } - + + # Return appropriate status code + if health_status["status"] == "unhealthy": + return JSONResponse( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + content=health_status + ) + return health_status -@router.get("/capabilities") +@router.get( + "/capabilities", + response_model=Dict[str, Any], + status_code=status.HTTP_200_OK, + summary="Get system capabilities", + description="Returns supported formats, codecs, operations, and hardware acceleration info.", + response_description="System capabilities and supported features", + responses={ + 200: { + "description": "Capabilities retrieved successfully", + "content": { + "application/json": { + "example": { + "version": "1.0.0", + "features": { + "api_version": "v1", + "max_file_size": 10737418240, + "concurrent_jobs": 10 + }, + "formats": { + "input": {"video": ["mp4", "avi", "mov"], "audio": ["mp3", "wav"]}, + "output": {"containers": ["mp4", "webm", "hls"]} + }, + "hardware_acceleration": { + "available": ["nvidia"], + "types": ["nvidia", "vaapi", "qsv"] + } + } + } + } + } + }, + tags=["health"], +) async def get_capabilities() -> Dict[str, Any]: """ Get system capabilities and supported formats. + + Returns comprehensive information about: + - Supported input/output formats + - Available video and audio codecs + - Supported operations and filters + - Quality analysis metrics + - Hardware acceleration availability """ return { "version": settings.VERSION, @@ -164,7 +294,7 @@ async def get_capabilities() -> Dict[str, Any]: async def check_hardware_acceleration() -> list: """Check available hardware acceleration.""" available = [] - + # Check NVIDIA try: import asyncio @@ -174,15 +304,15 @@ async def check_hardware_acceleration() -> list: stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=5.0) - + if proc.returncode == 0: available.append("nvidia") except: pass - + # Check VAAPI (Linux) import os if os.path.exists("/dev/dri/renderD128"): available.append("vaapi") - - return available \ No newline at end of file + + return available diff --git a/api/routers/jobs.py b/api/routers/jobs.py index e9b49ec..fbab1dc 100644 --- a/api/routers/jobs.py +++ b/api/routers/jobs.py @@ -1,69 +1,106 @@ """ -Jobs endpoint - Job management and monitoring +Jobs endpoint - Job management and monitoring with FastAPI 0.124+ patterns. + +Provides job listing, status, cancellation, and real-time progress streaming. """ -from typing import Optional, List +from typing import Optional, Annotated from uuid import UUID import asyncio import json from datetime import datetime -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import APIRouter, HTTPException, Query, status from fastapi.responses import StreamingResponse -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy import select, and_, func +from annotated_doc import Doc +from sqlalchemy import select, func import structlog from api.config import settings -from api.dependencies import get_db, require_api_key -from api.models.job import Job, JobStatus, JobResponse, JobListResponse, JobProgress +from api.dependencies import DatabaseSession, RequiredAPIKey +from api.models.job import Job, JobStatus, JobResponse, JobListResponse, JobProgress, ErrorResponse from api.services.queue import QueueService logger = structlog.get_logger() + router = APIRouter() queue_service = QueueService() -@router.get("/jobs", response_model=JobListResponse) +@router.get( + "/jobs", + response_model=JobListResponse, + status_code=status.HTTP_200_OK, + summary="List jobs", + description="List all jobs for the authenticated API key with filtering and pagination.", + response_description="Paginated list of jobs", + responses={ + 200: {"description": "Jobs retrieved successfully"}, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + }, + tags=["jobs"], +) async def list_jobs( - status: Optional[JobStatus] = None, - page: int = Query(1, ge=1), - per_page: int = Query(20, ge=1, le=100), - sort: str = Query("created_at:desc"), - db: AsyncSession = Depends(get_db), - api_key: str = Depends(require_api_key), + status_filter: Annotated[ + Optional[JobStatus], + Query(alias="status", description="Filter by job status"), + Doc("Filter jobs by their current status") + ] = None, + page: Annotated[ + int, + Query(ge=1, description="Page number (1-indexed)"), + Doc("Page number for pagination") + ] = 1, + per_page: Annotated[ + int, + Query(ge=1, le=100, description="Items per page (max 100)"), + Doc("Number of items per page") + ] = 20, + sort: Annotated[ + str, + Query( + description="Sort field and order (e.g., 'created_at:desc')", + pattern=r"^[a-z_]+:(asc|desc)$" + ), + Doc("Sort specification: field:order") + ] = "created_at:desc", + db: DatabaseSession = None, + api_key: RequiredAPIKey = None, ) -> JobListResponse: """ List jobs with optional filtering and pagination. + + Supports filtering by status and sorting by any field. + Only returns jobs owned by the authenticated API key. """ # Parse sort parameter sort_field, sort_order = sort.split(":") if ":" in sort else (sort, "asc") - + # Build query query = select(Job).where(Job.api_key == api_key) - - if status: - query = query.where(Job.status == status) - + + if status_filter: + query = query.where(Job.status == status_filter) + # Apply sorting order_column = getattr(Job, sort_field, Job.created_at) if sort_order == "desc": query = query.order_by(order_column.desc()) else: query = query.order_by(order_column.asc()) - + # Count total count_query = select(func.count()).select_from(query.subquery()) total = await db.scalar(count_query) - + # Apply pagination offset = (page - 1) * per_page query = query.offset(offset).limit(per_page) - + # Execute query result = await db.execute(query) jobs = result.scalars().all() - + # Convert to response models job_responses = [] for job in jobs: @@ -83,15 +120,15 @@ async def list_jobs( "logs": f"/api/v1/jobs/{job.id}/logs", }, ) - + if job.status == JobStatus.FAILED: job_response.error = { "message": job.error_message, "details": job.error_details, } - + job_responses.append(job_response) - + return JobListResponse( jobs=job_responses, total=total, @@ -102,25 +139,51 @@ async def list_jobs( ) -@router.get("/jobs/{job_id}", response_model=JobResponse) +@router.get( + "/jobs/{job_id}", + response_model=JobResponse, + status_code=status.HTTP_200_OK, + summary="Get job details", + description="Get detailed information about a specific job including progress and quality metrics.", + response_description="Job details", + responses={ + 200: {"description": "Job found"}, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + 403: {"model": ErrorResponse, "description": "Access denied to this job"}, + 404: {"model": ErrorResponse, "description": "Job not found"}, + }, + tags=["jobs"], +) async def get_job( - job_id: UUID, - db: AsyncSession = Depends(get_db), - api_key: str = Depends(require_api_key), + job_id: Annotated[UUID, Doc("Unique job identifier")], + db: DatabaseSession = None, + api_key: RequiredAPIKey = None, ) -> JobResponse: """ Get detailed information about a specific job. + + Returns full job details including: + - Current status and progress + - Quality metrics (VMAF, PSNR, SSIM) if available + - Error details if failed + - HATEOAS links for related resources """ # Get job from database job = await db.get(Job, job_id) - + if not job: - raise HTTPException(status_code=404, detail="Job not found") - + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={"error": "not_found", "message": "Job not found"} + ) + # Check ownership if job.api_key != api_key: - raise HTTPException(status_code=403, detail="Access denied") - + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail={"error": "access_denied", "message": "Access denied"} + ) + # Build response response = JobResponse( id=job.id, @@ -139,7 +202,7 @@ async def get_job( "cancel": f"/api/v1/jobs/{job.id}" if job.status in [JobStatus.QUEUED, JobStatus.PROCESSING] else None, }, ) - + # Add progress details if job.status == JobStatus.PROCESSING: response.progress_details = { @@ -148,7 +211,7 @@ async def get_job( "fps": job.fps, "eta_seconds": job.eta_seconds, } - + # Add error details if failed if job.status == JobStatus.FAILED: response.error = { @@ -156,7 +219,7 @@ async def get_job( "details": job.error_details, "retry_count": job.retry_count, } - + # Add quality metrics if available if job.vmaf_score or job.psnr_score or job.ssim_score: response.progress_details = response.progress_details or {} @@ -167,50 +230,76 @@ async def get_job( response.progress_details["quality"]["psnr"] = job.psnr_score if job.ssim_score: response.progress_details["quality"]["ssim"] = job.ssim_score - + return response -@router.delete("/jobs/{job_id}") +@router.delete( + "/jobs/{job_id}", + status_code=status.HTTP_200_OK, + summary="Cancel job", + description="Cancel a queued or processing job.", + response_description="Cancellation result", + responses={ + 200: {"description": "Job cancelled successfully"}, + 400: {"model": ErrorResponse, "description": "Job cannot be cancelled (already completed)"}, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + 403: {"model": ErrorResponse, "description": "Access denied to this job"}, + 404: {"model": ErrorResponse, "description": "Job not found"}, + }, + tags=["jobs"], +) async def cancel_job( - job_id: UUID, - db: AsyncSession = Depends(get_db), - api_key: str = Depends(require_api_key), + job_id: Annotated[UUID, Doc("Unique job identifier to cancel")], + db: DatabaseSession = None, + api_key: RequiredAPIKey = None, ) -> dict: """ Cancel a queued or processing job. + + Only jobs with status 'queued' or 'processing' can be cancelled. + Completed, failed, or already cancelled jobs cannot be cancelled. """ # Get job job = await db.get(Job, job_id) - + if not job: - raise HTTPException(status_code=404, detail="Job not found") - + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={"error": "not_found", "message": "Job not found"} + ) + # Check ownership if job.api_key != api_key: - raise HTTPException(status_code=403, detail="Access denied") - + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail={"error": "access_denied", "message": "Access denied"} + ) + # Check if job can be cancelled if job.status not in [JobStatus.QUEUED, JobStatus.PROCESSING]: raise HTTPException( - status_code=400, - detail=f"Cannot cancel job with status: {job.status}" + status_code=status.HTTP_400_BAD_REQUEST, + detail={ + "error": "invalid_state", + "message": f"Cannot cancel job with status: {job.status}" + } ) - + # Cancel in queue if job.status == JobStatus.QUEUED: await queue_service.cancel_job(str(job_id)) elif job.status == JobStatus.PROCESSING: # Send cancel signal to worker await queue_service.cancel_running_job(str(job_id), job.worker_id) - + # Update job status job.status = JobStatus.CANCELLED job.completed_at = datetime.utcnow() await db.commit() - - logger.info(f"Job cancelled: {job_id}") - + + logger.info("Job cancelled", job_id=str(job_id)) + return { "id": str(job_id), "status": "cancelled", @@ -218,43 +307,76 @@ async def cancel_job( } -@router.get("/jobs/{job_id}/events") +@router.get( + "/jobs/{job_id}/events", + summary="Stream job progress", + description="Stream real-time job progress updates using Server-Sent Events (SSE).", + response_description="SSE stream of progress events", + responses={ + 200: { + "description": "SSE stream started", + "content": { + "text/event-stream": { + "example": "event: progress\ndata: {\"percentage\": 45.5, \"stage\": \"encoding\"}\n\n" + } + } + }, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + 403: {"model": ErrorResponse, "description": "Access denied to this job"}, + 404: {"model": ErrorResponse, "description": "Job not found"}, + }, + tags=["jobs"], +) async def job_events( - job_id: UUID, - db: AsyncSession = Depends(get_db), - api_key: str = Depends(require_api_key), + job_id: Annotated[UUID, Doc("Unique job identifier")], + db: DatabaseSession = None, + api_key: RequiredAPIKey = None, ): """ Stream job progress events using Server-Sent Events. + + Events include: + - `progress`: Periodic progress updates with percentage, stage, FPS + - `completed`: Job completed successfully + - `failed`: Job failed with error details + - `cancelled`: Job was cancelled + + The stream ends when the job reaches a terminal state. """ # Verify job exists and user has access job = await db.get(Job, job_id) - + if not job: - raise HTTPException(status_code=404, detail="Job not found") - + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={"error": "not_found", "message": "Job not found"} + ) + if job.api_key != api_key: - raise HTTPException(status_code=403, detail="Access denied") - + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail={"error": "access_denied", "message": "Access denied"} + ) + async def event_generator(): """Generate SSE events for job progress.""" last_progress = -1 - + while True: # Refresh job from database await db.refresh(job) - + # Send progress update if changed if job.progress != last_progress: last_progress = job.progress - + progress_data = JobProgress( percentage=job.progress, stage=job.stage, fps=job.fps, eta_seconds=job.eta_seconds, ) - + # Add quality metrics if available if job.vmaf_score or job.psnr_score: progress_data.quality = {} @@ -262,9 +384,9 @@ async def event_generator(): progress_data.quality["vmaf"] = job.vmaf_score if job.psnr_score: progress_data.quality["psnr"] = job.psnr_score - + yield f"event: progress\ndata: {progress_data.model_dump_json()}\n\n" - + # Check if job completed if job.status in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: # Send final event @@ -272,63 +394,90 @@ async def event_generator(): "status": job.status, "message": "Job completed" if job.status == JobStatus.COMPLETED else f"Job {job.status}", } - + if job.status == JobStatus.COMPLETED: final_event["output_path"] = job.output_path if job.output_metadata: final_event["output_size"] = job.output_metadata.get("size") elif job.status == JobStatus.FAILED: final_event["error"] = job.error_message - + yield f"event: {job.status.lower()}\ndata: {json.dumps(final_event)}\n\n" break - + # Wait before next check await asyncio.sleep(1) - + return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", + "X-Accel-Buffering": "no", # Disable nginx buffering }, ) -@router.get("/jobs/{job_id}/logs") +@router.get( + "/jobs/{job_id}/logs", + status_code=status.HTTP_200_OK, + summary="Get job logs", + description="Get FFmpeg processing logs for a job.", + response_description="Job processing logs", + responses={ + 200: {"description": "Logs retrieved successfully"}, + 401: {"model": ErrorResponse, "description": "Authentication required"}, + 403: {"model": ErrorResponse, "description": "Access denied to this job"}, + 404: {"model": ErrorResponse, "description": "Job not found"}, + }, + tags=["jobs"], +) async def get_job_logs( - job_id: UUID, - lines: int = Query(100, ge=1, le=1000), - db: AsyncSession = Depends(get_db), - api_key: str = Depends(require_api_key), + job_id: Annotated[UUID, Doc("Unique job identifier")], + lines: Annotated[ + int, + Query(ge=1, le=1000, description="Number of log lines to return"), + Doc("Maximum number of log lines") + ] = 100, + db: DatabaseSession = None, + api_key: RequiredAPIKey = None, ) -> dict: """ Get FFmpeg processing logs for a job. + + Returns the last N lines of processing logs. + For running jobs, returns live logs from the worker. + For completed jobs, returns stored logs. """ # Get job job = await db.get(Job, job_id) - + if not job: - raise HTTPException(status_code=404, detail="Job not found") - + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={"error": "not_found", "message": "Job not found"} + ) + # Check ownership if job.api_key != api_key: - raise HTTPException(status_code=403, detail="Access denied") - + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail={"error": "access_denied", "message": "Access denied"} + ) + # Get logs from worker or storage - # In production, this would fetch from a log aggregation service logs = [] - + if job.status == JobStatus.PROCESSING and job.worker_id: # Get live logs from worker logs = await queue_service.get_worker_logs(job.worker_id, str(job_id), lines) else: # Get stored logs from database and log aggregation system from api.services.job_service import JobService - + stored_logs = await JobService.get_job_logs(db, job_id, lines) - + if stored_logs: logs = stored_logs else: @@ -336,27 +485,24 @@ async def get_job_logs( logs = [ f"[{job.created_at.isoformat()}] Job created: {job_id}", f"[{job.created_at.isoformat()}] Status: {job.status.value}", - f"[{job.created_at.isoformat()}] Input: {job.input_url or 'N/A'}", - f"[{job.created_at.isoformat()}] Output: {job.output_url or 'N/A'}", + f"[{job.created_at.isoformat()}] Input: {job.input_path or 'N/A'}", + f"[{job.created_at.isoformat()}] Output: {job.output_path or 'N/A'}", ] - + if job.started_at: logs.append(f"[{job.started_at.isoformat()}] Processing started") - + if job.completed_at: logs.append(f"[{job.completed_at.isoformat()}] Processing completed") - + if job.error_message: logs.append(f"[{(job.completed_at or job.started_at or job.created_at).isoformat()}] ERROR: {job.error_message}") - + if job.progress > 0: logs.append(f"[{(job.completed_at or job.started_at or job.created_at).isoformat()}] Progress: {job.progress}%") - + return { "job_id": str(job_id), "lines": len(logs), "logs": logs, } - - -# End of file \ No newline at end of file