""" Podcast downloader service for Podcastrr. """ import os import requests import logging from datetime import datetime, timedelta from flask import current_app from app.models.database import db from app.models.settings import Settings # Set up logging logger = logging.getLogger(__name__) def download_episode(episode): """ Download a podcast episode. Args: episode: Episode model instance. Returns: str: Path to the downloaded file. """ if not episode.audio_url: raise ValueError("Episode has no audio URL") # Get settings settings = Settings.query.first() if not settings: settings = Settings( download_path=current_app.config['DOWNLOAD_PATH'], naming_format="{podcast_title}/{episode_title}" ) db.session.add(settings) db.session.commit() # Create download directory download_path = settings.download_path os.makedirs(download_path, exist_ok=True) # Format filename using the naming format podcast = episode.podcast filename = format_filename(settings.naming_format, podcast, episode) # Ensure the directory exists file_dir = os.path.dirname(os.path.join(download_path, filename)) os.makedirs(file_dir, exist_ok=True) # Add file extension based on content type file_path = os.path.join(download_path, filename) # Download the file try: response = requests.get(episode.audio_url, stream=True) response.raise_for_status() # Get content type and set appropriate extension content_type = response.headers.get('Content-Type', '') if 'mp3' in content_type: file_path += '.mp3' elif 'mpeg' in content_type: file_path += '.mp3' elif 'mp4' in content_type or 'm4a' in content_type: file_path += '.m4a' elif 'ogg' in content_type: file_path += '.ogg' elif 'wav' in content_type: file_path += '.wav' else: file_path += '.mp3' # Default to mp3 # Write the file with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) # Update episode in database episode.downloaded = True episode.file_path = file_path db.session.commit() logger.info(f"Downloaded episode: {episode.title}") return file_path except Exception as e: logger.error(f"Error downloading episode: {str(e)}") raise def format_filename(format_string, podcast, episode): """ Format a filename using the provided format string and podcast/episode data. Args: format_string (str): Format string with placeholders. podcast: Podcast model instance. episode: Episode model instance. Returns: str: Formatted filename. """ # Create a dictionary with all available variables format_vars = { 'podcast_title': sanitize_filename(podcast.title), 'episode_title': sanitize_filename(episode.title), 'episode_number': sanitize_filename(str(episode.episode_number)) if episode.episode_number else '', 'published_date': episode.published_date.strftime('%Y-%m-%d') if episode.published_date else '', 'author': sanitize_filename(podcast.author) if podcast.author else '' } # Format the string try: return format_string.format(**format_vars) except KeyError as e: logger.warning(f"Invalid format variable: {str(e)}") # Fall back to a simple format return f"{format_vars['podcast_title']}/{format_vars['episode_title']}" def sanitize_filename(filename): """ Sanitize a string to be used as a filename. Args: filename (str): Original filename. Returns: str: Sanitized filename. """ # Replace invalid characters invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*'] for char in invalid_chars: filename = filename.replace(char, '_') # Limit length if len(filename) > 100: filename = filename[:97] + '...' return filename def delete_old_episodes(days=30): """ Delete episodes older than the specified number of days. Args: days (int): Number of days to keep episodes. Returns: int: Number of episodes deleted. """ from app.models.podcast import Episode settings = Settings.query.first() if settings: days = settings.delete_after_days # Calculate the cutoff date cutoff_date = datetime.utcnow() - timedelta(days=days) # Find episodes to delete episodes = Episode.query.filter( Episode.downloaded == True, Episode.published_date < cutoff_date ).all() count = 0 for episode in episodes: if episode.file_path and os.path.exists(episode.file_path): try: os.remove(episode.file_path) episode.file_path = None episode.downloaded = False count += 1 except Exception as e: logger.error(f"Error deleting episode file: {str(e)}") db.session.commit() logger.info(f"Deleted {count} old episodes") return count