podcastrr/app/services/podcast_search.py
2025-06-17 16:00:46 -07:00

451 lines
21 KiB
Python

"""
Podcast search service for Podcastrr.
"""
import requests
import feedparser
from datetime import datetime
import logging
# Set up logging
logger = logging.getLogger(__name__)
def search_podcasts(query=None, podcast_id=None):
"""
Search for podcasts using the iTunes API.
Args:
query (str): Search query for podcasts.
podcast_id (str): iTunes podcast ID to get specific podcast.
Returns:
list: List of podcast dictionaries if query is provided.
dict: Podcast dictionary if podcast_id is provided.
"""
if not query and not podcast_id:
return [] if query is not None else None
try:
if podcast_id:
# Get specific podcast by ID
url = f"https://itunes.apple.com/lookup?id={podcast_id}&entity=podcast"
response = requests.get(url)
data = response.json()
if data['resultCount'] == 0:
return None
podcast = data['results'][0]
return _format_podcast(podcast)
else:
# Search for podcasts
url = f"https://itunes.apple.com/search?term={query}&entity=podcast&limit=20"
response = requests.get(url)
data = response.json()
results = []
for podcast in data['results']:
results.append(_format_podcast(podcast))
return results
except Exception as e:
logger.error(f"Error searching podcasts: {str(e)}")
return [] if query is not None else None
def _format_podcast(podcast):
"""
Format podcast data from iTunes API.
Args:
podcast (dict): Podcast data from iTunes API.
Returns:
dict: Formatted podcast data.
"""
feed_url = podcast.get('feedUrl', '')
# Log feed URL for debugging
logger.info(f"Podcast: {podcast.get('collectionName', '')}, Feed URL: {feed_url}")
if not feed_url:
logger.warning(f"No feed URL found for podcast: {podcast.get('collectionName', '')}")
return {
'title': podcast.get('collectionName', ''),
'author': podcast.get('artistName', ''),
'description': podcast.get('description', ''),
'image_url': podcast.get('artworkUrl600', podcast.get('artworkUrl100', '')),
'feed_url': feed_url,
'external_id': str(podcast.get('collectionId', '')),
'genre': podcast.get('primaryGenreName', ''),
'country': podcast.get('country', '')
}
def get_podcast_episodes(feed_url):
"""
Get podcast episodes from RSS feed.
Args:
feed_url (str): URL of the podcast RSS feed.
Returns:
tuple: (list of episode dictionaries, podcast metadata dictionary)
"""
try:
if not feed_url:
logger.error("Empty feed URL provided")
return [], {}
logger.info(f"Fetching episodes from feed: {feed_url}")
# Check if the feed URL is valid and follow redirects
try:
import requests
response = requests.head(feed_url, allow_redirects=True, timeout=10)
if response.status_code != 200:
logger.error(f"Feed URL returned status code {response.status_code}: {feed_url}")
if response.url != feed_url:
logger.info(f"Feed URL redirected from {feed_url} to {response.url}")
feed_url = response.url
except Exception as e:
logger.warning(f"Error checking feed URL: {str(e)}")
# Parse the feed
feed = feedparser.parse(feed_url)
# Check for parsing errors
if hasattr(feed, 'bozo_exception') and feed.bozo_exception:
logger.error(f"Error parsing feed: {feed.bozo_exception}")
# Try to parse the feed with requests if feedparser fails
if len(feed.entries) == 0:
try:
logger.info("Trying alternative method to fetch feed")
response = requests.get(feed_url, timeout=10)
feed = feedparser.parse(response.content)
logger.info(f"Alternative method found {len(feed.entries)} entries")
except Exception as e:
logger.error(f"Alternative method also failed: {str(e)}")
logger.info(f"Found {len(feed.entries)} entries in feed")
# Extract podcast metadata
podcast_metadata = {
'title': feed.feed.get('title', ''),
'description': feed.feed.get('description', feed.feed.get('subtitle', '')),
'author': feed.feed.get('author', feed.feed.get('itunes_author', '')),
'image_url': None # Default to None, will try to extract below
}
# Try to get podcast image URL from various locations in the feed
if hasattr(feed.feed, 'image') and hasattr(feed.feed.image, 'href'):
podcast_metadata['image_url'] = feed.feed.image.href
logger.debug(f"Found podcast image in feed.image.href: {podcast_metadata['image_url']}")
elif hasattr(feed.feed, 'itunes_image') and hasattr(feed.feed.itunes_image, 'href'):
podcast_metadata['image_url'] = feed.feed.itunes_image.href
logger.debug(f"Found podcast image in feed.itunes_image.href: {podcast_metadata['image_url']}")
elif 'image' in feed.feed and 'href' in feed.feed.image:
podcast_metadata['image_url'] = feed.feed.image.href
logger.debug(f"Found podcast image in feed.image['href']: {podcast_metadata['image_url']}")
logger.info(f"Extracted podcast metadata: title='{podcast_metadata['title']}', image_url={podcast_metadata['image_url']}")
episodes = []
for entry in feed.entries:
# Log entry details for debugging
logger.debug(f"Processing entry: {entry.get('title', 'No title')}")
# Extract basic episode info
episode = {
'title': entry.get('title', ''),
'description': entry.get('description', ''),
'published_date': _parse_date(entry.get('published')),
'guid': entry.get('id', ''),
'duration': _parse_duration(entry.get('itunes_duration', '')),
'season': None, # Default to None
'episode_number': None, # Default to None, will try to extract from various sources
'explicit': False # Default to False
}
# Handle season tag - try multiple ways to access it
try:
# Try as attribute first
if hasattr(entry, 'itunes_season'):
episode['season'] = int(entry.itunes_season) if entry.itunes_season else None
logger.debug(f"Found season as attribute: {episode['season']}")
# Try as dictionary key
elif entry.get('itunes_season'):
episode['season'] = int(entry.get('itunes_season')) if entry.get('itunes_season') else None
logger.debug(f"Found season as dict key: {episode['season']}")
# Try looking in tags
elif hasattr(entry, 'tags'):
for tag in entry.tags:
if tag.get('term', '').startswith('Season'):
try:
episode['season'] = int(tag.get('term').replace('Season', '').strip())
logger.debug(f"Found season in tags: {episode['season']}")
break
except (ValueError, TypeError):
pass
except Exception as e:
logger.warning(f"Error parsing season: {str(e)}")
# Handle episode number - try multiple ways to access it
try:
# Try as attribute first (itunes_episode)
if hasattr(entry, 'itunes_episode') and entry.itunes_episode:
episode['episode_number'] = entry.itunes_episode
logger.debug(f"Found episode number as attribute: {episode['episode_number']}")
# Try as dictionary key
elif entry.get('itunes_episode'):
episode['episode_number'] = entry.get('itunes_episode')
logger.debug(f"Found episode number as dict key: {episode['episode_number']}")
# Try to extract from title if it contains "Episode X" or "Ep X" or "#X"
elif episode['title']:
import re
# Common patterns for episode numbers in titles
patterns = [
r'Episode\s+(\d+)', # "Episode 123"
r'Ep\s*(\d+)', # "Ep123" or "Ep 123"
r'#(\d+)', # "#123"
r'E(\d+)', # "E123" or "S1E123"
]
for pattern in patterns:
match = re.search(pattern, episode['title'], re.IGNORECASE)
if match:
episode['episode_number'] = match.group(1)
logger.debug(f"Extracted episode number from title: {episode['episode_number']}")
break
except Exception as e:
logger.warning(f"Error parsing episode number: {str(e)}")
# Handle explicit flag - try multiple ways to access it
try:
# Try as attribute first
if hasattr(entry, 'itunes_explicit'):
explicit_value = entry.itunes_explicit
if isinstance(explicit_value, str):
episode['explicit'] = explicit_value.lower() in ('yes', 'true')
logger.debug(f"Found explicit as attribute: {episode['explicit']}")
# Try as dictionary key
elif entry.get('itunes_explicit'):
explicit_value = entry.get('itunes_explicit')
if isinstance(explicit_value, str):
episode['explicit'] = explicit_value.lower() in ('yes', 'true')
logger.debug(f"Found explicit as dict key: {episode['explicit']}")
except Exception as e:
logger.warning(f"Error parsing explicit flag: {str(e)}")
# Handle the different combinations of season and episode numbers
# Case 1: No season, no episode - use published date to create a sequential order
if episode['season'] is None and (episode['episode_number'] is None or episode['episode_number'] == ''):
if episode['published_date']:
# Use the publication date to create a pseudo-episode number
# Format: YYYYMMDD (e.g., 20230101 for January 1, 2023)
episode['episode_number'] = episode['published_date'].strftime('%Y%m%d')
logger.debug(f"No season or episode number, using date as episode number: {episode['episode_number']}")
else:
# If no publication date, use a placeholder
episode['episode_number'] = "unknown"
logger.debug("No season, episode number, or date available")
# Case 2: No season, but episode number exists - keep episode number as is
elif episode['season'] is None and episode['episode_number'] is not None:
logger.debug(f"Using episode number without season: {episode['episode_number']}")
# Case 3: Season exists, no episode number - use season as prefix for ordering
elif episode['season'] is not None and (episode['episode_number'] is None or episode['episode_number'] == ''):
if episode['published_date']:
# Use the publication date with season prefix
# Format: S01_YYYYMMDD
episode['episode_number'] = f"S{episode['season']:02d}_{episode['published_date'].strftime('%Y%m%d')}"
logger.debug(f"Season without episode number, using season+date: {episode['episode_number']}")
else:
# If no publication date, use season with unknown suffix
episode['episode_number'] = f"S{episode['season']:02d}_unknown"
logger.debug(f"Season without episode number or date: {episode['episode_number']}")
# Case 4: Both season and episode exist - format as S01E02
elif episode['season'] is not None and episode['episode_number'] is not None:
# Check if episode_number is already formatted as S01E02
import re
if not re.match(r'^S\d+E\d+$', str(episode['episode_number']), re.IGNORECASE):
try:
# Try to convert episode_number to integer for proper formatting
ep_num = int(episode['episode_number'])
episode['episode_number'] = f"S{episode['season']:02d}E{ep_num:02d}"
logger.debug(f"Formatted season and episode as: {episode['episode_number']}")
except (ValueError, TypeError):
# If episode_number can't be converted to int, use as is with season prefix
episode['episode_number'] = f"S{episode['season']:02d}_{episode['episode_number']}"
logger.debug(f"Using season prefix with non-numeric episode: {episode['episode_number']}")
else:
logger.debug(f"Episode already formatted correctly: {episode['episode_number']}")
# Generate a GUID if one is not provided
if not episode['guid']:
# Try to use a link as GUID
for link in entry.get('links', []):
if link.get('rel') == 'alternate' or link.get('type') == 'text/html':
episode['guid'] = link.get('href', '')
logger.debug(f"Generated GUID from link: {episode['guid']}")
break
# If still no GUID, generate one from title and date
if not episode['guid'] and episode['title']:
import hashlib
# Create a hash from the title and published date (if available)
hash_input = episode['title']
if episode['published_date']:
hash_input += episode['published_date'].isoformat()
episode['guid'] = hashlib.md5(hash_input.encode('utf-8')).hexdigest()
logger.debug(f"Generated GUID from title and date: {episode['guid']}")
# If still no GUID (no title), skip this episode
if not episode['guid']:
logger.warning("Could not generate GUID for episode, skipping")
continue
# Get audio URL
audio_found = False
# Method 1: Check links
for link in entry.get('links', []):
if link.get('type', '').startswith('audio/'):
episode['audio_url'] = link.get('href', '')
episode['file_size'] = link.get('length', 0)
audio_found = True
logger.debug(f"Found audio URL in links: {episode['audio_url']}")
break
# Method 2: Check enclosures
if not audio_found and hasattr(entry, 'enclosures') and entry.enclosures:
for enclosure in entry.enclosures:
if enclosure.get('type', '').startswith('audio/'):
episode['audio_url'] = enclosure.get('href', '')
episode['file_size'] = enclosure.get('length', 0)
audio_found = True
logger.debug(f"Found audio URL in enclosure: {episode['audio_url']}")
break
# Method 3: Check media:content
if not audio_found and hasattr(entry, 'media_content'):
for media in entry.media_content:
if media.get('type', '').startswith('audio/'):
episode['audio_url'] = media.get('url', '')
episode['file_size'] = media.get('fileSize', 0)
audio_found = True
logger.debug(f"Found audio URL in media:content: {episode['audio_url']}")
break
# Method 4: Check for generic enclosure
if not audio_found and hasattr(entry, 'enclosures') and entry.enclosures:
# Try any enclosure if we haven't found an audio URL yet
enclosure = entry.enclosures[0]
episode['audio_url'] = enclosure.get('href', '')
episode['file_size'] = enclosure.get('length', 0)
audio_found = True
logger.debug(f"Found audio URL in generic enclosure: {episode['audio_url']}")
if not audio_found:
logger.warning(f"No audio URL found for episode: {episode['title']}")
# Get image URL
if 'image' in entry and 'href' in entry.image:
episode['image_url'] = entry.image.href
# Only add episodes with audio URLs
if audio_found and 'audio_url' in episode and episode['audio_url']:
# Validate the audio URL
try:
# Check if the URL is valid
if not episode['audio_url'].startswith(('http://', 'https://')):
logger.warning(f"Invalid audio URL format: {episode['audio_url']}")
continue
# Skip validation for now - we'll validate when downloading
# This prevents the import process from getting stuck on slow HEAD requests
# The previous implementation made a HEAD request for each episode, which could
# cause timeouts or hanging connections with feeds containing many episodes
# Validation will happen when the episode is actually downloaded instead
logger.debug(f"Skipping audio URL validation for {episode['title']}")
episode['download_error'] = None
episode['status_code'] = 200 # Assume success
# Add the episode regardless of status code
episodes.append(episode)
logger.debug(f"Added episode: {episode['title']} (Status: {episode.get('status_code')})")
except Exception as e:
# If we can't validate the URL, still add the episode but log a warning
logger.warning(f"Could not validate audio URL: {str(e)}")
episode['download_error'] = f"Could not validate URL: {str(e)}"
episodes.append(episode)
logger.debug(f"Added episode with unvalidated audio URL: {episode['title']}")
else:
logger.warning(f"Skipping episode without audio URL: {episode['title']}")
logger.info(f"Processed {len(episodes)} valid episodes")
return episodes, podcast_metadata
except Exception as e:
logger.error(f"Error getting podcast episodes: {str(e)}")
return [], {}
def _parse_date(date_str):
"""
Parse date string to datetime object.
Args:
date_str (str): Date string from RSS feed.
Returns:
datetime: Parsed datetime object or None.
"""
if not date_str:
return None
try:
return datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S %z')
except ValueError:
try:
return datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S %Z')
except ValueError:
try:
return datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S%z')
except ValueError:
logger.warning(f"Could not parse date: {date_str}")
return None
def _parse_duration(duration_str):
"""
Parse duration string to seconds.
Args:
duration_str (str): Duration string from RSS feed.
Returns:
int: Duration in seconds or None.
"""
if not duration_str:
return None
try:
# Try to parse as seconds
return int(duration_str)
except ValueError:
try:
# Try to parse as HH:MM:SS
parts = duration_str.split(':')
if len(parts) == 3:
hours, minutes, seconds = parts
return int(hours) * 3600 + int(minutes) * 60 + int(seconds)
elif len(parts) == 2:
minutes, seconds = parts
return int(minutes) * 60 + int(seconds)
else:
return None
except ValueError:
logger.warning(f"Could not parse duration: {duration_str}")
return None