diff --git a/lib/database_cleanup/__init__.py b/lib/database_cleanup/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lib/database_cleanup/prune_deleted_content.py b/lib/database_cleanup/prune_deleted_content.py new file mode 100644 index 00000000..80d98eb0 --- /dev/null +++ b/lib/database_cleanup/prune_deleted_content.py @@ -0,0 +1,168 @@ +""" +Tautulli Orphaned History Pruner +Standalone script to remove watch history entries for media no longer in Plex +""" + +import argparse +import asyncio +import logging +import sqlite3 +from contextlib import contextmanager +from plexapi.server import PlexServer +from typing import Set, List + +# Configure logging +logger = logging.getLogger("tautulli-pruner") +log_handler = logging.StreamHandler() +log_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") +log_handler.setFormatter(log_formatter) +logger.addHandler(log_handler) + + +@contextmanager +def database_connection(db_path: str): + """Context manager for SQLite database connections.""" + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + try: + yield conn + finally: + conn.close() + + +class PlexManager: + """Handles Plex server communication with async support""" + + def __init__(self, base_url: str, token: str, server_name: str = None): + self.plex = PlexServer(base_url, token) + self.server_name = server_name or self.plex.friendlyName + + async def fetch_all_media_ids(self) -> Set[int]: + """Fetch all rating keys from relevant Plex libraries""" + media_ids = set() + loop = asyncio.get_event_loop() + + try: + sections = await loop.run_in_executor(None, self.plex.library.sections) + for section in sections: + if section.type in ("movie", "show"): + try: + items = await loop.run_in_executor(None, section.all) + media_ids.update(item.ratingKey for item in items) + logger.debug(f"Processed {section.title} ({len(items)} items)") + except Exception as e: + logger.error(f"Error processing {section.title}: {e}") + except Exception as e: + logger.error(f"Plex connection failed: {e}") + + return media_ids + + +class HistoryPruner: + """Handles orphaned history detection and removal""" + + def __init__(self, db_path: str): + self.db_path = db_path + + def get_watch_history(self) -> List[int]: + """Retrieve all rating keys from watch history""" + with database_connection(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute("SELECT rating_key FROM watch_history") + return [row["rating_key"] for row in cursor.fetchall()] + + def delete_orphans(self, rating_keys: List[int]): + """Batch delete orphaned entries efficiently""" + if not rating_keys: + logger.info("No orphans to delete") + return + + chunk_size = 999 # SQLite parameter limit + total_deleted = 0 + + with database_connection(self.db_path) as conn: + cursor = conn.cursor() + try: + cursor.execute("BEGIN TRANSACTION") + for i in range(0, len(rating_keys), chunk_size): + chunk = rating_keys[i : i + chunk_size] + placeholders = ",".join(["?"] * len(chunk)) + cursor.execute( + f"DELETE FROM watch_history WHERE rating_key IN ({placeholders})", + chunk, + ) + total_deleted += cursor.rowcount + cursor.execute("COMMIT") + logger.info(f"Deleted {total_deleted} orphaned entries") + except sqlite3.Error as e: + cursor.execute("ROLLBACK") + logger.error(f"Database error: {e}") + raise + + +async def main(args): + """Orphan pruning workflow""" + logger.setLevel(args.loglevel) + + # Initialize components + plex = PlexManager(args.plex_url, args.plex_token, args.plex_server) + pruner = HistoryPruner(args.db_path) + + try: + # Fetch data from sources + logger.info("Fetching Plex media IDs...") + plex_ids = await plex.fetch_all_media_ids() + logger.info(f"Found {len(plex_ids)} Plex media items") + + logger.info("Fetching Tautulli watch history...") + history_ids = pruner.get_watch_history() + logger.info(f"Found {len(history_ids)} watch history entries") + + # Calculate orphans + orphans = list(set(history_ids) - plex_ids) + logger.info(f"Identified {len(orphans)} orphaned entries") + + pruner.delete_orphans(orphans) + + except Exception as e: + logger.error(f"Pruning failed: {e}") + raise + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Tautulli orphaned history pruner", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + # Required arguments + parser.add_argument( + "--db-path", required=True, help="Path to Tautulli database file (tautulli.db)" + ) + parser.add_argument( + "--plex-url", required=True, help="Plex server URL (e.g. http://plex:32400)" + ) + parser.add_argument("--plex-token", required=True, help="Plex authentication token") + + # Optional arguments + parser.add_argument("--plex-server", help="Plex server name (if multiple servers)") + parser.add_argument( + "-v", "--verbose", action="count", default=0, help="Increase logging verbosity" + ) + + args = parser.parse_args() + + # Set log level based on verbosity + args.loglevel = logging.WARNING + if args.verbose == 1: + args.loglevel = logging.INFO + elif args.verbose >= 2: + args.loglevel = logging.DEBUG + + try: + asyncio.run(main(args)) + except KeyboardInterrupt: + logger.error("Operation cancelled by user") + except Exception as e: + logger.critical(f"Fatal error: {e}") + exit(1) diff --git a/lib/database_cleanup/tests/__init__.py b/lib/database_cleanup/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lib/database_cleanup/tests/test_prune_deleted_content.py b/lib/database_cleanup/tests/test_prune_deleted_content.py new file mode 100644 index 00000000..65765ba0 --- /dev/null +++ b/lib/database_cleanup/tests/test_prune_deleted_content.py @@ -0,0 +1,123 @@ +import pytest +from unittest.mock import MagicMock, patch +import sqlite3 + +from lib.database_cleanup.prune_deleted_content import ( + PlexManager, + HistoryPruner, + main as pruner_main, +) +import logging + + +@pytest.fixture +def mock_plex(): + plex = MagicMock() + section_movie = MagicMock() + section_movie.type = "movie" + section_movie.title = "Movies" + section_movie.all.return_value = [ + MagicMock(ratingKey=1001), + MagicMock(ratingKey=1002), + ] + + section_show = MagicMock() + section_show.type = "show" + section_show.title = "TV Shows" + section_show.all.return_value = [ + MagicMock(ratingKey=2001), + MagicMock(ratingKey=2002), + ] + + section_music = MagicMock() + section_music.type = "artist" + plex.library.sections.return_value = [section_movie, section_show, section_music] + return plex + + +@pytest.fixture +def test_db(tmp_path): + db_path = tmp_path / "test.db" + conn = sqlite3.connect(db_path) + conn.execute( + """ + CREATE TABLE watch_history ( + id INTEGER PRIMARY KEY, + rating_key INTEGER NOT NULL, + timestamp INTEGER + ) + """ + ) + test_data = [ + (1001, 1625097600), + (1002, 1625184000), + (9999, 1625270400), # Orphan + (8888, 1625356800), # Orphan + ] + conn.executemany( + "INSERT INTO watch_history (rating_key, timestamp) VALUES (?, ?)", test_data + ) + conn.commit() + conn.close() + return db_path + + +@pytest.mark.asyncio +async def test_plex_manager_fetch_ids(mock_plex): + manager = PlexManager("http://test", "token") + with patch("pruner.PlexServer", return_value=mock_plex): + ids = await manager.fetch_all_media_ids() + assert ids == {1001, 1002, 2001, 2002} + + +def test_history_pruner_get_history(test_db): + pruner = HistoryPruner(test_db) + assert set(pruner.get_watch_history()) == {1001, 1002, 9999, 8888} + + +def test_delete_orphans(test_db): + pruner = HistoryPruner(test_db) + pruner.delete_orphans([9999, 8888]) + + conn = sqlite3.connect(test_db) + remaining = conn.execute("SELECT rating_key FROM watch_history").fetchall() + assert len(remaining) == 2 + assert {r[0] for r in remaining} == {1001, 1002} + + +@pytest.mark.asyncio +async def test_main_workflow(test_db, mock_plex, caplog): + with patch("pruner.PlexManager") as mock_manager: + mock_instance = mock_manager.return_value + mock_instance.fetch_all_media_ids.return_value = {1001, 1002, 2001, 2002} + + args = MagicMock( + db_path=test_db, + plex_url="http://test", + plex_token="token", + plex_server=None, + loglevel=logging.INFO, + ) + + await pruner_main(args) + + # Verify deletions + conn = sqlite3.connect(test_db) + remaining = conn.execute("SELECT rating_key FROM watch_history").fetchall() + assert len(remaining) == 2 + + # Verify logs + assert "Found 4 watch history entries" in caplog.text + assert "Identified 2 orphaned entries" in caplog.text + assert "Deleted 2 orphaned entries" in caplog.text + + +def test_empty_database(tmp_path): + db_path = tmp_path / "empty.db" + conn = sqlite3.connect(db_path) + conn.execute("CREATE TABLE watch_history (rating_key INTEGER)") + conn.close() + + pruner = HistoryPruner(db_path) + assert pruner.get_watch_history() == [] + pruner.delete_orphans([123]) # Shouldn't error