""" Podcast search service for Podcastrr. """ import requests import feedparser from datetime import datetime import logging # Set up logging logger = logging.getLogger(__name__) def search_podcasts(query=None, podcast_id=None): """ Search for podcasts using the iTunes API. Args: query (str): Search query for podcasts. podcast_id (str): iTunes podcast ID to get specific podcast. Returns: list: List of podcast dictionaries if query is provided. dict: Podcast dictionary if podcast_id is provided. """ if not query and not podcast_id: return [] if query is not None else None try: if podcast_id: # Get specific podcast by ID url = f"https://itunes.apple.com/lookup?id={podcast_id}&entity=podcast" response = requests.get(url) data = response.json() if data['resultCount'] == 0: return None podcast = data['results'][0] return _format_podcast(podcast) else: # Search for podcasts url = f"https://itunes.apple.com/search?term={query}&entity=podcast&limit=20" response = requests.get(url) data = response.json() results = [] for podcast in data['results']: results.append(_format_podcast(podcast)) return results except Exception as e: logger.error(f"Error searching podcasts: {str(e)}") return [] if query is not None else None def _format_podcast(podcast): """ Format podcast data from iTunes API. Args: podcast (dict): Podcast data from iTunes API. Returns: dict: Formatted podcast data. """ feed_url = podcast.get('feedUrl', '') # Log feed URL for debugging logger.info(f"Podcast: {podcast.get('collectionName', '')}, Feed URL: {feed_url}") if not feed_url: logger.warning(f"No feed URL found for podcast: {podcast.get('collectionName', '')}") return { 'title': podcast.get('collectionName', ''), 'author': podcast.get('artistName', ''), 'description': podcast.get('description', ''), 'image_url': podcast.get('artworkUrl600', podcast.get('artworkUrl100', '')), 'feed_url': feed_url, 'external_id': str(podcast.get('collectionId', '')), 'genre': podcast.get('primaryGenreName', ''), 'country': podcast.get('country', '') } def get_podcast_episodes(feed_url): """ Get podcast episodes from RSS feed. Args: feed_url (str): URL of the podcast RSS feed. Returns: list: List of episode dictionaries. """ try: if not feed_url: logger.error("Empty feed URL provided") return [] logger.info(f"Fetching episodes from feed: {feed_url}") # Check if the feed URL is valid and follow redirects try: import requests response = requests.head(feed_url, allow_redirects=True, timeout=10) if response.status_code != 200: logger.error(f"Feed URL returned status code {response.status_code}: {feed_url}") if response.url != feed_url: logger.info(f"Feed URL redirected from {feed_url} to {response.url}") feed_url = response.url except Exception as e: logger.warning(f"Error checking feed URL: {str(e)}") # Parse the feed feed = feedparser.parse(feed_url) # Check for parsing errors if hasattr(feed, 'bozo_exception') and feed.bozo_exception: logger.error(f"Error parsing feed: {feed.bozo_exception}") # Try to parse the feed with requests if feedparser fails if len(feed.entries) == 0: try: logger.info("Trying alternative method to fetch feed") response = requests.get(feed_url, timeout=10) feed = feedparser.parse(response.content) logger.info(f"Alternative method found {len(feed.entries)} entries") except Exception as e: logger.error(f"Alternative method also failed: {str(e)}") logger.info(f"Found {len(feed.entries)} entries in feed") episodes = [] for entry in feed.entries: # Log entry details for debugging logger.debug(f"Processing entry: {entry.get('title', 'No title')}") # Extract basic episode info episode = { 'title': entry.get('title', ''), 'description': entry.get('description', ''), 'published_date': _parse_date(entry.get('published')), 'guid': entry.get('id', ''), 'duration': _parse_duration(entry.get('itunes_duration', '')), 'season': None, # Default to None 'episode_number': None, # Default to None, will try to extract from various sources 'explicit': False # Default to False } # Handle season tag - try multiple ways to access it try: # Try as attribute first if hasattr(entry, 'itunes_season'): episode['season'] = int(entry.itunes_season) if entry.itunes_season else None logger.debug(f"Found season as attribute: {episode['season']}") # Try as dictionary key elif entry.get('itunes_season'): episode['season'] = int(entry.get('itunes_season')) if entry.get('itunes_season') else None logger.debug(f"Found season as dict key: {episode['season']}") # Try looking in tags elif hasattr(entry, 'tags'): for tag in entry.tags: if tag.get('term', '').startswith('Season'): try: episode['season'] = int(tag.get('term').replace('Season', '').strip()) logger.debug(f"Found season in tags: {episode['season']}") break except (ValueError, TypeError): pass except Exception as e: logger.warning(f"Error parsing season: {str(e)}") # Handle episode number - try multiple ways to access it try: # Try as attribute first (itunes_episode) if hasattr(entry, 'itunes_episode') and entry.itunes_episode: episode['episode_number'] = entry.itunes_episode logger.debug(f"Found episode number as attribute: {episode['episode_number']}") # Try as dictionary key elif entry.get('itunes_episode'): episode['episode_number'] = entry.get('itunes_episode') logger.debug(f"Found episode number as dict key: {episode['episode_number']}") # Try to extract from title if it contains "Episode X" or "Ep X" or "#X" elif episode['title']: import re # Common patterns for episode numbers in titles patterns = [ r'Episode\s+(\d+)', # "Episode 123" r'Ep\s*(\d+)', # "Ep123" or "Ep 123" r'#(\d+)', # "#123" r'E(\d+)', # "E123" or "S1E123" ] for pattern in patterns: match = re.search(pattern, episode['title'], re.IGNORECASE) if match: episode['episode_number'] = match.group(1) logger.debug(f"Extracted episode number from title: {episode['episode_number']}") break except Exception as e: logger.warning(f"Error parsing episode number: {str(e)}") # Handle explicit flag - try multiple ways to access it try: # Try as attribute first if hasattr(entry, 'itunes_explicit'): explicit_value = entry.itunes_explicit if isinstance(explicit_value, str): episode['explicit'] = explicit_value.lower() in ('yes', 'true') logger.debug(f"Found explicit as attribute: {episode['explicit']}") # Try as dictionary key elif entry.get('itunes_explicit'): explicit_value = entry.get('itunes_explicit') if isinstance(explicit_value, str): episode['explicit'] = explicit_value.lower() in ('yes', 'true') logger.debug(f"Found explicit as dict key: {episode['explicit']}") except Exception as e: logger.warning(f"Error parsing explicit flag: {str(e)}") # Handle the different combinations of season and episode numbers # Case 1: No season, no episode - use published date to create a sequential order if episode['season'] is None and (episode['episode_number'] is None or episode['episode_number'] == ''): if episode['published_date']: # Use the publication date to create a pseudo-episode number # Format: YYYYMMDD (e.g., 20230101 for January 1, 2023) episode['episode_number'] = episode['published_date'].strftime('%Y%m%d') logger.debug(f"No season or episode number, using date as episode number: {episode['episode_number']}") else: # If no publication date, use a placeholder episode['episode_number'] = "unknown" logger.debug("No season, episode number, or date available") # Case 2: No season, but episode number exists - keep episode number as is elif episode['season'] is None and episode['episode_number'] is not None: logger.debug(f"Using episode number without season: {episode['episode_number']}") # Case 3: Season exists, no episode number - use season as prefix for ordering elif episode['season'] is not None and (episode['episode_number'] is None or episode['episode_number'] == ''): if episode['published_date']: # Use the publication date with season prefix # Format: S01_YYYYMMDD episode['episode_number'] = f"S{episode['season']:02d}_{episode['published_date'].strftime('%Y%m%d')}" logger.debug(f"Season without episode number, using season+date: {episode['episode_number']}") else: # If no publication date, use season with unknown suffix episode['episode_number'] = f"S{episode['season']:02d}_unknown" logger.debug(f"Season without episode number or date: {episode['episode_number']}") # Case 4: Both season and episode exist - format as S01E02 elif episode['season'] is not None and episode['episode_number'] is not None: # Check if episode_number is already formatted as S01E02 import re if not re.match(r'^S\d+E\d+$', str(episode['episode_number']), re.IGNORECASE): try: # Try to convert episode_number to integer for proper formatting ep_num = int(episode['episode_number']) episode['episode_number'] = f"S{episode['season']:02d}E{ep_num:02d}" logger.debug(f"Formatted season and episode as: {episode['episode_number']}") except (ValueError, TypeError): # If episode_number can't be converted to int, use as is with season prefix episode['episode_number'] = f"S{episode['season']:02d}_{episode['episode_number']}" logger.debug(f"Using season prefix with non-numeric episode: {episode['episode_number']}") else: logger.debug(f"Episode already formatted correctly: {episode['episode_number']}") # Generate a GUID if one is not provided if not episode['guid']: # Try to use a link as GUID for link in entry.get('links', []): if link.get('rel') == 'alternate' or link.get('type') == 'text/html': episode['guid'] = link.get('href', '') logger.debug(f"Generated GUID from link: {episode['guid']}") break # If still no GUID, generate one from title and date if not episode['guid'] and episode['title']: import hashlib # Create a hash from the title and published date (if available) hash_input = episode['title'] if episode['published_date']: hash_input += episode['published_date'].isoformat() episode['guid'] = hashlib.md5(hash_input.encode('utf-8')).hexdigest() logger.debug(f"Generated GUID from title and date: {episode['guid']}") # If still no GUID (no title), skip this episode if not episode['guid']: logger.warning("Could not generate GUID for episode, skipping") continue # Get audio URL audio_found = False # Method 1: Check links for link in entry.get('links', []): if link.get('type', '').startswith('audio/'): episode['audio_url'] = link.get('href', '') episode['file_size'] = link.get('length', 0) audio_found = True logger.debug(f"Found audio URL in links: {episode['audio_url']}") break # Method 2: Check enclosures if not audio_found and hasattr(entry, 'enclosures') and entry.enclosures: for enclosure in entry.enclosures: if enclosure.get('type', '').startswith('audio/'): episode['audio_url'] = enclosure.get('href', '') episode['file_size'] = enclosure.get('length', 0) audio_found = True logger.debug(f"Found audio URL in enclosure: {episode['audio_url']}") break # Method 3: Check media:content if not audio_found and hasattr(entry, 'media_content'): for media in entry.media_content: if media.get('type', '').startswith('audio/'): episode['audio_url'] = media.get('url', '') episode['file_size'] = media.get('fileSize', 0) audio_found = True logger.debug(f"Found audio URL in media:content: {episode['audio_url']}") break # Method 4: Check for generic enclosure if not audio_found and hasattr(entry, 'enclosures') and entry.enclosures: # Try any enclosure if we haven't found an audio URL yet enclosure = entry.enclosures[0] episode['audio_url'] = enclosure.get('href', '') episode['file_size'] = enclosure.get('length', 0) audio_found = True logger.debug(f"Found audio URL in generic enclosure: {episode['audio_url']}") if not audio_found: logger.warning(f"No audio URL found for episode: {episode['title']}") # Get image URL if 'image' in entry and 'href' in entry.image: episode['image_url'] = entry.image.href # Only add episodes with audio URLs if audio_found and 'audio_url' in episode and episode['audio_url']: # Validate the audio URL try: # Check if the URL is valid if not episode['audio_url'].startswith(('http://', 'https://')): logger.warning(f"Invalid audio URL format: {episode['audio_url']}") continue # Try to validate the URL without downloading the file import requests head_response = requests.head(episode['audio_url'], timeout=5, allow_redirects=True) # Check if the URL is accessible if head_response.status_code >= 400: logger.warning(f"Audio URL returned status code {head_response.status_code}: {episode['audio_url']}") continue # Check if the content type is audio content_type = head_response.headers.get('Content-Type', '') if not content_type.startswith('audio/') and 'application/octet-stream' not in content_type: logger.warning(f"Audio URL has non-audio content type: {content_type}") # Don't skip here as some servers might not report the correct content type # If we got here, the audio URL is probably valid episodes.append(episode) logger.debug(f"Added episode with valid audio URL: {episode['title']}") except Exception as e: # If we can't validate the URL, still add the episode but log a warning logger.warning(f"Could not validate audio URL: {str(e)}") episodes.append(episode) logger.debug(f"Added episode with unvalidated audio URL: {episode['title']}") else: logger.warning(f"Skipping episode without audio URL: {episode['title']}") logger.info(f"Processed {len(episodes)} valid episodes") return episodes except Exception as e: logger.error(f"Error getting podcast episodes: {str(e)}") return [] def _parse_date(date_str): """ Parse date string to datetime object. Args: date_str (str): Date string from RSS feed. Returns: datetime: Parsed datetime object or None. """ if not date_str: return None try: return datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S %z') except ValueError: try: return datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S %Z') except ValueError: try: return datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S%z') except ValueError: logger.warning(f"Could not parse date: {date_str}") return None def _parse_duration(duration_str): """ Parse duration string to seconds. Args: duration_str (str): Duration string from RSS feed. Returns: int: Duration in seconds or None. """ if not duration_str: return None try: # Try to parse as seconds return int(duration_str) except ValueError: try: # Try to parse as HH:MM:SS parts = duration_str.split(':') if len(parts) == 3: hours, minutes, seconds = parts return int(hours) * 3600 + int(minutes) * 60 + int(seconds) elif len(parts) == 2: minutes, seconds = parts return int(minutes) * 60 + int(seconds) else: return None except ValueError: logger.warning(f"Could not parse duration: {duration_str}") return None