Do not remove session from db until it is successfully written

* For activity pinger only
This commit is contained in:
JonnyWong16 2016-03-05 13:07:26 -08:00
parent 0569abd00d
commit d73e379dcf
4 changed files with 82 additions and 34 deletions

View file

@ -395,7 +395,7 @@ def dbcheck():
# sessions table :: This is a temp table that logs currently active sessions
c_db.execute(
'CREATE TABLE IF NOT EXISTS sessions (id INTEGER PRIMARY KEY AUTOINCREMENT, '
'session_key INTEGER, rating_key INTEGER, section_id INTEGER, media_type TEXT, started INTEGER, '
'session_key INTEGER, rating_key INTEGER, section_id INTEGER, media_type TEXT, started INTEGER, stopped INTEGER, '
'paused_counter INTEGER DEFAULT 0, state TEXT, user_id INTEGER, user TEXT, friendly_name TEXT, '
'ip_address TEXT, machine_id TEXT, player TEXT, platform TEXT, title TEXT, parent_title TEXT, '
'grandparent_title TEXT, parent_rating_key INTEGER, grandparent_rating_key INTEGER, '
@ -619,6 +619,15 @@ def dbcheck():
'ALTER TABLE sessions ADD COLUMN section_id INTEGER'
)
# Upgrade sessions table from earlier versions
try:
c_db.execute('SELECT stopped FROM sessions')
except sqlite3.OperationalError:
logger.debug(u"Altering database. Updating database table sessions.")
c_db.execute(
'ALTER TABLE sessions ADD COLUMN stopped INTEGER'
)
# Upgrade session_history table from earlier versions
try:
c_db.execute('SELECT reference_id FROM session_history')

View file

@ -77,7 +77,8 @@ class ActivityHandler(object):
if not force_stop:
ap.set_session_state(session_key=self.get_session_key(),
state=self.timeline['state'],
view_offset=self.timeline['viewOffset'])
view_offset=self.timeline['viewOffset'],
stopped=int(time.time()))
# Retrieve the session data from our temp table
db_session = ap.get_session_by_key(session_key=self.get_session_key())
@ -91,6 +92,7 @@ class ActivityHandler(object):
monitor_proc.write_session_history(session=db_session)
# Remove the session from our temp session table
logger.debug(u"PlexPy ActivityHandler :: Removing session %s from session queue" % str(self.get_session_key()))
ap.delete_session(session_key=self.get_session_key())
def on_pause(self):

View file

