Refactor activity handler

This commit is contained in:
JonnyWong16 2023-02-16 17:02:30 -08:00
commit 599c54c9e1
No known key found for this signature in database
GPG key ID: B1F1F9807184697A

View file

@ -53,38 +53,33 @@ class ActivityHandler(object):
def __init__(self, timeline): def __init__(self, timeline):
self.ap = activity_processor.ActivityProcessor() self.ap = activity_processor.ActivityProcessor()
self.timeline = timeline self.timeline = timeline
self.session_key = None
self.rating_key = None
self.is_valid_session = ('sessionKey' in self.timeline and str(self.timeline['sessionKey']).isdigit())
if self.is_valid_session:
self.session_key = int(self.timeline['sessionKey'])
self.rating_key = str(self.timeline['ratingKey'])
self.key = self.timeline.get('key')
self.state = self.timeline.get('state')
self.view_offset = self.timeline.get('viewOffset')
self.transcode_key = self.timeline.get('transcodeSession', '')
self.db_session = None self.db_session = None
self.session = None self.session = None
self.metadata = None self.metadata = None
def is_valid_session(self):
if 'sessionKey' in self.timeline:
if str(self.timeline['sessionKey']).isdigit():
return True
return False
def get_session_key(self):
if self.is_valid_session():
return int(self.timeline['sessionKey'])
return None
def get_rating_key(self):
if self.is_valid_session():
return self.timeline['ratingKey']
return None
def get_db_session(self): def get_db_session(self):
# Retrieve the session data from our temp table # Retrieve the session data from our temp table
self.db_session = self.ap.get_session_by_key(session_key=self.get_session_key()) self.db_session = self.ap.get_session_by_key(session_key=self.session_key)
def get_metadata(self, skip_cache=False): def get_metadata(self, skip_cache=False):
if self.metadata is None: if self.metadata is None:
cache_key = None if skip_cache else self.get_session_key() cache_key = None if skip_cache else self.session_key
pms_connect = pmsconnect.PmsConnect() pms_connect = pmsconnect.PmsConnect()
metadata = pms_connect.get_metadata_details(rating_key=self.get_rating_key(), cache_key=cache_key) metadata = pms_connect.get_metadata_details(rating_key=self.rating_key, cache_key=cache_key)
if metadata: if metadata:
self.metadata = metadata self.metadata = metadata
@ -95,12 +90,12 @@ class ActivityHandler(object):
if session_list: if session_list:
for session in session_list['sessions']: for session in session_list['sessions']:
if int(session['session_key']) == self.get_session_key(): if int(session['session_key']) == self.session_key:
# Live sessions don't have rating keys in sessions # Live sessions don't have rating keys in sessions
# Get it from the websocket data # Get it from the websocket data
if not session['rating_key']: if not session['rating_key']:
session['rating_key'] = self.get_rating_key() session['rating_key'] = self.rating_key
session['rating_key_websocket'] = self.get_rating_key() session['rating_key_websocket'] = self.rating_key
self.session = session self.session = session
return session return session
@ -116,9 +111,9 @@ class ActivityHandler(object):
self.get_db_session() self.get_db_session()
def set_session_state(self): def set_session_state(self):
self.ap.set_session_state(session_key=self.get_session_key(), self.ap.set_session_state(session_key=self.session_key,
state=self.timeline['state'], state=self.state,
view_offset=self.timeline['viewOffset'], view_offset=self.view_offset,
stopped=helpers.timestamp()) stopped=helpers.timestamp())
def put_notification(self, notify_action, **kwargs): def put_notification(self, notify_action, **kwargs):
@ -148,19 +143,19 @@ class ActivityHandler(object):
self.update_db_session(notify=True) self.update_db_session(notify=True)
# Schedule a callback to force stop a stale stream 5 minutes later # Schedule a callback to force stop a stale stream 5 minutes later
schedule_callback('session_key-{}'.format(self.get_session_key()), schedule_callback('session_key-{}'.format(self.session_key),
func=force_stop_stream, func=force_stop_stream,
args=[self.get_session_key(), self.session['full_title'], self.session['username']], args=[self.session_key, self.session['full_title'], self.session['username']],
minutes=5) minutes=5)
self.check_markers() self.check_markers()
def on_stop(self, force_stop=False): def on_stop(self, force_stop=False):
logger.debug("Tautulli ActivityHandler :: Session %s %sstopped." logger.debug("Tautulli ActivityHandler :: Session %s %sstopped."
% (str(self.get_session_key()), 'force ' if force_stop else '')) % (str(self.session_key), 'force ' if force_stop else ''))
# Set the session last_paused timestamp # Set the session last_paused timestamp
self.ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=None) self.ap.set_session_last_paused(session_key=self.session_key, timestamp=None)
# Update the session state and viewOffset # Update the session state and viewOffset
# Set force_stop to true to disable the state set # Set force_stop to true to disable the state set
@ -173,25 +168,25 @@ class ActivityHandler(object):
if row_id: if row_id:
self.put_notification('on_stop') self.put_notification('on_stop')
schedule_callback('session_key-{}'.format(self.get_session_key()), remove_job=True) schedule_callback('session_key-{}'.format(self.session_key), remove_job=True)
# Remove the session from our temp session table # Remove the session from our temp session table
logger.debug("Tautulli ActivityHandler :: Removing sessionKey %s ratingKey %s from session queue" logger.debug("Tautulli ActivityHandler :: Removing sessionKey %s ratingKey %s from session queue"
% (str(self.get_session_key()), str(self.get_rating_key()))) % (str(self.session_key), str(self.rating_key)))
self.ap.delete_session(row_id=row_id) self.ap.delete_session(row_id=row_id)
delete_metadata_cache(self.get_session_key()) delete_metadata_cache(self.session_key)
else: else:
schedule_callback('session_key-{}'.format(self.get_session_key()), schedule_callback('session_key-{}'.format(self.session_key),
func=force_stop_stream, func=force_stop_stream,
args=[self.get_session_key(), self.db_session['full_title'], self.db_session['user']], args=[self.session_key, self.db_session['full_title'], self.db_session['user']],
seconds=30) seconds=30)
def on_pause(self, still_paused=False): def on_pause(self, still_paused=False):
if not still_paused: if not still_paused:
logger.debug("Tautulli ActivityHandler :: Session %s paused." % str(self.get_session_key())) logger.debug("Tautulli ActivityHandler :: Session %s paused." % str(self.session_key))
# Set the session last_paused timestamp # Set the session last_paused timestamp
self.ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=helpers.timestamp()) self.ap.set_session_last_paused(session_key=self.session_key, timestamp=helpers.timestamp())
self.update_db_session() self.update_db_session()
@ -199,52 +194,52 @@ class ActivityHandler(object):
self.put_notification('on_pause') self.put_notification('on_pause')
def on_resume(self): def on_resume(self):
logger.debug("Tautulli ActivityHandler :: Session %s resumed." % str(self.get_session_key())) logger.debug("Tautulli ActivityHandler :: Session %s resumed." % str(self.session_key))
# Set the session last_paused timestamp # Set the session last_paused timestamp
self.ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=None) self.ap.set_session_last_paused(session_key=self.session_key, timestamp=None)
self.update_db_session() self.update_db_session()
self.put_notification('on_resume') self.put_notification('on_resume')
def on_buffer(self): def on_buffer(self):
logger.debug("Tautulli ActivityHandler :: Session %s is buffering." % self.get_session_key()) logger.debug("Tautulli ActivityHandler :: Session %s is buffering." % self.session_key)
# Increment our buffer count # Increment our buffer count
self.ap.increment_session_buffer_count(session_key=self.get_session_key()) self.ap.increment_session_buffer_count(session_key=self.session_key)
# Get our current buffer count # Get our current buffer count
current_buffer_count = self.ap.get_session_buffer_count(self.get_session_key()) current_buffer_count = self.ap.get_session_buffer_count(self.session_key)
logger.debug("Tautulli ActivityHandler :: Session %s buffer count is %s." % logger.debug("Tautulli ActivityHandler :: Session %s buffer count is %s." %
(self.get_session_key(), current_buffer_count)) (self.session_key, current_buffer_count))
# Get our last triggered time # Get our last triggered time
buffer_last_triggered = self.ap.get_session_buffer_trigger_time(self.get_session_key()) buffer_last_triggered = self.ap.get_session_buffer_trigger_time(self.session_key)
self.update_db_session() self.update_db_session()
time_since_last_trigger = 0 time_since_last_trigger = 0
if buffer_last_triggered: if buffer_last_triggered:
logger.debug("Tautulli ActivityHandler :: Session %s buffer last triggered at %s." % logger.debug("Tautulli ActivityHandler :: Session %s buffer last triggered at %s." %
(self.get_session_key(), buffer_last_triggered)) (self.session_key, buffer_last_triggered))
time_since_last_trigger = helpers.timestamp() - int(buffer_last_triggered) time_since_last_trigger = helpers.timestamp() - int(buffer_last_triggered)
if current_buffer_count >= plexpy.CONFIG.BUFFER_THRESHOLD and time_since_last_trigger == 0 or \ if current_buffer_count >= plexpy.CONFIG.BUFFER_THRESHOLD and time_since_last_trigger == 0 or \
time_since_last_trigger >= plexpy.CONFIG.BUFFER_WAIT: time_since_last_trigger >= plexpy.CONFIG.BUFFER_WAIT:
self.ap.set_session_buffer_trigger_time(session_key=self.get_session_key()) self.ap.set_session_buffer_trigger_time(session_key=self.session_key)
self.put_notification('on_buffer') self.put_notification('on_buffer')
def on_error(self): def on_error(self):
logger.debug("Tautulli ActivityHandler :: Session %s encountered an error." % str(self.get_session_key())) logger.debug("Tautulli ActivityHandler :: Session %s encountered an error." % str(self.session_key))
self.update_db_session() self.update_db_session()
self.put_notification('on_error') self.put_notification('on_error')
def on_change(self): def on_change(self):
logger.debug("Tautulli ActivityHandler :: Session %s has changed transcode decision." % str(self.get_session_key())) logger.debug("Tautulli ActivityHandler :: Session %s has changed transcode decision." % str(self.session_key))
self.update_db_session() self.update_db_session()
@ -252,17 +247,17 @@ class ActivityHandler(object):
def on_intro(self, marker): def on_intro(self, marker):
if self.get_live_session(): if self.get_live_session():
logger.debug("Tautulli ActivityHandler :: Session %s intro marker reached." % str(self.get_session_key())) logger.debug("Tautulli ActivityHandler :: Session %s intro marker reached." % str(self.session_key))
self.put_notification('on_intro', marker=marker) self.put_notification('on_intro', marker=marker)
def on_credits(self, marker): def on_credits(self, marker):
if self.get_live_session(): if self.get_live_session():
logger.debug("Tautulli ActivityHandler :: Session %s credits marker reached." % str(self.get_session_key())) logger.debug("Tautulli ActivityHandler :: Session %s credits marker reached." % str(self.session_key))
self.put_notification('on_credits', marker=marker) self.put_notification('on_credits', marker=marker)
def on_watched(self): def on_watched(self):
logger.debug("Tautulli ActivityHandler :: Session %s watched." % str(self.get_session_key())) logger.debug("Tautulli ActivityHandler :: Session %s watched." % str(self.session_key))
watched_notifiers = notification_handler.get_notify_state_enabled( watched_notifiers = notification_handler.get_notify_state_enabled(
session=self.db_session, notify_action='on_watched', notified=False) session=self.db_session, notify_action='on_watched', notified=False)
@ -272,88 +267,85 @@ class ActivityHandler(object):
# This function receives events from our websocket connection # This function receives events from our websocket connection
def process(self): def process(self):
if self.is_valid_session(): if not self.is_valid_session:
self.get_db_session() return
this_state = self.timeline['state'] self.get_db_session()
this_rating_key = str(self.timeline['ratingKey'])
this_key = self.timeline['key']
this_transcode_key = self.timeline.get('transcodeSession', '')
# Get the live tv session uuid if not self.db_session:
this_live_uuid = this_key.split('/')[-1] if this_key.startswith('/livetv/sessions') else None # We don't have this session in our table yet, start a new one.
if self.state != 'buffering':
self.on_start()
return
# If we already have this session in the temp table, check for state changes # If we already have this session in the temp table, check for state changes
if self.db_session: # Re-schedule the callback to reset the 5 minutes timer
# Re-schedule the callback to reset the 5 minutes timer schedule_callback('session_key-{}'.format(self.session_key),
schedule_callback('session_key-{}'.format(self.get_session_key()), func=force_stop_stream,
func=force_stop_stream, args=[self.session_key, self.db_session['full_title'], self.db_session['user']],
args=[self.get_session_key(), self.db_session['full_title'], self.db_session['user']], minutes=5)
minutes=5)
last_state = self.db_session['state'] last_state = self.db_session['state']
last_rating_key = str(self.db_session['rating_key']) last_rating_key = str(self.db_session['rating_key'])
last_live_uuid = self.db_session['live_uuid'] last_live_uuid = self.db_session['live_uuid']
last_transcode_key = self.db_session['transcode_key'].split('/')[-1] last_transcode_key = self.db_session['transcode_key'].split('/')[-1]
last_paused = self.db_session['last_paused'] last_paused = self.db_session['last_paused']
last_rating_key_websocket = self.db_session['rating_key_websocket'] last_rating_key_websocket = self.db_session['rating_key_websocket']
last_guid = self.db_session['guid'] last_guid = self.db_session['guid']
this_guid = last_guid # Get the live tv session uuid
# Check guid for live TV metadata every 60 seconds this_live_uuid = self.key.split('/')[-1] if self.key.startswith('/livetv/sessions') else None
if self.db_session['live'] and helpers.timestamp() - self.db_session['stopped'] > 60:
self.get_metadata(skip_cache=True)
if self.metadata:
this_guid = self.metadata['guid']
# Make sure the same item is being played this_guid = last_guid
if (this_rating_key == last_rating_key # Check guid for live TV metadata every 60 seconds
or this_rating_key == last_rating_key_websocket if self.db_session['live'] and helpers.timestamp() - self.db_session['stopped'] > 60:
or this_live_uuid == last_live_uuid) \ self.get_metadata(skip_cache=True)
and this_guid == last_guid: if self.metadata:
# Update the session state and viewOffset this_guid = self.metadata['guid']
if this_state == 'playing':
# Update the session in our temp session table
# if the last set temporary stopped time exceeds 60 seconds
if helpers.timestamp() - self.db_session['stopped'] > 60:
self.update_db_session()
# Start our state checks # Make sure the same item is being played
if this_state != last_state: if (self.rating_key == last_rating_key
if this_state == 'paused': or self.rating_key == last_rating_key_websocket
self.on_pause() or this_live_uuid == last_live_uuid) \
elif last_paused and this_state == 'playing': and this_guid == last_guid:
self.on_resume() # Update the session state and viewOffset
elif this_state == 'stopped': if self.state == 'playing':
self.on_stop() # Update the session in our temp session table
elif this_state == 'error': # if the last set temporary stopped time exceeds 60 seconds
self.on_error() if helpers.timestamp() - self.db_session['stopped'] > 60:
self.update_db_session()
elif this_state == 'paused': # Start our state checks
# Update the session last_paused timestamp if self.state != last_state:
self.on_pause(still_paused=True) if self.state == 'paused':
self.on_pause()
elif last_paused and self.state == 'playing':
self.on_resume()
elif self.state == 'stopped':
self.on_stop()
elif self.state == 'error':
self.on_error()
if this_state == 'buffering': elif self.state == 'paused':
self.on_buffer() # Update the session last_paused timestamp
self.on_pause(still_paused=True)
if this_transcode_key != last_transcode_key and this_state != 'stopped': if self.state == 'buffering':
self.on_change() self.on_buffer()
# If a client doesn't register stop events (I'm looking at you PHT!) check if the ratingKey has changed if self.transcode_key != last_transcode_key and self.state != 'stopped':
else: self.on_change()
# Manually stop and start
# Set force_stop so that we don't overwrite our last viewOffset
self.on_stop(force_stop=True)
self.on_start()
# Check for stream offset notifications # If a client doesn't register stop events (I'm looking at you PHT!) check if the ratingKey has changed
self.check_markers() else:
self.check_watched() # Manually stop and start
# Set force_stop so that we don't overwrite our last viewOffset
self.on_stop(force_stop=True)
self.on_start()
else: # Check for stream offset notifications
# We don't have this session in our table yet, start a new one. self.check_markers()
if this_state != 'buffering': self.check_watched()
self.on_start()
def check_markers(self): def check_markers(self):
# Monitor if the stream has reached the intro or credit marker offsets # Monitor if the stream has reached the intro or credit marker offsets
@ -364,20 +356,20 @@ class ActivityHandler(object):
for marker_idx, marker in enumerate(self.metadata['markers'], start=1): for marker_idx, marker in enumerate(self.metadata['markers'], start=1):
# Websocket events only fire every 10 seconds # Websocket events only fire every 10 seconds
# Check if the marker is within 10 seconds of the current viewOffset # Check if the marker is within 10 seconds of the current viewOffset
if marker['start_time_offset'] - 10000 <= self.timeline['viewOffset'] <= marker['end_time_offset']: if marker['start_time_offset'] - 10000 <= self.view_offset <= marker['end_time_offset']:
marker_flag = True marker_flag = True
if self.db_session['marker'] != marker_idx: if self.db_session['marker'] != marker_idx:
self.ap.set_marker(session_key=self.get_session_key(), marker_idx=marker_idx, marker_type=marker['type']) self.ap.set_marker(session_key=self.session_key, marker_idx=marker_idx, marker_type=marker['type'])
callback_func = getattr(self, 'on_{}'.format(marker['type'])) callback_func = getattr(self, 'on_{}'.format(marker['type']))
if self.timeline['viewOffset'] < marker['start_time_offset']: if self.view_offset < marker['start_time_offset']:
# Schedule a callback for the exact offset of the marker # Schedule a callback for the exact offset of the marker
schedule_callback( schedule_callback(
'session_key-{}-marker-{}'.format(self.get_session_key(), marker_idx), 'session_key-{}-marker-{}'.format(self.session_key, marker_idx),
func=callback_func, func=callback_func,
args=[marker], args=[marker],
milliseconds=marker['start_time_offset'] - self.timeline['viewOffset'] milliseconds=marker['start_time_offset'] - self.view_offset
) )
else: else:
callback_func(marker) callback_func(marker)
@ -385,7 +377,7 @@ class ActivityHandler(object):
break break
if not marker_flag: if not marker_flag:
self.ap.set_marker(session_key=self.get_session_key(), marker_idx=0) self.ap.set_marker(session_key=self.session_key, marker_idx=0)
def check_watched(self): def check_watched(self):
# Monitor if the stream has reached the watch percentage for notifications # Monitor if the stream has reached the watch percentage for notifications
@ -399,7 +391,7 @@ class ActivityHandler(object):
} }
if progress_percent >= watched_percent.get(self.db_session['media_type'], 101): if progress_percent >= watched_percent.get(self.db_session['media_type'], 101):
self.ap.set_watched(session_key=self.get_session_key()) self.ap.set_watched(session_key=self.session_key)
self.on_watched() self.on_watched()
@ -408,121 +400,106 @@ class TimelineHandler(object):
def __init__(self, timeline): def __init__(self, timeline):
self.timeline = timeline self.timeline = timeline
def is_item(self): self.rating_key = None
if 'itemID' in self.timeline:
return True
return False self.is_item = ('itemID' in self.timeline)
if self.is_item:
self.rating_key = int(self.timeline['itemID'])
def get_rating_key(self): self.parent_rating_key = helpers.cast_to_int(self.timeline.get('parentItemID')) or None
if self.is_item(): self.grandparent_rating_key = helpers.cast_to_int(self.timeline.get('rootItemID')) or None
return int(self.timeline['itemID']) self.identifier = self.timeline.get('identifier')
self.state_type = self.timeline.get('state')
return None self.media_type = common.MEDIA_TYPE_VALUES.get(self.timeline.get('type'))
self.section_id = helpers.cast_to_int(self.timeline.get('sectionID', 0))
def get_metadata(self): self.title = self.timeline.get('title', 'Unknown')
pms_connect = pmsconnect.PmsConnect() self.metadata_state = self.timeline.get('metadataState')
metadata = pms_connect.get_metadata_details(self.get_rating_key()) self.media_state = self.timeline.get('mediaState')
self.queue_size = self.timeline.get('queueSize')
if metadata:
return metadata
return None
# This function receives events from our websocket connection # This function receives events from our websocket connection
def process(self): def process(self):
if self.is_item(): if not self.is_item:
global RECENTLY_ADDED_QUEUE return
rating_key = self.get_rating_key() # Return if it is not a library event (i.e. DVR EPG event)
parent_rating_key = helpers.cast_to_int(self.timeline.get('parentItemID')) or None if self.identifier != 'com.plexapp.plugins.library':
grandparent_rating_key = helpers.cast_to_int(self.timeline.get('rootItemID')) or None return
identifier = self.timeline.get('identifier') global RECENTLY_ADDED_QUEUE
state_type = self.timeline.get('state')
media_type = common.MEDIA_TYPE_VALUES.get(self.timeline.get('type'))
section_id = helpers.cast_to_int(self.timeline.get('sectionID', 0))
title = self.timeline.get('title', 'Unknown')
metadata_state = self.timeline.get('metadataState')
media_state = self.timeline.get('mediaState')
queue_size = self.timeline.get('queueSize')
# Return if it is not a library event (i.e. DVR EPG event) # Add a new media item to the recently added queue
if identifier != 'com.plexapp.plugins.library': if self.media_type and self.section_id > 0 and self.state_type == 0 and self.metadata_state == 'created':
return
# Add a new media item to the recently added queue if self.media_type in ('episode', 'track'):
if media_type and section_id > 0 and state_type == 0 and metadata_state == 'created': grandparent_set = RECENTLY_ADDED_QUEUE.get(self.grandparent_rating_key, set())
grandparent_set.add(self.parent_rating_key)
RECENTLY_ADDED_QUEUE[self.grandparent_rating_key] = grandparent_set
if media_type in ('episode', 'track'): parent_set = RECENTLY_ADDED_QUEUE.get(self.parent_rating_key, set())
grandparent_set = RECENTLY_ADDED_QUEUE.get(grandparent_rating_key, set()) parent_set.add(self.rating_key)
grandparent_set.add(parent_rating_key) RECENTLY_ADDED_QUEUE[self.parent_rating_key] = parent_set
RECENTLY_ADDED_QUEUE[grandparent_rating_key] = grandparent_set
parent_set = RECENTLY_ADDED_QUEUE.get(parent_rating_key, set()) RECENTLY_ADDED_QUEUE[self.rating_key] = {self.grandparent_rating_key}
parent_set.add(rating_key)
RECENTLY_ADDED_QUEUE[parent_rating_key] = parent_set
RECENTLY_ADDED_QUEUE[rating_key] = {grandparent_rating_key} logger.debug("Tautulli TimelineHandler :: Library item '%s' (%s, grandparent %s) "
"added to recently added queue."
% (self.title, str(self.rating_key), str(self.grandparent_rating_key)))
logger.debug("Tautulli TimelineHandler :: Library item '%s' (%s, grandparent %s) " # Schedule a callback to clear the recently added queue
"added to recently added queue." schedule_callback('rating_key-{}'.format(self.grandparent_rating_key),
% (title, str(rating_key), str(grandparent_rating_key))) func=clear_recently_added_queue,
args=[self.grandparent_rating_key, self.title],
seconds=plexpy.CONFIG.NOTIFY_RECENTLY_ADDED_DELAY)
# Schedule a callback to clear the recently added queue elif self.media_type in ('season', 'album'):
schedule_callback('rating_key-{}'.format(grandparent_rating_key), parent_set = RECENTLY_ADDED_QUEUE.get(self.parent_rating_key, set())
func=clear_recently_added_queue, parent_set.add(self.rating_key)
args=[grandparent_rating_key, title], RECENTLY_ADDED_QUEUE[self.parent_rating_key] = parent_set
seconds=plexpy.CONFIG.NOTIFY_RECENTLY_ADDED_DELAY)
elif media_type in ('season', 'album'): logger.debug("Tautulli TimelineHandler :: Library item '%s' (%s , parent %s) "
parent_set = RECENTLY_ADDED_QUEUE.get(parent_rating_key, set()) "added to recently added queue."
parent_set.add(rating_key) % (self.title, str(self.rating_key), str(self.parent_rating_key)))
RECENTLY_ADDED_QUEUE[parent_rating_key] = parent_set
logger.debug("Tautulli TimelineHandler :: Library item '%s' (%s , parent %s) " # Schedule a callback to clear the recently added queue
"added to recently added queue." schedule_callback('rating_key-{}'.format(self.parent_rating_key),
% (title, str(rating_key), str(parent_rating_key))) func=clear_recently_added_queue,
args=[self.parent_rating_key, self.title],
seconds=plexpy.CONFIG.NOTIFY_RECENTLY_ADDED_DELAY)
# Schedule a callback to clear the recently added queue elif self.media_type in ('movie', 'show', 'artist'):
schedule_callback('rating_key-{}'.format(parent_rating_key), queue_set = RECENTLY_ADDED_QUEUE.get(self.rating_key, set())
func=clear_recently_added_queue, RECENTLY_ADDED_QUEUE[self.rating_key] = queue_set
args=[parent_rating_key, title],
seconds=plexpy.CONFIG.NOTIFY_RECENTLY_ADDED_DELAY)
elif media_type in ('movie', 'show', 'artist'):
queue_set = RECENTLY_ADDED_QUEUE.get(rating_key, set())
RECENTLY_ADDED_QUEUE[rating_key] = queue_set
logger.debug("Tautulli TimelineHandler :: Library item '%s' (%s) "
"added to recently added queue."
% (title, str(rating_key)))
# Schedule a callback to clear the recently added queue
schedule_callback('rating_key-{}'.format(rating_key),
func=clear_recently_added_queue,
args=[rating_key, title],
seconds=plexpy.CONFIG.NOTIFY_RECENTLY_ADDED_DELAY)
# A movie, show, or artist is done processing
elif media_type in ('movie', 'show', 'artist') and section_id > 0 and \
state_type == 5 and metadata_state is None and queue_size is None and \
rating_key in RECENTLY_ADDED_QUEUE:
logger.debug("Tautulli TimelineHandler :: Library item '%s' (%s) " logger.debug("Tautulli TimelineHandler :: Library item '%s' (%s) "
"done processing metadata." "added to recently added queue."
% (title, str(rating_key))) % (self.title, str(self.rating_key)))
# An item was deleted, make sure it is removed from the queue # Schedule a callback to clear the recently added queue
elif state_type == 9 and metadata_state == 'deleted': schedule_callback('rating_key-{}'.format(self.rating_key),
if rating_key in RECENTLY_ADDED_QUEUE and not RECENTLY_ADDED_QUEUE[rating_key]: func=clear_recently_added_queue,
logger.debug("Tautulli TimelineHandler :: Library item %s " args=[self.rating_key, self.title],
"removed from recently added queue." seconds=plexpy.CONFIG.NOTIFY_RECENTLY_ADDED_DELAY)
% str(rating_key))
del_keys(rating_key)
# Remove the callback if the item is removed # A movie, show, or artist is done processing
schedule_callback('rating_key-{}'.format(rating_key), remove_job=True) elif self.media_type in ('movie', 'show', 'artist') and self.section_id > 0 and \
self.state_type == 5 and self.metadata_state is None and self.queue_size is None and \
self.rating_key in RECENTLY_ADDED_QUEUE:
logger.debug("Tautulli TimelineHandler :: Library item '%s' (%s) "
"done processing metadata."
% (self.title, str(self.rating_key)))
# An item was deleted, make sure it is removed from the queue
elif self.state_type == 9 and self.metadata_state == 'deleted':
if self.rating_key in RECENTLY_ADDED_QUEUE and not RECENTLY_ADDED_QUEUE[self.rating_key]:
logger.debug("Tautulli TimelineHandler :: Library item %s "
"removed from recently added queue."
% str(self.rating_key))
del_keys(self.rating_key)
# Remove the callback if the item is removed
schedule_callback('rating_key-{}'.format(self.rating_key), remove_job=True)
class ReachabilityHandler(object): class ReachabilityHandler(object):
@ -530,10 +507,7 @@ class ReachabilityHandler(object):
def __init__(self, data): def __init__(self, data):
self.data = data self.data = data
def is_reachable(self): self.is_reachable = self.data.get('reachable', False)
if 'reachability' in self.data:
return self.data['reachability']
return False
def remote_access_enabled(self): def remote_access_enabled(self):
pms_connect = pmsconnect.PmsConnect() pms_connect = pmsconnect.PmsConnect()
@ -552,42 +526,44 @@ class ReachabilityHandler(object):
return return
# Do nothing if remote access is still up and hasn't changed # Do nothing if remote access is still up and hasn't changed
if self.is_reachable() and plexpy.PLEX_REMOTE_ACCESS_UP: if self.is_reachable and plexpy.PLEX_REMOTE_ACCESS_UP:
return return
pms_connect = pmsconnect.PmsConnect() pms_connect = pmsconnect.PmsConnect()
server_response = pms_connect.get_server_response() server_response = pms_connect.get_server_response()
if server_response: if not server_response:
# Waiting for port mapping return
if server_response['mapping_state'] == 'waiting':
logger.warn("Tautulli ReachabilityHandler :: Remote access waiting for port mapping.")
elif plexpy.PLEX_REMOTE_ACCESS_UP is not False and server_response['reason']: # Waiting for port mapping
logger.warn("Tautulli ReachabilityHandler :: Remote access failed: %s" % server_response['reason']) if server_response['mapping_state'] == 'waiting':
logger.info("Tautulli ReachabilityHandler :: Plex remote access is down.") logger.warn("Tautulli ReachabilityHandler :: Remote access waiting for port mapping.")
plexpy.PLEX_REMOTE_ACCESS_UP = False elif plexpy.PLEX_REMOTE_ACCESS_UP is not False and server_response['reason']:
logger.warn("Tautulli ReachabilityHandler :: Remote access failed: %s" % server_response['reason'])
logger.info("Tautulli ReachabilityHandler :: Plex remote access is down.")
if not ACTIVITY_SCHED.get_job('on_extdown'): plexpy.PLEX_REMOTE_ACCESS_UP = False
logger.debug("Tautulli ReachabilityHandler :: Scheduling remote access down callback in %d seconds.",
plexpy.CONFIG.NOTIFY_REMOTE_ACCESS_THRESHOLD)
schedule_callback('on_extdown', func=self.on_extdown, args=[server_response],
seconds=plexpy.CONFIG.NOTIFY_REMOTE_ACCESS_THRESHOLD)
elif plexpy.PLEX_REMOTE_ACCESS_UP is False and not server_response['reason']: if not ACTIVITY_SCHED.get_job('on_extdown'):
logger.info("Tautulli ReachabilityHandler :: Plex remote access is back up.") logger.debug("Tautulli ReachabilityHandler :: Scheduling remote access down callback in %d seconds.",
plexpy.CONFIG.NOTIFY_REMOTE_ACCESS_THRESHOLD)
schedule_callback('on_extdown', func=self.on_extdown, args=[server_response],
seconds=plexpy.CONFIG.NOTIFY_REMOTE_ACCESS_THRESHOLD)
plexpy.PLEX_REMOTE_ACCESS_UP = True elif plexpy.PLEX_REMOTE_ACCESS_UP is False and not server_response['reason']:
logger.info("Tautulli ReachabilityHandler :: Plex remote access is back up.")
if ACTIVITY_SCHED.get_job('on_extdown'): plexpy.PLEX_REMOTE_ACCESS_UP = True
logger.debug("Tautulli ReachabilityHandler :: Cancelling scheduled remote access down callback.")
schedule_callback('on_extdown', remove_job=True)
else:
self.on_extup(server_response)
elif plexpy.PLEX_REMOTE_ACCESS_UP is None: if ACTIVITY_SCHED.get_job('on_extdown'):
plexpy.PLEX_REMOTE_ACCESS_UP = self.is_reachable() logger.debug("Tautulli ReachabilityHandler :: Cancelling scheduled remote access down callback.")
schedule_callback('on_extdown', remove_job=True)
else:
self.on_extup(server_response)
elif plexpy.PLEX_REMOTE_ACCESS_UP is None:
plexpy.PLEX_REMOTE_ACCESS_UP = self.is_reachable
def del_keys(key): def del_keys(key):