""" Podcast search service for Podcastrr. """ import requests import feedparser from datetime import datetime import logging # Set up logging logger = logging.getLogger(__name__) def search_podcasts(query=None, podcast_id=None): """ Search for podcasts using the iTunes API. Args: query (str): Search query for podcasts. podcast_id (str): iTunes podcast ID to get specific podcast. Returns: list: List of podcast dictionaries if query is provided. dict: Podcast dictionary if podcast_id is provided. """ if not query and not podcast_id: return [] if query is not None else None try: if podcast_id: # Get specific podcast by ID url = f"https://itunes.apple.com/lookup?id={podcast_id}&entity=podcast" response = requests.get(url) data = response.json() if data['resultCount'] == 0: return None podcast = data['results'][0] return _format_podcast(podcast) else: # Search for podcasts url = f"https://itunes.apple.com/search?term={query}&entity=podcast&limit=20" response = requests.get(url) data = response.json() results = [] for podcast in data['results']: results.append(_format_podcast(podcast)) return results except Exception as e: logger.error(f"Error searching podcasts: {str(e)}") return [] if query is not None else None def _format_podcast(podcast): """ Format podcast data from iTunes API. Args: podcast (dict): Podcast data from iTunes API. Returns: dict: Formatted podcast data. """ feed_url = podcast.get('feedUrl', '') # Log feed URL for debugging logger.info(f"Podcast: {podcast.get('collectionName', '')}, Feed URL: {feed_url}") if not feed_url: logger.warning(f"No feed URL found for podcast: {podcast.get('collectionName', '')}") return { 'title': podcast.get('collectionName', ''), 'author': podcast.get('artistName', ''), 'description': podcast.get('description', ''), 'image_url': podcast.get('artworkUrl600', podcast.get('artworkUrl100', '')), 'feed_url': feed_url, 'external_id': str(podcast.get('collectionId', '')), 'genre': podcast.get('primaryGenreName', ''), 'country': podcast.get('country', '') } def get_podcast_episodes(feed_url): """ Get podcast episodes from RSS feed. Args: feed_url (str): URL of the podcast RSS feed. Returns: list: List of episode dictionaries. """ try: if not feed_url: logger.error("Empty feed URL provided") return [] logger.info(f"Fetching episodes from feed: {feed_url}") # Check if the feed URL is valid and follow redirects try: import requests response = requests.head(feed_url, allow_redirects=True, timeout=10) if response.status_code != 200: logger.error(f"Feed URL returned status code {response.status_code}: {feed_url}") if response.url != feed_url: logger.info(f"Feed URL redirected from {feed_url} to {response.url}") feed_url = response.url except Exception as e: logger.warning(f"Error checking feed URL: {str(e)}") # Parse the feed feed = feedparser.parse(feed_url) # Check for parsing errors if hasattr(feed, 'bozo_exception') and feed.bozo_exception: logger.error(f"Error parsing feed: {feed.bozo_exception}") # Try to parse the feed with requests if feedparser fails if len(feed.entries) == 0: try: logger.info("Trying alternative method to fetch feed") response = requests.get(feed_url, timeout=10) feed = feedparser.parse(response.content) logger.info(f"Alternative method found {len(feed.entries)} entries") except Exception as e: logger.error(f"Alternative method also failed: {str(e)}") logger.info(f"Found {len(feed.entries)} entries in feed") episodes = [] for entry in feed.entries: # Log entry details for debugging logger.debug(f"Processing entry: {entry.get('title', 'No title')}") # Extract basic episode info episode = { 'title': entry.get('title', ''), 'description': entry.get('description', ''), 'published_date': _parse_date(entry.get('published')), 'guid': entry.get('id', ''), 'duration': _parse_duration(entry.get('itunes_duration', '')), 'season': entry.get('itunes_season'), # Season number 'episode_number': entry.get('itunes_episode', ''), # Episode number within season 'explicit': False # Default to False } # Handle explicit flag safely itunes_explicit = entry.get('itunes_explicit', '') if isinstance(itunes_explicit, str) and itunes_explicit: episode['explicit'] = itunes_explicit.lower() == 'yes' # Generate a GUID if one is not provided if not episode['guid']: # Try to use a link as GUID for link in entry.get('links', []): if link.get('rel') == 'alternate' or link.get('type') == 'text/html': episode['guid'] = link.get('href', '') logger.debug(f"Generated GUID from link: {episode['guid']}") break # If still no GUID, generate one from title and date if not episode['guid'] and episode['title']: import hashlib # Create a hash from the title and published date (if available) hash_input = episode['title'] if episode['published_date']: hash_input += episode['published_date'].isoformat() episode['guid'] = hashlib.md5(hash_input.encode('utf-8')).hexdigest() logger.debug(f"Generated GUID from title and date: {episode['guid']}") # If still no GUID (no title), skip this episode if not episode['guid']: logger.warning("Could not generate GUID for episode, skipping") continue # Get audio URL audio_found = False # Method 1: Check links for link in entry.get('links', []): if link.get('type', '').startswith('audio/'): episode['audio_url'] = link.get('href', '') episode['file_size'] = link.get('length', 0) audio_found = True logger.debug(f"Found audio URL in links: {episode['audio_url']}") break # Method 2: Check enclosures if not audio_found and hasattr(entry, 'enclosures') and entry.enclosures: for enclosure in entry.enclosures: if enclosure.get('type', '').startswith('audio/'): episode['audio_url'] = enclosure.get('href', '') episode['file_size'] = enclosure.get('length', 0) audio_found = True logger.debug(f"Found audio URL in enclosure: {episode['audio_url']}") break # Method 3: Check media:content if not audio_found and hasattr(entry, 'media_content'): for media in entry.media_content: if media.get('type', '').startswith('audio/'): episode['audio_url'] = media.get('url', '') episode['file_size'] = media.get('fileSize', 0) audio_found = True logger.debug(f"Found audio URL in media:content: {episode['audio_url']}") break # Method 4: Check for generic enclosure if not audio_found and hasattr(entry, 'enclosures') and entry.enclosures: # Try any enclosure if we haven't found an audio URL yet enclosure = entry.enclosures[0] episode['audio_url'] = enclosure.get('href', '') episode['file_size'] = enclosure.get('length', 0) audio_found = True logger.debug(f"Found audio URL in generic enclosure: {episode['audio_url']}") if not audio_found: logger.warning(f"No audio URL found for episode: {episode['title']}") # Get image URL if 'image' in entry and 'href' in entry.image: episode['image_url'] = entry.image.href # Only add episodes with audio URLs if audio_found and 'audio_url' in episode and episode['audio_url']: # Validate the audio URL try: # Check if the URL is valid if not episode['audio_url'].startswith(('http://', 'https://')): logger.warning(f"Invalid audio URL format: {episode['audio_url']}") continue # Try to validate the URL without downloading the file import requests head_response = requests.head(episode['audio_url'], timeout=5, allow_redirects=True) # Check if the URL is accessible if head_response.status_code >= 400: logger.warning(f"Audio URL returned status code {head_response.status_code}: {episode['audio_url']}") continue # Check if the content type is audio content_type = head_response.headers.get('Content-Type', '') if not content_type.startswith('audio/') and 'application/octet-stream' not in content_type: logger.warning(f"Audio URL has non-audio content type: {content_type}") # Don't skip here as some servers might not report the correct content type # If we got here, the audio URL is probably valid episodes.append(episode) logger.debug(f"Added episode with valid audio URL: {episode['title']}") except Exception as e: # If we can't validate the URL, still add the episode but log a warning logger.warning(f"Could not validate audio URL: {str(e)}") episodes.append(episode) logger.debug(f"Added episode with unvalidated audio URL: {episode['title']}") else: logger.warning(f"Skipping episode without audio URL: {episode['title']}") logger.info(f"Processed {len(episodes)} valid episodes") return episodes except Exception as e: logger.error(f"Error getting podcast episodes: {str(e)}") return [] def _parse_date(date_str): """ Parse date string to datetime object. Args: date_str (str): Date string from RSS feed. Returns: datetime: Parsed datetime object or None. """ if not date_str: return None try: return datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S %z') except ValueError: try: return datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S %Z') except ValueError: try: return datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S%z') except ValueError: logger.warning(f"Could not parse date: {date_str}") return None def _parse_duration(duration_str): """ Parse duration string to seconds. Args: duration_str (str): Duration string from RSS feed. Returns: int: Duration in seconds or None. """ if not duration_str: return None try: # Try to parse as seconds return int(duration_str) except ValueError: try: # Try to parse as HH:MM:SS parts = duration_str.split(':') if len(parts) == 3: hours, minutes, seconds = parts return int(hours) * 3600 + int(minutes) * 60 + int(seconds) elif len(parts) == 2: minutes, seconds = parts return int(minutes) * 60 + int(seconds) else: return None except ValueError: logger.warning(f"Could not parse duration: {duration_str}") return None