169 lines
5.4 KiB
Python
169 lines
5.4 KiB
Python
"""
|
|
OPML import/export functionality for Podcastrr.
|
|
"""
|
|
import xml.etree.ElementTree as ET
|
|
from xml.dom import minidom
|
|
import logging
|
|
from datetime import datetime
|
|
from flask import current_app
|
|
|
|
# Set up logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def parse_opml(opml_content):
|
|
"""
|
|
Parse OPML content and extract podcast feed URLs.
|
|
|
|
Args:
|
|
opml_content (str): OPML file content.
|
|
|
|
Returns:
|
|
list: List of dictionaries containing podcast information.
|
|
"""
|
|
try:
|
|
root = ET.fromstring(opml_content)
|
|
|
|
# Find all outline elements that represent podcasts
|
|
podcasts = []
|
|
|
|
# Look for outlines in the body
|
|
body = root.find('body')
|
|
if body is None:
|
|
logger.error("OPML file has no body element")
|
|
return []
|
|
|
|
# Process all outline elements
|
|
for outline in body.findall('.//outline'):
|
|
# Check if this is a podcast outline (has xmlUrl attribute)
|
|
xml_url = outline.get('xmlUrl')
|
|
if xml_url:
|
|
podcast = {
|
|
'feed_url': xml_url,
|
|
'title': outline.get('title') or outline.get('text', 'Unknown Podcast'),
|
|
'description': outline.get('description', ''),
|
|
'html_url': outline.get('htmlUrl', '')
|
|
}
|
|
podcasts.append(podcast)
|
|
|
|
logger.info(f"Parsed OPML file and found {len(podcasts)} podcasts")
|
|
return podcasts
|
|
except Exception as e:
|
|
logger.error(f"Error parsing OPML file: {str(e)}")
|
|
return []
|
|
|
|
def generate_opml(podcasts):
|
|
"""
|
|
Generate OPML content from a list of podcasts.
|
|
|
|
Args:
|
|
podcasts (list): List of Podcast model instances.
|
|
|
|
Returns:
|
|
str: OPML file content.
|
|
"""
|
|
try:
|
|
# Create the root element
|
|
root = ET.Element('opml')
|
|
root.set('version', '2.0')
|
|
|
|
# Create the head element
|
|
head = ET.SubElement(root, 'head')
|
|
title = ET.SubElement(head, 'title')
|
|
title.text = 'Podcastrr Subscriptions'
|
|
date_created = ET.SubElement(head, 'dateCreated')
|
|
date_created.text = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
|
|
|
|
# Create the body element
|
|
body = ET.SubElement(root, 'body')
|
|
|
|
# Add each podcast as an outline element
|
|
for podcast in podcasts:
|
|
outline = ET.SubElement(body, 'outline')
|
|
outline.set('type', 'rss')
|
|
outline.set('text', podcast.title)
|
|
outline.set('title', podcast.title)
|
|
outline.set('xmlUrl', podcast.feed_url)
|
|
if podcast.description:
|
|
outline.set('description', podcast.description)
|
|
|
|
# Convert to pretty-printed XML
|
|
xml_str = ET.tostring(root, encoding='utf-8')
|
|
parsed_xml = minidom.parseString(xml_str)
|
|
pretty_xml = parsed_xml.toprettyxml(indent=" ")
|
|
|
|
logger.info(f"Generated OPML file with {len(podcasts)} podcasts")
|
|
return pretty_xml
|
|
except Exception as e:
|
|
logger.error(f"Error generating OPML file: {str(e)}")
|
|
return ""
|
|
|
|
def import_podcasts_from_opml(opml_content, progress_callback=None):
|
|
"""
|
|
Import podcasts from OPML content into the database.
|
|
|
|
Args:
|
|
opml_content (str): OPML file content.
|
|
progress_callback (callable, optional): Function to call with progress updates.
|
|
|
|
Returns:
|
|
dict: Statistics about the import process.
|
|
"""
|
|
from app.models.podcast import Podcast
|
|
from app.models.database import db
|
|
from app.services.podcast_updater import update_podcast
|
|
|
|
podcasts = parse_opml(opml_content)
|
|
|
|
stats = {
|
|
'total': len(podcasts),
|
|
'imported': 0,
|
|
'skipped': 0,
|
|
'errors': 0
|
|
}
|
|
|
|
# Initial progress update
|
|
if progress_callback:
|
|
progress_callback(0, f"Starting import of {len(podcasts)} podcasts")
|
|
|
|
for i, podcast_data in enumerate(podcasts):
|
|
try:
|
|
# Check if podcast already exists
|
|
existing = Podcast.query.filter_by(feed_url=podcast_data['feed_url']).first()
|
|
|
|
if existing:
|
|
logger.info(f"Podcast already exists: {podcast_data['title']}")
|
|
stats['skipped'] += 1
|
|
continue
|
|
|
|
# Create new podcast
|
|
podcast = Podcast(
|
|
title=podcast_data['title'],
|
|
description=podcast_data.get('description', ''),
|
|
feed_url=podcast_data['feed_url']
|
|
)
|
|
|
|
db.session.add(podcast)
|
|
db.session.commit()
|
|
|
|
# Update podcast to fetch episodes
|
|
try:
|
|
update_podcast(podcast.id)
|
|
except Exception as e:
|
|
logger.error(f"Error updating podcast {podcast.title}: {str(e)}")
|
|
|
|
stats['imported'] += 1
|
|
logger.info(f"Imported podcast: {podcast.title}")
|
|
except Exception as e:
|
|
stats['errors'] += 1
|
|
logger.error(f"Error importing podcast: {str(e)}")
|
|
|
|
# Update progress during the loop
|
|
if progress_callback and len(podcasts) > 0:
|
|
progress = int((i + 1) / len(podcasts) * 100)
|
|
progress_callback(progress, f"Processed {i + 1}/{len(podcasts)} podcasts")
|
|
|
|
# Final progress update
|
|
if progress_callback:
|
|
progress_callback(100, f"Import completed. Imported: {stats['imported']}, Skipped: {stats['skipped']}, Errors: {stats['errors']}")
|
|
|
|
return stats
|