@ -58,12 +58,16 @@ def check_active_sessions(ws_request=False):
# Here we can check the play states
if session['state'] != stream['state']:
if session['state'] == 'paused':
logger.debug(u"PlexPy Monitor :: Session %s has been paused." % stream['session_key'])
# Push any notifications -
# Push it on it's own thread so we don't hold up our db actions
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=stream, notify_action='pause')).start()
if session['state'] == 'playing' and stream['state'] == 'paused':
logger.debug(u"PlexPy Monitor :: Session %s has been resumed." % stream['session_key'])
# Push any notifications -
# Push it on it's own thread so we don't hold up our db actions
threading.Thread(target=notification_handler.notify,
@ -122,8 +126,9 @@ def check_active_sessions(ws_request=False):
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=stream, notify_action='buffer')).start()
logger.debug(u"PlexPy Monitor :: Stream buffering. Count is now %s. Last triggered %s."
% (buffer_values[0]['buffer_count'],
logger.debug(u"PlexPy Monitor :: Session %s is buffering. Count is now %s. Last triggered %s."
% (stream['session_key'],
buffer_values[0]['buffer_count'],
buffer_values[0]['buffer_last_triggered']))
# Check if the user has reached the offset in the media we defined as the "watched" percent
@ -139,10 +144,14 @@ def check_active_sessions(ws_request=False):
else:
# The user has stopped playing a stream
logger.debug(u"PlexPy Monitor :: Removing sessionKey %s ratingKey %s from session queue"
% (stream['session_key'], stream['rating_key']))
monitor_db.action('DELETE FROM sessions WHERE session_key = ? AND rating_key = ?',
[stream['session_key'], stream['rating_key']])
if stream['state'] != 'stopped':
logger.debug(u"PlexPy Monitor :: Session %s has stopped." % stream['session_key'])
# Set the stream stop time
stream['stopped'] = int(time.time())
monitor_db.action('UPDATE sessions SET stopped = ?, state = ? '
'WHERE session_key = ? AND rating_key = ?',
[stream['stopped'], 'stopped', stream['session_key'], stream['rating_key']])
# Check if the user has reached the offset in the media we defined as the "watched" percent
if stream['view_offset'] and stream['duration']:
@ -158,11 +167,25 @@ def check_active_sessions(ws_request=False):
kwargs=dict(stream_data=stream, notify_action='stop')).start()
# Write the item history on playback stop
monitor_process.write_session_history(session=stream)
success = monitor_process.write_session_history(session=stream)
if success:
# If session is written to the databaase successfully, remove the session from the session table
logger.debug(u"PlexPy Monitor :: Removing sessionKey %s ratingKey %s from session queue"
% (stream['session_key'], stream['rating_key']))
monitor_db.action('DELETE FROM sessions WHERE session_key = ? AND rating_key = ?',
[stream['session_key'], stream['rating_key']])
else:
logger.warn(u"PlexPy Monitor :: Failed to write sessionKey %s ratingKey %s to the database. " \
"Will try again on the next pass." % (stream['session_key'], stream['rating_key']))
# Process the newly received session data
for session in media_container:
monitor_process.write_session(session)
new_session = monitor_process.write_session(session)
if new_session:
logger.debug(u"PlexPy Monitor :: Session %s has started." % session['session_key'])
else:
logger.debug(u"PlexPy Monitor :: Unable to read session list.")

View file

@ -97,6 +97,8 @@ class ActivityProcessor(object):
ip_address = {'ip_address': ip_address}
self.db.upsert('sessions', ip_address, keys)
return True
def write_session_history(self, session=None, import_metadata=None, is_import=False, import_ignore_interval=0):
from plexpy import users, libraries
@ -108,6 +110,10 @@ class ActivityProcessor(object):
library_data = libraries.Libraries()
library_details = library_data.get_details(section_id=section_id)
# Return false if failed to retrieve user or library details
if not user_details or not library_details:
return False
if session:
logging_enabled = False
@ -117,7 +123,7 @@ class ActivityProcessor(object):
else:
stopped = int(time.time())
else:
stopped = int(time.time())
stopped = int(session['stopped'])
if plexpy.CONFIG.MOVIE_LOGGING_ENABLE and str(session['rating_key']).isdigit() and \
session['media_type'] == 'movie':
@ -167,6 +173,19 @@ class ActivityProcessor(object):
logger.debug(u"PlexPy ActivityProcessor :: History logging for library '%s' is disabled." % library_details['section_name'])
if logging_enabled:
# Fetch metadata first so we can return false if it fails
if not is_import:
logger.debug(u"PlexPy ActivityProcessor :: Fetching metadata for item ratingKey %s" % session['rating_key'])
pms_connect = pmsconnect.PmsConnect()
result = pms_connect.get_metadata_details(rating_key=str(session['rating_key']))
if result:
metadata = result['metadata']
else:
return False
else:
metadata = import_metadata
# logger.debug(u"PlexPy ActivityProcessor :: Attempting to write to session_history table...")
query = 'INSERT INTO session_history (started, stopped, rating_key, parent_rating_key, ' \
'grandparent_rating_key, media_type, user_id, user, ip_address, paused_counter, player, ' \
@ -247,14 +266,6 @@ class ActivityProcessor(object):
# logger.debug(u"PlexPy ActivityProcessor :: Writing session_history_media_info transaction...")
self.db.action(query=query, args=args)
if not is_import:
logger.debug(u"PlexPy ActivityProcessor :: Fetching metadata for item ratingKey %s" % session['rating_key'])
pms_connect = pmsconnect.PmsConnect()
result = pms_connect.get_metadata_details(rating_key=str(session['rating_key']))
metadata = result['metadata']
else:
metadata = import_metadata
# Write the session_history_metadata table
directors = ";".join(metadata['directors'])
writers = ";".join(metadata['writers'])
@ -290,6 +301,9 @@ class ActivityProcessor(object):
# logger.debug(u"PlexPy ActivityProcessor :: Writing session_history_metadata transaction...")
self.db.action(query=query, args=args)
# Return true when the session is successfully written to the database
return True
def find_session_ip(self, rating_key=None, machine_id=None):
logger.debug(u"PlexPy ActivityProcessor :: Requesting log lines...")
@ -361,11 +375,11 @@ class ActivityProcessor(object):
return None
def set_session_state(self, session_key=None, state=None, view_offset=0):
def set_session_state(self, session_key=None, **kwargs):
if str(session_key).isdigit() and str(view_offset).isdigit():
values = {'view_offset': int(view_offset)}
if state:
values['state'] = state
values = {}
for k,v in kwargs.iteritems():
values[k] = v
keys = {'session_key': session_key}
result = self.db.upsert('sessions', values, keys)