From 32b43202c2210feecacc40fc40c228c90602d00e Mon Sep 17 00:00:00 2001 From: JonnyWong16 Date: Mon, 15 Jan 2018 18:55:37 -0800 Subject: [PATCH] Attempt at fixing stuck sessions which require flishing the database --- plexpy/activity_handler.py | 12 ++++++------ plexpy/activity_pinger.py | 17 ++++++----------- plexpy/activity_processor.py | 18 +++++++++++++----- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/plexpy/activity_handler.py b/plexpy/activity_handler.py index 39a2b55f..026f0e67 100644 --- a/plexpy/activity_handler.py +++ b/plexpy/activity_handler.py @@ -121,15 +121,15 @@ class ActivityHandler(object): # Write it to the history table monitor_proc = activity_processor.ActivityProcessor() - success = monitor_proc.write_session_history(session=db_session) + row_id = monitor_proc.write_session_history(session=db_session) - if success: + if row_id: schedule_callback('session_key-{}'.format(self.get_session_key()), remove_job=True) # Remove the session from our temp session table logger.debug(u"Tautulli ActivityHandler :: Removing sessionKey %s ratingKey %s from session queue" % (str(self.get_session_key()), str(self.get_rating_key()))) - ap.delete_session(session_key=self.get_session_key()) + ap.delete_session(row_id=row_id) delete_metadata_cache(self.get_session_key()) else: schedule_callback('session_key-{}'.format(self.get_session_key()), func=force_stop_stream, @@ -441,13 +441,13 @@ def force_stop_stream(session_key): ap = activity_processor.ActivityProcessor() session = ap.get_session_by_key(session_key=session_key) - success = ap.write_session_history(session=session) + row_id = ap.write_session_history(session=session) - if success: + if row_id: # If session is written to the databaase successfully, remove the session from the session table logger.info(u"Tautulli ActivityHandler :: Removing stale stream with sessionKey %s ratingKey %s from session queue" % (session['session_key'], session['rating_key'])) - ap.delete_session(session_key=session_key) + ap.delete_session(row_id=row_id) delete_metadata_cache(session_key) else: diff --git a/plexpy/activity_pinger.py b/plexpy/activity_pinger.py index ae7341bf..13ff3063 100644 --- a/plexpy/activity_pinger.py +++ b/plexpy/activity_pinger.py @@ -160,14 +160,13 @@ def check_active_sessions(ws_request=False): plexpy.NOTIFY_QUEUE.put({'stream_data': stream, 'notify_action': 'on_stop'}) # Write the item history on playback stop - success = monitor_process.write_session_history(session=stream) - - if success: + row_id = monitor_process.write_session_history(session=stream) + + if row_id: # If session is written to the databaase successfully, remove the session from the session table logger.debug(u"Tautulli Monitor :: Removing sessionKey %s ratingKey %s from session queue" % (stream['session_key'], stream['rating_key'])) - monitor_db.action('DELETE FROM sessions WHERE session_key = ? AND rating_key = ?', - [stream['session_key'], stream['rating_key']]) + monitor_process.delete_session(row_id=row_id) else: stream['write_attempts'] += 1 @@ -175,18 +174,14 @@ def check_active_sessions(ws_request=False): logger.warn(u"Tautulli Monitor :: Failed to write sessionKey %s ratingKey %s to the database. " \ "Will try again on the next pass. Write attempt %s." % (stream['session_key'], stream['rating_key'], str(stream['write_attempts']))) - monitor_db.action('UPDATE sessions SET write_attempts = ? ' - 'WHERE session_key = ? AND rating_key = ?', - [stream['write_attempts'], stream['session_key'], stream['rating_key']]) + monitor_process.increment_write_attempts(session_key=stream['session_key']) else: logger.warn(u"Tautulli Monitor :: Failed to write sessionKey %s ratingKey %s to the database. " \ "Removing session from the database. Write attempt %s." % (stream['session_key'], stream['rating_key'], str(stream['write_attempts']))) logger.debug(u"Tautulli Monitor :: Removing sessionKey %s ratingKey %s from session queue" % (stream['session_key'], stream['rating_key'])) - monitor_db.action('DELETE FROM sessions WHERE session_key = ? AND rating_key = ?', - [stream['session_key'], stream['rating_key']]) - + monitor_process.delete_session(session_key=stream['session_key']) # Process the newly received session data for session in media_container: diff --git a/plexpy/activity_processor.py b/plexpy/activity_processor.py index dec886ae..e58f9d2e 100644 --- a/plexpy/activity_processor.py +++ b/plexpy/activity_processor.py @@ -155,7 +155,12 @@ class ActivityProcessor(object): # Reload json from raw stream info if session.get('raw_stream_info'): - session.update(json.loads(session['raw_stream_info'])) + raw_stream_info = json.loads(session['raw_stream_info']) + # Don't overwrite id, session_key, stopped + raw_stream_info.pop('id', None) + raw_stream_info.pop('session_key', None) + raw_stream_info.pop('stopped', None) + session.update(raw_stream_info) session = defaultdict(str, session) @@ -177,6 +182,7 @@ class ActivityProcessor(object): else: logger.debug(u"Tautulli ActivityProcessor :: ratingKey %s not logged. Does not meet logging criteria. " u"Media type is '%s'" % (session['rating_key'], session['media_type'])) + return session['id'] if str(session['paused_counter']).isdigit(): real_play_time = stopped - session['started'] - int(session['paused_counter']) @@ -284,7 +290,7 @@ class ActivityProcessor(object): query = 'UPDATE session_history SET reference_id = ? WHERE id = ? ' # If rating_key is the same in the previous session, then set the reference_id to the previous row, else set the reference_id to the new id - if prev_session == new_session == None: + if prev_session is None and new_session is None: args = [last_id, last_id] elif prev_session['rating_key'] == new_session['rating_key'] and prev_session['view_offset'] <= new_session['view_offset']: args = [prev_session['reference_id'], new_session['id']] @@ -414,8 +420,8 @@ class ActivityProcessor(object): # logger.debug(u"Tautulli ActivityProcessor :: Writing session_history_metadata transaction...") self.db.upsert(table_name='session_history_metadata', key_dict=keys, value_dict=values) - # Return true when the session is successfully written to the database - return True + # Return the session row id when the session is successfully written to the database + return session['id'] def get_sessions(self, user_id=None, ip_address=None): query = 'SELECT * FROM sessions' @@ -456,9 +462,11 @@ class ActivityProcessor(object): return None - def delete_session(self, session_key=None): + def delete_session(self, session_key=None, row_id=None): if str(session_key).isdigit(): self.db.action('DELETE FROM sessions WHERE session_key = ?', [session_key]) + elif str(row_id).isdigit(): + self.db.action('DELETE FROM sessions WHERE id = ?', [row_id]) def set_session_last_paused(self, session_key=None, timestamp=None): if str(session_key).isdigit():