diff --git a/plexpy/__init__.py b/plexpy/__init__.py index 27340070..2efdbc95 100644 --- a/plexpy/__init__.py +++ b/plexpy/__init__.py @@ -32,6 +32,7 @@ import cherrypy from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger +import activity_handler import activity_pinger import config import database @@ -372,7 +373,7 @@ def initialize_scheduler(): try: SCHED.start() except Exception as e: - logger.info(e) + logger.error(e) def schedule_job(function, name, hours=0, minutes=0, seconds=0, args=None): @@ -402,6 +403,9 @@ def start(): global _STARTED if _INITIALIZED: + # Start the scheduler for stale stream callbacks + activity_handler.ACTIVITY_SCHED.start() + # Start background notification thread notification_handler.start_threads(num_threads=CONFIG.NOTIFICATION_THREADS) diff --git a/plexpy/activity_handler.py b/plexpy/activity_handler.py index d591e665..cf0bc93c 100644 --- a/plexpy/activity_handler.py +++ b/plexpy/activity_handler.py @@ -13,9 +13,13 @@ # You should have received a copy of the GNU General Public License # along with PlexPy. If not, see . +import datetime import threading import time +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.date import DateTrigger + import plexpy import activity_processor import datafactory @@ -26,6 +30,8 @@ import notifiers import pmsconnect +ACTIVITY_SCHED = BackgroundScheduler() + RECENTLY_ADDED_QUEUE = {} class ActivityHandler(object): @@ -206,6 +212,9 @@ class ActivityHandler(object): # If we already have this session in the temp table, check for state changes if db_session: + # Re-schedule the callback to reset the 5 minutes timer + schedule_callback(self.get_session_key(), args=[self.get_session_key()], minutes=5) + last_state = db_session['state'] last_key = str(db_session['rating_key']) @@ -225,6 +234,10 @@ class ActivityHandler(object): self.on_resume() elif this_state == 'stopped': self.on_stop() + + # Remove the callback if the stream is stopped + schedule_callback(self.get_session_key(), remove_job=True) + elif this_state == 'buffering': self.on_buffer() # If a client doesn't register stop events (I'm looking at you PHT!) check if the ratingKey has changed @@ -250,6 +263,10 @@ class ActivityHandler(object): if this_state != 'buffering': self.on_start() + # Schedule a callback to force stop a stale stream 5 minutes later + schedule_callback(self.get_session_key(), args=[self.get_session_key()], minutes=5) + + class TimelineHandler(object): def __init__(self, timeline): @@ -439,3 +456,50 @@ def del_keys(key): del_keys(child_key) elif key in RECENTLY_ADDED_QUEUE: del_keys(RECENTLY_ADDED_QUEUE.pop(key)) + + +def schedule_callback(id, remove_job=False, args=None, **kwargs): + if ACTIVITY_SCHED.get_job(str(id)): + if remove_job: + ACTIVITY_SCHED.remove_job(str(id)) + else: + ACTIVITY_SCHED.reschedule_job( + str(id), args=args, trigger=DateTrigger( + run_date=datetime.datetime.now() + datetime.timedelta(**kwargs))) + elif not remove_job: + ACTIVITY_SCHED.add_job( + force_stop_stream, args=args, id=str(id), trigger=DateTrigger( + run_date=datetime.datetime.now() + datetime.timedelta(**kwargs))) + + +def force_stop_stream(session_key): + ap = activity_processor.ActivityProcessor() + session = ap.get_session_by_key(session_key=session_key) + + success = ap.write_session_history(session=session) + + if success: + # If session is written to the databaase successfully, remove the session from the session table + logger.info(u"PlexPy ActivityHandler :: Removing stale stream with sessionKey %s ratingKey %s from session queue" + % (session['session_key'], session['rating_key'])) + ap.delete_session(session_key=session_key) + + else: + sessions['write_attempts'] += 1 + + if sessions['write_attempts'] < plexpy.CONFIG.SESSION_DB_WRITE_ATTEMPTS: + logger.warn(u"PlexPy ActivityHandler :: Failed to write stream with sessionKey %s ratingKey %s to the database. " \ + "Will try again in 30 seconds. Write attempt %s." + % (sessions['session_key'], sessions['rating_key'], str(sessions['write_attempts']))) + ap.increment_write_attempts(session_key=session_key) + + # Reschedule for 30 seconds later + schedule_callback(session_key, args=[session_key], seconds=30) + + else: + logger.warn(u"PlexPy Monitor :: Failed to write stream with sessionKey %s ratingKey %s to the database. " \ + "Removing session from the database. Write attempt %s." + % (sessions['session_key'], sessions['rating_key'], str(sessions['write_attempts']))) + logger.info(u"PlexPy Monitor :: Removing stale stream with sessionKey %s ratingKey %s from session queue" + % (sessions['session_key'], sessions['rating_key'])) + ap.delete_session(session_key=session_key) \ No newline at end of file diff --git a/plexpy/activity_processor.py b/plexpy/activity_processor.py index 2736805c..978fc301 100644 --- a/plexpy/activity_processor.py +++ b/plexpy/activity_processor.py @@ -404,4 +404,10 @@ class ActivityProcessor(object): def set_temp_stopped(self): stopped_time = int(time.time()) - self.db.action('UPDATE sessions SET stopped = ?', [stopped_time]) \ No newline at end of file + self.db.action('UPDATE sessions SET stopped = ?', [stopped_time]) + + def increment_write_attempts(self, session_key=None): + if str(session_key).isdigit(): + session = self.get_session_by_key(session_key=session_key) + self.db.action('UPDATE sessions SET write_attempts = ? WHERE session_key = ?', + [session['write_attempts'] + 1, session_key])