Check on_watched condition before adding to queue

This commit is contained in:
JonnyWong16 2016-10-03 00:20:41 -07:00 committed by JonnyWong16
parent 7b2a7aff9f
commit 82f4c99025
5 changed files with 48 additions and 36 deletions

View file

@ -381,7 +381,6 @@ def start():
# Start background notification thread # Start background notification thread
if any([CONFIG.MOVIE_NOTIFY_ENABLE, CONFIG.TV_NOTIFY_ENABLE, if any([CONFIG.MOVIE_NOTIFY_ENABLE, CONFIG.TV_NOTIFY_ENABLE,
CONFIG.MUSIC_NOTIFY_ENABLE, CONFIG.NOTIFY_RECENTLY_ADDED]): CONFIG.MUSIC_NOTIFY_ENABLE, CONFIG.NOTIFY_RECENTLY_ADDED]):
logger.info(u"Starting background notification handler.")
notification_handler.start_thread(num_threads=3) notification_handler.start_thread(num_threads=3)
started = True started = True

View file

@ -74,15 +74,15 @@ class ActivityHandler(object):
session = self.get_live_session() session = self.get_live_session()
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=session, notify_action='on_play')) stream_data=session, notify_action='on_play'))
# Write the new session to our temp session table # Write the new session to our temp session table
self.update_db_session(session=session) self.update_db_session(session=session)
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=session, notify_action='on_concurrent')) stream_data=session, notify_action='on_concurrent'))
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=session, notify_action='on_newdevice')) stream_data=session, notify_action='on_newdevice'))
def on_stop(self, force_stop=False): def on_stop(self, force_stop=False):
@ -104,7 +104,7 @@ class ActivityHandler(object):
# Retrieve the session data from our temp table # Retrieve the session data from our temp table
db_session = ap.get_session_by_key(session_key=self.get_session_key()) db_session = ap.get_session_by_key(session_key=self.get_session_key())
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=db_session, notify_action='on_stop')) stream_data=db_session, notify_action='on_stop'))
# Write it to the history table # Write it to the history table
@ -132,7 +132,7 @@ class ActivityHandler(object):
# Retrieve the session data from our temp table # Retrieve the session data from our temp table
db_session = ap.get_session_by_key(session_key=self.get_session_key()) db_session = ap.get_session_by_key(session_key=self.get_session_key())
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=db_session, notify_action='on_pause')) stream_data=db_session, notify_action='on_pause'))
def on_resume(self): def on_resume(self):
@ -151,7 +151,7 @@ class ActivityHandler(object):
# Retrieve the session data from our temp table # Retrieve the session data from our temp table
db_session = ap.get_session_by_key(session_key=self.get_session_key()) db_session = ap.get_session_by_key(session_key=self.get_session_key())
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=db_session, notify_action='on_resume')) stream_data=db_session, notify_action='on_resume'))
def on_buffer(self): def on_buffer(self):
@ -181,7 +181,7 @@ class ActivityHandler(object):
time_since_last_trigger == 0 or time_since_last_trigger >= plexpy.CONFIG.BUFFER_WAIT): time_since_last_trigger == 0 or time_since_last_trigger >= plexpy.CONFIG.BUFFER_WAIT):
ap.set_session_buffer_trigger_time(session_key=self.get_session_key()) ap.set_session_buffer_trigger_time(session_key=self.get_session_key())
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=db_session, notify_action='on_buffer')) stream_data=db_session, notify_action='on_buffer'))
# This function receives events from our websocket connection # This function receives events from our websocket connection
@ -225,7 +225,11 @@ class ActivityHandler(object):
# Monitor if the stream has reached the watch percentage for notifications # Monitor if the stream has reached the watch percentage for notifications
# The only purpose of this is for notifications # The only purpose of this is for notifications
if this_state != 'buffering': if this_state != 'buffering':
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( progress_percent = helpers.get_percent(db_session['view_offset'], db_session['duration'])
notify_states = notification_handler.get_notify_state(session=db_session)
if progress_percent >= plexpy.CONFIG.NOTIFY_WATCHED_PERCENT \
and not any(d['notify_action'] == 'on_watched' for d in notify_states):
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=db_session, notify_action='on_watched')) stream_data=db_session, notify_action='on_watched'))
else: else:

View file

@ -48,7 +48,7 @@ def check_active_sessions(ws_request=False):
if int_ping_count >= 3: if int_ping_count >= 3:
logger.info(u"PlexPy Monitor :: The Plex Media Server is back up.") logger.info(u"PlexPy Monitor :: The Plex Media Server is back up.")
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
notify_action='on_intup')) notify_action='on_intup'))
int_ping_count = 0 int_ping_count = 0
@ -70,13 +70,13 @@ def check_active_sessions(ws_request=False):
if session['state'] == 'paused': if session['state'] == 'paused':
logger.debug(u"PlexPy Monitor :: Session %s has been paused." % stream['session_key']) logger.debug(u"PlexPy Monitor :: Session %s has been paused." % stream['session_key'])
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=stream, notify_action='on_pause')) stream_data=stream, notify_action='on_pause'))
if session['state'] == 'playing' and stream['state'] == 'paused': if session['state'] == 'playing' and stream['state'] == 'paused':
logger.debug(u"PlexPy Monitor :: Session %s has been resumed." % stream['session_key']) logger.debug(u"PlexPy Monitor :: Session %s has been resumed." % stream['session_key'])
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=stream, notify_action='on_resume')) stream_data=stream, notify_action='on_resume'))
if stream['state'] == 'paused' and not ws_request: if stream['state'] == 'paused' and not ws_request:
@ -115,7 +115,7 @@ def check_active_sessions(ws_request=False):
'WHERE session_key = ? AND rating_key = ?', 'WHERE session_key = ? AND rating_key = ?',
[stream['session_key'], stream['rating_key']]) [stream['session_key'], stream['rating_key']])
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=stream, notify_action='on_buffer')) stream_data=stream, notify_action='on_buffer'))
else: else:
@ -130,7 +130,7 @@ def check_active_sessions(ws_request=False):
'WHERE session_key = ? AND rating_key = ?', 'WHERE session_key = ? AND rating_key = ?',
[stream['session_key'], stream['rating_key']]) [stream['session_key'], stream['rating_key']])
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=stream, notify_action='on_buffer')) stream_data=stream, notify_action='on_buffer'))
logger.debug(u"PlexPy Monitor :: Session %s is buffering. Count is now %s. Last triggered %s." logger.debug(u"PlexPy Monitor :: Session %s is buffering. Count is now %s. Last triggered %s."
@ -141,7 +141,12 @@ def check_active_sessions(ws_request=False):
# Check if the user has reached the offset in the media we defined as the "watched" percent # Check if the user has reached the offset in the media we defined as the "watched" percent
# Don't trigger if state is buffer as some clients push the progress to the end when # Don't trigger if state is buffer as some clients push the progress to the end when
# buffering on start. # buffering on start.
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( if session['state'] != 'buffering':
progress_percent = helpers.get_percent(session['view_offset'], session['duration'])
notify_states = notification_handler.get_notify_state(session=session)
if progress_percent >= plexpy.CONFIG.NOTIFY_WATCHED_PERCENT \
and not any(d['notify_action'] == 'on_watched' for d in notify_states):
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=stream, notify_action='on_watched')) stream_data=stream, notify_action='on_watched'))
else: else:
@ -156,10 +161,14 @@ def check_active_sessions(ws_request=False):
'WHERE session_key = ? AND rating_key = ?', 'WHERE session_key = ? AND rating_key = ?',
[stream['stopped'], 'stopped', stream['session_key'], stream['rating_key']]) [stream['stopped'], 'stopped', stream['session_key'], stream['rating_key']])
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( progress_percent = helpers.get_percent(stream['view_offset'], stream['duration'])
notify_states = notification_handler.get_notify_state(session=stream)
if progress_percent >= plexpy.CONFIG.NOTIFY_WATCHED_PERCENT \
and not any(d['notify_action'] == 'on_watched' for d in notify_states):
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=stream, notify_action='on_watched')) stream_data=stream, notify_action='on_watched'))
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=stream, notify_action='on_stop')) stream_data=stream, notify_action='on_stop'))
# Write the item history on playback stop # Write the item history on playback stop
@ -212,7 +221,7 @@ def check_active_sessions(ws_request=False):
% str(int_ping_count)) % str(int_ping_count))
if int_ping_count == 3: if int_ping_count == 3:
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
notify_action='on_intdown')) notify_action='on_intdown'))
@ -266,7 +275,7 @@ def check_recently_added():
if 0 < time_threshold - int(item['added_at']) <= time_interval: if 0 < time_threshold - int(item['added_at']) <= time_interval:
logger.debug(u"PlexPy Monitor :: Library item %s has been added to Plex." % str(item['rating_key'])) logger.debug(u"PlexPy Monitor :: Library item %s has been added to Plex." % str(item['rating_key']))
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
timeline_data=item, notify_action='on_created')) timeline_data=item, notify_action='on_created'))
else: else:
@ -285,7 +294,7 @@ def check_recently_added():
logger.debug(u"PlexPy Monitor :: Library item %s has been added to Plex." % str(item['rating_key'])) logger.debug(u"PlexPy Monitor :: Library item %s has been added to Plex." % str(item['rating_key']))
# Check if any notification agents have notifications enabled # Check if any notification agents have notifications enabled
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
timeline_data=item, notify_action='on_created')) timeline_data=item, notify_action='on_created'))
@ -318,13 +327,13 @@ def check_server_response():
if ext_ping_count >= 3: if ext_ping_count >= 3:
logger.info(u"PlexPy Monitor :: Plex remote access is back up.") logger.info(u"PlexPy Monitor :: Plex remote access is back up.")
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
notify_action='on_extup')) notify_action='on_extup'))
ext_ping_count = 0 ext_ping_count = 0
if ext_ping_count == 3: if ext_ping_count == 3:
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
notify_action='on_extdown')) notify_action='on_extdown'))
@ -342,7 +351,7 @@ def check_server_updates():
if download_info['update_available']: if download_info['update_available']:
logger.info(u"PlexPy Monitor :: PMS update available version: %s", download_info['version']) logger.info(u"PlexPy Monitor :: PMS update available version: %s", download_info['version'])
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
notify_action='on_pmsupdate')) notify_action='on_pmsupdate'))
else: else:

View file

@ -99,7 +99,7 @@ class ActivityProcessor(object):
# Check if any notification agents have notifications enabled # Check if any notification agents have notifications enabled
if notify: if notify:
values.update({'ip_address': session['ip_address']}) values.update({'ip_address': session['ip_address']})
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=values, notify_action='on_play')) stream_data=values, notify_action='on_play'))
# If it's our first write then time stamp it. # If it's our first write then time stamp it.
@ -116,9 +116,9 @@ class ActivityProcessor(object):
self.db.upsert('sessions', ip_address, keys) self.db.upsert('sessions', ip_address, keys)
if notify: if notify:
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=values, notify_action='on_concurrent')) stream_data=values, notify_action='on_concurrent'))
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_notify_queue( plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
stream_data=values, notify_action='on_newdevice')) stream_data=values, notify_action='on_newdevice'))
return True return True

View file

@ -41,13 +41,14 @@ def process_queue():
def start_thread(num_threads=1): def start_thread(num_threads=1):
logger.info(u"PlexPy NotificationHandler :: Starting background notification handler.")
for x in range(num_threads): for x in range(num_threads):
thread = threading.Thread(target=process_queue) thread = threading.Thread(target=process_queue)
thread.daemon = True thread.daemon = True
thread.start() thread.start()
def add_to_notify_queue(notify_action=None, stream_data=None, timeline_data=None): def add_to_queue(notify_action=None, stream_data=None, timeline_data=None):
if not notify_action: if not notify_action:
logger.debug(u"PlexPy NotificationHandler :: Notify called but no action received.") logger.debug(u"PlexPy NotificationHandler :: Notify called but no action received.")
return return
@ -99,8 +100,7 @@ def notify_conditions(notifier=None, notify_action=None, stream_data=None, timel
conditions = \ conditions = \
{'on_stop': plexpy.CONFIG.NOTIFY_CONSECUTIVE or progress_percent < plexpy.CONFIG.NOTIFY_WATCHED_PERCENT, {'on_stop': plexpy.CONFIG.NOTIFY_CONSECUTIVE or progress_percent < plexpy.CONFIG.NOTIFY_WATCHED_PERCENT,
'on_resume': plexpy.CONFIG.NOTIFY_CONSECUTIVE or progress_percent < 99, 'on_resume': plexpy.CONFIG.NOTIFY_CONSECUTIVE or progress_percent < 99,
'on_watched': progress_percent >= plexpy.CONFIG.NOTIFY_WATCHED_PERCENT and \ 'on_watched': not any(d['agent_id'] == notifier['agent_id'] and d['notify_action'] == notify_action
not any(d['agent_id'] == notifier['agent_id'] and d['notify_action'] == notify_action
for d in get_notify_state(session=stream_data)), for d in get_notify_state(session=stream_data)),
'on_concurrent': len(user_sessions) >= plexpy.CONFIG.NOTIFY_CONCURRENT_THRESHOLD, 'on_concurrent': len(user_sessions) >= plexpy.CONFIG.NOTIFY_CONCURRENT_THRESHOLD,
'on_newdevice': stream_data['machine_id'] not in user_devices 'on_newdevice': stream_data['machine_id'] not in user_devices
@ -195,7 +195,7 @@ def set_notify_state(notify_action, notifier, subject, body, session=None, metad
'user_id': session.get('user_id', None), 'user_id': session.get('user_id', None),
'notifier_id': notifier['id'], 'notifier_id': notifier['id'],
'agent_id': notifier['agent_id'], 'agent_id': notifier['agent_id'],
'notify_action': notify_action.split('on_')[-1]} 'notify_action': notify_action}
values = {'parent_rating_key': session.get('parent_rating_key', None), values = {'parent_rating_key': session.get('parent_rating_key', None),
'grandparent_rating_key': session.get('grandparent_rating_key', None), 'grandparent_rating_key': session.get('grandparent_rating_key', None),
@ -378,7 +378,7 @@ def build_media_notify_params(notify_action=None, session=None, timeline=None):
'server_name': server_name, 'server_name': server_name,
'server_uptime': server_uptime, 'server_uptime': server_uptime,
'server_version': server_times.get('version',''), 'server_version': server_times.get('version',''),
'action': notify_action.title(), 'action': notify_action.split('on_')[-1].title(),
'datestamp': arrow.now().format(date_format), 'datestamp': arrow.now().format(date_format),
'timestamp': arrow.now().format(time_format), 'timestamp': arrow.now().format(time_format),
# Stream parameters # Stream parameters
@ -501,7 +501,7 @@ def build_server_notify_params(notify_action=None):
'server_name': server_name, 'server_name': server_name,
'server_uptime': server_uptime, 'server_uptime': server_uptime,
'server_version': server_times.get('version',''), 'server_version': server_times.get('version',''),
'action': notify_action.title(), 'action': notify_action.split('on_')[-1].title(),
'datestamp': arrow.now().format(date_format), 'datestamp': arrow.now().format(date_format),
'timestamp': arrow.now().format(time_format), 'timestamp': arrow.now().format(time_format),
# Update parameters # Update parameters