Add script for orphaned history pruning functionality

This commit is contained in:
lincmba 2025-05-09 01:15:40 +03:00
commit d7b6312cb2
No known key found for this signature in database
GPG key ID: 1EFC0193DEB2AEF0
4 changed files with 291 additions and 0 deletions

View file

View file

@ -0,0 +1,168 @@
"""
Tautulli Orphaned History Pruner
Standalone script to remove watch history entries for media no longer in Plex
"""
import argparse
import asyncio
import logging
import sqlite3
from contextlib import contextmanager
from plexapi.server import PlexServer
from typing import Set, List
# Configure logging
logger = logging.getLogger("tautulli-pruner")
log_handler = logging.StreamHandler()
log_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
log_handler.setFormatter(log_formatter)
logger.addHandler(log_handler)
@contextmanager
def database_connection(db_path: str):
"""Context manager for SQLite database connections."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
class PlexManager:
"""Handles Plex server communication with async support"""
def __init__(self, base_url: str, token: str, server_name: str = None):
self.plex = PlexServer(base_url, token)
self.server_name = server_name or self.plex.friendlyName
async def fetch_all_media_ids(self) -> Set[int]:
"""Fetch all rating keys from relevant Plex libraries"""
media_ids = set()
loop = asyncio.get_event_loop()
try:
sections = await loop.run_in_executor(None, self.plex.library.sections)
for section in sections:
if section.type in ("movie", "show"):
try:
items = await loop.run_in_executor(None, section.all)
media_ids.update(item.ratingKey for item in items)
logger.debug(f"Processed {section.title} ({len(items)} items)")
except Exception as e:
logger.error(f"Error processing {section.title}: {e}")
except Exception as e:
logger.error(f"Plex connection failed: {e}")
return media_ids
class HistoryPruner:
"""Handles orphaned history detection and removal"""
def __init__(self, db_path: str):
self.db_path = db_path
def get_watch_history(self) -> List[int]:
"""Retrieve all rating keys from watch history"""
with database_connection(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT rating_key FROM watch_history")
return [row["rating_key"] for row in cursor.fetchall()]
def delete_orphans(self, rating_keys: List[int]):
"""Batch delete orphaned entries efficiently"""
if not rating_keys:
logger.info("No orphans to delete")
return
chunk_size = 999 # SQLite parameter limit
total_deleted = 0
with database_connection(self.db_path) as conn:
cursor = conn.cursor()
try:
cursor.execute("BEGIN TRANSACTION")
for i in range(0, len(rating_keys), chunk_size):
chunk = rating_keys[i : i + chunk_size]
placeholders = ",".join(["?"] * len(chunk))
cursor.execute(
f"DELETE FROM watch_history WHERE rating_key IN ({placeholders})",
chunk,
)
total_deleted += cursor.rowcount
cursor.execute("COMMIT")
logger.info(f"Deleted {total_deleted} orphaned entries")
except sqlite3.Error as e:
cursor.execute("ROLLBACK")
logger.error(f"Database error: {e}")
raise
async def main(args):
"""Orphan pruning workflow"""
logger.setLevel(args.loglevel)
# Initialize components
plex = PlexManager(args.plex_url, args.plex_token, args.plex_server)
pruner = HistoryPruner(args.db_path)
try:
# Fetch data from sources
logger.info("Fetching Plex media IDs...")
plex_ids = await plex.fetch_all_media_ids()
logger.info(f"Found {len(plex_ids)} Plex media items")
logger.info("Fetching Tautulli watch history...")
history_ids = pruner.get_watch_history()
logger.info(f"Found {len(history_ids)} watch history entries")
# Calculate orphans
orphans = list(set(history_ids) - plex_ids)
logger.info(f"Identified {len(orphans)} orphaned entries")
pruner.delete_orphans(orphans)
except Exception as e:
logger.error(f"Pruning failed: {e}")
raise
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Tautulli orphaned history pruner",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
# Required arguments
parser.add_argument(
"--db-path", required=True, help="Path to Tautulli database file (tautulli.db)"
)
parser.add_argument(
"--plex-url", required=True, help="Plex server URL (e.g. http://plex:32400)"
)
parser.add_argument("--plex-token", required=True, help="Plex authentication token")
# Optional arguments
parser.add_argument("--plex-server", help="Plex server name (if multiple servers)")
parser.add_argument(
"-v", "--verbose", action="count", default=0, help="Increase logging verbosity"
)
args = parser.parse_args()
# Set log level based on verbosity
args.loglevel = logging.WARNING
if args.verbose == 1:
args.loglevel = logging.INFO
elif args.verbose >= 2:
args.loglevel = logging.DEBUG
try:
asyncio.run(main(args))
except KeyboardInterrupt:
logger.error("Operation cancelled by user")
except Exception as e:
logger.critical(f"Fatal error: {e}")
exit(1)

View file

View file

@ -0,0 +1,123 @@
import pytest
from unittest.mock import MagicMock, patch
import sqlite3
from lib.database_cleanup.prune_deleted_content import (
PlexManager,
HistoryPruner,
main as pruner_main,
)
import logging
@pytest.fixture
def mock_plex():
plex = MagicMock()
section_movie = MagicMock()
section_movie.type = "movie"
section_movie.title = "Movies"
section_movie.all.return_value = [
MagicMock(ratingKey=1001),
MagicMock(ratingKey=1002),
]
section_show = MagicMock()
section_show.type = "show"
section_show.title = "TV Shows"
section_show.all.return_value = [
MagicMock(ratingKey=2001),
MagicMock(ratingKey=2002),
]
section_music = MagicMock()
section_music.type = "artist"
plex.library.sections.return_value = [section_movie, section_show, section_music]
return plex
@pytest.fixture
def test_db(tmp_path):
db_path = tmp_path / "test.db"
conn = sqlite3.connect(db_path)
conn.execute(
"""
CREATE TABLE watch_history (
id INTEGER PRIMARY KEY,
rating_key INTEGER NOT NULL,
timestamp INTEGER
)
"""
)
test_data = [
(1001, 1625097600),
(1002, 1625184000),
(9999, 1625270400), # Orphan
(8888, 1625356800), # Orphan
]
conn.executemany(
"INSERT INTO watch_history (rating_key, timestamp) VALUES (?, ?)", test_data
)
conn.commit()
conn.close()
return db_path
@pytest.mark.asyncio
async def test_plex_manager_fetch_ids(mock_plex):
manager = PlexManager("http://test", "token")
with patch("pruner.PlexServer", return_value=mock_plex):
ids = await manager.fetch_all_media_ids()
assert ids == {1001, 1002, 2001, 2002}
def test_history_pruner_get_history(test_db):
pruner = HistoryPruner(test_db)
assert set(pruner.get_watch_history()) == {1001, 1002, 9999, 8888}
def test_delete_orphans(test_db):
pruner = HistoryPruner(test_db)
pruner.delete_orphans([9999, 8888])
conn = sqlite3.connect(test_db)
remaining = conn.execute("SELECT rating_key FROM watch_history").fetchall()
assert len(remaining) == 2
assert {r[0] for r in remaining} == {1001, 1002}
@pytest.mark.asyncio
async def test_main_workflow(test_db, mock_plex, caplog):
with patch("pruner.PlexManager") as mock_manager:
mock_instance = mock_manager.return_value
mock_instance.fetch_all_media_ids.return_value = {1001, 1002, 2001, 2002}
args = MagicMock(
db_path=test_db,
plex_url="http://test",
plex_token="token",
plex_server=None,
loglevel=logging.INFO,
)
await pruner_main(args)
# Verify deletions
conn = sqlite3.connect(test_db)
remaining = conn.execute("SELECT rating_key FROM watch_history").fetchall()
assert len(remaining) == 2
# Verify logs
assert "Found 4 watch history entries" in caplog.text
assert "Identified 2 orphaned entries" in caplog.text
assert "Deleted 2 orphaned entries" in caplog.text
def test_empty_database(tmp_path):
db_path = tmp_path / "empty.db"
conn = sqlite3.connect(db_path)
conn.execute("CREATE TABLE watch_history (rating_key INTEGER)")
conn.close()
pruner = HistoryPruner(db_path)
assert pruner.get_watch_history() == []
pruner.delete_orphans([123]) # Shouldn't error