""" Task manager service for Podcastrr. Handles background tasks and provides status updates. """ import threading import time import uuid import logging from datetime import datetime from enum import Enum from flask import current_app # Set up logging logger = logging.getLogger(__name__) class TaskStatus(Enum): """Task status enum""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" class Task: """ Represents a background task with status tracking. """ def __init__(self, task_type, description, target_func, target_args=None, target_kwargs=None): self.id = str(uuid.uuid4()) self.type = task_type self.description = description self.status = TaskStatus.PENDING self.progress = 0 self.message = "Task created" self.result = None self.error = None self.created_at = datetime.utcnow() self.started_at = None self.completed_at = None self.target_func = target_func self.target_args = target_args or [] self.target_kwargs = target_kwargs or {} def to_dict(self): """Convert task to dictionary for API responses.""" return { 'id': self.id, 'type': self.type, 'description': self.description, 'status': self.status.value, 'progress': self.progress, 'message': self.message, 'created_at': self.created_at.isoformat() if self.created_at else None, 'started_at': self.started_at.isoformat() if self.started_at else None, 'completed_at': self.completed_at.isoformat() if self.completed_at else None, 'error': self.error } class TaskManager: """ Manages background tasks and provides status updates. """ _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(TaskManager, cls).__new__(cls) cls._instance.tasks = {} cls._instance.lock = threading.Lock() return cls._instance def create_task(self, task_type, description, target_func, *args, **kwargs): """ Create a new task and add it to the task manager. Args: task_type (str): Type of task (e.g., 'download', 'update') description (str): Human-readable description of the task target_func (callable): Function to execute in the background *args: Arguments to pass to the target function **kwargs: Keyword arguments to pass to the target function Returns: str: Task ID """ task = Task(task_type, description, target_func, args, kwargs) with self.lock: self.tasks[task.id] = task # Get the current Flask app app = current_app._get_current_object() # Start the task in a background thread thread = threading.Thread(target=self._run_task, args=(task.id, app)) thread.daemon = True thread.start() logger.info(f"Created task {task.id} of type {task_type}: {description}") return task.id def _run_task(self, task_id, app): """ Run a task in the background. Args: task_id (str): ID of the task to run app: Flask application instance """ task = self.get_task(task_id) if not task: logger.error(f"Task {task_id} not found") return # Update task status task.status = TaskStatus.RUNNING task.started_at = datetime.utcnow() task.message = "Task started" try: # Create a wrapper for the target function to update progress def progress_callback(progress, message=None): task.progress = progress if message: task.message = message # Add progress_callback to kwargs task.target_kwargs['progress_callback'] = progress_callback # Run the target function within Flask application context with app.app_context(): result = task.target_func(*task.target_args, **task.target_kwargs) # Update task status task.status = TaskStatus.COMPLETED task.completed_at = datetime.utcnow() task.progress = 100 task.result = result task.message = "Task completed successfully" logger.info(f"Task {task_id} completed successfully") except Exception as e: # Update task status on error task.status = TaskStatus.FAILED task.completed_at = datetime.utcnow() task.error = str(e) task.message = f"Task failed: {str(e)}" logger.error(f"Task {task_id} failed: {str(e)}", exc_info=True) # Clean up old tasks self.clean_old_tasks() def get_task(self, task_id): """ Get a task by ID. Args: task_id (str): ID of the task to get Returns: Task: Task object or None if not found """ with self.lock: return self.tasks.get(task_id) def get_all_tasks(self): """ Get all tasks. Returns: list: List of all tasks """ with self.lock: return list(self.tasks.values()) def clean_old_tasks(self, max_age_seconds=86400): """ Remove old completed or failed tasks. Args: max_age_seconds (int): Maximum age of tasks to keep in seconds (default: 24 hours) Returns: int: Number of tasks removed """ now = datetime.utcnow() to_remove = [] with self.lock: for task_id, task in self.tasks.items(): if task.status in (TaskStatus.COMPLETED, TaskStatus.FAILED): if task.completed_at and (now - task.completed_at).total_seconds() > max_age_seconds: to_remove.append(task_id) for task_id in to_remove: del self.tasks[task_id] return len(to_remove) # Create a global task manager instance task_manager = TaskManager()