mirror of
https://github.com/Tautulli/Tautulli.git
synced 2025-07-08 14:10:52 -07:00
Do not remove session from db until it is successfully written
* For activity pinger only
This commit is contained in:
parent
0569abd00d
commit
d73e379dcf
4 changed files with 82 additions and 34 deletions
|
@ -395,7 +395,7 @@ def dbcheck():
|
||||||
# sessions table :: This is a temp table that logs currently active sessions
|
# sessions table :: This is a temp table that logs currently active sessions
|
||||||
c_db.execute(
|
c_db.execute(
|
||||||
'CREATE TABLE IF NOT EXISTS sessions (id INTEGER PRIMARY KEY AUTOINCREMENT, '
|
'CREATE TABLE IF NOT EXISTS sessions (id INTEGER PRIMARY KEY AUTOINCREMENT, '
|
||||||
'session_key INTEGER, rating_key INTEGER, section_id INTEGER, media_type TEXT, started INTEGER, '
|
'session_key INTEGER, rating_key INTEGER, section_id INTEGER, media_type TEXT, started INTEGER, stopped INTEGER, '
|
||||||
'paused_counter INTEGER DEFAULT 0, state TEXT, user_id INTEGER, user TEXT, friendly_name TEXT, '
|
'paused_counter INTEGER DEFAULT 0, state TEXT, user_id INTEGER, user TEXT, friendly_name TEXT, '
|
||||||
'ip_address TEXT, machine_id TEXT, player TEXT, platform TEXT, title TEXT, parent_title TEXT, '
|
'ip_address TEXT, machine_id TEXT, player TEXT, platform TEXT, title TEXT, parent_title TEXT, '
|
||||||
'grandparent_title TEXT, parent_rating_key INTEGER, grandparent_rating_key INTEGER, '
|
'grandparent_title TEXT, parent_rating_key INTEGER, grandparent_rating_key INTEGER, '
|
||||||
|
@ -619,6 +619,15 @@ def dbcheck():
|
||||||
'ALTER TABLE sessions ADD COLUMN section_id INTEGER'
|
'ALTER TABLE sessions ADD COLUMN section_id INTEGER'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Upgrade sessions table from earlier versions
|
||||||
|
try:
|
||||||
|
c_db.execute('SELECT stopped FROM sessions')
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
logger.debug(u"Altering database. Updating database table sessions.")
|
||||||
|
c_db.execute(
|
||||||
|
'ALTER TABLE sessions ADD COLUMN stopped INTEGER'
|
||||||
|
)
|
||||||
|
|
||||||
# Upgrade session_history table from earlier versions
|
# Upgrade session_history table from earlier versions
|
||||||
try:
|
try:
|
||||||
c_db.execute('SELECT reference_id FROM session_history')
|
c_db.execute('SELECT reference_id FROM session_history')
|
||||||
|
|
|
@ -77,7 +77,8 @@ class ActivityHandler(object):
|
||||||
if not force_stop:
|
if not force_stop:
|
||||||
ap.set_session_state(session_key=self.get_session_key(),
|
ap.set_session_state(session_key=self.get_session_key(),
|
||||||
state=self.timeline['state'],
|
state=self.timeline['state'],
|
||||||
view_offset=self.timeline['viewOffset'])
|
view_offset=self.timeline['viewOffset'],
|
||||||
|
stopped=int(time.time()))
|
||||||
|
|
||||||
# Retrieve the session data from our temp table
|
# Retrieve the session data from our temp table
|
||||||
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
||||||
|
@ -91,6 +92,7 @@ class ActivityHandler(object):
|
||||||
monitor_proc.write_session_history(session=db_session)
|
monitor_proc.write_session_history(session=db_session)
|
||||||
|
|
||||||
# Remove the session from our temp session table
|
# Remove the session from our temp session table
|
||||||
|
logger.debug(u"PlexPy ActivityHandler :: Removing session %s from session queue" % str(self.get_session_key()))
|
||||||
ap.delete_session(session_key=self.get_session_key())
|
ap.delete_session(session_key=self.get_session_key())
|
||||||
|
|
||||||
def on_pause(self):
|
def on_pause(self):
|
||||||
|
|
|
@ -58,12 +58,16 @@ def check_active_sessions(ws_request=False):
|
||||||
# Here we can check the play states
|
# Here we can check the play states
|
||||||
if session['state'] != stream['state']:
|
if session['state'] != stream['state']:
|
||||||
if session['state'] == 'paused':
|
if session['state'] == 'paused':
|
||||||
|
logger.debug(u"PlexPy Monitor :: Session %s has been paused." % stream['session_key'])
|
||||||
|
|
||||||
# Push any notifications -
|
# Push any notifications -
|
||||||
# Push it on it's own thread so we don't hold up our db actions
|
# Push it on it's own thread so we don't hold up our db actions
|
||||||
threading.Thread(target=notification_handler.notify,
|
threading.Thread(target=notification_handler.notify,
|
||||||
kwargs=dict(stream_data=stream, notify_action='pause')).start()
|
kwargs=dict(stream_data=stream, notify_action='pause')).start()
|
||||||
|
|
||||||
if session['state'] == 'playing' and stream['state'] == 'paused':
|
if session['state'] == 'playing' and stream['state'] == 'paused':
|
||||||
|
logger.debug(u"PlexPy Monitor :: Session %s has been resumed." % stream['session_key'])
|
||||||
|
|
||||||
# Push any notifications -
|
# Push any notifications -
|
||||||
# Push it on it's own thread so we don't hold up our db actions
|
# Push it on it's own thread so we don't hold up our db actions
|
||||||
threading.Thread(target=notification_handler.notify,
|
threading.Thread(target=notification_handler.notify,
|
||||||
|
@ -122,8 +126,9 @@ def check_active_sessions(ws_request=False):
|
||||||
threading.Thread(target=notification_handler.notify,
|
threading.Thread(target=notification_handler.notify,
|
||||||
kwargs=dict(stream_data=stream, notify_action='buffer')).start()
|
kwargs=dict(stream_data=stream, notify_action='buffer')).start()
|
||||||
|
|
||||||
logger.debug(u"PlexPy Monitor :: Stream buffering. Count is now %s. Last triggered %s."
|
logger.debug(u"PlexPy Monitor :: Session %s is buffering. Count is now %s. Last triggered %s."
|
||||||
% (buffer_values[0]['buffer_count'],
|
% (stream['session_key'],
|
||||||
|
buffer_values[0]['buffer_count'],
|
||||||
buffer_values[0]['buffer_last_triggered']))
|
buffer_values[0]['buffer_last_triggered']))
|
||||||
|
|
||||||
# Check if the user has reached the offset in the media we defined as the "watched" percent
|
# Check if the user has reached the offset in the media we defined as the "watched" percent
|
||||||
|
@ -139,30 +144,48 @@ def check_active_sessions(ws_request=False):
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# The user has stopped playing a stream
|
# The user has stopped playing a stream
|
||||||
logger.debug(u"PlexPy Monitor :: Removing sessionKey %s ratingKey %s from session queue"
|
if stream['state'] != 'stopped':
|
||||||
% (stream['session_key'], stream['rating_key']))
|
logger.debug(u"PlexPy Monitor :: Session %s has stopped." % stream['session_key'])
|
||||||
monitor_db.action('DELETE FROM sessions WHERE session_key = ? AND rating_key = ?',
|
|
||||||
[stream['session_key'], stream['rating_key']])
|
|
||||||
|
|
||||||
# Check if the user has reached the offset in the media we defined as the "watched" percent
|
# Set the stream stop time
|
||||||
if stream['view_offset'] and stream['duration']:
|
stream['stopped'] = int(time.time())
|
||||||
if helpers.get_percent(stream['view_offset'],
|
monitor_db.action('UPDATE sessions SET stopped = ?, state = ? '
|
||||||
stream['duration']) > plexpy.CONFIG.NOTIFY_WATCHED_PERCENT:
|
'WHERE session_key = ? AND rating_key = ?',
|
||||||
# Push any notifications -
|
[stream['stopped'], 'stopped', stream['session_key'], stream['rating_key']])
|
||||||
# Push it on it's own thread so we don't hold up our db actions
|
|
||||||
threading.Thread(target=notification_handler.notify,
|
|
||||||
kwargs=dict(stream_data=stream, notify_action='watched')).start()
|
|
||||||
|
|
||||||
# Push any notifications - Push it on it's own thread so we don't hold up our db actions
|
# Check if the user has reached the offset in the media we defined as the "watched" percent
|
||||||
threading.Thread(target=notification_handler.notify,
|
if stream['view_offset'] and stream['duration']:
|
||||||
kwargs=dict(stream_data=stream, notify_action='stop')).start()
|
if helpers.get_percent(stream['view_offset'],
|
||||||
|
stream['duration']) > plexpy.CONFIG.NOTIFY_WATCHED_PERCENT:
|
||||||
|
# Push any notifications -
|
||||||
|
# Push it on it's own thread so we don't hold up our db actions
|
||||||
|
threading.Thread(target=notification_handler.notify,
|
||||||
|
kwargs=dict(stream_data=stream, notify_action='watched')).start()
|
||||||
|
|
||||||
|
# Push any notifications - Push it on it's own thread so we don't hold up our db actions
|
||||||
|
threading.Thread(target=notification_handler.notify,
|
||||||
|
kwargs=dict(stream_data=stream, notify_action='stop')).start()
|
||||||
|
|
||||||
# Write the item history on playback stop
|
# Write the item history on playback stop
|
||||||
monitor_process.write_session_history(session=stream)
|
success = monitor_process.write_session_history(session=stream)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
# If session is written to the databaase successfully, remove the session from the session table
|
||||||
|
logger.debug(u"PlexPy Monitor :: Removing sessionKey %s ratingKey %s from session queue"
|
||||||
|
% (stream['session_key'], stream['rating_key']))
|
||||||
|
monitor_db.action('DELETE FROM sessions WHERE session_key = ? AND rating_key = ?',
|
||||||
|
[stream['session_key'], stream['rating_key']])
|
||||||
|
else:
|
||||||
|
logger.warn(u"PlexPy Monitor :: Failed to write sessionKey %s ratingKey %s to the database. " \
|
||||||
|
"Will try again on the next pass." % (stream['session_key'], stream['rating_key']))
|
||||||
|
|
||||||
# Process the newly received session data
|
# Process the newly received session data
|
||||||
for session in media_container:
|
for session in media_container:
|
||||||
monitor_process.write_session(session)
|
new_session = monitor_process.write_session(session)
|
||||||
|
|
||||||
|
if new_session:
|
||||||
|
logger.debug(u"PlexPy Monitor :: Session %s has started." % session['session_key'])
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.debug(u"PlexPy Monitor :: Unable to read session list.")
|
logger.debug(u"PlexPy Monitor :: Unable to read session list.")
|
||||||
|
|
||||||
|
|
|
@ -97,6 +97,8 @@ class ActivityProcessor(object):
|
||||||
ip_address = {'ip_address': ip_address}
|
ip_address = {'ip_address': ip_address}
|
||||||
self.db.upsert('sessions', ip_address, keys)
|
self.db.upsert('sessions', ip_address, keys)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
def write_session_history(self, session=None, import_metadata=None, is_import=False, import_ignore_interval=0):
|
def write_session_history(self, session=None, import_metadata=None, is_import=False, import_ignore_interval=0):
|
||||||
from plexpy import users, libraries
|
from plexpy import users, libraries
|
||||||
|
|
||||||
|
@ -108,6 +110,10 @@ class ActivityProcessor(object):
|
||||||
library_data = libraries.Libraries()
|
library_data = libraries.Libraries()
|
||||||
library_details = library_data.get_details(section_id=section_id)
|
library_details = library_data.get_details(section_id=section_id)
|
||||||
|
|
||||||
|
# Return false if failed to retrieve user or library details
|
||||||
|
if not user_details or not library_details:
|
||||||
|
return False
|
||||||
|
|
||||||
if session:
|
if session:
|
||||||
logging_enabled = False
|
logging_enabled = False
|
||||||
|
|
||||||
|
@ -117,7 +123,7 @@ class ActivityProcessor(object):
|
||||||
else:
|
else:
|
||||||
stopped = int(time.time())
|
stopped = int(time.time())
|
||||||
else:
|
else:
|
||||||
stopped = int(time.time())
|
stopped = int(session['stopped'])
|
||||||
|
|
||||||
if plexpy.CONFIG.MOVIE_LOGGING_ENABLE and str(session['rating_key']).isdigit() and \
|
if plexpy.CONFIG.MOVIE_LOGGING_ENABLE and str(session['rating_key']).isdigit() and \
|
||||||
session['media_type'] == 'movie':
|
session['media_type'] == 'movie':
|
||||||
|
@ -167,6 +173,19 @@ class ActivityProcessor(object):
|
||||||
logger.debug(u"PlexPy ActivityProcessor :: History logging for library '%s' is disabled." % library_details['section_name'])
|
logger.debug(u"PlexPy ActivityProcessor :: History logging for library '%s' is disabled." % library_details['section_name'])
|
||||||
|
|
||||||
if logging_enabled:
|
if logging_enabled:
|
||||||
|
|
||||||
|
# Fetch metadata first so we can return false if it fails
|
||||||
|
if not is_import:
|
||||||
|
logger.debug(u"PlexPy ActivityProcessor :: Fetching metadata for item ratingKey %s" % session['rating_key'])
|
||||||
|
pms_connect = pmsconnect.PmsConnect()
|
||||||
|
result = pms_connect.get_metadata_details(rating_key=str(session['rating_key']))
|
||||||
|
if result:
|
||||||
|
metadata = result['metadata']
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
metadata = import_metadata
|
||||||
|
|
||||||
# logger.debug(u"PlexPy ActivityProcessor :: Attempting to write to session_history table...")
|
# logger.debug(u"PlexPy ActivityProcessor :: Attempting to write to session_history table...")
|
||||||
query = 'INSERT INTO session_history (started, stopped, rating_key, parent_rating_key, ' \
|
query = 'INSERT INTO session_history (started, stopped, rating_key, parent_rating_key, ' \
|
||||||
'grandparent_rating_key, media_type, user_id, user, ip_address, paused_counter, player, ' \
|
'grandparent_rating_key, media_type, user_id, user, ip_address, paused_counter, player, ' \
|
||||||
|
@ -247,14 +266,6 @@ class ActivityProcessor(object):
|
||||||
# logger.debug(u"PlexPy ActivityProcessor :: Writing session_history_media_info transaction...")
|
# logger.debug(u"PlexPy ActivityProcessor :: Writing session_history_media_info transaction...")
|
||||||
self.db.action(query=query, args=args)
|
self.db.action(query=query, args=args)
|
||||||
|
|
||||||
if not is_import:
|
|
||||||
logger.debug(u"PlexPy ActivityProcessor :: Fetching metadata for item ratingKey %s" % session['rating_key'])
|
|
||||||
pms_connect = pmsconnect.PmsConnect()
|
|
||||||
result = pms_connect.get_metadata_details(rating_key=str(session['rating_key']))
|
|
||||||
metadata = result['metadata']
|
|
||||||
else:
|
|
||||||
metadata = import_metadata
|
|
||||||
|
|
||||||
# Write the session_history_metadata table
|
# Write the session_history_metadata table
|
||||||
directors = ";".join(metadata['directors'])
|
directors = ";".join(metadata['directors'])
|
||||||
writers = ";".join(metadata['writers'])
|
writers = ";".join(metadata['writers'])
|
||||||
|
@ -290,6 +301,9 @@ class ActivityProcessor(object):
|
||||||
# logger.debug(u"PlexPy ActivityProcessor :: Writing session_history_metadata transaction...")
|
# logger.debug(u"PlexPy ActivityProcessor :: Writing session_history_metadata transaction...")
|
||||||
self.db.action(query=query, args=args)
|
self.db.action(query=query, args=args)
|
||||||
|
|
||||||
|
# Return true when the session is successfully written to the database
|
||||||
|
return True
|
||||||
|
|
||||||
def find_session_ip(self, rating_key=None, machine_id=None):
|
def find_session_ip(self, rating_key=None, machine_id=None):
|
||||||
|
|
||||||
logger.debug(u"PlexPy ActivityProcessor :: Requesting log lines...")
|
logger.debug(u"PlexPy ActivityProcessor :: Requesting log lines...")
|
||||||
|
@ -361,11 +375,11 @@ class ActivityProcessor(object):
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def set_session_state(self, session_key=None, state=None, view_offset=0):
|
def set_session_state(self, session_key=None, **kwargs):
|
||||||
if str(session_key).isdigit() and str(view_offset).isdigit():
|
if str(session_key).isdigit() and str(view_offset).isdigit():
|
||||||
values = {'view_offset': int(view_offset)}
|
values = {}
|
||||||
if state:
|
for k,v in kwargs.iteritems():
|
||||||
values['state'] = state
|
values[k] = v
|
||||||
|
|
||||||
keys = {'session_key': session_key}
|
keys = {'session_key': session_key}
|
||||||
result = self.db.upsert('sessions', values, keys)
|
result = self.db.upsert('sessions', values, keys)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue